Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client-examples/e2e-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies {
implementation(libs.guava)
implementation(libs.slf4j.log4j2)
implementation(libs.bundles.scylla)
implementation(libs.jackson)
implementation(libs.bundles.jackson)
implementation(libs.mongodb.driver.core)

testImplementation(testlibs.bundles.base)
Expand Down
2 changes: 1 addition & 1 deletion kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ dependencies {
implementation("dev.responsive:controller-api:0.16.0")
implementation(libs.bundles.scylla)
implementation(libs.bundles.commons)
implementation(libs.jackson)
implementation(libs.bundles.jackson)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
implementation(libs.bundles.grpc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dev.responsive.kafka.internal.snapshot;

import org.apache.kafka.streams.processor.TaskId;

/**
* Interface that abstracts away how we lookup a task's current generation.
* For synchronized snapshots we want to do one of the following so we can record a task's
* generation metadata transactionally alongside the rows with new generation markers when
* we bump generations:
* (1) store this information in the offset metadata as part of the transaction that
* bumps the generation
* (2) if/when kafka supports 2pc store this information in another store (like rs3 or
* the snapshot store)
*
* For simple uncoordinated snapshots we'll support looking up the generation by looking
* at the snapshot's state in the generation store.
*/
public interface GenerationStorage {
long lookupGeneration(final TaskId taskId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package dev.responsive.kafka.internal.snapshot;

import java.util.List;
import java.util.Objects;

/**
* Implementation of SnapshotApi that directly interacts with the Snapshot Store rather
* than calling out to an API service.
*/
public class LocalSnapshotApi implements SnapshotApi {
private final SnapshotStore snapshotStore;

public LocalSnapshotApi(final SnapshotStore snapshotStore) {
this.snapshotStore = Objects.requireNonNull(snapshotStore);
}

@Override
public Snapshot createSnapshot() {
return snapshotStore.updateCurrentSnapshot(snapshot -> {
if (snapshot.state().equals(Snapshot.State.CREATED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess updateCurrentSnapshot guarantees that the latest snapshot is either in progress or has been completed. So failed snapshots would not show up here?

throw new RuntimeException("Snapshot is currently in progress");
}
return snapshot.nextSnapshot();
});
}

@Override
public Snapshot getCurrentSnapshot() {
return snapshotStore.currentSnapshot(true);
}

@Override
public List<Snapshot> getSnapshots() {
return snapshotStore.listSnapshots(true);
}

@Override
public void close() {
snapshotStore.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package dev.responsive.kafka.internal.snapshot;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.processor.TaskId;

/**
* An implementation of the orchestrator that runs within the application and interacts
* directly with the snapshot store.
*/
public class LocalSnapshotOrchestrator implements SnapshotOrchestrator {
private final SnapshotStore snapshotStore;
private final Set<TaskId> allTasks;

public LocalSnapshotOrchestrator(
final SnapshotStore snapshotStore,
final Set<TaskId> allTasks
) {
this.snapshotStore = Objects.requireNonNull(snapshotStore);
this.allTasks = Objects.requireNonNull(allTasks);
}

@Override
public long getCurrentGeneration() {
return snapshotStore.currentSnapshot(false).generation();
}

@Override
public void reportTaskSnapshotMetadata(
final long generation,
final List<Snapshot.TaskSnapshotMetadata> taskSnapshots
) {
snapshotStore.updateCurrentSnapshot(snapshot -> {
// check that we're still working on this snapshot
if (snapshot.generation() != generation) {
throw new RuntimeException(
String.format("generation too old: %d > %d", snapshot.generation(), generation));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the check above is inequality

}
if (!snapshot.state().equals(Snapshot.State.CREATED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use !=

throw new RuntimeException("Snapshot is currently completed. Cannot update");
}

// check that for all the specified tasks, we either haven't collected its metadata
// or the metadata is the same
final List<Snapshot.TaskSnapshotMetadata> newlyCompletedTaskSnapshots = new ArrayList<>();
for (final var taskSnapshot : taskSnapshots) {
final var found = snapshot.taskSnapshots()
.stream()
.filter(s -> s.taskId().equals(taskSnapshot.taskId()))
.collect(Collectors.toList());
if (found.size() > 1) {
throw new IllegalStateException(
"found multiple snapshots for task " + taskSnapshot.taskId());
}
if (found.isEmpty()) {
newlyCompletedTaskSnapshots.add(taskSnapshot);
} else if (!found.get(0).equals(taskSnapshot)) {
throw new IllegalStateException(
"found conflicting snapshots for task" + taskSnapshot.taskId());
}
}

// if we've collected snapshots for all tasks, mark the snapshot as completed
final Set<TaskId> completedTasks = Stream.concat(
newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId),
snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be nice to move this helper to get completed task Ids to Snapshot.

).collect(Collectors.toSet());
Snapshot.State state = snapshot.state();
if (completedTasks.equals(this.allTasks)) {
state = Snapshot.State.COMPLETED;
}

return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state);
});
}

@Override
public void failSnapshot(long snapshotGeneration) {
snapshotStore.updateCurrentSnapshot(snapshot -> {
if (snapshot.generation() != snapshotGeneration) {
// todo: do something more reasonable here
throw new RuntimeException("Generation mismatch");
}
if (snapshot.state().equals(Snapshot.State.COMPLETED)) {
throw new RuntimeException("Cannot fail completed snapshot");
}
return snapshot.withStateFailed();
});
}

@Override
public void close() {
snapshotStore.close();
}
}
Loading
Loading