Skip to content

Commit

Permalink
Handle null and empty values better when inferring schema (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Nov 22, 2023
1 parent 92e4d98 commit afb2aaf
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.Tasks;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +69,10 @@ public RecordWriter createWriter(
Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType = SchemaUtils.inferIcebergType(sample.value(), config).asStructType();
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -171,12 +172,15 @@ private GenericRecord convertToStruct(
NestedField tableField = lookupStructField(recordFieldName, schema, structFieldId);
if (tableField == null) {
// add the column if schema evolution is on, otherwise skip the value,
// skip the add column if the value is null as we can't infer the type
if (schemaUpdateConsumer != null && recordFieldValue != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.inferIcebergType(recordFieldValue, config);
schemaUpdateConsumer.accept(new AddColumn(parentFieldName, recordFieldName, type));
// skip the add column if we can't infer the type
if (schemaUpdateConsumer != null) {
Optional<Type> type = SchemaUtils.inferIcebergType(recordFieldValue, config);
if (type.isPresent()) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
schemaUpdateConsumer.accept(
new AddColumn(parentFieldName, recordFieldName, type.get()));
}
}
} else {
result.setField(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -214,7 +216,7 @@ public static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) {
return new SchemaGenerator(config).toIcebergType(valueSchema);
}

public static Type inferIcebergType(Object value, IcebergSinkConfig config) {
public static Optional<Type> inferIcebergType(Object value, IcebergSinkConfig config) {
return new SchemaGenerator(config).inferIcebergType(value);
}

Expand Down Expand Up @@ -292,10 +294,14 @@ Type toIcebergType(Schema valueSchema) {
}
}

Optional<Type> inferIcebergType(Object value) {
return Optional.ofNullable(doInferIcebergType(value));
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
Type inferIcebergType(Object value) {
private Type doInferIcebergType(Object value) {
if (value == null) {
throw new UnsupportedOperationException("Cannot infer type from null value");
return null;
} else if (value instanceof String) {
return StringType.get();
} else if (value instanceof Boolean) {
Expand All @@ -321,27 +327,33 @@ Type inferIcebergType(Object value) {
return TimestampType.withoutZone();
} else if (value instanceof List) {
List<?> list = (List<?>) value;
if (!list.isEmpty()) {
Type elementType = inferIcebergType(list.get(0));
return ListType.ofOptional(nextId(), elementType);
} else {
return ListType.ofOptional(nextId(), StringType.get());
if (list.isEmpty()) {
return null;
}
Optional<Type> elementType = inferIcebergType(list.get(0));
return elementType.map(type -> ListType.ofOptional(nextId(), type)).orElse(null);
} else if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
List<NestedField> structFields =
map.entrySet().stream()
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
.map(
entry ->
NestedField.optional(
nextId(),
entry.getKey().toString(),
inferIcebergType(entry.getValue())))
entry -> {
Optional<Type> valueType = inferIcebergType(entry.getValue());
return valueType
.map(
type ->
NestedField.optional(nextId(), entry.getKey().toString(), type))
.orElse(null);
})
.filter(Objects::nonNull)
.collect(toList());
if (structFields.isEmpty()) {
return null;
}
return StructType.of(structFields);
} else {
return StringType.get();
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.data;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.matches;
Expand All @@ -38,12 +37,15 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -233,38 +235,64 @@ public void testToIcebergType(boolean forceOptional) {
public void testInferIcebergType() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);

assertThatThrownBy(() -> SchemaUtils.inferIcebergType(null, config))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Cannot infer type from null value");
assertThat(SchemaUtils.inferIcebergType(1, config).get()).isInstanceOf(LongType.class);
assertThat(SchemaUtils.inferIcebergType(1L, config).get()).isInstanceOf(LongType.class);
assertThat(SchemaUtils.inferIcebergType(1.1f, config).get()).isInstanceOf(DoubleType.class);
assertThat(SchemaUtils.inferIcebergType(1.1d, config).get()).isInstanceOf(DoubleType.class);
assertThat(SchemaUtils.inferIcebergType("foobar", config).get()).isInstanceOf(StringType.class);
assertThat(SchemaUtils.inferIcebergType(true, config).get()).isInstanceOf(BooleanType.class);
assertThat(SchemaUtils.inferIcebergType(LocalDate.now(), config).get())
.isInstanceOf(DateType.class);
assertThat(SchemaUtils.inferIcebergType(LocalTime.now(), config).get())
.isInstanceOf(TimeType.class);

assertThat(SchemaUtils.inferIcebergType(1, config)).isInstanceOf(LongType.class);
assertThat(SchemaUtils.inferIcebergType(1L, config)).isInstanceOf(LongType.class);
assertThat(SchemaUtils.inferIcebergType(1.1f, config)).isInstanceOf(DoubleType.class);
assertThat(SchemaUtils.inferIcebergType(1.1d, config)).isInstanceOf(DoubleType.class);
assertThat(SchemaUtils.inferIcebergType("foobar", config)).isInstanceOf(StringType.class);
assertThat(SchemaUtils.inferIcebergType(true, config)).isInstanceOf(BooleanType.class);
assertThat(SchemaUtils.inferIcebergType(LocalDate.now(), config)).isInstanceOf(DateType.class);
assertThat(SchemaUtils.inferIcebergType(LocalTime.now(), config)).isInstanceOf(TimeType.class);

Type timestampType = SchemaUtils.inferIcebergType(new java.util.Date(), config);
Type timestampType = SchemaUtils.inferIcebergType(new java.util.Date(), config).get();
assertThat(timestampType).isInstanceOf(TimestampType.class);
assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue();

timestampType = SchemaUtils.inferIcebergType(OffsetDateTime.now(), config);
timestampType = SchemaUtils.inferIcebergType(OffsetDateTime.now(), config).get();
assertThat(timestampType).isInstanceOf(TimestampType.class);
assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue();

timestampType = SchemaUtils.inferIcebergType(LocalDateTime.now(), config);
timestampType = SchemaUtils.inferIcebergType(LocalDateTime.now(), config).get();
assertThat(timestampType).isInstanceOf(TimestampType.class);
assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isFalse();

Type decimalType = SchemaUtils.inferIcebergType(new BigDecimal("12.345"), config);
Type decimalType = SchemaUtils.inferIcebergType(new BigDecimal("12.345"), config).get();
assertThat(decimalType).isInstanceOf(DecimalType.class);
assertThat(((DecimalType) decimalType).scale()).isEqualTo(3);

assertThat(SchemaUtils.inferIcebergType(ImmutableList.of("foobar"), config))
assertThat(SchemaUtils.inferIcebergType(ImmutableList.of("foobar"), config).get())
.isInstanceOf(ListType.class);
assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("foo", "bar"), config))
assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("foo", "bar"), config).get())
.isInstanceOf(StructType.class);
}

@Test
public void testInferIcebergTypeEmpty() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);

// skip infer for null
assertThat(SchemaUtils.inferIcebergType(null, config)).isNotPresent();

// skip infer for empty list
assertThat(SchemaUtils.inferIcebergType(ImmutableList.of(), config)).isNotPresent();
// skip infer for list if first element is null
List<?> list = Lists.newArrayList();
list.add(null);
assertThat(SchemaUtils.inferIcebergType(list, config)).isNotPresent();
// skip infer for list if first element is an empty object
assertThat(SchemaUtils.inferIcebergType(ImmutableList.of(ImmutableMap.of()), config))
.isNotPresent();

// skip infer for empty object
assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of(), config)).isNotPresent();
// skip infer for object if values are null
Map<String, ?> map = Maps.newHashMap();
map.put("col", null);
assertThat(SchemaUtils.inferIcebergType(map, config)).isNotPresent();
// skip infer for object if values are empty objects
assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("nested", ImmutableMap.of()), config))
.isNotPresent();
}
}

0 comments on commit afb2aaf

Please sign in to comment.