-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support of References for AVRO Schemas #926
base: main
Are you sure you want to change the base?
Changes from all commits
5ab6ba0
2e69844
47e903f
cba3c0d
445519e
a1f659f
ba4d640
22432f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,8 @@ | |
from __future__ import annotations | ||
|
||
from avro.errors import SchemaParseException | ||
from avro.schema import parse as avro_parse, Schema as AvroSchema | ||
from avro.name import Names as AvroNames | ||
from avro.schema import make_avsc_object, parse as avro_parse, Schema as AvroSchema | ||
from collections.abc import Collection, Mapping, Sequence | ||
from dataclasses import dataclass | ||
from jsonschema import Draft7Validator | ||
|
@@ -30,7 +31,9 @@ | |
from typing import Any, cast, Final, final | ||
|
||
import hashlib | ||
import json | ||
import logging | ||
import re | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
@@ -153,6 +156,7 @@ def normalize_schema_str( | |
except JSONDecodeError as e: | ||
LOG.info("Schema is not valid JSON") | ||
raise e | ||
|
||
elif schema_type == SchemaType.PROTOBUF: | ||
if schema: | ||
schema_str = str(schema) | ||
|
@@ -195,6 +199,37 @@ def schema(self) -> Draft7Validator | AvroSchema | ProtobufSchema: | |
return parsed_typed_schema.schema | ||
|
||
|
||
class AvroResolver: | ||
def __init__(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None): | ||
self.schema_str = json_encode(json_decode(schema_str), compact=True, sort_keys=True) | ||
self.dependencies = dependencies | ||
self.unique_id = 0 | ||
self.regex = re.compile(r"^\s*\[") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
def builder(self, schema_str: str, dependencies: Mapping[str, Dependency] | None = None) -> list: | ||
"""To support references in AVRO we iteratively merge all referenced schemas with current schema""" | ||
stack: list[tuple[str, Mapping[str, Dependency] | None]] = [(schema_str, dependencies)] | ||
merge: list = [] | ||
|
||
while stack: | ||
current_schema_str, current_dependencies = stack.pop() | ||
if current_dependencies: | ||
stack.append((current_schema_str, None)) | ||
for dependency in reversed(current_dependencies.values()): | ||
stack.append((dependency.schema.schema_str, dependency.schema.dependencies)) | ||
else: | ||
self.unique_id += 1 | ||
merge.append(current_schema_str) | ||
|
||
return merge | ||
|
||
def resolve(self) -> list: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
"""Resolve the given ``schema_str`` with ``dependencies`` to a list of schemas | ||
sorted in an order where all referenced schemas are located prior to their referrers. | ||
""" | ||
return self.builder(self.schema_str, self.dependencies) | ||
|
||
|
||
def parse( | ||
schema_type: SchemaType, | ||
schema_str: str, | ||
|
@@ -207,18 +242,33 @@ def parse( | |
) -> ParsedTypedSchema: | ||
if schema_type not in [SchemaType.AVRO, SchemaType.JSONSCHEMA, SchemaType.PROTOBUF]: | ||
raise InvalidSchema(f"Unknown parser {schema_type} for {schema_str}") | ||
|
||
parsed_schema: Draft7Validator | AvroSchema | ProtobufSchema | ||
if schema_type is SchemaType.AVRO: | ||
try: | ||
if dependencies: | ||
schemas_list = AvroResolver(schema_str, dependencies).resolve() | ||
names = AvroNames(validate_names=validate_avro_names) | ||
merged_schema = None | ||
for schema in schemas_list: | ||
# Merge dep with all previously merged ones | ||
merged_schema = make_avsc_object(json.loads(schema), names) | ||
merged_schema_str = str(merged_schema) | ||
else: | ||
merged_schema_str = schema_str | ||
parsed_schema = parse_avro_schema_definition( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have realized now that it is possible to pass a list of schema strings to We currently use When called with a list of schemas it returns a I think passing to If it works then we can delegate that logic to the Avro library |
||
schema_str, | ||
merged_schema_str, | ||
validate_enum_symbols=validate_avro_enum_symbols, | ||
validate_names=validate_avro_names, | ||
) | ||
return ParsedTypedSchema( | ||
schema_type=schema_type, | ||
schema_str=schema_str, | ||
schema=parsed_schema, | ||
references=references, | ||
dependencies=dependencies, | ||
) | ||
except (SchemaParseException, JSONDecodeError, TypeError) as e: | ||
raise InvalidSchema from e | ||
|
||
elif schema_type is SchemaType.JSONSCHEMA: | ||
try: | ||
parsed_schema = parse_jsonschema_definition(schema_str) | ||
|
@@ -287,7 +337,6 @@ def __init__( | |
dependencies: Mapping[str, Dependency] | None = None, | ||
) -> None: | ||
self._schema_cached: Draft7Validator | AvroSchema | ProtobufSchema | None = schema | ||
|
||
super().__init__( | ||
schema_type=schema_type, | ||
schema_str=schema_str, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
unique_id
only used in tests?Can you do without it?