Skip to content

Commit 0a2f2cf

Browse files
committed
Add some support types for snapshot orchestration.
This patch adds a number of support types that are used to coordinate execution of a snapshot. They correspond to the entities defined in https://www.notion.so/responsivedev/Snapshot-Protocol-1a627549be4c8075a909cfd05adae3af SnapshotStore defines an interface for a store for Snapshot metadata. Its essentially a table of all the snapshots taken of an application keyed by generation. TopicSnapshotStore implements SnapshotStore by writing snapshot metadata to a topic and using a Kafka transaction to do transactional updates to the current snapshot. SnapshotOrchestrator defines the interface between Kafka Streams tasks and the snapshot orchestration process. Its how the tasks discover what the target generation should be, and how they report their task snapshot metadata (offsets+checkpoints). LocalSnapshotOrchestrator implements SnapshotOrchestrator for settings where the orchestrator runs embedded in the application. SnapshotApi defines the interface for interacting with snapshots from outside the application. LocalSnapshotApi implements SnapshotApi for settings where the api interacts directly with the SnapshotStore (rather than going through another api service like we might for responsive cloud) - e.g. in settings where we're just orchestrating snapshots locally by storing metadata in a topic using TopicSnapshotStore.
1 parent 339c583 commit 0a2f2cf

24 files changed

+1732
-5
lines changed

