Skip to content

Commit

Permalink
memory optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed May 20, 2023
1 parent 0f19ad2 commit 54dedb6
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public void teardown() {
@Test
public void testIcebergSink() throws Exception {
// set offset reset to earliest so we don't miss any test messages
// TODO: get bootstrap.servers from worker properties?
KafkaConnectContainer.Config connectorConfig =
new KafkaConnectContainer.Config(CONNECTOR_NAME)
.config("topics", TEST_TOPIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public void teardown() {
@Test
public void testIcebergSink() {
// set offset reset to earliest so we don't miss any test messages
// TODO: get bootstrap.servers from worker properties?
KafkaConnectContainer.Config connectorConfig =
new KafkaConnectContainer.Config(CONNECTOR_NAME)
.config("topics", TEST_TOPIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public void teardown() {
@Test
public void testIcebergSink() {
// set offset reset to earliest so we don't miss any test messages
// TODO: get bootstrap.servers from worker properties?
KafkaConnectContainer.Config connectorConfig =
new KafkaConnectContainer.Config(CONNECTOR_NAME)
.config("topics", TEST_TOPIC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void teardown() {
@Test
public void testIcebergSink() throws Exception {
// set offset reset to earliest so we don't miss any test messages
// TODO: get bootstrap.servers from worker properties?
KafkaConnectContainer.Config connectorConfig =
new KafkaConnectContainer.Config(CONNECTOR_NAME)
.config("topics", TEST_TOPIC)
Expand Down
2 changes: 1 addition & 1 deletion kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
compileOnly libs.bundles.kafka.connect

// REST catalog needs this when loading the FileIO
// TODO: remove with upgrade to Iceberg 1.3
// TODO: move to runtime with upgrade to Iceberg 1.3
runtimeOnly libs.hadoop.common

testImplementation libs.junit.api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import io.tabular.iceberg.connect.channel.events.EventType;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.AppendFiles;
Expand All @@ -59,7 +59,6 @@ public class Coordinator extends Channel {
private static final String CONTROL_OFFSETS_SNAPSHOT_PREFIX = "kafka.connect.control.offsets.";

private final Catalog catalog;
private final Map<TableIdentifier, Table> tableCache;
private final IcebergSinkConfig config;
private final List<Envelope> commitBuffer = new LinkedList<>();
private final List<CommitReadyPayload> readyBuffer = new LinkedList<>();
Expand All @@ -69,12 +68,13 @@ public class Coordinator extends Channel {
private final String snapshotOffsetsProp;
private final ExecutorService exec;

private static final Duration CACHE_TTL = Duration.ofMinutes(60);

public Coordinator(Catalog catalog, IcebergSinkConfig config) {
// pass consumer group ID to which we commit low watermark offsets
super("coordinator", config.getControlGroupId() + "-coord", config);

this.catalog = catalog;
this.tableCache = new ConcurrentHashMap<>(); // TODO: LRU cache
this.config = config;
this.totalPartitionCount = getTotalPartitionCount();
this.snapshotOffsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic();
Expand Down Expand Up @@ -208,9 +208,8 @@ private void doCommit() {

private void commitToTable(
TableIdentifier tableIdentifier, List<Envelope> envelopeList, String offsetsJson) {
Table table = getTable(tableIdentifier);
table.refresh();
Map<Integer, Long> commitedOffsets = getLastCommittedOffsetsForTable(tableIdentifier);
Table table = catalog.loadTable(tableIdentifier);
Map<Integer, Long> commitedOffsets = getLastCommittedOffsetsForTable(table);

List<CommitResponsePayload> payloads =
envelopeList.stream()
Expand Down Expand Up @@ -256,11 +255,10 @@ private void commitToTable(
}
}

private Map<Integer, Long> getLastCommittedOffsetsForTable(TableIdentifier tableIdentifier) {
private Map<Integer, Long> getLastCommittedOffsetsForTable(Table table) {
// TODO: support branches

String offsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic();
Table table = getTable(tableIdentifier);
Snapshot snapshot = table.currentSnapshot();

while (snapshot != null) {
Expand All @@ -279,9 +277,4 @@ private Map<Integer, Long> getLastCommittedOffsetsForTable(TableIdentifier table
}
return ImmutableMap.of();
}

private Table getTable(TableIdentifier tableIdentifier) {
return tableCache.computeIfAbsent(
tableIdentifier, notUsed -> catalog.loadTable(tableIdentifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public class Worker extends Channel {

private final Catalog catalog;
private final IcebergSinkConfig config;
private final Map<String, IcebergWriter> writers = new HashMap<>();
private final SinkTaskContext context;
private final String controlGroupId;
private final Map<String, Boolean> tableExistsMap = new HashMap<>();
private final Map<String, IcebergWriter> writers;
private final Map<String, Boolean> tableExistsMap;

public Worker(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context) {
// pass transient consumer group ID to which we never commit offsets
Expand All @@ -68,6 +68,8 @@ public Worker(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context
this.config = config;
this.context = context;
this.controlGroupId = config.getControlGroupId();
this.writers = new HashMap<>();
this.tableExistsMap = new HashMap<>();
}

public void syncCommitOffsets() {
Expand Down Expand Up @@ -95,11 +97,12 @@ protected boolean receive(Envelope envelope) {
return false;
}

tableExistsMap.clear(); // refresh in case of new or dropped tables

List<WriterResult> writeResults =
writers.values().stream().map(IcebergWriter::complete).collect(toList());

tableExistsMap.clear();
writers.clear();

Map<TopicPartition, Long> offsets = new HashMap<>();
writeResults.stream()
.flatMap(writerResult -> writerResult.getOffsets().entrySet().stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,12 @@ public WriterResult complete() {
throw new UncheckedIOException(e);
}

WriterResult result =
new WriterResult(
tableIdentifier,
Arrays.asList(writeResult.dataFiles()),
Arrays.asList(writeResult.deleteFiles()),
table.spec().partitionType(),
offsets);

table.refresh();
recordConverter = new RecordConverter(table, config.getJsonConverter());
writer = Utilities.createTableWriter(table, config);
offsets = new HashMap<>();

return result;
return new WriterResult(
tableIdentifier,
Arrays.asList(writeResult.dataFiles()),
Arrays.asList(writeResult.deleteFiles()),
table.spec().partitionType(),
offsets);
}

@Override
Expand Down

0 comments on commit 54dedb6

Please sign in to comment.