From 64b824adfa7cc76716b272e3554128d704553f00 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 29 Nov 2023 08:09:00 -0800 Subject: [PATCH] Fixes for schema evolution with structs in collections (#167) --- .../iceberg/connect/data/IcebergWriter.java | 9 +- .../iceberg/connect/data/RecordConverter.java | 30 +- .../iceberg/connect/data/SchemaUpdate.java | 42 +++ .../iceberg/connect/data/SchemaUtils.java | 18 +- .../connect/data/RecordConverterTest.java | 355 +++++++++++++----- .../iceberg/connect/data/SchemaUtilsTest.java | 39 +- 6 files changed, 333 insertions(+), 160 deletions(-) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java index 263bb477..3e4d74e9 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java @@ -19,6 +19,7 @@ package io.tabular.iceberg.connect.data; import io.tabular.iceberg.connect.IcebergSinkConfig; +import io.tabular.iceberg.connect.data.SchemaUpdate.Consumer; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; @@ -82,10 +83,10 @@ private Record convertToRow(SinkRecord record) { return recordConverter.convert(record.value()); } - List updates = Lists.newArrayList(); - Record row = recordConverter.convert(record.value(), updates::add); + SchemaUpdate.Consumer updates = new Consumer(); + Record row = recordConverter.convert(record.value(), updates); - if (!updates.isEmpty()) { + if (!updates.empty()) { // complete the current file flush(); // apply the schema updates, this will refresh the table @@ -93,7 +94,7 @@ private Record convertToRow(SinkRecord record) { // initialize a new writer with the new schema initNewWriter(); // convert the row again, this time using the new table schema - row = recordConverter.convert(record.value(), updates::add); + row = recordConverter.convert(record.value(), null); } return row; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java index 5f8d90ea..b62bd1a7 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java @@ -22,9 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional; -import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; @@ -46,7 +43,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.function.Consumer; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -94,7 +90,7 @@ public Record convert(Object data) { return convert(data, null); } - public Record convert(Object data, Consumer schemaUpdateConsumer) { + public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) { if (data instanceof Struct || data instanceof Map) { return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer); } @@ -107,7 +103,7 @@ private NameMapping createNameMapping(Table table) { } private Object convertValue( - Object value, Type type, int fieldId, Consumer schemaUpdateConsumer) { + Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) { if (value == null) { return null; } @@ -151,7 +147,7 @@ protected GenericRecord convertStructValue( Object value, StructType schema, int parentFieldId, - Consumer schemaUpdateConsumer) { + SchemaUpdate.Consumer schemaUpdateConsumer) { if (value instanceof Map) { return convertToStruct((Map) value, schema, parentFieldId, schemaUpdateConsumer); } else if (value instanceof Struct) { @@ -164,7 +160,7 @@ private GenericRecord convertToStruct( Map map, StructType schema, int structFieldId, - Consumer schemaUpdateConsumer) { + SchemaUpdate.Consumer schemaUpdateConsumer) { GenericRecord result = GenericRecord.create(schema); map.forEach( (recordFieldNameObj, recordFieldValue) -> { @@ -178,8 +174,7 @@ private GenericRecord convertToStruct( if (type.isPresent()) { String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); - schemaUpdateConsumer.accept( - new AddColumn(parentFieldName, recordFieldName, type.get())); + schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type.get()); } } } else { @@ -199,7 +194,7 @@ private GenericRecord convertToStruct( Struct struct, StructType schema, int structFieldId, - Consumer schemaUpdateConsumer) { + SchemaUpdate.Consumer schemaUpdateConsumer) { GenericRecord result = GenericRecord.create(schema); struct .schema() @@ -213,8 +208,7 @@ private GenericRecord convertToStruct( String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); Type type = SchemaUtils.toIcebergType(recordField.schema(), config); - schemaUpdateConsumer.accept( - new AddColumn(parentFieldName, recordField.name(), type)); + schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); } } else { boolean hasSchemaUpdates = false; @@ -224,13 +218,13 @@ private GenericRecord convertToStruct( SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema()); if (evolveDataType != null) { String fieldName = tableSchema.findColumnName(tableField.fieldId()); - schemaUpdateConsumer.accept(new UpdateType(fieldName, evolveDataType)); + schemaUpdateConsumer.updateType(fieldName, evolveDataType); hasSchemaUpdates = true; } // make optional if needed and schema evolution is on if (tableField.isRequired() && recordField.schema().isOptional()) { String fieldName = tableSchema.findColumnName(tableField.fieldId()); - schemaUpdateConsumer.accept(new MakeOptional(fieldName)); + schemaUpdateConsumer.makeOptional(fieldName); hasSchemaUpdates = true; } } @@ -277,7 +271,7 @@ private Map createStructNameMap(StructType schema) { } protected List convertListValue( - Object value, ListType type, Consumer schemaUpdateConsumer) { + Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) { Preconditions.checkArgument(value instanceof List); List list = (List) value; return list.stream() @@ -290,14 +284,14 @@ protected List convertListValue( } protected Map convertMapValue( - Object value, MapType type, Consumer schemaUpdateConsumer) { + Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) { Preconditions.checkArgument(value instanceof Map); Map map = (Map) value; Map result = Maps.newHashMap(); map.forEach( (k, v) -> { int keyFieldId = type.fields().get(0).fieldId(); - int valueFieldId = type.fields().get(0).fieldId(); + int valueFieldId = type.fields().get(1).fieldId(); result.put( convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer), convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer)); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java index ab54f9e3..653b28c2 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUpdate.java @@ -18,11 +18,49 @@ */ package io.tabular.iceberg.connect.data; +import java.util.Collection; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Type.PrimitiveType; public class SchemaUpdate { + public static class Consumer { + private final Map addColumns = Maps.newHashMap(); + private final Map updateTypes = Maps.newHashMap(); + private final Map makeOptionals = Maps.newHashMap(); + + public Collection addColumns() { + return addColumns.values(); + } + + public Collection updateTypes() { + return updateTypes.values(); + } + + public Collection makeOptionals() { + return makeOptionals.values(); + } + + public boolean empty() { + return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty(); + } + + public void addColumn(String parentName, String name, Type type) { + AddColumn addCol = new AddColumn(parentName, name, type); + addColumns.put(addCol.key(), addCol); + } + + public void updateType(String name, PrimitiveType type) { + updateTypes.put(name, new UpdateType(name, type)); + } + + public void makeOptional(String name) { + makeOptionals.put(name, new MakeOptional(name)); + } + } + public static class AddColumn extends SchemaUpdate { private final String parentName; private final String name; @@ -42,6 +80,10 @@ public String name() { return name; } + public String key() { + return parentName == null ? name : parentName + "." + name; + } + public Type type() { return type; } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java index b15417d9..43c7944b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/SchemaUtils.java @@ -82,8 +82,8 @@ public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema return null; } - public static void applySchemaUpdates(Table table, List updates) { - if (updates == null || updates.isEmpty()) { + public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) { + if (updates == null || updates.empty()) { // no updates to apply return; } @@ -93,31 +93,25 @@ public static void applySchemaUpdates(Table table, List updates) { .run(notUsed -> commitSchemaUpdates(table, updates)); } - private static void commitSchemaUpdates(Table table, List updates) { + private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) { // get the latest schema in case another process updated it table.refresh(); // filter out columns that have already been added List addColumns = - updates.stream() - .filter(update -> update instanceof AddColumn) - .map(update -> (AddColumn) update) + updates.addColumns().stream() .filter(addCol -> !columnExists(table.schema(), addCol)) .collect(toList()); // filter out columns that have the updated type List updateTypes = - updates.stream() - .filter(update -> update instanceof UpdateType) - .map(update -> (UpdateType) update) + updates.updateTypes().stream() .filter(updateType -> !typeMatches(table.schema(), updateType)) .collect(toList()); // filter out columns that have already been made optional List makeOptionals = - updates.stream() - .filter(update -> update instanceof MakeOptional) - .map(update -> (MakeOptional) update) + updates.makeOptionals().stream() .filter(makeOptional -> !isOptional(table.schema(), makeOptional)) .collect(toList()); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java index e90855b2..2c716c69 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java @@ -18,7 +18,6 @@ */ package io.tabular.iceberg.connect.data; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,10 +37,12 @@ import java.time.ZoneOffset; import java.time.temporal.Temporal; import java.util.Base64; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; @@ -51,8 +52,8 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.DateType; @@ -125,6 +126,31 @@ public class RecordConverterTest { private static final org.apache.iceberg.Schema ID_SCHEMA = new org.apache.iceberg.Schema(Types.NestedField.required(1, "ii", Types.IntegerType.get())); + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stli", Types.ListType.ofRequired(101, NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_LIST_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, "stli", Types.ListType.ofRequired(101, ID_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, + "stma", + Types.MapType.ofRequired( + 101, 102, Types.StringType.get(), NESTED_SCHEMA.asStruct()))); + + private static final org.apache.iceberg.Schema STRUCT_IN_MAP_BASIC_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required( + 100, + "stma", + Types.MapType.ofRequired(101, 102, Types.StringType.get(), ID_SCHEMA.asStruct()))); + private static final Schema CONNECT_SCHEMA = SchemaBuilder.struct() .field("i", Schema.INT32_SCHEMA) @@ -146,6 +172,14 @@ public class RecordConverterTest { private static final Schema CONNECT_NESTED_SCHEMA = SchemaBuilder.struct().field("ii", Schema.INT32_SCHEMA).field("st", CONNECT_SCHEMA); + private static final Schema CONNECT_STRUCT_IN_LIST_SCHEMA = + SchemaBuilder.struct().field("stli", SchemaBuilder.array(CONNECT_NESTED_SCHEMA)).build(); + + private static final Schema CONNECT_STRUCT_IN_MAP_SCHEMA = + SchemaBuilder.struct() + .field("stma", SchemaBuilder.map(Schema.STRING_SCHEMA, CONNECT_NESTED_SCHEMA)) + .build(); + private static final LocalDate DATE_VAL = LocalDate.parse("2023-05-18"); private static final LocalTime TIME_VAL = LocalTime.parse("07:14:21"); private static final LocalDateTime TS_VAL = LocalDateTime.parse("2023-05-18T07:14:21"); @@ -213,6 +247,35 @@ public void testMapToString() throws Exception { assertThat(map).hasSize(MAPPED_CNT); } + @Test + public void testMapValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = converter.convert(ImmutableMap.of("stli", ImmutableList.of(data, data))); + List fieldVal = (List) record.getField("stli"); + + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testMapValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map data = createNestedMapData(); + Record record = + converter.convert(ImmutableMap.of("stma", ImmutableMap.of("key1", data, "key2", data))); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + @Test public void testStructConvert() { Table table = mock(Table.class); @@ -250,6 +313,39 @@ public void testStructToString() throws Exception { assertThat(map).hasSize(MAPPED_CNT); } + @Test + public void testStructValueInListConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA).put("stli", ImmutableList.of(data, data)); + Record record = converter.convert(struct); + + List fieldVal = (List) record.getField("stli"); + Record elementVal = (Record) fieldVal.get(0); + assertNestedRecordValues(elementVal); + } + + @Test + public void testStructValueInMapConvert() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct data = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", data, "key2", data)); + Record record = converter.convert(struct); + + Map fieldVal = (Map) record.getField("stma"); + Record mapVal = (Record) fieldVal.get("key1"); + assertNestedRecordValues(mapVal); + } + @Test public void testNameMapping() { Table table = mock(Table.class); @@ -416,30 +512,16 @@ public void testMissingColumnDetectionMap() { Map data = Maps.newHashMap(createMapData()); data.put("null", null); - List updates = Lists.newArrayList(); - converter.convert(data, updates::add); - List addCols = updates.stream().map(update -> (AddColumn) update).collect(toList()); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); assertThat(addCols).hasSize(15); Map newColMap = Maps.newHashMap(); addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); - assertThat(newColMap.get("i").type()).isInstanceOf(LongType.class); - assertThat(newColMap.get("l").type()).isInstanceOf(LongType.class); - assertThat(newColMap.get("d").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("t").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("ts").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("tsz").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("fl").type()).isInstanceOf(DoubleType.class); - assertThat(newColMap.get("do").type()).isInstanceOf(DoubleType.class); - assertThat(newColMap.get("dec").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("s").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("u").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("f").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("b").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("li").type()).isInstanceOf(ListType.class); - assertThat(newColMap.get("ma").type()).isInstanceOf(StructType.class); + assertTypesAddedFromMap(col -> newColMap.get(col).type()); // null values should be ignored assertThat(newColMap).doesNotContainKey("null"); @@ -452,33 +534,59 @@ public void testMissingColumnDetectionMapNested() { RecordConverter converter = new RecordConverter(table, config); Map nestedData = createNestedMapData(); - List addCols = Lists.newArrayList(); - converter.convert(nestedData, addCols::add); - - assertThat(addCols).hasSize(1); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); assertThat(addCols).hasSize(1); - AddColumn addCol = (AddColumn) addCols.get(0); + AddColumn addCol = addCols.iterator().next(); assertThat(addCol.name()).isEqualTo("st"); StructType addedType = addCol.type().asStructType(); assertThat(addedType.fields()).hasSize(15); - assertThat(addedType.field("i").type()).isInstanceOf(LongType.class); - assertThat(addedType.field("l").type()).isInstanceOf(LongType.class); - assertThat(addedType.field("d").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("t").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("ts").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("tsz").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("fl").type()).isInstanceOf(DoubleType.class); - assertThat(addedType.field("do").type()).isInstanceOf(DoubleType.class); - assertThat(addedType.field("dec").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("s").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("u").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("f").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("b").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("li").type()).isInstanceOf(ListType.class); - assertThat(addedType.field("ma").type()).isInstanceOf(StructType.class); + assertTypesAddedFromMap(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionMapListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Map nestedData = createNestedMapData(); + Map map = ImmutableMap.of("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(map, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(15); + assertTypesAddedFromMap(col -> nestedElementType.field(col).type()); + } + + private void assertTypesAddedFromMap(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(LongType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(StringType.class); + assertThat(fn.apply("t")).isInstanceOf(StringType.class); + assertThat(fn.apply("ts")).isInstanceOf(StringType.class); + assertThat(fn.apply("tsz")).isInstanceOf(StringType.class); + assertThat(fn.apply("fl")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(StringType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(StringType.class); + assertThat(fn.apply("b")).isInstanceOf(StringType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(StructType.class); } @Test @@ -488,30 +596,105 @@ public void testMissingColumnDetectionStruct() { RecordConverter converter = new RecordConverter(table, config); Struct data = createStructData(); - List updates = Lists.newArrayList(); - converter.convert(data, updates::add); - List addCols = updates.stream().map(update -> (AddColumn) update).collect(toList()); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection addCols = consumer.addColumns(); assertThat(addCols).hasSize(15); Map newColMap = Maps.newHashMap(); addCols.forEach(addCol -> newColMap.put(addCol.name(), addCol)); - assertThat(newColMap.get("i").type()).isInstanceOf(IntegerType.class); - assertThat(newColMap.get("l").type()).isInstanceOf(LongType.class); - assertThat(newColMap.get("d").type()).isInstanceOf(DateType.class); - assertThat(newColMap.get("t").type()).isInstanceOf(TimeType.class); - assertThat(newColMap.get("ts").type()).isInstanceOf(TimestampType.class); - assertThat(newColMap.get("tsz").type()).isInstanceOf(TimestampType.class); - assertThat(newColMap.get("fl").type()).isInstanceOf(FloatType.class); - assertThat(newColMap.get("do").type()).isInstanceOf(DoubleType.class); - assertThat(newColMap.get("dec").type()).isInstanceOf(DecimalType.class); - assertThat(newColMap.get("s").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("u").type()).isInstanceOf(StringType.class); - assertThat(newColMap.get("f").type()).isInstanceOf(BinaryType.class); - assertThat(newColMap.get("b").type()).isInstanceOf(BinaryType.class); - assertThat(newColMap.get("li").type()).isInstanceOf(ListType.class); - assertThat(newColMap.get("ma").type()).isInstanceOf(MapType.class); + assertTypesAddedFromStruct(col -> newColMap.get(col).type()); + } + + @Test + public void testMissingColumnDetectionStructNested() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(ID_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(nestedData, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.name()).isEqualTo("st"); + + StructType addedType = addCol.type().asStructType(); + assertThat(addedType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> addedType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructListValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_LIST_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_LIST_SCHEMA) + .put("stli", ImmutableList.of(nestedData, nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stli.element"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedElementType = addCol.type().asStructType(); + assertThat(nestedElementType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> nestedElementType.field(col).type()); + } + + @Test + public void testMissingColumnDetectionStructMapValue() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(STRUCT_IN_MAP_BASIC_SCHEMA); + RecordConverter converter = new RecordConverter(table, config); + + Struct nestedData = createNestedStructData(); + Struct struct = + new Struct(CONNECT_STRUCT_IN_MAP_SCHEMA) + .put("stma", ImmutableMap.of("key1", nestedData, "key2", nestedData)); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(struct, consumer); + Collection addCols = consumer.addColumns(); + + assertThat(addCols).hasSize(1); + + AddColumn addCol = addCols.iterator().next(); + assertThat(addCol.parentName()).isEqualTo("stma.value"); + assertThat(addCol.name()).isEqualTo("st"); + + StructType nestedValueType = addCol.type().asStructType(); + assertThat(nestedValueType.fields()).hasSize(15); + assertTypesAddedFromStruct(col -> nestedValueType.field(col).type()); + } + + private void assertTypesAddedFromStruct(Function fn) { + assertThat(fn.apply("i")).isInstanceOf(IntegerType.class); + assertThat(fn.apply("l")).isInstanceOf(LongType.class); + assertThat(fn.apply("d")).isInstanceOf(DateType.class); + assertThat(fn.apply("t")).isInstanceOf(TimeType.class); + assertThat(fn.apply("ts")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("tsz")).isInstanceOf(TimestampType.class); + assertThat(fn.apply("fl")).isInstanceOf(FloatType.class); + assertThat(fn.apply("do")).isInstanceOf(DoubleType.class); + assertThat(fn.apply("dec")).isInstanceOf(DecimalType.class); + assertThat(fn.apply("s")).isInstanceOf(StringType.class); + assertThat(fn.apply("u")).isInstanceOf(StringType.class); + assertThat(fn.apply("f")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("b")).isInstanceOf(BinaryType.class); + assertThat(fn.apply("li")).isInstanceOf(ListType.class); + assertThat(fn.apply("ma")).isInstanceOf(MapType.class); } @Test @@ -529,15 +712,14 @@ public void testEvolveTypeDetectionStruct() { SchemaBuilder.struct().field("ii", Schema.INT64_SCHEMA).field("ff", Schema.FLOAT64_SCHEMA); Struct data = new Struct(valueSchema).put("ii", 11L).put("ff", 22d); - List updates = Lists.newArrayList(); - converter.convert(data, updates::add); - List addCols = - updates.stream().map(update -> (UpdateType) update).collect(toList()); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); - assertThat(addCols).hasSize(2); + assertThat(updates).hasSize(2); Map updateMap = Maps.newHashMap(); - addCols.forEach(update -> updateMap.put(update.name(), update)); + updates.forEach(update -> updateMap.put(update.name(), update)); assertThat(updateMap.get("ii").type()).isInstanceOf(LongType.class); assertThat(updateMap.get("ff").type()).isInstanceOf(DoubleType.class); @@ -566,54 +748,19 @@ public void testEvolveTypeDetectionStructNested() { Struct structValue = new Struct(structSchema).put("ii", 11L).put("ff", 22d); Struct data = new Struct(schema).put("i", 1).put("st", structValue); - List updates = Lists.newArrayList(); - converter.convert(data, updates::add); - List addCols = - updates.stream().map(update -> (UpdateType) update).collect(toList()); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + converter.convert(data, consumer); + Collection updates = consumer.updateTypes(); - assertThat(addCols).hasSize(2); + assertThat(updates).hasSize(2); Map updateMap = Maps.newHashMap(); - addCols.forEach(update -> updateMap.put(update.name(), update)); + updates.forEach(update -> updateMap.put(update.name(), update)); assertThat(updateMap.get("st.ii").type()).isInstanceOf(LongType.class); assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); } - @Test - public void testMissingColumnDetectionStructNested() { - Table table = mock(Table.class); - when(table.schema()).thenReturn(ID_SCHEMA); - RecordConverter converter = new RecordConverter(table, config); - - Struct nestedData = createNestedStructData(); - List addCols = Lists.newArrayList(); - converter.convert(nestedData, addCols::add); - - assertThat(addCols).hasSize(1); - - AddColumn addCol = (AddColumn) addCols.get(0); - assertThat(addCol.name()).isEqualTo("st"); - - StructType addedType = addCol.type().asStructType(); - assertThat(addedType.fields()).hasSize(15); - assertThat(addedType.field("i").type()).isInstanceOf(IntegerType.class); - assertThat(addedType.field("l").type()).isInstanceOf(LongType.class); - assertThat(addedType.field("d").type()).isInstanceOf(DateType.class); - assertThat(addedType.field("t").type()).isInstanceOf(TimeType.class); - assertThat(addedType.field("ts").type()).isInstanceOf(TimestampType.class); - assertThat(addedType.field("tsz").type()).isInstanceOf(TimestampType.class); - assertThat(addedType.field("fl").type()).isInstanceOf(FloatType.class); - assertThat(addedType.field("do").type()).isInstanceOf(DoubleType.class); - assertThat(addedType.field("dec").type()).isInstanceOf(DecimalType.class); - assertThat(addedType.field("s").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("u").type()).isInstanceOf(StringType.class); - assertThat(addedType.field("f").type()).isInstanceOf(BinaryType.class); - assertThat(addedType.field("b").type()).isInstanceOf(BinaryType.class); - assertThat(addedType.field("li").type()).isInstanceOf(ListType.class); - assertThat(addedType.field("ma").type()).isInstanceOf(MapType.class); - } - private Map createMapData() { return ImmutableMap.builder() .put("i", 1) diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java index 86713fe7..b74691a9 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/SchemaUtilsTest.java @@ -30,9 +30,6 @@ import static org.mockito.Mockito.when; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn; -import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional; -import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; @@ -105,15 +102,14 @@ public void testApplySchemaUpdates() { when(table.updateSchema()).thenReturn(updateSchema); // the updates to "i" should be ignored as it already exists and is the same type - List updates = - ImmutableList.of( - new AddColumn(null, "i", IntegerType.get()), - new UpdateType("i", IntegerType.get()), - new MakeOptional("i"), - new UpdateType("f", DoubleType.get()), - new AddColumn(null, "s", StringType.get())); - - SchemaUtils.applySchemaUpdates(table, updates); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn(null, "i", IntegerType.get()); + consumer.updateType("i", IntegerType.get()); + consumer.makeOptional("i"); + consumer.updateType("f", DoubleType.get()); + consumer.addColumn(null, "s", StringType.get()); + + SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); verify(table).updateSchema(); @@ -136,15 +132,14 @@ public void testApplyNestedSchemaUpdates() { when(table.updateSchema()).thenReturn(updateSchema); // the updates to "st.i" should be ignored as it already exists and is the same type - List updates = - ImmutableList.of( - new AddColumn("st", "i", IntegerType.get()), - new UpdateType("st.i", IntegerType.get()), - new MakeOptional("st.i"), - new UpdateType("st.f", DoubleType.get()), - new AddColumn("st", "s", StringType.get())); - - SchemaUtils.applySchemaUpdates(table, updates); + SchemaUpdate.Consumer consumer = new SchemaUpdate.Consumer(); + consumer.addColumn("st", "i", IntegerType.get()); + consumer.updateType("st.i", IntegerType.get()); + consumer.makeOptional("st.i"); + consumer.updateType("st.f", DoubleType.get()); + consumer.addColumn("st", "s", StringType.get()); + + SchemaUtils.applySchemaUpdates(table, consumer); verify(table).refresh(); verify(table).updateSchema(); @@ -168,7 +163,7 @@ public void testApplySchemaUpdatesNoUpdates() { verify(table, times(0)).refresh(); verify(table, times(0)).updateSchema(); - SchemaUtils.applySchemaUpdates(table, ImmutableList.of()); + SchemaUtils.applySchemaUpdates(table, new SchemaUpdate.Consumer()); verify(table, times(0)).refresh(); verify(table, times(0)).updateSchema(); }