Skip to content

Commit

Permalink
Fixes for schema evolution with structs in collections (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Nov 29, 2023
1 parent c698a39 commit 64b824a
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.tabular.iceberg.connect.data;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.Consumer;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
Expand Down Expand Up @@ -82,18 +83,18 @@ private Record convertToRow(SinkRecord record) {
return recordConverter.convert(record.value());
}

List<SchemaUpdate> updates = Lists.newArrayList();
Record row = recordConverter.convert(record.value(), updates::add);
SchemaUpdate.Consumer updates = new Consumer();
Record row = recordConverter.convert(record.value(), updates);

if (!updates.isEmpty()) {
if (!updates.empty()) {
// complete the current file
flush();
// apply the schema updates, this will refresh the table
SchemaUtils.applySchemaUpdates(table, updates);
// initialize a new writer with the new schema
initNewWriter();
// convert the row again, this time using the new table schema
row = recordConverter.convert(record.value(), updates::add);
row = recordConverter.convert(record.value(), null);
}

return row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.SchemaUpdate.AddColumn;
import io.tabular.iceberg.connect.data.SchemaUpdate.MakeOptional;
import io.tabular.iceberg.connect.data.SchemaUpdate.UpdateType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
Expand All @@ -46,7 +43,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -94,7 +90,7 @@ public Record convert(Object data) {
return convert(data, null);
}

public Record convert(Object data, Consumer<SchemaUpdate> schemaUpdateConsumer) {
public Record convert(Object data, SchemaUpdate.Consumer schemaUpdateConsumer) {
if (data instanceof Struct || data instanceof Map) {
return convertStructValue(data, tableSchema.asStruct(), -1, schemaUpdateConsumer);
}
Expand All @@ -107,7 +103,7 @@ private NameMapping createNameMapping(Table table) {
}

private Object convertValue(
Object value, Type type, int fieldId, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, Type type, int fieldId, SchemaUpdate.Consumer schemaUpdateConsumer) {
if (value == null) {
return null;
}
Expand Down Expand Up @@ -151,7 +147,7 @@ protected GenericRecord convertStructValue(
Object value,
StructType schema,
int parentFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
if (value instanceof Map) {
return convertToStruct((Map<?, ?>) value, schema, parentFieldId, schemaUpdateConsumer);
} else if (value instanceof Struct) {
Expand All @@ -164,7 +160,7 @@ private GenericRecord convertToStruct(
Map<?, ?> map,
StructType schema,
int structFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
GenericRecord result = GenericRecord.create(schema);
map.forEach(
(recordFieldNameObj, recordFieldValue) -> {
Expand All @@ -178,8 +174,7 @@ private GenericRecord convertToStruct(
if (type.isPresent()) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordFieldName, type.get()));
schemaUpdateConsumer.addColumn(parentFieldName, recordFieldName, type.get());
}
}
} else {
Expand All @@ -199,7 +194,7 @@ private GenericRecord convertToStruct(
Struct struct,
StructType schema,
int structFieldId,
Consumer<SchemaUpdate> schemaUpdateConsumer) {
SchemaUpdate.Consumer schemaUpdateConsumer) {
GenericRecord result = GenericRecord.create(schema);
struct
.schema()
Expand All @@ -213,8 +208,7 @@ private GenericRecord convertToStruct(
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordField.name(), type));
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type);
}
} else {
boolean hasSchemaUpdates = false;
Expand All @@ -224,13 +218,13 @@ private GenericRecord convertToStruct(
SchemaUtils.needsDataTypeUpdate(tableField.type(), recordField.schema());
if (evolveDataType != null) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new UpdateType(fieldName, evolveDataType));
schemaUpdateConsumer.updateType(fieldName, evolveDataType);
hasSchemaUpdates = true;
}
// make optional if needed and schema evolution is on
if (tableField.isRequired() && recordField.schema().isOptional()) {
String fieldName = tableSchema.findColumnName(tableField.fieldId());
schemaUpdateConsumer.accept(new MakeOptional(fieldName));
schemaUpdateConsumer.makeOptional(fieldName);
hasSchemaUpdates = true;
}
}
Expand Down Expand Up @@ -277,7 +271,7 @@ private Map<String, NestedField> createStructNameMap(StructType schema) {
}

protected List<Object> convertListValue(
Object value, ListType type, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, ListType type, SchemaUpdate.Consumer schemaUpdateConsumer) {
Preconditions.checkArgument(value instanceof List);
List<?> list = (List<?>) value;
return list.stream()
Expand All @@ -290,14 +284,14 @@ protected List<Object> convertListValue(
}

protected Map<Object, Object> convertMapValue(
Object value, MapType type, Consumer<SchemaUpdate> schemaUpdateConsumer) {
Object value, MapType type, SchemaUpdate.Consumer schemaUpdateConsumer) {
Preconditions.checkArgument(value instanceof Map);
Map<?, ?> map = (Map<?, ?>) value;
Map<Object, Object> result = Maps.newHashMap();
map.forEach(
(k, v) -> {
int keyFieldId = type.fields().get(0).fieldId();
int valueFieldId = type.fields().get(0).fieldId();
int valueFieldId = type.fields().get(1).fieldId();
result.put(
convertValue(k, type.keyType(), keyFieldId, schemaUpdateConsumer),
convertValue(v, type.valueType(), valueFieldId, schemaUpdateConsumer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,49 @@
*/
package io.tabular.iceberg.connect.data;

import java.util.Collection;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;

public class SchemaUpdate {

public static class Consumer {
private final Map<String, AddColumn> addColumns = Maps.newHashMap();
private final Map<String, UpdateType> updateTypes = Maps.newHashMap();
private final Map<String, MakeOptional> makeOptionals = Maps.newHashMap();

public Collection<AddColumn> addColumns() {
return addColumns.values();
}

public Collection<UpdateType> updateTypes() {
return updateTypes.values();
}

public Collection<MakeOptional> makeOptionals() {
return makeOptionals.values();
}

public boolean empty() {
return addColumns.isEmpty() && updateTypes.isEmpty() && makeOptionals.isEmpty();
}

public void addColumn(String parentName, String name, Type type) {
AddColumn addCol = new AddColumn(parentName, name, type);
addColumns.put(addCol.key(), addCol);
}

public void updateType(String name, PrimitiveType type) {
updateTypes.put(name, new UpdateType(name, type));
}

public void makeOptional(String name) {
makeOptionals.put(name, new MakeOptional(name));
}
}

public static class AddColumn extends SchemaUpdate {
private final String parentName;
private final String name;
Expand All @@ -42,6 +80,10 @@ public String name() {
return name;
}

public String key() {
return parentName == null ? name : parentName + "." + name;
}

public Type type() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, Schema
return null;
}

public static void applySchemaUpdates(Table table, List<SchemaUpdate> updates) {
if (updates == null || updates.isEmpty()) {
public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
if (updates == null || updates.empty()) {
// no updates to apply
return;
}
Expand All @@ -93,31 +93,25 @@ public static void applySchemaUpdates(Table table, List<SchemaUpdate> updates) {
.run(notUsed -> commitSchemaUpdates(table, updates));
}

private static void commitSchemaUpdates(Table table, List<SchemaUpdate> updates) {
private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updates) {
// get the latest schema in case another process updated it
table.refresh();

// filter out columns that have already been added
List<AddColumn> addColumns =
updates.stream()
.filter(update -> update instanceof AddColumn)
.map(update -> (AddColumn) update)
updates.addColumns().stream()
.filter(addCol -> !columnExists(table.schema(), addCol))
.collect(toList());

// filter out columns that have the updated type
List<UpdateType> updateTypes =
updates.stream()
.filter(update -> update instanceof UpdateType)
.map(update -> (UpdateType) update)
updates.updateTypes().stream()
.filter(updateType -> !typeMatches(table.schema(), updateType))
.collect(toList());

// filter out columns that have already been made optional
List<MakeOptional> makeOptionals =
updates.stream()
.filter(update -> update instanceof MakeOptional)
.map(update -> (MakeOptional) update)
updates.makeOptionals().stream()
.filter(makeOptional -> !isOptional(table.schema(), makeOptional))
.collect(toList());

Expand Down
Loading

0 comments on commit 64b824a

Please sign in to comment.