Skip to content

Conversation

@rodesai
Copy link
Contributor

@rodesai rodesai commented May 10, 2025

This patch adds a number of support types that are used to coordinate execution of a snapshot. They correspond to the entities defined in https://www.notion.so/responsivedev/Snapshot-Protocol-1a627549be4c8075a909cfd05adae3af

SnapshotStore defines an interface for a store for Snapshot metadata. Its essentially a table of all the snapshots taken of an application keyed by generation.

TopicSnapshotStore implements SnapshotStore by writing snapshot metadata to a topic and using a Kafka transaction to do transactional updates to the current snapshot.

SnapshotOrchestrator defines the interface between Kafka Streams tasks and the snapshot orchestration process. Its how the tasks discover what the target generation should be, and how they report their task snapshot metadata (offsets+checkpoints).

LocalSnapshotOrchestrator implements SnapshotOrchestrator for settings where the orchestrator runs embedded in the application.

SnapshotApi defines the interface for interacting with snapshots from outside the application.

LocalSnapshotApi implements SnapshotApi for settings where the api interacts directly with the SnapshotStore (rather than going through another api service like we might for responsive cloud) - e.g. in settings where we're just orchestrating snapshots locally by storing metadata in a topic using TopicSnapshotStore.

This patch adds a number of support types that are used to coordinate execution
of a snapshot. They correspond to the entities defined in
https://www.notion.so/responsivedev/Snapshot-Protocol-1a627549be4c8075a909cfd05adae3af

SnapshotStore defines an interface for a store for Snapshot metadata. Its essentially
a table of all the snapshots taken of an application keyed by generation.

TopicSnapshotStore implements SnapshotStore by writing snapshot metadata to a topic
and using a Kafka transaction to do transactional updates to the current snapshot.

SnapshotOrchestrator defines the interface between Kafka Streams tasks and the
snapshot orchestration process. Its how the tasks discover what the target generation
should be, and how they report their task snapshot metadata (offsets+checkpoints).

LocalSnapshotOrchestrator implements SnapshotOrchestrator for settings where the
orchestrator runs embedded in the application.

SnapshotApi defines the interface for interacting with snapshots from outside the
application.

LocalSnapshotApi implements SnapshotApi for settings where the api interacts directly
with the SnapshotStore (rather than going through another api service like we might
for responsive cloud) - e.g. in settings where we're just orchestrating snapshots
locally by storing metadata in a topic using TopicSnapshotStore.
* Interface for interacting with Snapshots from outside an application. Supports
* reading the current/past snapshots, and creating a new snapshot.
*/
public interface SnapshotApi extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we eventually support snapshot removal through the same API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's the intention.

/**
* Returns the current snapshot
*
* @param block if true, then the call blocks until the store has observed the latest update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little unclear. I guess the intent is that block=false allows us to use a cached Snapshot while block=true requires that we check with the storage system? In what scenario would we use this with block=false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's right. It gets used by the local orchestrator when coordinating snapshot execution from a stream thread. The stream thread just needs to eventually discover that the target generation for the app has bumped so it can start bumping the generations of tasks assigned to it. It does the check after committing, where we don't want to block it.

/**
* Called by the stream thread to report task snapshot metadata for a given task.
*
* @param snapshotGeneration The generation of the task snapshot(s) being reported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing there is a monotonic assumption here? Do we throw some kind of fencing error for older generations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's in the orchestrator implementation.

public static class TaskSnapshotMetadata {
private final TaskId taskId;
private final List<CommittedOffset> committedOffsets;
private final Map<String, byte[]> checkpoints;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the key type representing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the state store name. I'll add a comment.

@Override
public Snapshot createSnapshot() {
return snapshotStore.updateCurrentSnapshot(snapshot -> {
if (snapshot.state().equals(Snapshot.State.CREATED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess updateCurrentSnapshot guarantees that the latest snapshot is either in progress or has been completed. So failed snapshots would not show up here?

// if we've collected snapshots for all tasks, mark the snapshot as completed
final Set<TaskId> completedTasks = Stream.concat(
newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId),
snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be nice to move this helper to get completed task Ids to Snapshot.

* Returns the current snapshot
*
* @param block if true, then the call blocks until the store has observed the latest update
* @return the current snapshot of the application
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this return an empty snapshot with offsets at 0 if no snapshots have been taken yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll leave that detail in the docs. It doesn't have offsets set to 0 - there just aren't any offsets or checkpoints recorded inside, and its generation is set to 0.

}

@Override
public long lookupGeneration(final TaskId taskId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for the task id to not be part of the snapshot (i.e. if a task was added)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a task is pretty unusual. It requires that either the user added partitions to some topic or the user changed the topology to change the set of subtopologies. In either case the user would need to restart the application, so the set of tasks should get reinitialized when that happens.

final Future<RecordMetadata> sendFut;
final Snapshot updated;
try (final var producer = producerSupplier.get()) {
producer.initTransactions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be transactional? I guess we require a single writer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Any thread/process can try to update the store, so we need to write to the store transactionally.

private void createTopic(final Admin admin, final short replicas) {
try {
final var result = admin.createTopics(List.of(
new NewTopic(topicPartition.topic(), 1, replicas)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we expect the retention policy to be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it's implemented to retain indefinitely. I think for snapshots that's actually appropriate, and then we can add some mechanism for expiring snapshots explicitly in a follow-up. I'd rather not use retention for this since you'd want to be able to set different expiries/retention for different snapshots (e.g. maybe you want a daily snapshot that you retain for a long time, and on-demand or more frequent snapshots that you retain for less time).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants