From e942a59a33b81bb0010dedf0bb942fd49eb6e551 Mon Sep 17 00:00:00 2001 From: Mathew Fournier <160646114+tabmatfournier@users.noreply.github.com> Date: Thu, 23 May 2024 11:12:11 -0700 Subject: [PATCH] log-records-on-table-actions (#256) - log kafka metadata for record that creates tables - log kafka metadata for record that causes schema evolution --- .../java/io/tabular/iceberg/connect/data/IcebergWriter.java | 6 ++++++ .../tabular/iceberg/connect/data/IcebergWriterFactory.java | 1 + 2 files changed, 7 insertions(+) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java index e9edf0e1..f0952bf3 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java @@ -32,8 +32,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IcebergWriter implements RecordWriter { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -91,6 +96,7 @@ private Record convertToRow(SinkRecord record) { flush(); // apply the schema updates, this will refresh the table SchemaUtils.applySchemaUpdates(table, updates); + LOG.info("Table schema evolution on table {} caused by record at topic: {}, partition: {}, offset: {}", table.name(), record.topic(), record.kafkaPartition(), record.kafkaOffset()); // initialize a new writer with the new schema initNewWriter(); // convert the row again, this time using the new table schema diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java index a11d1cf1..2bbf0788 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java @@ -105,6 +105,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { result.set( catalog.createTable( identifier, schema, partitionSpec, config.autoCreateProps())); + LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset()); } }); return result.get();