Skip to content

Commit

Permalink
Preserve original field order when merging parquet Fields
Browse files Browse the repository at this point in the history
We use the `Migrations.mergeSchema` function in the loaders, for
combining a series of schemas (e.g. `1-0-0`, `1-0-1`, `1-0-2`) into a
reconciled column

Before this PR, `mergeSchemas` contained some logic to preserve field
order... but it was the wrong logic. It preserved field order of the
newer schema, ignoring whether a field was present in the older schema.

After this PR, we preserve field order of the older schema.  New fields
added in a later schema are appended to the end of the field list.

This feature change is needed for loaders that only allow field
additions when they are appended to the end of a struct. E.g. the Lake
Loader for Hudi/Glue.
  • Loading branch information
istreeter committed Nov 15, 2024
1 parent 6aa722d commit 052450f
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ object Field {
Field(name, Type.Array(constructedType.value, itemNullability), nullability)
}

def normalize(field: Field): Field = {
val fieldType = field.fieldType match {
case Type.Struct(fields) => Type.Struct(collapseDuplicateFields(fields.map(normalize)))
case Type.Array(Type.Struct(fields), nullability) => Type.Array(
Type.Struct(collapseDuplicateFields(fields.map(normalize)))
, nullability)
def normalize(field: Field): Field =
field.copy(name = normalizeName(field), fieldType = normalizeType(field.fieldType))

private def normalizeType(t: Type): Type =
t match {
case Type.Struct(fields) => Type.Struct(collapseDuplicateFields(fields.map(normalize)).sortBy(_.name))
case Type.Array(el, nullability) => Type.Array(normalizeType(el), nullability)
case other => other
}
field.copy(name = normalizeName(field), fieldType = fieldType)
}

private def collapseDuplicateFields(normFields: NonEmptyVector[Field]): NonEmptyVector[Field] = {
val endMap = normFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ object Migrations {
val mergedType: Option[Type] = sourceType match {
case sourceStruct@Struct(sourceFields) => targetType match {
case targetStruct@Type.Struct(targetFields) =>
val forwardMigration = sourceFields.map(srcField => MigrationFieldPair(srcField.name :: path, srcField, targetStruct.focus(srcField.name)).migrations)
val forwardMigration = sourceFields.map {
srcField =>
MigrationFieldPair(srcField.name :: path, srcField, targetStruct.focus(srcField.name)).migrations
}

// Comparing struct target fields to the source. This will detect additions.
val reverseMigration = targetFields.map(tgtField => MigrationFieldPair(tgtField.name :: path, tgtField, sourceStruct.focus(tgtField.name)).migrations)
Expand All @@ -96,22 +99,23 @@ object Migrations {
val tgtFields = reverseMigration.toVector.traverse(_.result).toVector.flatten
val tgtFieldNames = tgtFields.map(_.name)
val allSrcFields = forwardMigration.toVector.traverse(_.result).toVector.flatten
val allSrcFieldMap = allSrcFields.map(f => f.name -> f).toMap
// swap fields in src and target as they would be rearranged in nested structs or arrays
val reorderedTgtFields = tgtFields.map { t =>
allSrcFieldMap.get(t.name) match {
case Some(value) if value.fieldType.isInstanceOf[Struct] => value
case Some(value) if value.fieldType.isInstanceOf[Array] => value
case _ => t
}
val allSrcFieldNames = allSrcFields.map(_.name)

val srcFields: Vector[Field] = allSrcFields.map {
srcField =>
if (tgtFieldNames.contains(srcField.name))
srcField
else
// drop not null constraints from removed fields.
srcField.copy(nullability = Type.Nullability.Nullable)
}
val newTgtFields = tgtFields.filter {
tgtField =>
!allSrcFieldNames.contains(tgtField.name)
}
val srcFields: Vector[Field] = allSrcFields.filter(srcField => !tgtFieldNames.contains(srcField.name)).map(
// drop not null constrains from removed fields.
_.copy(nullability = Type.Nullability.Nullable)
)

// failed migration would produce no fields in source
NonEmptyVector.fromVector(reorderedTgtFields ++ srcFields).map { nonEmpty =>
NonEmptyVector.fromVector(srcFields ++ newTgtFields).map { nonEmpty =>
Type.Struct(nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
normalName handles camel case and disallowed characters $e13
normalize would collapse colliding names $e14
normalize would collapse colliding names with deterministic type selection $e15
normalize would sort fields in order of name $e16
normalize would sort nested fields in order of name $e17
"""

// a helper
Expand Down Expand Up @@ -409,15 +411,15 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
fieldType = Type.Struct(
fields = NonEmptyVector.of(
Field(
name = "_ga",
name = "__b",
fieldType = Type.Integer,
nullability = Nullable,
accessors = Set("_ga", "_Ga")
nullability = Nullable
),
Field(
name = "__b",
name = "_ga",
fieldType = Type.Integer,
nullability = Nullable
nullability = Nullable,
accessors = Set("_ga", "_Ga")
),
)
),
Expand Down Expand Up @@ -458,5 +460,65 @@ class FieldSpec extends org.specs2.Specification { def is = s2"""
(input1 must_== expected) and (input2 must_== expected)
}

def e16 = {
val input = {
val struct = Type.Struct(
NonEmptyVector.of(
Field("Apple", Type.String, Nullable),
Field("cherry", Type.String, Nullable),
Field("banana", Type.String, Nullable),
Field("Damson", Type.String, Nullable)
)
)
Field("top", struct, Nullable)
}

val expected = {
val struct = Type.Struct(
NonEmptyVector.of(
Field("apple", Type.String, Nullable, Set("Apple")),
Field("banana", Type.String, Nullable, Set("banana")),
Field("cherry", Type.String, Nullable, Set("cherry")),
Field("damson", Type.String, Nullable, Set("Damson"))
)
)
Field("top", struct, Nullable)
}

Field.normalize(input) must beEqualTo(expected)
}

def e17 = {
val input = {
val nested = Type.Struct(
NonEmptyVector.of(
Field("Apple", Type.String, Nullable),
Field("cherry", Type.String, Nullable),
Field("banana", Type.String, Nullable),
Field("Damson", Type.String, Nullable)
)
)
val arr = Field("nested_arr", Type.Array(nested, Nullable), Nullable)
val struct = Field("nested_obj", nested, Nullable)
Field("top", Type.Struct(NonEmptyVector.of(arr, struct)), Nullable)
}

val expected = {
val nested = Type.Struct(
NonEmptyVector.of(
Field("apple", Type.String, Nullable, Set("Apple")),
Field("banana", Type.String, Nullable, Set("banana")),
Field("cherry", Type.String, Nullable, Set("cherry")),
Field("damson", Type.String, Nullable, Set("Damson"))
)
)
val arr = Field("nested_arr", Type.Array(nested, Nullable), Nullable)
val struct = Field("nested_obj", nested, Nullable)
Field("top", Type.Struct(NonEmptyVector.of(arr, struct)), Nullable)
}

Field.normalize(input) must beEqualTo(expected)
}

private def fieldNormalName(name: String) = Field.normalizeName(Field(name, Type.String, nullability = Nullable))
}
Loading

0 comments on commit 052450f

Please sign in to comment.