Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of References for AVRO Schemas #926

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions src/karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from __future__ import annotations

from avro.errors import SchemaParseException
from avro.schema import parse as avro_parse, Schema as AvroSchema
from avro.name import Names as AvroNames
from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema
from collections.abc import Collection, Mapping, Sequence
from dataclasses import dataclass
from jsonschema import Draft7Validator
Expand All @@ -30,7 +31,9 @@
from typing import Any, cast, Final, final

import hashlib
import json
import logging
import re

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -153,6 +156,7 @@ def normalize_schema_str(
except JSONDecodeError as e:
LOG.info("Schema is not valid JSON")
raise e

elif schema_type == SchemaType.PROTOBUF:
if schema:
schema_str = str(schema)
Expand Down Expand Up @@ -195,6 +199,37 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema:
return parsed_typed_schema.schema


class AvroResolver:
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None):
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True)
self.dependencies = dependencies
self.unique_id = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is unique_id only used in tests?
Can you do without it?

self.regex = re.compile(r"^\s*\[")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regex does not seem to be used


def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> list:
"""To support references in AVRO we iteratively merge all referenced schemas with current schema"""
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)]
merge: list = []

while stack:
current_schema_str, current_dependencies = stack.pop()
if current_dependencies:
stack.append((current_schema_str, None))
for dependency in reversed(current_dependencies.values()):
stack.append((dependency.schema.schema_str, dependency.schema.dependencies))
else:
self.unique_id += 1
merge.append(current_schema_str)

return merge

def resolve(self) -> list:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve() and build() can be merged in a single method, since builder() is only called by resolve().
Maybe the whole AvroResolver class can be just a static method?

"""Resolve the given ``schema_str`` with ``dependencies`` to a list of schemas
sorted in an order where all referenced schemas are located prior to their referrers.
"""
return self.builder(self.schema_str, self.dependencies)


def parse(
schema_type: SchemaType,
schema_str: str,
Expand All @@ -207,18 +242,33 @@ def parse(
) -> ParsedTypedSchema:
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]:
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}")

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema
if schema_type is SchemaType.AVRO:
try:
if dependencies:
schemas_list = AvroResolver(schema_str, dependencies).resolve()
names = AvroNames(validate_names=validate_avro_names)
merged_schema = None
for schema in schemas_list:
# Merge dep with all previously merged ones
merged_schema = make_avsc_object(json.loads(schema), names)
merged_schema_str = str(merged_schema)
else:
merged_schema_str = schema_str
parsed_schema = parse_avro_schema_definition(
Copy link
Contributor

@davide-armand davide-armand Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have realized now that it is possible to pass a list of schema strings to avro.schema.parse():
https://github.com/aiven/avro/blob/5a82d57f2a650fd87c819a30e433f1abb2c76ca2/lang/py/avro/schema.py#L1192

We currently use parse() to parse a single schema (see parse_avro_schema_definition()).

When called with a list of schemas it returns a UnionSchema (through make_avsc_object()), which itself contains a list of parsed schemas (UnionSchema.schemas).
The constructor of UnionSchema seems to be doing what we want (loop on the schemas, call make_avsc_object() and remember names).

I think passing to parse() the reference schemas + the main schema (the main schema should be passed as last in the list) and then getting the last schema from UnionSchema.schemas should be equivalent to what's the PR now.

If it works then we can delegate that logic to the Avro library

schema_str,
merged_schema_str,
validate_enum_symbols=validate_avro_enum_symbols,
validate_names=validate_avro_names,
)
return ParsedTypedSchema(
schema_type=schema_type,
schema_str=schema_str,
schema=parsed_schema,
references=references,
dependencies=dependencies,
)
except (SchemaParseException, JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
Expand Down Expand Up @@ -287,7 +337,6 @@ def __init__(
dependencies: Mapping[str, Dependency] | None = None,
) -> None:
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema

super().__init__(
schema_type=schema_type,
schema_str=schema_str,
Expand Down
14 changes: 13 additions & 1 deletion src/karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,19 @@ def _handle_msg_schema(self, key: dict, value: dict | None) -> None:

parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | None = None
resolved_dependencies: dict[str, Dependency] | None = None
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
if schema_type_parsed == SchemaType.AVRO:
try:
if schema_references:
candidate_references = [reference_from_mapping(reference_data) for reference_data in schema_references]
resolved_references, resolved_dependencies = self.resolve_references(candidate_references)
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as e:
LOG.warning("Schema is not valid JSON")
raise e
except InvalidReferences as e:
LOG.exception("Invalid AVRO references")
raise e
elif schema_type_parsed == SchemaType.JSONSCHEMA:
try:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError as exc:
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ def _validate_references(
content_type=content_type,
status=HTTPStatus.BAD_REQUEST,
)
if references and schema_type != SchemaType.PROTOBUF:
if references and schema_type != SchemaType.PROTOBUF and schema_type != SchemaType.AVRO:
self.r(
body={
"error_code": SchemaErrorCodes.REFERENCES_SUPPORT_NOT_IMPLEMENTED.value,
Expand Down
Loading
Loading