diff --git a/build.gradle b/build.gradle index 9664541a..25d05b38 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ subprojects { apply plugin: "maven-publish" group "io.tabular.connect" - version "0.4.7-SNAPSHOT" + version "0.4.7" repositories { mavenCentral() diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java index fcf90eb5..e67dd774 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java @@ -18,6 +18,8 @@ */ package io.tabular.iceberg.connect.data; +import static java.lang.String.format; + import io.tabular.iceberg.connect.IcebergSinkConfig; import java.io.Closeable; import java.io.IOException; @@ -29,6 +31,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; public class IcebergWriter implements Closeable { @@ -59,8 +62,12 @@ public void write(SinkRecord record) { writer.write(new RecordWrapper(row, op)); } } - } catch (IOException e) { - throw new UncheckedIOException(e); + } catch (Exception e) { + throw new DataException( + format( + "An error occurred converting record, topic: %s, partition, %d, offset: %d", + record.topic(), record.kafkaPartition(), record.kafkaOffset()), + e); } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java index 3d47dd09..fccfb3d8 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/RecordConverter.java @@ -59,6 +59,7 @@ import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.types.Types.TimestampType; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.json.JsonConverter; @@ -164,7 +165,12 @@ private Object getFieldValue(Object value, String fieldName) { if (value instanceof Map) { return ((Map) value).get(fieldName); } else if (value instanceof Struct) { - return ((Struct) value).get(fieldName); + Struct struct = (Struct) value; + Field field = struct.schema().field(fieldName); + if (field == null) { + return null; + } + return struct.get(field); } throw new IllegalArgumentException("Cannot convert to struct: " + value.getClass().getName()); } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java index 27465f83..ef51bcd5 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/RecordConverterTest.java @@ -80,7 +80,11 @@ public class RecordConverterTest { Types.NestedField.required( 36, "ma", - Types.MapType.ofRequired(37, 38, Types.StringType.get(), Types.StringType.get()))); + Types.MapType.ofRequired(37, 38, Types.StringType.get(), Types.StringType.get())), + Types.NestedField.optional(39, "extra", Types.StringType.get())); + + // we have 1 unmapped column so exclude that from the count + private static final int MAPPED_CNT = SCHEMA.columns().size() - 1; private static final org.apache.iceberg.Schema NESTED_SCHEMA = new org.apache.iceberg.Schema( @@ -169,7 +173,7 @@ public void testMapToString() throws Exception { String str = (String) record.getField("st"); Map map = (Map) MAPPER.readValue(str, Map.class); - assertEquals(SCHEMA.columns().size(), map.size()); + assertEquals(MAPPED_CNT, map.size()); } @Test @@ -206,7 +210,7 @@ public void testStructToString() throws Exception { String str = (String) record.getField("st"); Map map = (Map) MAPPER.readValue(str, Map.class); - assertEquals(SCHEMA.columns().size(), map.size()); + assertEquals(MAPPED_CNT, map.size()); } @Test