From 54dedb63e9dc45bd197fa79f7f313c71a7158adf Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Fri, 19 May 2023 18:00:05 -0700 Subject: [PATCH] memory optimizations --- .../iceberg/connect/IntegrationCdcTest.java | 1 - .../connect/IntegrationDynamicTableTest.java | 1 - .../connect/IntegrationMultiTableTest.java | 1 - .../iceberg/connect/IntegrationTest.java | 1 - kafka-connect/build.gradle | 2 +- .../iceberg/connect/channel/Coordinator.java | 19 ++++++------------ .../iceberg/connect/channel/Worker.java | 11 ++++++---- .../iceberg/connect/data/IcebergWriter.java | 20 ++++++------------- 8 files changed, 20 insertions(+), 36 deletions(-) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java index 0e608f89..cbefe816 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationCdcTest.java @@ -85,7 +85,6 @@ public void teardown() { @Test public void testIcebergSink() throws Exception { // set offset reset to earliest so we don't miss any test messages - // TODO: get bootstrap.servers from worker properties? KafkaConnectContainer.Config connectorConfig = new KafkaConnectContainer.Config(CONNECTOR_NAME) .config("topics", TEST_TOPIC) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java index 63e653c7..2ca773b8 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationDynamicTableTest.java @@ -80,7 +80,6 @@ public void teardown() { @Test public void testIcebergSink() { // set offset reset to earliest so we don't miss any test messages - // TODO: get bootstrap.servers from worker properties? KafkaConnectContainer.Config connectorConfig = new KafkaConnectContainer.Config(CONNECTOR_NAME) .config("topics", TEST_TOPIC) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java index 8f3ce5b1..1ccb20a7 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationMultiTableTest.java @@ -79,7 +79,6 @@ public void teardown() { @Test public void testIcebergSink() { // set offset reset to earliest so we don't miss any test messages - // TODO: get bootstrap.servers from worker properties? KafkaConnectContainer.Config connectorConfig = new KafkaConnectContainer.Config(CONNECTOR_NAME) .config("topics", TEST_TOPIC) diff --git a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java index b0b2e376..18b6afa4 100644 --- a/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java +++ b/kafka-connect-runtime/src/test/java/io/tabular/iceberg/connect/IntegrationTest.java @@ -77,7 +77,6 @@ public void teardown() { @Test public void testIcebergSink() throws Exception { // set offset reset to earliest so we don't miss any test messages - // TODO: get bootstrap.servers from worker properties? KafkaConnectContainer.Config connectorConfig = new KafkaConnectContainer.Config(CONNECTOR_NAME) .config("topics", TEST_TOPIC) diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 78dfc456..ca78f77e 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -6,7 +6,7 @@ dependencies { compileOnly libs.bundles.kafka.connect // REST catalog needs this when loading the FileIO - // TODO: remove with upgrade to Iceberg 1.3 + // TODO: move to runtime with upgrade to Iceberg 1.3 runtimeOnly libs.hadoop.common testImplementation libs.junit.api diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 4c15944d..ff9cd196 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -31,11 +31,11 @@ import io.tabular.iceberg.connect.channel.events.EventType; import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import org.apache.iceberg.AppendFiles; @@ -59,7 +59,6 @@ public class Coordinator extends Channel { private static final String CONTROL_OFFSETS_SNAPSHOT_PREFIX = "kafka.connect.control.offsets."; private final Catalog catalog; - private final Map tableCache; private final IcebergSinkConfig config; private final List commitBuffer = new LinkedList<>(); private final List readyBuffer = new LinkedList<>(); @@ -69,12 +68,13 @@ public class Coordinator extends Channel { private final String snapshotOffsetsProp; private final ExecutorService exec; + private static final Duration CACHE_TTL = Duration.ofMinutes(60); + public Coordinator(Catalog catalog, IcebergSinkConfig config) { // pass consumer group ID to which we commit low watermark offsets super("coordinator", config.getControlGroupId() + "-coord", config); this.catalog = catalog; - this.tableCache = new ConcurrentHashMap<>(); // TODO: LRU cache this.config = config; this.totalPartitionCount = getTotalPartitionCount(); this.snapshotOffsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic(); @@ -208,9 +208,8 @@ private void doCommit() { private void commitToTable( TableIdentifier tableIdentifier, List envelopeList, String offsetsJson) { - Table table = getTable(tableIdentifier); - table.refresh(); - Map commitedOffsets = getLastCommittedOffsetsForTable(tableIdentifier); + Table table = catalog.loadTable(tableIdentifier); + Map commitedOffsets = getLastCommittedOffsetsForTable(table); List payloads = envelopeList.stream() @@ -256,11 +255,10 @@ private void commitToTable( } } - private Map getLastCommittedOffsetsForTable(TableIdentifier tableIdentifier) { + private Map getLastCommittedOffsetsForTable(Table table) { // TODO: support branches String offsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic(); - Table table = getTable(tableIdentifier); Snapshot snapshot = table.currentSnapshot(); while (snapshot != null) { @@ -279,9 +277,4 @@ private Map getLastCommittedOffsetsForTable(TableIdentifier table } return ImmutableMap.of(); } - - private Table getTable(TableIdentifier tableIdentifier) { - return tableCache.computeIfAbsent( - tableIdentifier, notUsed -> catalog.loadTable(tableIdentifier)); - } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java index 7eb627b7..59ef0900 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java @@ -55,10 +55,10 @@ public class Worker extends Channel { private final Catalog catalog; private final IcebergSinkConfig config; - private final Map writers = new HashMap<>(); private final SinkTaskContext context; private final String controlGroupId; - private final Map tableExistsMap = new HashMap<>(); + private final Map writers; + private final Map tableExistsMap; public Worker(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context) { // pass transient consumer group ID to which we never commit offsets @@ -68,6 +68,8 @@ public Worker(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context this.config = config; this.context = context; this.controlGroupId = config.getControlGroupId(); + this.writers = new HashMap<>(); + this.tableExistsMap = new HashMap<>(); } public void syncCommitOffsets() { @@ -95,11 +97,12 @@ protected boolean receive(Envelope envelope) { return false; } - tableExistsMap.clear(); // refresh in case of new or dropped tables - List writeResults = writers.values().stream().map(IcebergWriter::complete).collect(toList()); + tableExistsMap.clear(); + writers.clear(); + Map offsets = new HashMap<>(); writeResults.stream() .flatMap(writerResult -> writerResult.getOffsets().entrySet().stream()) diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java index 2c541fe0..7bc47607 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java @@ -105,20 +105,12 @@ public WriterResult complete() { throw new UncheckedIOException(e); } - WriterResult result = - new WriterResult( - tableIdentifier, - Arrays.asList(writeResult.dataFiles()), - Arrays.asList(writeResult.deleteFiles()), - table.spec().partitionType(), - offsets); - - table.refresh(); - recordConverter = new RecordConverter(table, config.getJsonConverter()); - writer = Utilities.createTableWriter(table, config); - offsets = new HashMap<>(); - - return result; + return new WriterResult( + tableIdentifier, + Arrays.asList(writeResult.dataFiles()), + Arrays.asList(writeResult.deleteFiles()), + table.spec().partitionType(), + offsets); } @Override