Skip to content

Commit

Permalink
Move commit state to separate class (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jun 12, 2023
1 parent 889a04d commit 93a7768
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 158 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.3.4-SNAPSHOT"
version "0.3.4"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import static java.util.stream.Collectors.groupingBy;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.events.CommitReadyPayload;
import io.tabular.iceberg.connect.events.CommitResponsePayload;
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitState {
private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);

private final List<Envelope> commitBuffer = new LinkedList<>();
private final List<CommitReadyPayload> readyBuffer = new LinkedList<>();
private long startTime;
private UUID currentCommitId;
private final IcebergSinkConfig config;

public CommitState(IcebergSinkConfig config) {
this.config = config;
}

public void addResponse(Envelope envelope) {
commitBuffer.add(envelope);
if (!isCommitInProgress()) {
LOG.warn(
"Received commit response when no commit in progress, this can happen during recovery");
}
}

public void addReady(Envelope envelope) {
readyBuffer.add((CommitReadyPayload) envelope.getEvent().getPayload());
if (!isCommitInProgress()) {
LOG.warn("Received commit ready when no commit in progress, this can happen during recovery");
}
}

public UUID getCurrentCommitId() {
return currentCommitId;
}

public boolean isCommitInProgress() {
return currentCommitId != null;
}

public boolean isCommitIntervalReached() {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}

return (!isCommitInProgress()
&& System.currentTimeMillis() - startTime >= config.getCommitIntervalMs());
}

public void startNewCommit() {
currentCommitId = UUID.randomUUID();
startTime = System.currentTimeMillis();
}

public void endCurrentCommit() {
readyBuffer.clear();
currentCommitId = null;
}

public void clearResponses() {
commitBuffer.clear();
}

public boolean isCommitTimedOut() {
if (!isCommitInProgress()) {
return false;
}

if (System.currentTimeMillis() - startTime > config.getCommitTimeoutMs()) {
LOG.info("Commit timeout reached");
return true;
}
return false;
}

public boolean isCommitReady(int expectedPartitionCount) {
if (!isCommitInProgress()) {
return false;
}

int receivedPartitionCount =
readyBuffer.stream()
.filter(payload -> payload.getCommitId().equals(currentCommitId))
.mapToInt(payload -> payload.getAssignments().size())
.sum();

if (receivedPartitionCount >= expectedPartitionCount) {
LOG.info(
"Commit {} ready, received responses for all {} partitions",
currentCommitId,
receivedPartitionCount);
return true;
}

LOG.info(
"Commit {} not ready, received responses for {} of {} partitions, waiting for more",
currentCommitId,
receivedPartitionCount,
expectedPartitionCount);

return false;
}

public Map<TableIdentifier, List<Envelope>> getTableCommitMap() {
return commitBuffer.stream()
.collect(
groupingBy(
envelope ->
((CommitResponsePayload) envelope.getEvent().getPayload())
.getTableName()
.toIdentifier()));
}

public Long getVtts(boolean partialCommit) {
boolean validVtts =
!partialCommit
&& readyBuffer.stream()
.flatMap(event -> event.getAssignments().stream())
.allMatch(offset -> offset.getTimestamp() != null);

Long result;
if (validVtts) {
result =
readyBuffer.stream()
.flatMap(event -> event.getAssignments().stream())
.mapToLong(TopicPartitionOffset::getTimestamp)
.min()
.getAsLong();
} else {
result = null;
}
return result;
}
}
Loading

0 comments on commit 93a7768

Please sign in to comment.