Skip to content

Commit

Permalink
basic implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Feb 7, 2025
1 parent 04bbe45 commit 84ac6ab
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ data class DestinationStream(
return importType is Overwrite ||
(minimumGenerationId == generationId && minimumGenerationId > 0)
}

fun isSingleGenerationTruncate() =
shouldBeTruncatedAtEndOfSync() && minimumGenerationId == generationId
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,18 @@ class S3DataLakeStreamLoader(
properties = properties
)

s3DataLakeTableSynchronizer.applySchemaChanges(table, incomingSchema)
// If we're executing a truncate, then force the schema change.
val columnTypeChangeBehavior =
if (stream.isSingleGenerationTruncate()) {
ColumnTypeChangeBehavior.OVERWRITE
} else {
ColumnTypeChangeBehavior.SAFE_SUPERTYPE
}
s3DataLakeTableSynchronizer.applySchemaChanges(
table,
incomingSchema,
columnTypeChangeBehavior
)

try {
logger.info {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ import org.apache.iceberg.UpdateSchema
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.PrimitiveType

/** Describes how the [S3DataLakeTableSynchronizer] handles column type changes. */
enum class ColumnTypeChangeBehavior {
/**
* Find the supertype between the old and new types, throwing an error if Iceberg does not
* support safely altering the column in this way.
*/
SAFE_SUPERTYPE,

/** Set the column's type to the new type, executing an incompatible schema change if needed. */
OVERWRITE,
}

/**
* Applies schema changes to an Iceberg [Table], including nested columns (struct fields).
*
Expand All @@ -30,7 +42,6 @@ class S3DataLakeTableSynchronizer(
private val comparator: S3DataLakeTypesComparator,
private val superTypeFinder: S3DataLakeSuperTypeFinder,
) {

/**
* Compare [table]'s current schema with [incomingSchema] and apply changes as needed:
*
Expand All @@ -43,7 +54,11 @@ class S3DataLakeTableSynchronizer(
* @param incomingSchema The schema describing incoming data.
* @return The updated [Schema], after changes have been applied and committed.
*/
fun applySchemaChanges(table: Table, incomingSchema: Schema): Schema {
fun applySchemaChanges(
table: Table,
incomingSchema: Schema,
columnTypeChangeBehavior: ColumnTypeChangeBehavior,
): Schema {
val existingSchema = table.schema()
val diff = comparator.compareSchemas(incomingSchema, existingSchema)

Expand All @@ -66,18 +81,34 @@ class S3DataLakeTableSynchronizer(
incomingSchema.findField(columnName)
?: error("Field \"$columnName\" not found in the incoming schema!")

val superType: Type =
superTypeFinder.findSuperType(
existingType = existingField.type(),
incomingType = incomingField.type(),
columnName = columnName
)
require(superType is PrimitiveType) {
"Currently only primitive type updates are supported. Attempted type: $superType"
}
val newType: Type =
when (columnTypeChangeBehavior) {
ColumnTypeChangeBehavior.SAFE_SUPERTYPE -> {
val superType: Type =
superTypeFinder.findSuperType(
existingType = existingField.type(),
incomingType = incomingField.type(),
columnName = columnName
)
require(superType is PrimitiveType) {
"Currently only primitive type updates are supported. Attempted type: $superType"
}
superType
}
ColumnTypeChangeBehavior.OVERWRITE -> incomingField.type()
}

// Update the column to the supertype
update.updateColumn(columnName, superType)
if (newType is PrimitiveType) {
// Iceberg directly supports altering columns to a primitive type
update.updateColumn(columnName, newType)
} else {
// We shouldn't need to do this (b/c we map all types to primitives,
// including objects/arrays/unions, which we map to STRING).
throw IllegalStateException(
"We should never alter a column into a non-primitive type. This indicates a bug in our AirbyteType -> Iceberg schema conversion."
)
}
}

// 3) Mark columns newly optional
Expand Down

0 comments on commit 84ac6ab

Please sign in to comment.