Skip to content

Commit

Permalink
Run coordinator in its own thread (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Jun 11, 2023
1 parent 0ca8528 commit 5fe818f
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package io.tabular.iceberg.connect;

import io.tabular.iceberg.connect.channel.Coordinator;
import io.tabular.iceberg.connect.channel.CoordinatorThread;
import io.tabular.iceberg.connect.channel.KafkaClientFactory;
import io.tabular.iceberg.connect.channel.NotRunningException;
import io.tabular.iceberg.connect.channel.Worker;
import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.Utilities;
Expand All @@ -41,7 +43,7 @@ public class IcebergSinkTask extends SinkTask {

private IcebergSinkConfig config;
private Catalog catalog;
private Coordinator coordinator;
private CoordinatorThread coordinatorThread;
private Worker worker;

@Override
Expand All @@ -61,8 +63,9 @@ public void open(Collection<TopicPartition> partitions) {

if (isLeader(partitions)) {
LOG.info("Task elected leader, starting commit coordinator");
coordinator = new Coordinator(catalog, config, clientFactory);
coordinator.start();
Coordinator coordinator = new Coordinator(catalog, config, clientFactory);
coordinatorThread = new CoordinatorThread(coordinator);
coordinatorThread.start();
}

LOG.info("Starting commit worker");
Expand All @@ -84,14 +87,18 @@ boolean isLeader(Collection<TopicPartition> partitions) {

@Override
public void close(Collection<TopicPartition> partitions) {
close();
}

private void close() {
if (worker != null) {
worker.stop();
worker = null;
}

if (coordinator != null) {
coordinator.stop();
coordinator = null;
if (coordinatorThread != null) {
coordinatorThread.terminate();
coordinatorThread = null;
}

if (catalog != null) {
Expand All @@ -111,12 +118,21 @@ public void put(Collection<SinkRecord> sinkRecords) {
if (sinkRecords != null && !sinkRecords.isEmpty() && worker != null) {
worker.save(sinkRecords);
}
coordinate();
processControlEvents();
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
coordinate();
processControlEvents();
}

private void processControlEvents() {
if (coordinatorThread != null && coordinatorThread.isTerminated()) {
throw new NotRunningException("Coordinator unexpectedly terminated");
}
if (worker != null) {
worker.process();
}
}

@Override
Expand All @@ -128,22 +144,8 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(
return worker.getCommitOffsets();
}

private void coordinate() {
if (worker != null) {
worker.process();
}
if (coordinator != null) {
coordinator.process();
}
}

@Override
public void stop() {
if (worker != null) {
worker.stop();
}
if (coordinator != null) {
coordinator.stop();
}
close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.Admin;
Expand Down Expand Up @@ -112,15 +111,7 @@ protected void send(List<Event> events, Map<TopicPartition, Offset> sourceOffset

protected abstract boolean receive(Envelope envelope);

public void process() {
consumeAvailable(this::receive, Duration.ZERO);
}

public void process(Duration pollDuration) {
consumeAvailable(this::receive, pollDuration);
}

protected void consumeAvailable(Function<Envelope, Boolean> eventHandler, Duration pollDuration) {
protected void consumeAvailable(Duration pollDuration) {
ConsumerRecords<String, byte[]> records = consumer.poll(pollDuration);
while (!records.isEmpty()) {
records.forEach(
Expand All @@ -132,7 +123,7 @@ record -> {
Event event = Event.decode(record.value());

LOG.debug("Received event of type: {}", event.getType().name());
if (eventHandler.apply(new Envelope(event, record.partition(), record.offset()))) {
if (receive(new Envelope(event, record.partition(), record.offset()))) {
LOG.info("Handled event of type: {}", event.getType().name());
}
});
Expand Down Expand Up @@ -161,7 +152,7 @@ public void start() {
consumer.subscribe(ImmutableList.of(controlTopic));

// initial poll with longer duration so the consumer will initialize...
process(Duration.ofMillis(1000));
consumeAvailable(Duration.ofMillis(1000));
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class Coordinator extends Channel {
private static final String CONTROL_OFFSETS_SNAPSHOT_PREFIX = "kafka.connect.control.offsets.";
private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commitId";
private static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts";
private static final Duration POLL_DURATION = Duration.ofMillis(1000);

private final Catalog catalog;
private final IcebergSinkConfig config;
Expand All @@ -86,7 +88,6 @@ public Coordinator(Catalog catalog, IcebergSinkConfig config, KafkaClientFactory
this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.getCommitThreads());
}

@Override
public void process() {
if (startTime == 0) {
startTime = System.currentTimeMillis();
Expand All @@ -101,7 +102,7 @@ public void process() {
startTime = System.currentTimeMillis();
}

super.process();
consumeAvailable(POLL_DURATION);

if (currentCommitId != null && isCommitTimedOut()) {
commit(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatorThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorThread.class);
private static final String THREAD_NAME = "iceberg-coord";

private Coordinator coordinator;
private volatile boolean terminated;

public CoordinatorThread(Coordinator coordinator) {
super(THREAD_NAME);
this.coordinator = coordinator;
}

@Override
public void run() {
try {
coordinator.start();
} catch (Exception e) {
LOG.error("Coordinator error during start, exiting thread", e);
terminated = true;
}

while (!terminated) {
try {
coordinator.process();
} catch (Exception e) {
LOG.error("Coordinator error during process, exiting thread", e);
terminated = true;
}
}

try {
coordinator.stop();
} catch (Exception e) {
LOG.error("Coordinator error during stop, ignoring", e);
}
coordinator = null;
}

public boolean isTerminated() {
return terminated;
}

public void terminate() {
terminated = true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

public class NotRunningException extends RuntimeException {
public NotRunningException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.tabular.iceberg.connect.events.EventType;
import io.tabular.iceberg.connect.events.TableName;
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -99,6 +100,10 @@ public Map<TopicPartition, OffsetAndMetadata> getCommitOffsets() {
}
}

public void process() {
consumeAvailable(Duration.ZERO);
}

@Override
protected boolean receive(Envelope envelope) {
Event event = envelope.getEvent();
Expand Down

0 comments on commit 5fe818f

Please sign in to comment.