Skip to content

Commit

Permalink
Specify control offsets with group ID (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jun 28, 2023
1 parent 59095a1 commit 7bcd992
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.3.9-SNAPSHOT"
version "0.4.0"

repositories {
mavenCentral()
Expand Down
4 changes: 2 additions & 2 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ When a task starts up, the consumer offsets are initialized to those in the sink

#### Control topic

On coordinator startup, the control topic offsets are restored from the consumer group. Any data files events added after the offsets are processed during startup. If the consumer group had not yet been initialized, then the coordinator’s consumer starts reading from the latest.
On coordinator startup, the control topic offsets are restored from the consumer group. Any data files events added after the offsets are processed during startup. If the consumer group had not yet been initialized, then the coordinator’s consumer starts reading from the latest.

The control topic offsets are also stored in the Iceberg snapshot as a summary property. Before committing to a table, this property is read from the table. Only data files events with offsets after this value are committed to the table.

Expand Down Expand Up @@ -110,7 +110,7 @@ The connector has exactly-once semantics. Workers ensure this by sending the dat

If a task encounters a very heavy GC cycle during a transaction that causes a pause longer than the consumer session timeout (45 seconds by default), a partition might be assigned to a different task even though the “zombie” is still alive (but in a degraded state).

In this circumstance, the new worker starts reading from the current committed offsets. When the zombie starts processing again, it complete the commit. This could lead to duplicates in this extreme case. Zombie fencing will be targeted for a future release.
In this circumstance, the new worker starts reading from the current committed offsets. When the zombie starts processing again, it complete the commit. This could lead to duplicates in this extreme case. Zombie fencing will be targeted for a future release.

## Error Handling

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class Event implements Element {
private UUID id;
private EventType type;
private Long timestamp;
private String connector;
private String groupId;
private Payload payload;
private Schema avroSchema;

Expand All @@ -56,11 +56,11 @@ public Event(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public Event(String connector, EventType type, Payload payload) {
public Event(String groupId, EventType type, Payload payload) {
this.id = UUID.randomUUID();
this.type = type;
this.timestamp = System.currentTimeMillis();
this.connector = connector;
this.groupId = groupId;
this.payload = payload;

this.avroSchema =
Expand All @@ -85,10 +85,9 @@ public Event(String connector, EventType type, Payload payload) {
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type(payload.getSchema())
.noDefault()
.name("connector")
.name("groupId")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable() // for backwards compatibility
.stringType()
.noDefault()
.endRecord();
Expand All @@ -110,8 +109,8 @@ public Payload getPayload() {
return payload;
}

public String getConnector() {
return connector;
public String getGroupId() {
return groupId;
}

@Override
Expand All @@ -135,7 +134,7 @@ public void put(int i, Object v) {
this.payload = (Payload) v;
return;
case 4:
this.connector = v == null ? null : v.toString();
this.groupId = v == null ? null : v.toString();
return;
default:
// ignore the object, it must be from a newer version of the format
Expand All @@ -154,7 +153,7 @@ public Object get(int i) {
case 3:
return payload;
case 4:
return connector;
return groupId;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class EventSerializationTest {
public void testCommitRequestSerialization() {
UUID commitId = UUID.randomUUID();
Event event =
new Event("connector", EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId));
new Event("cg-connector", EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId));

byte[] data = Event.encode(event);
Event result = Event.decode(data);
Expand All @@ -49,7 +49,7 @@ public void testCommitResponseSerialization() {
UUID commitId = UUID.randomUUID();
Event event =
new Event(
"connector",
"cg-connector",
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
StructType.of(),
Expand All @@ -76,7 +76,7 @@ public void testCommitReadySerialization() {
UUID commitId = UUID.randomUUID();
Event event =
new Event(
"connector",
"cg-connector",
EventType.COMMIT_READY,
new CommitReadyPayload(
commitId,
Expand All @@ -99,7 +99,7 @@ public void testCommitTableSerialization() {
UUID commitId = UUID.randomUUID();
Event event =
new Event(
"connector",
"cg-connector",
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, 2L));
Expand All @@ -119,7 +119,8 @@ public void testCommitTableSerialization() {
public void testCommitCompleteSerialization() {
UUID commitId = UUID.randomUUID();
Event event =
new Event("connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, 2L));
new Event(
"cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, 2L));

byte[] data = Event.encode(event);
Event result = Event.decode(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected void assertSnapshotProps(TableIdentifier tableIdentifier) {
new Condition<String>() {
@Override
public boolean matches(String str) {
return str.startsWith("kafka.connect.control.offsets.");
return str.startsWith("kafka.connect.offsets.");
}
});
assertThat(props).containsKey("kafka.connect.commitId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class Channel {

private final String controlTopic;
private final String controlGroupId;
private final String connector;
private final String groupId;
private final Producer<String, byte[]> producer;
private final Consumer<String, byte[]> consumer;
private final Admin admin;
Expand All @@ -61,7 +61,7 @@ public Channel(
KafkaClientFactory clientFactory) {
this.controlTopic = config.getControlTopic();
this.controlGroupId = config.getControlGroupId();
this.connector = config.getConnectorName();
this.groupId = config.getControlGroupId();

String transactionalId = name + config.getTransactionalSuffix();
this.producer = clientFactory.createProducer(transactionalId);
Expand Down Expand Up @@ -124,7 +124,7 @@ record -> {

Event event = Event.decode(record.value());

if (event.getConnector() == null || event.getConnector().equals(connector)) {
if (event.getGroupId().equals(groupId)) {
LOG.debug("Received event of type: {}", event.getType().name());
if (receive(new Envelope(event, record.partition(), record.offset()))) {
LOG.info("Handled event of type: {}", event.getType().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.tabular.iceberg.connect.channel;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class Coordinator extends Channel {

private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String CONTROL_OFFSETS_SNAPSHOT_PREFIX = "kafka.connect.control.offsets.";
private static final String OFFSETS_SNAPSHOT_PROP_FMT = "kafka.connect.offsets.%s.%s";
private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commitId";
private static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts";
private static final Duration POLL_DURATION = Duration.ofMillis(1000);
Expand All @@ -75,7 +76,8 @@ public Coordinator(Catalog catalog, IcebergSinkConfig config, KafkaClientFactory
this.catalog = catalog;
this.config = config;
this.totalPartitionCount = getTotalPartitionCount();
this.snapshotOffsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic();
this.snapshotOffsetsProp =
format(OFFSETS_SNAPSHOT_PROP_FMT, config.getControlTopic(), config.getControlGroupId());
this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.getCommitThreads());
this.commitState = new CommitState(config);
}
Expand All @@ -86,7 +88,7 @@ public void process() {
commitState.startNewCommit();
Event event =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_REQUEST,
new CommitRequestPayload(commitState.getCurrentCommitId()));
send(event);
Expand Down Expand Up @@ -160,7 +162,7 @@ private void doCommit(boolean partialCommit) {

Event event =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_COMPLETE,
new CommitCompletePayload(commitState.getCurrentCommitId(), vtts));
send(event);
Expand Down Expand Up @@ -243,7 +245,7 @@ private void commitToTable(
Long snapshotId = table.currentSnapshot().snapshotId();
Event event =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitState.getCurrentCommitId(),
Expand All @@ -264,12 +266,10 @@ private void commitToTable(
private Map<Integer, Long> getLastCommittedOffsetsForTable(Table table) {
// TODO: support branches

String offsetsProp = CONTROL_OFFSETS_SNAPSHOT_PREFIX + config.getControlTopic();
Snapshot snapshot = table.currentSnapshot();

while (snapshot != null) {
Map<String, String> summary = snapshot.summary();
String value = summary.get(offsetsProp);
String value = summary.get(snapshotOffsetsProp);
if (value != null) {
TypeReference<Map<Integer, Long>> typeRef = new TypeReference<Map<Integer, Long>>() {};
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected boolean receive(Envelope envelope) {
.map(
writeResult ->
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
writeResult.getPartitionStruct(),
Expand All @@ -160,7 +160,7 @@ protected boolean receive(Envelope envelope) {

Event readyEvent =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(commitId, assignments));
events.add(readyEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() {
when(config.getControlTopic()).thenReturn(CTL_TOPIC_NAME);
when(config.getControlGroupId()).thenReturn("group");
when(config.getCommitThreads()).thenReturn(1);
when(config.getConnectorName()).thenReturn("connector");
when(config.getControlGroupId()).thenReturn("cg-connector");

TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
when(partitionInfo.partition()).thenReturn(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testCommitAppend() {

ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(appendOp, times(3)).set(captor.capture(), notNull());
assertThat(captor.getAllValues().get(0)).startsWith("kafka.connect.control.offsets.");
assertThat(captor.getAllValues().get(0)).startsWith("kafka.connect.offsets.");
assertEquals("kafka.connect.commitId", captor.getAllValues().get(1));
assertEquals("kafka.connect.vtts", captor.getAllValues().get(2));

Expand All @@ -90,7 +90,7 @@ public void testCommitDelta() {

ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(deltaOp, times(3)).set(captor.capture(), notNull());
assertThat(captor.getAllValues().get(0)).startsWith("kafka.connect.control.offsets.");
assertThat(captor.getAllValues().get(0)).startsWith("kafka.connect.offsets.");
assertEquals("kafka.connect.commitId", captor.getAllValues().get(1));
assertEquals("kafka.connect.vtts", captor.getAllValues().get(2));
}
Expand Down Expand Up @@ -162,7 +162,7 @@ private UUID coordinatorTest(List<DataFile> dataFiles, List<DeleteFile> deleteFi

Event commitResponse =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
StructType.of(),
Expand All @@ -175,7 +175,7 @@ private UUID coordinatorTest(List<DataFile> dataFiles, List<DeleteFile> deleteFi

Event commitReady =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(
commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void workerTest(Map<String, Object> value) {
UUID commitId = UUID.randomUUID();
Event commitRequest =
new Event(
config.getConnectorName(),
config.getControlGroupId(),
EventType.COMMIT_REQUEST,
new CommitRequestPayload(commitId));
byte[] bytes = Event.encode(commitRequest);
Expand Down

0 comments on commit 7bcd992

Please sign in to comment.