diff --git a/kafka-client-examples/e2e-test/build.gradle.kts b/kafka-client-examples/e2e-test/build.gradle.kts index a554a07da..eaed1938f 100644 --- a/kafka-client-examples/e2e-test/build.gradle.kts +++ b/kafka-client-examples/e2e-test/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { implementation(libs.guava) implementation(libs.slf4j.log4j2) implementation(libs.bundles.scylla) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.core) testImplementation(testlibs.bundles.base) diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 08efad9c3..f93de5e66 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -137,7 +137,7 @@ dependencies { implementation("dev.responsive:controller-api:0.16.0") implementation(libs.bundles.scylla) implementation(libs.bundles.commons) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.sync) implementation(libs.bundles.otel) implementation(libs.bundles.grpc) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java new file mode 100644 index 000000000..4cea01ea0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Interface that abstracts away how we lookup a task's current generation. + * For synchronized snapshots we want to do one of the following so we can record a task's + * generation metadata transactionally alongside the rows with new generation markers when + * we bump generations: + * (1) store this information in the offset metadata as part of the transaction that + * bumps the generation + * (2) if/when kafka supports 2pc store this information in another store (like rs3 or + * the snapshot store) + * + * For simple uncoordinated snapshots we'll support looking up the generation by looking + * at the snapshot's state in the generation store. + */ +public interface GenerationStorage { + long lookupGeneration(final TaskId taskId); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java new file mode 100644 index 000000000..8ce602b8d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java @@ -0,0 +1,41 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.Objects; + +/** + * Implementation of SnapshotApi that directly interacts with the Snapshot Store rather + * than calling out to an API service. + */ +public class LocalSnapshotApi implements SnapshotApi { + private final SnapshotStore snapshotStore; + + public LocalSnapshotApi(final SnapshotStore snapshotStore) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + } + + @Override + public Snapshot createSnapshot() { + return snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently in progress"); + } + return snapshot.nextSnapshot(); + }); + } + + @Override + public Snapshot getCurrentSnapshot() { + return snapshotStore.currentSnapshot(true); + } + + @Override + public List getSnapshots() { + return snapshotStore.listSnapshots(true); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java new file mode 100644 index 000000000..3282266ed --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * An implementation of the orchestrator that runs within the application and interacts + * directly with the snapshot store. + */ +public class LocalSnapshotOrchestrator implements SnapshotOrchestrator { + private final SnapshotStore snapshotStore; + private final Set allTasks; + + public LocalSnapshotOrchestrator( + final SnapshotStore snapshotStore, + final Set allTasks + ) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + this.allTasks = Objects.requireNonNull(allTasks); + } + + @Override + public long getCurrentGeneration() { + return snapshotStore.currentSnapshot(false).generation(); + } + + @Override + public void reportTaskSnapshotMetadata( + final long generation, + final List taskSnapshots + ) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + // check that we're still working on this snapshot + if (snapshot.generation() != generation) { + throw new RuntimeException( + String.format("generation too old: %d > %d", snapshot.generation(), generation)); + } + if (!snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently completed. Cannot update"); + } + + // check that for all the specified tasks, we either haven't collected its metadata + // or the metadata is the same + final List newlyCompletedTaskSnapshots = new ArrayList<>(); + for (final var taskSnapshot : taskSnapshots) { + final var found = snapshot.taskSnapshots() + .stream() + .filter(s -> s.taskId().equals(taskSnapshot.taskId())) + .collect(Collectors.toList()); + if (found.size() > 1) { + throw new IllegalStateException( + "found multiple snapshots for task " + taskSnapshot.taskId()); + } + if (found.isEmpty()) { + newlyCompletedTaskSnapshots.add(taskSnapshot); + } else if (!found.get(0).equals(taskSnapshot)) { + throw new IllegalStateException( + "found conflicting snapshots for task" + taskSnapshot.taskId()); + } + } + + // if we've collected snapshots for all tasks, mark the snapshot as completed + final Set completedTasks = Stream.concat( + newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId), + snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId) + ).collect(Collectors.toSet()); + Snapshot.State state = snapshot.state(); + if (completedTasks.equals(this.allTasks)) { + state = Snapshot.State.COMPLETED; + } + + return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state); + }); + } + + @Override + public void failSnapshot(long snapshotGeneration) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.generation() != snapshotGeneration) { + // todo: do something more reasonable here + throw new RuntimeException("Generation mismatch"); + } + if (snapshot.state().equals(Snapshot.State.COMPLETED)) { + throw new RuntimeException("Cannot fail completed snapshot"); + } + return snapshot.withStateFailed(); + }); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java new file mode 100644 index 000000000..4b5aa7de2 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java @@ -0,0 +1,272 @@ +package dev.responsive.kafka.internal.snapshot; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * POJO representing an application snapshot + */ +public class Snapshot { + public enum State { + CREATED, + COMPLETED, + // TODO: add state between CREATED and COMPLETED to reflect a snapshot + // that needs to be finalized by converting expiring checkpoints to + // nonexpiring checkpoints + FAILED + // TODO: add state beteen CREATED and FAILED to reflect a snapshot that has + // failed but is not yet cleaned up + } + + private final Instant createdAt; + private final long generation; + private final State state; + private final List taskSnapshots; + + public static Snapshot initial() { + return new Snapshot( + Instant.EPOCH, + 0, + State.COMPLETED, + List.of() + ); + } + + @JsonCreator + public Snapshot( + @JsonProperty("createdAt") final Instant createdAt, + @JsonProperty("generation") final long generation, + @JsonProperty("state") final State state, + @JsonProperty("taskSnapshots") final List taskSnapshots + ) { + this.createdAt = createdAt; + this.generation = generation; + this.state = state; + this.taskSnapshots = List.copyOf(taskSnapshots); + } + + @JsonProperty("createdAt") + public Instant createdAt() { + return createdAt; + } + + @JsonProperty("generation") + public long generation() { + return generation; + } + + @JsonProperty("state") + public State state() { + return state; + } + + @JsonProperty("taskSnapshots") + public List taskSnapshots() { + return taskSnapshots; + } + + public Snapshot nextSnapshot() { + return new Snapshot( + Instant.now(), + generation + 1, + State.CREATED, + List.of() + ); + } + + public Snapshot withTaskSnapshots(List taskSnapshots, State state) { + return new Snapshot( + createdAt, + generation, + state, + Stream.concat(this.taskSnapshots.stream(), taskSnapshots.stream()) + .collect(Collectors.toList()) + ); + } + + public Snapshot withStateFailed() { + return new Snapshot( + createdAt, + generation, + State.FAILED, + taskSnapshots + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Snapshot)) { + return false; + } + final Snapshot snapshot = (Snapshot) o; + return generation == snapshot.generation + && Objects.equals(createdAt, snapshot.createdAt) + && state == snapshot.state && Objects.equals(taskSnapshots, snapshot.taskSnapshots); + } + + @Override + public int hashCode() { + return Objects.hash(createdAt, generation, state, taskSnapshots); + } + + @Override + public String toString() { + return "Snapshot{" + + "createdAt=" + createdAt + + ", generation=" + generation + + ", state=" + state + + ", taskSnapshots=" + taskSnapshots + + '}'; + } + + public static class TaskSnapshotMetadata { + private final TaskId taskId; + private final List committedOffsets; + private final Map checkpoints; + private final Instant timestamp; + + @JsonCreator + public TaskSnapshotMetadata( + @JsonProperty("taskId") final TaskId taskId, + @JsonProperty("committedOffsets") final List committedOffsets, + @JsonProperty("checkpoints") final Map checkpoints, + @JsonProperty("timestamp") final Instant timestamp + ) { + this.taskId = taskId; + this.committedOffsets = List.copyOf(committedOffsets); + this.checkpoints = Map.copyOf(checkpoints); + this.timestamp = Objects.requireNonNull(timestamp); + } + + @JsonProperty("taskId") + public TaskId taskId() { + return taskId; + } + + @JsonProperty("committedOffsets") + public List committedOffsets() { + return committedOffsets; + } + + @JsonProperty("checkpoints") + public Map checkpoints() { + return checkpoints; + } + + @JsonProperty("timestamp") + public Instant timestamp() { + return timestamp; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TaskSnapshotMetadata)) { + return false; + } + final TaskSnapshotMetadata that = (TaskSnapshotMetadata) o; + if (checkpoints == null ^ that.checkpoints == null) { + return false; + } + if (checkpoints != null) { + if (checkpoints.size() != that.checkpoints.size()) { + return false; + } + for (final Map.Entry entry : checkpoints.entrySet()) { + if (!Arrays.equals(entry.getValue(), that.checkpoints.get(entry.getKey()))) { + return false; + } + } + } + return Objects.equals(taskId, that.taskId) + && Objects.equals(committedOffsets, that.committedOffsets); + } + + @Override + public int hashCode() { + return Objects.hash(taskId, committedOffsets, checkpoints); + } + + @Override + public String toString() { + return "TaskSnapshotMetadata{" + + "taskId=" + taskId + + ", committedOffsets=" + committedOffsets + + ", checkpoints=" + checkpoints + + '}'; + } + } + + public static class CommittedOffset { + private final String topic; + private final int partition; + private final long offset; + + @JsonCreator + public CommittedOffset( + @JsonProperty("topic") final String topic, + @JsonProperty("partition") final int partition, + @JsonProperty("offset") final long offset + ) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + @JsonProperty("topic") + public String topic() { + return topic; + } + + @JsonProperty("partition") + public int partition() { + return partition; + } + + @JsonProperty("offset") + public long offset() { + return offset; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CommittedOffset)) { + return false; + } + final CommittedOffset that = (CommittedOffset) o; + return partition == that.partition + && offset == that.offset + && Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(topic, partition, offset); + } + + @Override + public String toString() { + return "CommittedOffset{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + ", offset=" + offset + + '}'; + } + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java new file mode 100644 index 000000000..f0c2a3bf7 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * Interface for interacting with Snapshots from outside an application. Supports + * reading the current/past snapshots, and creating a new snapshot. + */ +public interface SnapshotApi extends AutoCloseable { + Snapshot createSnapshot(); + + List getSnapshots(); + + Snapshot getCurrentSnapshot(); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java new file mode 100644 index 000000000..4482c7ba0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java @@ -0,0 +1,40 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * SnapshotOrchestrator is responsible for coordinating snapshot execution by initiating + * task-level snapshots and updating the snapshot's state as they complete. + */ +public interface SnapshotOrchestrator { + /** + * Gets the application's current snapshot generation + * + * @return The current snapshot generation. + */ + long getCurrentGeneration(); + + /** + * Called by the stream thread to report task snapshot metadata for a given task. + * + * @param snapshotGeneration The generation of the task snapshot(s) being reported. + * @param metadata The task snapshot(s) being reported. + */ + void reportTaskSnapshotMetadata( + long snapshotGeneration, + List metadata + ); + + /** + * Called by the stream thread to report a failed task snapshot. This should only + * be called to report terminal failures. + * + * @param snapshotGeneration The generation of the snapshot to fail. + */ + void failSnapshot(long snapshotGeneration); + + /** + * Clean up any resources held by the orchestrator + */ + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java new file mode 100644 index 000000000..e9b76b2fd --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java @@ -0,0 +1,39 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.function.Function; + +/** + * A store for the Snapshot metadata for a given applicaiton + */ +public interface SnapshotStore extends AutoCloseable { + /** + * Returns the current snapshot + * + * @param block if true, then the call blocks until the store has observed the latest update + * @return the current snapshot of the application + */ + Snapshot currentSnapshot(boolean block); + + /** + * List all snapshots + * + * @param block if true, then the call will block until the store has observed the latest update + * @return a list of all snapshots of the application + */ + List listSnapshots(boolean block); + + /** + * Updates a snapshot given an updater function. The update is guaranteed to be isolated from + * other concurrent updates. + * + * @param updater is passed the current snapshot and returns an updated snapshot. + * @return the updated snapshot + */ + Snapshot updateCurrentSnapshot( + Function updater + ); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java new file mode 100644 index 000000000..572ff9a86 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java @@ -0,0 +1,33 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Computes a task's generation from the current Snapshot metadata. If the current snapshot + * contains a task snapshot for a task, the task is considered to be on the current snapshot's + * generation. Otherwise, it's considered to be on the previous generation. + */ +public class SnapshotStoreBasedGenerationStorage implements GenerationStorage { + private final SnapshotStore snapshotStore; + + public SnapshotStoreBasedGenerationStorage(final SnapshotStore snapshotStore) { + this.snapshotStore = snapshotStore; + } + + @Override + public long lookupGeneration(final TaskId taskId) { + final var currentSnapshot = snapshotStore.currentSnapshot(false); + // todo: move to a fn + if (currentSnapshot.state() == Snapshot.State.COMPLETED + || currentSnapshot.state() == Snapshot.State.FAILED) { + return currentSnapshot.generation(); + } + if (currentSnapshot.taskSnapshots().stream() + .anyMatch(s -> s.taskId().equals(taskId))) { + return currentSnapshot.generation(); + } + // this task has not completed. set generation to previous generation + // todo: make previous generation a field rather than computing it here + return currentSnapshot.generation() - 1; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java new file mode 100644 index 000000000..0ff5fa0a5 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecord { + private final SnapshotStoreRecordType type; + private final Snapshot snapshot; + + @JsonCreator + public SnapshotStoreRecord( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("snapshot") final Snapshot snapshot + ) { + this.type = Objects.requireNonNull(type); + this.snapshot = snapshot; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("snapshot") + public Optional snapshot() { + return Optional.ofNullable(snapshot); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecord)) { + return false; + } + final SnapshotStoreRecord that = (SnapshotStoreRecord) o; + return type == that.type && Objects.equals(snapshot, that.snapshot); + } + + @Override + public int hashCode() { + return Objects.hash(type, snapshot); + } + + @Override + public String toString() { + return "SnapshotStoreRecord{" + + "type=" + type + + ", snapshot=" + snapshot + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java new file mode 100644 index 000000000..161fa3781 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecordKey { + + private final SnapshotStoreRecordType type; + private final Long generation; + + @JsonCreator + public SnapshotStoreRecordKey( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("generation") final Long generation + ) { + this.type = Objects.requireNonNull(type); + this.generation = generation; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("generation") + public Optional generation() { + return Optional.ofNullable(generation); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecordKey)) { + return false; + } + final SnapshotStoreRecordKey that = (SnapshotStoreRecordKey) o; + return type == that.type && Objects.equals(generation, that.generation); + } + + @Override + public int hashCode() { + return Objects.hash(type, generation); + } + + @Override + public String toString() { + return "SnapshotStoreRecordKey{" + + "type=" + type + + ", generation=" + generation + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java new file mode 100644 index 000000000..d14cfc6bf --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java @@ -0,0 +1,5 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +public enum SnapshotStoreRecordType { + Snapshot +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java new file mode 100644 index 000000000..df8483255 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.io.IOException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.TaskId; + +public class SnapshotStoreSerdes { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + static { + MAPPER.registerModule(new JavaTimeModule()); + MAPPER.registerModule(new Jdk8Module()); + final SimpleModule module = new SimpleModule(); + module.addSerializer(TaskId.class, new TaskIDJacksonSerializer()); + module.addDeserializer(TaskId.class, new TaskIDJacksonDeserializer()); + MAPPER.registerModule(module); + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_ABSENT); + } + + public static class SnapshotStoreRecordKeySerializer + implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecordKey data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordKeyDeserializer + implements Deserializer { + @Override + public SnapshotStoreRecordKey deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecordKey.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordSerializer implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecord data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordDeserializer implements Deserializer { + @Override + public SnapshotStoreRecord deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecord.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class TaskIDJacksonSerializer extends JsonSerializer { + @Override + public void serialize( + final TaskId taskId, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider + ) throws IOException { + jsonGenerator.writeString(taskId.toString()); + } + } + + public static class TaskIDJacksonDeserializer extends JsonDeserializer { + + @Override + public TaskId deserialize( + final JsonParser jsonParser, + final DeserializationContext deserializationContext + ) throws IOException { + return TaskId.parse(jsonParser.getValueAsString()); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java new file mode 100644 index 000000000..a26f6309f --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +class SynchronizedConsumerPosition { + private long position = 0; + + synchronized void waitTillConsumerPosition(long targetPosition) { + while (position < targetPosition) { + try { + wait(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + synchronized void updateConsumerPosition(long position) { + this.position = position; + notifyAll(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java new file mode 100644 index 000000000..150275e87 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java @@ -0,0 +1,258 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import dev.responsive.kafka.internal.snapshot.Snapshot; +import dev.responsive.kafka.internal.snapshot.SnapshotStore; +import java.time.Duration; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements SnapshotStore by storing Snapshot metadata for a single application on a Kafka topic. + * The topic has a single partition. Each record represents an update to a single snapshot of an + * application. The key includes the generation of the snapshot the update applies to. The + * value contains an instance of Snapshot. + */ +public class TopicSnapshotStore implements SnapshotStore { + private static final Logger LOG = LoggerFactory.getLogger(TopicSnapshotStore.class); + + private final TopicPartition topicPartition; + private final Supplier> producerSupplier; + private final Supplier> consumerSupplier; + private final Thread readerThread; + private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicReference currentSnapshot + = new AtomicReference<>(Snapshot.initial()); + private final ConcurrentMap snapshots = new ConcurrentHashMap<>(); + private final SynchronizedConsumerPosition consumedOffset = new SynchronizedConsumerPosition(); + private final Consumer endOffsetConsumer; + + public TopicSnapshotStore( + final String topic, + final short replicas, + final Supplier> consumerSupplier, + final Supplier> producerSupplier, + final Admin admin + ) { + this.topicPartition = new TopicPartition(topic, 0); + this.producerSupplier = producerSupplier; + this.consumerSupplier = consumerSupplier; + createTopic(admin, replicas); + final var consumer = consumerSupplier.get(); + consumer.assign(List.of(topicPartition)); + consumer.seekToBeginning(List.of(topicPartition)); + readerThread = new Thread(() -> runReader( + consumer, + currentSnapshot, + snapshots, + consumedOffset, + running + )); + readerThread.start(); + this.endOffsetConsumer = consumerSupplier.get(); + this.endOffsetConsumer.assign(List.of(topicPartition)); + waitTillConsumedAll(); + } + + public static TopicSnapshotStore create( + final String topic, + final short replicas, + final Map config + ) { + final Map consumerConfig = new HashMap<>(config); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, null); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + consumerConfig.put( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerConfig = new HashMap<>(config); + final String appId = config.get(StreamsConfig.APPLICATION_ID_CONFIG).toString(); + producerConfig.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + String.format("__responsive-%s-snapshot-store", appId) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + try (final Admin admin = Admin.create(config)) { + return new TopicSnapshotStore( + topic, replicas, consumerSupplier, producerSupplier, admin); + } + } + + @Override + public Snapshot currentSnapshot(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return currentSnapshot.get(); + } + + @Override + public List listSnapshots(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return snapshots.values().stream() + .sorted(Comparator.comparingLong(Snapshot::generation)) + .collect(Collectors.toList()); + } + + @Override + public Snapshot updateCurrentSnapshot( + final Function updater + ) { + final Future sendFut; + final Snapshot updated; + try (final var producer = producerSupplier.get()) { + producer.initTransactions(); + producer.beginTransaction(); + try { + waitTillConsumedAll(); + updated = updater.apply(currentSnapshot.get()); + final var record = createRecord(updated); + sendFut = producer.send(record); + producer.commitTransaction(); + } catch (final RuntimeException e) { + producer.abortTransaction(); + throw e; + } + } + final RecordMetadata recordMetadata; + try { + recordMetadata = sendFut.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + waitTillConsumerPosition(recordMetadata.offset() + 1); + return updated; + } + + @Override + public void close() { + try { + endOffsetConsumer.close(); + } catch (final RuntimeException e) { + LOG.warn("error closing end offset consumer", e); + } + running.set(false); + try { + readerThread.join(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + private ProducerRecord createRecord( + final Snapshot snapshot + ) { + return new ProducerRecord<>( + topicPartition.topic(), + topicPartition.partition(), + new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, snapshot.generation()), + new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot) + ); + } + + private void createTopic(final Admin admin, final short replicas) { + try { + final var result = admin.createTopics(List.of( + new NewTopic(topicPartition.topic(), 1, replicas) + )); + result.all().get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + LOG.info("snapshot store topic already exists."); + } else { + throw new RuntimeException(e); + } + } catch (final TopicExistsException e) { + LOG.info("snapshot store topic already exists."); + } + } + + private void waitTillConsumedAll() { + waitTillConsumerPosition(endOffset()); + } + + private void waitTillConsumerPosition(final long offset) { + consumedOffset.waitTillConsumerPosition(offset); + } + + private long endOffset() { + synchronized (endOffsetConsumer) { + return endOffsetConsumer.endOffsets(List.of(topicPartition)).get(topicPartition); + } + } + + private void runReader( + final Consumer consumer, + final AtomicReference currentSnapshot, + final ConcurrentMap allSnapshots, + final SynchronizedConsumerPosition consumedOffset, + final AtomicBoolean running + ) { + while (running.get()) { + final ConsumerRecords records + = consumer.poll(Duration.ofMillis(100)); + for (final ConsumerRecord record : records) { + switch (record.key().type()) { + case Snapshot: { + final Snapshot update = record.value().snapshot().get(); + currentSnapshot.getAndUpdate(c -> { + if (update.generation() >= c.generation()) { + return update; + } else { + return c; + } + }); + allSnapshots.put(update.generation(), update); + break; + } + default: { + throw new IllegalStateException(); + } + } + } + consumedOffset.updateConsumerPosition(consumer.position(topicPartition)); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java new file mode 100644 index 000000000..32f6a394f --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java @@ -0,0 +1,195 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class LocalSnapshotOrchestratorTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + private static final TaskId TASK_1 = new TaskId(0, 1); + private static final TaskId TASK_2 = new TaskId(0, 2); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private long generation; + private final LocalSnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + snapshotStore, + Set.of(TASK_0, TASK_1, TASK_2) + ); + + @BeforeEach + public void setup() { + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldFailUpdateWithConflictingTaskSnapshot() { + // given: + orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )) + ); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 456L)), + Map.of(), + Instant.now() + )) + ) + ); + } + + @Test + public void shouldDoIdempotentTaskSnapshotUpdate() { + // given: + final var taskSnapshots = List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldFailUpdateForWrongGeneration() { + // when/then: + assertThrows(RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation - 1, List.of())); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldCompleteSnapshotWhenFinishedTasksInSeparateUpdates() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ) + ); + final var taskSnapshots2 = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots2); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + final var allTaskSnapshots = new ArrayList<>(taskSnapshots); + allTaskSnapshots.addAll(taskSnapshots2); + assertThat(snapshot.taskSnapshots(), is(allTaskSnapshots)); + } + + @Test + public void shouldFailUpdateForCompletedSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when/then: + assertThrows( + RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots)); + } + + @Test + public void shouldGetCurrentGeneration() { + assertThat(orchestrator.getCurrentGeneration(), is(generation)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java new file mode 100644 index 000000000..e48bda4cc --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java @@ -0,0 +1,303 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecord; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecordKey; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreSerdes; +import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore; +import dev.responsive.kafka.testutils.TestConstants; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.lifecycle.Startables; + +@ExtendWith(MockitoExtension.class) +class SnapshotOrchestrationIntegrationTest { + private static KafkaContainer KAFKA = new KafkaContainer(TestConstants.KAFKA) + .withEnv("KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS", "1000") + .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "60000"); + private static final Set TASKS = Set.of( + new TaskId(0, 0), + new TaskId(0, 1), + new TaskId(0, 2) + ); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + KAFKA.stop(); + })); + } + + private String topic; + private TestCtx ctx; + + @BeforeAll + public static void setupAll() { + final var fut = Startables.deepStart(KAFKA); + try { + fut.get(); + } catch (final InterruptedException | ExecutionException e) { + System.out.println("LOGS: " + KAFKA.getLogs()); + throw new RuntimeException(e); + } + } + + @AfterAll + public static void teardownAll() { + KAFKA.stop(); + } + + @BeforeEach + public void setup(final TestInfo testInfo) { + topic = testInfo.getTestMethod().get().getName(); + ctx = createCtx(); + } + + @Test + public void shouldReturnGenerationZeroSnapshot() { + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + assertThat(snapshot, is(Snapshot.initial())); + } + + @Test + public void shouldCreateInitialSnapshot() { + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + assertThat(snapshot.generation(), is(1L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldRefreshStore() { + // given: + final var store2 = createCtx(); + store2.api.createSnapshot(); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + MatcherAssert.assertThat(snapshot.generation(), is(1L)); + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldFailCreateNextSnapshotIfNotCompleted() { + // given: + ctx.api.createSnapshot(); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.api.createSnapshot()); + } + + @Test + public void shouldCreatNextSnapshot() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + assertThat(snapshot.generation(), is(2L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldAddTaskSnapshotToSnapshot() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + MatcherAssert.assertThat(snapshot.taskSnapshots(), contains(metadata)); + } + + @Test + public void shouldFailAddTaskSnapshotIfConflictingMetadata() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of(), + Map.of("foo", "bar".getBytes(Charset.defaultCharset())), + Instant.now() + ); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldFailAddTaskSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + } + + @Test + public void shouldFailSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.failSnapshot(1); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.FAILED)); + } + + @Test + public void shouldFailFailSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.orchestrator.failSnapshot(1)); + } + + private TestCtx createCtx() { + return new TestCtx(topic); + } + + private static class TestCtx { + private final TopicSnapshotStore store; + private final SnapshotOrchestrator orchestrator; + private final SnapshotApi api; + + private TestCtx(final String topic) { + final Admin admin + = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers())); + final Map consumerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false + ); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ProducerConfig.TRANSACTIONAL_ID_CONFIG, String.format("%s-snapshot-store", topic) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + this.store = new TopicSnapshotStore( + topic, + (short) 1, + consumerSupplier, + producerSupplier, + admin + ); + this.orchestrator = new LocalSnapshotOrchestrator(this.store, TASKS); + this.api = new LocalSnapshotApi(this.store); + } + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java new file mode 100644 index 000000000..438287d89 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java @@ -0,0 +1,59 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SnapshotStoreBasedGenerationStorageTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private final GenerationStorage generationStorage + = new SnapshotStoreBasedGenerationStorage(snapshotStore); + private long oldGeneration; + private long generation; + + @BeforeEach + public void setup() { + oldGeneration = snapshotStore.currentSnapshot(true).generation(); + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldReturnNewGenerationForCompletedSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot(s -> s.withTaskSnapshots(List.of(), Snapshot.State.COMPLETED)); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnNewGenerationForTaskWithCompletedTaskSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot( + s -> s.withTaskSnapshots( + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, List.of(), Map.of(), Instant.now())), + s.state() + )); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnOldGenerationForTaskWithoutCompletedSnapshot() { + assertThat(generationStorage.lookupGeneration(TASK_0), is(oldGeneration)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java new file mode 100644 index 000000000..4777b6414 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java @@ -0,0 +1,37 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class TestSnapshotStore implements SnapshotStore { + private final Map snapshots = new HashMap<>(); + + public TestSnapshotStore() { + final var initial = Snapshot.initial(); + snapshots.put(initial.generation(), initial); + } + + @Override + public synchronized Snapshot currentSnapshot(final boolean block) { + return snapshots.get(snapshots.keySet().stream().max(Long::compare).get()); + } + + @Override + public synchronized List listSnapshots(final boolean block) { + return new ArrayList<>(snapshots.values()); + } + + @Override + public synchronized Snapshot updateCurrentSnapshot(final Function updater) { + final var updated = updater.apply(currentSnapshot(false)); + snapshots.put(updated.generation(), updated); + return updated; + } + + @Override + public void close() { + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java new file mode 100644 index 000000000..fd1b7496d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java @@ -0,0 +1,78 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.Test; + +class SnapshotStoreSerdesTest { + private final SnapshotStoreSerdes.SnapshotStoreRecordSerializer recordSerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordSerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordDeserializer recordDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer keySerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer keyDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(); + + @Test + public void shouldSerializeRecord() { + // given: + final Snapshot snapshot = new Snapshot( + Instant.now(), + 123, + Snapshot.State.COMPLETED, + List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of( + new Snapshot.CommittedOffset("foo", 0, 100), + new Snapshot.CommittedOffset("bar", 0, 200) + ), + Map.of( + "store1", "store1-cp".getBytes(StandardCharsets.UTF_8), + "store2", "store2-cp".getBytes(StandardCharsets.UTF_8) + ), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + new TaskId(1, 0), + List.of(), + Map.of(), + Instant.now() + ) + ) + ); + final SnapshotStoreRecord record + = new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot); + + // when: + final byte[] serialized = recordSerializer.serialize("", record); + System.out.println(new String(serialized)); + final SnapshotStoreRecord deserialized = recordDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(record)); + } + + @Test + public void shouldSerializeKey() { + // given: + final SnapshotStoreRecordKey key + = new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, 100L); + + // when; + final byte[] serialized = keySerializer.serialize("", key); + System.out.println(new String(serialized)); + final SnapshotStoreRecordKey deserialized = keyDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(key)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java index 7aae1d30e..535f035c0 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java @@ -17,7 +17,7 @@ public class TestConstants { public static final DockerImageName CASSANDRA = DockerImageName.parse("cassandra:4.1.0"); - public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.3.2"); + public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.9.0"); public static final DockerImageName MONGODB = DockerImageName.parse("mongo:7.0.2"); } \ No newline at end of file diff --git a/operator/build.gradle.kts b/operator/build.gradle.kts index 0536dfd6a..1bf08b090 100644 --- a/operator/build.gradle.kts +++ b/operator/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":controller-api")) implementation(libs.crd.generator.atp) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.javaoperatorsdk) implementation(libs.bundles.commons) diff --git a/settings.gradle.kts b/settings.gradle.kts index ea9325b38..d70615b5d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -52,7 +52,9 @@ dependencyResolutionManagement { version("mongoDB", "4.10.2") version("fabric8", "6.13.4") - library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jdk8", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jsr310", "com.fasterxml.jackson.datatype", "jackson-datatype-jsr310").versionRef("jackson") + bundle("jackson", listOf("jackson-jdk8", "jackson-jsr310")) library("kafka-clients", "org.apache.kafka", "kafka-clients").versionRef("kafka") library("kafka-streams", "org.apache.kafka", "kafka-streams").versionRef("kafka")