kafka-client-examples/e2e-test/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ dependencies {
3131
implementation(libs.guava)
3232
implementation(libs.slf4j.log4j2)
3333
implementation(libs.bundles.scylla)
34-
implementation(libs.jackson)
34+
implementation(libs.bundles.jackson)
3535
implementation(libs.mongodb.driver.core)
3636

3737
testImplementation(testlibs.bundles.base)

kafka-client/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ dependencies {
137137
implementation("dev.responsive:controller-api:0.16.0")
138138
implementation(libs.bundles.scylla)
139139
implementation(libs.bundles.commons)
140-
implementation(libs.jackson)
140+
implementation(libs.bundles.jackson)
141141
implementation(libs.mongodb.driver.sync)
142142
implementation(libs.bundles.otel)
143143
implementation(libs.bundles.grpc)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package dev.responsive.kafka.internal.snapshot;
2+
3+
import org.apache.kafka.streams.processor.TaskId;
4+
5+
/**
6+
* Interface that abstracts away how we lookup a task's current generation.
7+
* For synchronized snapshots we want to do one of the following so we can record a task's
8+
* generation metadata transactionally alongside the rows with new generation markers when
9+
* we bump generations:
10+
* (1) store this information in the offset metadata as part of the transaction that
11+
* bumps the generation
12+
* (2) if/when kafka supports 2pc store this information in another store (like rs3 or
13+
* the snapshot store)
14+
*
15+
* For simple uncoordinated snapshots we'll support looking up the generation by looking
16+
* at the snapshot's state in the generation store.
17+
*/
18+
public interface GenerationStorage {
19+
long lookupGeneration(final TaskId taskId);
20+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package dev.responsive.kafka.internal.snapshot;
2+
3+
import java.util.List;
4+
import java.util.Objects;
5+
6+
/**
7+
* Implementation of SnapshotApi that directly interacts with the Snapshot Store rather
8+
* than calling out to an API service.
9+
*/
10+
public class LocalSnapshotApi implements SnapshotApi {
11+
private final SnapshotStore snapshotStore;
12+
13+
public LocalSnapshotApi(final SnapshotStore snapshotStore) {
14+
this.snapshotStore = Objects.requireNonNull(snapshotStore);
15+
}
16+
17+
@Override
18+
public Snapshot createSnapshot() {
19+
return snapshotStore.updateCurrentSnapshot(snapshot -> {
20+
if (snapshot.state().equals(Snapshot.State.CREATED)) {
21+
throw new RuntimeException("Snapshot is currently in progress");
22+
}
23+
return snapshot.nextSnapshot();
24+
});
25+
}
26+
27+
@Override
28+
public Snapshot getCurrentSnapshot() {
29+
return snapshotStore.currentSnapshot(true);
30+
}
31+
32+
@Override
33+
public List<Snapshot> getSnapshots() {
34+
return snapshotStore.listSnapshots(true);
35+
}
36+
37+
@Override
38+
public void close() {
39+
snapshotStore.close();
40+
}
41+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package dev.responsive.kafka.internal.snapshot;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Objects;
6+
import java.util.Set;
7+
import java.util.stream.Collectors;
8+
import java.util.stream.Stream;
9+
import org.apache.kafka.streams.processor.TaskId;
10+
11+
/**
12+
* An implementation of the orchestrator that runs within the application and interacts
13+
* directly with the snapshot store.
14+
*/
15+
public class LocalSnapshotOrchestrator implements SnapshotOrchestrator {
16+
private final SnapshotStore snapshotStore;
17+
private final Set<TaskId> allTasks;
18+
19+
public LocalSnapshotOrchestrator(
20+
final SnapshotStore snapshotStore,
21+
final Set<TaskId> allTasks
22+
) {
23+
this.snapshotStore = Objects.requireNonNull(snapshotStore);
24+
this.allTasks = Objects.requireNonNull(allTasks);
25+
}
26+
27+
@Override
28+
public long getCurrentGeneration() {
29+
return snapshotStore.currentSnapshot(false).generation();
30+
}
31+
32+
@Override
33+
public void reportTaskSnapshotMetadata(
34+
final long generation,
35+
final List<Snapshot.TaskSnapshotMetadata> taskSnapshots
36+
) {
37+
snapshotStore.updateCurrentSnapshot(snapshot -> {
38+
// check that we're still working on this snapshot
39+
if (snapshot.generation() != generation) {
40+
throw new RuntimeException(
41+
String.format("generation too old: %d > %d", snapshot.generation(), generation));
42+
}
43+
if (!snapshot.state().equals(Snapshot.State.CREATED)) {
44+
throw new RuntimeException("Snapshot is currently completed. Cannot update");
45+
}
46+
47+
// check that for all the specified tasks, we either haven't collected its metadata
48+
// or the metadata is the same
49+
final List<Snapshot.TaskSnapshotMetadata> newlyCompletedTaskSnapshots = new ArrayList<>();
50+
for (final var taskSnapshot : taskSnapshots) {
51+
final var found = snapshot.taskSnapshots()
52+
.stream()
53+
.filter(s -> s.taskId().equals(taskSnapshot.taskId()))
54+
.collect(Collectors.toList());
55+
if (found.size() > 1) {
56+
throw new IllegalStateException(
57+
"found multiple snapshots for task " + taskSnapshot.taskId());
58+
}
59+
if (found.isEmpty()) {
60+
newlyCompletedTaskSnapshots.add(taskSnapshot);
61+
} else if (!found.get(0).equals(taskSnapshot)) {
62+
throw new IllegalStateException(
63+
"found conflicting snapshots for task" + taskSnapshot.taskId());
64+
}
65+
}
66+
67+
// if we've collected snapshots for all tasks, mark the snapshot as completed
68+
final Set<TaskId> completedTasks = Stream.concat(
69+
newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId),
70+
snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId)
71+
).collect(Collectors.toSet());
72+
Snapshot.State state = snapshot.state();
73+
if (completedTasks.equals(this.allTasks)) {
74+
state = Snapshot.State.COMPLETED;
75+
}
76+
77+
return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state);
78+
});
79+
}
80+
81+
@Override
82+
public void failSnapshot(long snapshotGeneration) {
83+
snapshotStore.updateCurrentSnapshot(snapshot -> {
84+
if (snapshot.generation() != snapshotGeneration) {
85+
// todo: do something more reasonable here
86+
throw new RuntimeException("Generation mismatch");
87+
}
88+
if (snapshot.state().equals(Snapshot.State.COMPLETED)) {
89+
throw new RuntimeException("Cannot fail completed snapshot");
90+
}
91+
return snapshot.withStateFailed();
92+
});
93+
}
94+
95+
@Override
96+
public void close() {
97+
snapshotStore.close();
98+
}
99+
}

0 commit comments

Comments
 (0)