-
Notifications
You must be signed in to change notification settings - Fork 5
Add some support types for snapshot orchestration. #460
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| package dev.responsive.kafka.internal.snapshot; | ||
|
|
||
| import org.apache.kafka.streams.processor.TaskId; | ||
|
|
||
| /** | ||
| * Interface that abstracts away how we lookup a task's current generation. | ||
| * For synchronized snapshots we want to do one of the following so we can record a task's | ||
| * generation metadata transactionally alongside the rows with new generation markers when | ||
| * we bump generations: | ||
| * (1) store this information in the offset metadata as part of the transaction that | ||
| * bumps the generation | ||
| * (2) if/when kafka supports 2pc store this information in another store (like rs3 or | ||
| * the snapshot store) | ||
| * | ||
| * For simple uncoordinated snapshots we'll support looking up the generation by looking | ||
| * at the snapshot's state in the generation store. | ||
| */ | ||
| public interface GenerationStorage { | ||
| long lookupGeneration(final TaskId taskId); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package dev.responsive.kafka.internal.snapshot; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * Implementation of SnapshotApi that directly interacts with the Snapshot Store rather | ||
| * than calling out to an API service. | ||
| */ | ||
| public class LocalSnapshotApi implements SnapshotApi { | ||
| private final SnapshotStore snapshotStore; | ||
|
|
||
| public LocalSnapshotApi(final SnapshotStore snapshotStore) { | ||
| this.snapshotStore = Objects.requireNonNull(snapshotStore); | ||
| } | ||
|
|
||
| @Override | ||
| public Snapshot createSnapshot() { | ||
| return snapshotStore.updateCurrentSnapshot(snapshot -> { | ||
| if (snapshot.state().equals(Snapshot.State.CREATED)) { | ||
| throw new RuntimeException("Snapshot is currently in progress"); | ||
| } | ||
| return snapshot.nextSnapshot(); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public Snapshot getCurrentSnapshot() { | ||
| return snapshotStore.currentSnapshot(true); | ||
| } | ||
|
|
||
| @Override | ||
| public List<Snapshot> getSnapshots() { | ||
| return snapshotStore.listSnapshots(true); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| snapshotStore.close(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| package dev.responsive.kafka.internal.snapshot; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import org.apache.kafka.streams.processor.TaskId; | ||
|
|
||
| /** | ||
| * An implementation of the orchestrator that runs within the application and interacts | ||
| * directly with the snapshot store. | ||
| */ | ||
| public class LocalSnapshotOrchestrator implements SnapshotOrchestrator { | ||
| private final SnapshotStore snapshotStore; | ||
| private final Set<TaskId> allTasks; | ||
|
|
||
| public LocalSnapshotOrchestrator( | ||
| final SnapshotStore snapshotStore, | ||
| final Set<TaskId> allTasks | ||
| ) { | ||
| this.snapshotStore = Objects.requireNonNull(snapshotStore); | ||
| this.allTasks = Objects.requireNonNull(allTasks); | ||
| } | ||
|
|
||
| @Override | ||
| public long getCurrentGeneration() { | ||
| return snapshotStore.currentSnapshot(false).generation(); | ||
| } | ||
|
|
||
| @Override | ||
| public void reportTaskSnapshotMetadata( | ||
| final long generation, | ||
| final List<Snapshot.TaskSnapshotMetadata> taskSnapshots | ||
| ) { | ||
| snapshotStore.updateCurrentSnapshot(snapshot -> { | ||
| // check that we're still working on this snapshot | ||
| if (snapshot.generation() != generation) { | ||
| throw new RuntimeException( | ||
| String.format("generation too old: %d > %d", snapshot.generation(), generation)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the check above is inequality |
||
| } | ||
| if (!snapshot.state().equals(Snapshot.State.CREATED)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can use |
||
| throw new RuntimeException("Snapshot is currently completed. Cannot update"); | ||
| } | ||
|
|
||
| // check that for all the specified tasks, we either haven't collected its metadata | ||
| // or the metadata is the same | ||
| final List<Snapshot.TaskSnapshotMetadata> newlyCompletedTaskSnapshots = new ArrayList<>(); | ||
| for (final var taskSnapshot : taskSnapshots) { | ||
| final var found = snapshot.taskSnapshots() | ||
| .stream() | ||
| .filter(s -> s.taskId().equals(taskSnapshot.taskId())) | ||
| .collect(Collectors.toList()); | ||
| if (found.size() > 1) { | ||
| throw new IllegalStateException( | ||
| "found multiple snapshots for task " + taskSnapshot.taskId()); | ||
| } | ||
| if (found.isEmpty()) { | ||
| newlyCompletedTaskSnapshots.add(taskSnapshot); | ||
| } else if (!found.get(0).equals(taskSnapshot)) { | ||
| throw new IllegalStateException( | ||
| "found conflicting snapshots for task" + taskSnapshot.taskId()); | ||
| } | ||
| } | ||
|
|
||
| // if we've collected snapshots for all tasks, mark the snapshot as completed | ||
| final Set<TaskId> completedTasks = Stream.concat( | ||
| newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId), | ||
| snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Might be nice to move this helper to get completed task Ids to |
||
| ).collect(Collectors.toSet()); | ||
| Snapshot.State state = snapshot.state(); | ||
| if (completedTasks.equals(this.allTasks)) { | ||
| state = Snapshot.State.COMPLETED; | ||
| } | ||
|
|
||
| return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void failSnapshot(long snapshotGeneration) { | ||
| snapshotStore.updateCurrentSnapshot(snapshot -> { | ||
| if (snapshot.generation() != snapshotGeneration) { | ||
| // todo: do something more reasonable here | ||
| throw new RuntimeException("Generation mismatch"); | ||
| } | ||
| if (snapshot.state().equals(Snapshot.State.COMPLETED)) { | ||
| throw new RuntimeException("Cannot fail completed snapshot"); | ||
| } | ||
| return snapshot.withStateFailed(); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| snapshotStore.close(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess
updateCurrentSnapshotguarantees that the latest snapshot is either in progress or has been completed. So failed snapshots would not show up here?