diff --git a/kafka-client-examples/e2e-test/build.gradle.kts b/kafka-client-examples/e2e-test/build.gradle.kts index a554a07da..eaed1938f 100644 --- a/kafka-client-examples/e2e-test/build.gradle.kts +++ b/kafka-client-examples/e2e-test/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { implementation(libs.guava) implementation(libs.slf4j.log4j2) implementation(libs.bundles.scylla) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.core) testImplementation(testlibs.bundles.base) diff --git a/kafka-client/build.gradle.kts b/kafka-client/build.gradle.kts index 08efad9c3..f93de5e66 100644 --- a/kafka-client/build.gradle.kts +++ b/kafka-client/build.gradle.kts @@ -137,7 +137,7 @@ dependencies { implementation("dev.responsive:controller-api:0.16.0") implementation(libs.bundles.scylla) implementation(libs.bundles.commons) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.mongodb.driver.sync) implementation(libs.bundles.otel) implementation(libs.bundles.grpc) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 68cabc143..acec9c989 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -60,6 +60,7 @@ import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService; import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService; import dev.responsive.kafka.internal.metrics.exporter.otel.OtelMetricsService; +import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import dev.responsive.kafka.internal.utils.SessionClients; import dev.responsive.kafka.internal.utils.SessionUtil; @@ -471,6 +472,11 @@ public Params build() { ) : innerClientSupplier; this.oeReporter = reporter(responsiveConfig, license); + final var snapshotCtx = KafkaStreamsSnapshotContext.create( + responsiveConfig, + streamsConfig, + topology.describe() + ); this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier( delegateKafkaClientSupplier, responsiveConfig, @@ -478,7 +484,8 @@ public Params build() { storeRegistry, metrics, oeReporter, - storageBackend + storageBackend, + snapshotCtx ); final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 6916380d2..6c1e9fd4b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -19,6 +19,7 @@ import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.internal.db.partitioning.Murmur3Hasher; +import dev.responsive.kafka.internal.snapshot.SnapshotSupport; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -329,6 +330,24 @@ public class ResponsiveConfig extends AbstractConfig { private static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC = "How often to report origin event usage information. This should generally not be changed in production environments"; + // ------------------ Snapshot Configs + public static final String SNAPSHOTS_CONFIG = "responsive.snapshots"; + private static final String SNAPSHOTS_DEFAULT = SnapshotSupport.DISABLED.name(); + private static final String SNAPSHOTS_DOC + = "Set to LOCAL enable snapshot support. This feature is experimental"; + + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX + = "responsive.snapshots.local.store.topic.suffix"; + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT + = "snapshots"; + private static final String SNAPSHOT_LOCAL_STORE_TOPIC_DOC + = "The topic to store snapshot metadata on when using local snapshot coordination."; + + public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR + = "responsive.snapshots.local.store.topic.replication.factor"; + public static final short SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT = (short) 3; + private static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC + = "replication factor for the snapshot store topic when using local snapshot coordination."; // ------------------ StreamsConfig overrides ---------------------- // These configuration values are required by Responsive, and a ConfigException will @@ -660,6 +679,29 @@ public class ResponsiveConfig extends AbstractConfig { ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT, Importance.LOW, ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC + ).define( + SNAPSHOTS_CONFIG, + Type.STRING, + SNAPSHOTS_DEFAULT, + ConfigDef.CaseInsensitiveValidString.in( + Arrays.stream(SnapshotSupport.values()) + .map(Enum::name) + .toArray(String[]::new) + ), + Importance.LOW, + SNAPSHOTS_DOC + ).define( + SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX, + Type.STRING, + SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT, + Importance.LOW, + SNAPSHOT_LOCAL_STORE_TOPIC_DOC + ).define( + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR, + Type.SHORT, + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT, + Importance.LOW, + SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC ); /** diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java index 8c944ac8e..e3832a5b1 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplier.java @@ -22,6 +22,8 @@ import dev.responsive.kafka.internal.metrics.EndOffsetsPoller; import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener; import dev.responsive.kafka.internal.metrics.ResponsiveMetrics; +import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext; +import dev.responsive.kafka.internal.snapshot.SnapshotCommitListener; import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import java.io.Closeable; import java.io.IOException; @@ -30,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import java.util.function.Function; import org.apache.kafka.clients.CommonClientConfigs; @@ -68,6 +71,7 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier private final boolean eos; private final StorageBackend storageBackend; private final boolean repairRestoreOffsetOutOfRange; + private final Optional snapshotCtx; public ResponsiveKafkaClientSupplier( final KafkaClientSupplier clientSupplier, @@ -76,7 +80,8 @@ public ResponsiveKafkaClientSupplier( final ResponsiveStoreRegistry storeRegistry, final ResponsiveMetrics metrics, final OriginEventReporter oeReporter, - final StorageBackend storageBackend + final StorageBackend storageBackend, + final Optional snapshotCtx ) { this( new Factories() { @@ -87,7 +92,8 @@ public ResponsiveKafkaClientSupplier( metrics, storageBackend, oeReporter, - responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG) + responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG), + snapshotCtx ); } @@ -99,7 +105,8 @@ public ResponsiveKafkaClientSupplier( final ResponsiveMetrics metrics, final StorageBackend storageBackend, final OriginEventReporter oeReporter, - final boolean repairRestoreOffsetOutOfRange + final boolean repairRestoreOffsetOutOfRange, + final Optional snapshotCtx ) { this.factories = factories; this.wrapped = wrapped; @@ -117,6 +124,7 @@ public ResponsiveKafkaClientSupplier( this ); applicationId = configs.getString(StreamsConfig.APPLICATION_ID_CONFIG); + this.snapshotCtx = snapshotCtx; } @Override @@ -138,7 +146,8 @@ public Producer getProducer(final Map config) { config, endOffsetsPoller, storeRegistry, - factories + factories, + snapshotCtx ); return factories.createResponsiveProducer( (String) config.get(CommonClientConfigs.CLIENT_ID_CONFIG), @@ -169,7 +178,8 @@ public Consumer getConsumer(final Map config) { config, endOffsetsPoller, storeRegistry, - factories + factories, + snapshotCtx ); // TODO: the end offsets poller call is kind of heavy for a synchronized block return factories.createResponsiveConsumer( @@ -253,7 +263,8 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( final Map configs, final EndOffsetsPoller endOffsetsPoller, final ResponsiveStoreRegistry storeRegistry, - final Factories factories + final Factories factories, + final Optional snapshotContext ) { if (threadListeners.containsKey(threadId)) { final var tl = threadListeners.get(threadId); @@ -262,6 +273,16 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( } final var offsetRecorder = factories.createOffsetRecorder(eos, threadId); final var originEventRecorder = factories.createOriginEventRecorder(oeReporter, eos); + final var storeCommitListener = new StoreCommitListener(storeRegistry, offsetRecorder); + final var snapshotCommitListener = snapshotContext.map( + ctx -> new SnapshotCommitListener( + ctx.orchestrator(), + ctx.generationStorage(), + storeRegistry, + ctx.topologyTaskInfo(), + offsetRecorder + ) + ); final var tl = new ReferenceCounted<>( String.format("ListenersForThread(%s)", threadId), new ListenersForThread( @@ -272,9 +293,10 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread( threadId, offsetRecorder ), - new StoreCommitListener(storeRegistry, offsetRecorder), + storeCommitListener, endOffsetsPoller.addForThread(threadId), - originEventRecorder + originEventRecorder, + snapshotCommitListener ) ); threadListeners.put(threadId, tl); @@ -306,6 +328,7 @@ private static class ListenersForThread implements Closeable { final StoreCommitListener storeCommitListener; final EndOffsetsPoller.Listener endOffsetsPollerListener; final OriginEventTracker originEventTracker; + final Optional snapshotCommitListener; public ListenersForThread( final String threadId, @@ -313,7 +336,8 @@ public ListenersForThread( final MetricPublishingCommitListener committedOffsetMetricListener, final StoreCommitListener storeCommitListener, final EndOffsetsPoller.Listener endOffsetsPollerListener, - final OriginEventTracker originEventTracker + final OriginEventTracker originEventTracker, + final Optional snapshotCommitListener ) { this.threadId = threadId; this.offsetRecorder = offsetRecorder; @@ -321,12 +345,14 @@ public ListenersForThread( this.storeCommitListener = storeCommitListener; this.endOffsetsPollerListener = endOffsetsPollerListener; this.originEventTracker = originEventTracker; + this.snapshotCommitListener = snapshotCommitListener; } @Override public void close() { committedOffsetMetricListener.close(); endOffsetsPollerListener.close(); + snapshotCommitListener.ifPresent(SnapshotCommitListener::close); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java new file mode 100644 index 000000000..4cea01ea0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/GenerationStorage.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Interface that abstracts away how we lookup a task's current generation. + * For synchronized snapshots we want to do one of the following so we can record a task's + * generation metadata transactionally alongside the rows with new generation markers when + * we bump generations: + * (1) store this information in the offset metadata as part of the transaction that + * bumps the generation + * (2) if/when kafka supports 2pc store this information in another store (like rs3 or + * the snapshot store) + * + * For simple uncoordinated snapshots we'll support looking up the generation by looking + * at the snapshot's state in the generation store. + */ +public interface GenerationStorage { + long lookupGeneration(final TaskId taskId); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java new file mode 100644 index 000000000..86f8c86dc --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/KafkaStreamsSnapshotContext.java @@ -0,0 +1,75 @@ +package dev.responsive.kafka.internal.snapshot; + +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import java.util.Optional; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TopologyDescription; + +public class KafkaStreamsSnapshotContext { + private final SnapshotOrchestrator orchestrator; + private final GenerationStorage generationStorage; + private final TopologyTaskInfo topologyTaskInfo; + private final SnapshotStore snapshotStore; + + private KafkaStreamsSnapshotContext( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final SnapshotStore snapshotStore, + final TopologyTaskInfo topologyTaskInfo + ) { + this.orchestrator = orchestrator; + this.generationStorage = generationStorage; + this.snapshotStore = snapshotStore; + this.topologyTaskInfo = topologyTaskInfo; + } + + public SnapshotOrchestrator orchestrator() { + return orchestrator; + } + + public GenerationStorage generationStorage() { + return generationStorage; + } + + public TopologyTaskInfo topologyTaskInfo() { + return topologyTaskInfo; + } + + public static Optional create( + final ResponsiveConfig config, + final StreamsConfig streamsConfig, + final TopologyDescription topologyDescription + ) { + final SnapshotSupport support = SnapshotSupport + .valueOf(config.getString(ResponsiveConfig.SNAPSHOTS_CONFIG)); + switch (support) { + case LOCAL: { + final SnapshotStore store = TopicSnapshotStore.create(config.originals()); + final TopologyTaskInfo tti; + try (final Admin admin = Admin.create(config.originals())) { + tti = TopologyTaskInfo.forTopology( + topologyDescription, + admin + ); + } + final SnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + store, + tti.partitionsByTask().keySet() + ); + final GenerationStorage generationStorage = new SnapshotStoreBasedGenerationStorage(store); + return Optional.of(new KafkaStreamsSnapshotContext( + orchestrator, + generationStorage, + store, + tti + )); + } + default: { + return Optional.empty(); + } + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java new file mode 100644 index 000000000..8ce602b8d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotApi.java @@ -0,0 +1,41 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.Objects; + +/** + * Implementation of SnapshotApi that directly interacts with the Snapshot Store rather + * than calling out to an API service. + */ +public class LocalSnapshotApi implements SnapshotApi { + private final SnapshotStore snapshotStore; + + public LocalSnapshotApi(final SnapshotStore snapshotStore) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + } + + @Override + public Snapshot createSnapshot() { + return snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently in progress"); + } + return snapshot.nextSnapshot(); + }); + } + + @Override + public Snapshot getCurrentSnapshot() { + return snapshotStore.currentSnapshot(true); + } + + @Override + public List getSnapshots() { + return snapshotStore.listSnapshots(true); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java new file mode 100644 index 000000000..3282266ed --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestrator.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * An implementation of the orchestrator that runs within the application and interacts + * directly with the snapshot store. + */ +public class LocalSnapshotOrchestrator implements SnapshotOrchestrator { + private final SnapshotStore snapshotStore; + private final Set allTasks; + + public LocalSnapshotOrchestrator( + final SnapshotStore snapshotStore, + final Set allTasks + ) { + this.snapshotStore = Objects.requireNonNull(snapshotStore); + this.allTasks = Objects.requireNonNull(allTasks); + } + + @Override + public long getCurrentGeneration() { + return snapshotStore.currentSnapshot(false).generation(); + } + + @Override + public void reportTaskSnapshotMetadata( + final long generation, + final List taskSnapshots + ) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + // check that we're still working on this snapshot + if (snapshot.generation() != generation) { + throw new RuntimeException( + String.format("generation too old: %d > %d", snapshot.generation(), generation)); + } + if (!snapshot.state().equals(Snapshot.State.CREATED)) { + throw new RuntimeException("Snapshot is currently completed. Cannot update"); + } + + // check that for all the specified tasks, we either haven't collected its metadata + // or the metadata is the same + final List newlyCompletedTaskSnapshots = new ArrayList<>(); + for (final var taskSnapshot : taskSnapshots) { + final var found = snapshot.taskSnapshots() + .stream() + .filter(s -> s.taskId().equals(taskSnapshot.taskId())) + .collect(Collectors.toList()); + if (found.size() > 1) { + throw new IllegalStateException( + "found multiple snapshots for task " + taskSnapshot.taskId()); + } + if (found.isEmpty()) { + newlyCompletedTaskSnapshots.add(taskSnapshot); + } else if (!found.get(0).equals(taskSnapshot)) { + throw new IllegalStateException( + "found conflicting snapshots for task" + taskSnapshot.taskId()); + } + } + + // if we've collected snapshots for all tasks, mark the snapshot as completed + final Set completedTasks = Stream.concat( + newlyCompletedTaskSnapshots.stream().map(Snapshot.TaskSnapshotMetadata::taskId), + snapshot.taskSnapshots().stream().map(Snapshot.TaskSnapshotMetadata::taskId) + ).collect(Collectors.toSet()); + Snapshot.State state = snapshot.state(); + if (completedTasks.equals(this.allTasks)) { + state = Snapshot.State.COMPLETED; + } + + return snapshot.withTaskSnapshots(newlyCompletedTaskSnapshots, state); + }); + } + + @Override + public void failSnapshot(long snapshotGeneration) { + snapshotStore.updateCurrentSnapshot(snapshot -> { + if (snapshot.generation() != snapshotGeneration) { + // todo: do something more reasonable here + throw new RuntimeException("Generation mismatch"); + } + if (snapshot.state().equals(Snapshot.State.COMPLETED)) { + throw new RuntimeException("Cannot fail completed snapshot"); + } + return snapshot.withStateFailed(); + }); + } + + @Override + public void close() { + snapshotStore.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java new file mode 100644 index 000000000..4b5aa7de2 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/Snapshot.java @@ -0,0 +1,272 @@ +package dev.responsive.kafka.internal.snapshot; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.streams.processor.TaskId; + +/** + * POJO representing an application snapshot + */ +public class Snapshot { + public enum State { + CREATED, + COMPLETED, + // TODO: add state between CREATED and COMPLETED to reflect a snapshot + // that needs to be finalized by converting expiring checkpoints to + // nonexpiring checkpoints + FAILED + // TODO: add state beteen CREATED and FAILED to reflect a snapshot that has + // failed but is not yet cleaned up + } + + private final Instant createdAt; + private final long generation; + private final State state; + private final List taskSnapshots; + + public static Snapshot initial() { + return new Snapshot( + Instant.EPOCH, + 0, + State.COMPLETED, + List.of() + ); + } + + @JsonCreator + public Snapshot( + @JsonProperty("createdAt") final Instant createdAt, + @JsonProperty("generation") final long generation, + @JsonProperty("state") final State state, + @JsonProperty("taskSnapshots") final List taskSnapshots + ) { + this.createdAt = createdAt; + this.generation = generation; + this.state = state; + this.taskSnapshots = List.copyOf(taskSnapshots); + } + + @JsonProperty("createdAt") + public Instant createdAt() { + return createdAt; + } + + @JsonProperty("generation") + public long generation() { + return generation; + } + + @JsonProperty("state") + public State state() { + return state; + } + + @JsonProperty("taskSnapshots") + public List taskSnapshots() { + return taskSnapshots; + } + + public Snapshot nextSnapshot() { + return new Snapshot( + Instant.now(), + generation + 1, + State.CREATED, + List.of() + ); + } + + public Snapshot withTaskSnapshots(List taskSnapshots, State state) { + return new Snapshot( + createdAt, + generation, + state, + Stream.concat(this.taskSnapshots.stream(), taskSnapshots.stream()) + .collect(Collectors.toList()) + ); + } + + public Snapshot withStateFailed() { + return new Snapshot( + createdAt, + generation, + State.FAILED, + taskSnapshots + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof Snapshot)) { + return false; + } + final Snapshot snapshot = (Snapshot) o; + return generation == snapshot.generation + && Objects.equals(createdAt, snapshot.createdAt) + && state == snapshot.state && Objects.equals(taskSnapshots, snapshot.taskSnapshots); + } + + @Override + public int hashCode() { + return Objects.hash(createdAt, generation, state, taskSnapshots); + } + + @Override + public String toString() { + return "Snapshot{" + + "createdAt=" + createdAt + + ", generation=" + generation + + ", state=" + state + + ", taskSnapshots=" + taskSnapshots + + '}'; + } + + public static class TaskSnapshotMetadata { + private final TaskId taskId; + private final List committedOffsets; + private final Map checkpoints; + private final Instant timestamp; + + @JsonCreator + public TaskSnapshotMetadata( + @JsonProperty("taskId") final TaskId taskId, + @JsonProperty("committedOffsets") final List committedOffsets, + @JsonProperty("checkpoints") final Map checkpoints, + @JsonProperty("timestamp") final Instant timestamp + ) { + this.taskId = taskId; + this.committedOffsets = List.copyOf(committedOffsets); + this.checkpoints = Map.copyOf(checkpoints); + this.timestamp = Objects.requireNonNull(timestamp); + } + + @JsonProperty("taskId") + public TaskId taskId() { + return taskId; + } + + @JsonProperty("committedOffsets") + public List committedOffsets() { + return committedOffsets; + } + + @JsonProperty("checkpoints") + public Map checkpoints() { + return checkpoints; + } + + @JsonProperty("timestamp") + public Instant timestamp() { + return timestamp; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TaskSnapshotMetadata)) { + return false; + } + final TaskSnapshotMetadata that = (TaskSnapshotMetadata) o; + if (checkpoints == null ^ that.checkpoints == null) { + return false; + } + if (checkpoints != null) { + if (checkpoints.size() != that.checkpoints.size()) { + return false; + } + for (final Map.Entry entry : checkpoints.entrySet()) { + if (!Arrays.equals(entry.getValue(), that.checkpoints.get(entry.getKey()))) { + return false; + } + } + } + return Objects.equals(taskId, that.taskId) + && Objects.equals(committedOffsets, that.committedOffsets); + } + + @Override + public int hashCode() { + return Objects.hash(taskId, committedOffsets, checkpoints); + } + + @Override + public String toString() { + return "TaskSnapshotMetadata{" + + "taskId=" + taskId + + ", committedOffsets=" + committedOffsets + + ", checkpoints=" + checkpoints + + '}'; + } + } + + public static class CommittedOffset { + private final String topic; + private final int partition; + private final long offset; + + @JsonCreator + public CommittedOffset( + @JsonProperty("topic") final String topic, + @JsonProperty("partition") final int partition, + @JsonProperty("offset") final long offset + ) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + + @JsonProperty("topic") + public String topic() { + return topic; + } + + @JsonProperty("partition") + public int partition() { + return partition; + } + + @JsonProperty("offset") + public long offset() { + return offset; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CommittedOffset)) { + return false; + } + final CommittedOffset that = (CommittedOffset) o; + return partition == that.partition + && offset == that.offset + && Objects.equals(topic, that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(topic, partition, offset); + } + + @Override + public String toString() { + return "CommittedOffset{" + + "topic='" + topic + '\'' + + ", partition=" + partition + + ", offset=" + offset + + '}'; + } + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java new file mode 100644 index 000000000..f0c2a3bf7 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotApi.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * Interface for interacting with Snapshots from outside an application. Supports + * reading the current/past snapshots, and creating a new snapshot. + */ +public interface SnapshotApi extends AutoCloseable { + Snapshot createSnapshot(); + + List getSnapshots(); + + Snapshot getCurrentSnapshot(); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java new file mode 100644 index 000000000..b60fde43d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListener.java @@ -0,0 +1,180 @@ +package dev.responsive.kafka.internal.snapshot; + +import com.google.common.annotations.VisibleForTesting; +import dev.responsive.kafka.internal.clients.OffsetRecorder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapshotCommitListener { + private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommitListener.class); + + private final SnapshotOrchestrator orchestrator; + private final GenerationStorage generationStorage; + private final ResponsiveStoreRegistry storeRegistry; + private final TopologyTaskInfo topologyTaskInfo; + private final Supplier clock; + + public SnapshotCommitListener( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final ResponsiveStoreRegistry storeRegistry, + final TopologyTaskInfo topologyTaskInfo, + final OffsetRecorder offsetRecorder + ) { + this( + orchestrator, + generationStorage, + storeRegistry, + topologyTaskInfo, + offsetRecorder, + Instant::now + ); + } + + SnapshotCommitListener( + final SnapshotOrchestrator orchestrator, + final GenerationStorage generationStorage, + final ResponsiveStoreRegistry storeRegistry, + final TopologyTaskInfo topologyTaskInfo, + final OffsetRecorder offsetRecorder, + final Supplier clock + ) { + this.orchestrator = Objects.requireNonNull(orchestrator); + this.generationStorage = Objects.requireNonNull(generationStorage); + this.storeRegistry = Objects.requireNonNull(storeRegistry); + this.topologyTaskInfo = Objects.requireNonNull(topologyTaskInfo); + this.clock = Objects.requireNonNull(clock); + offsetRecorder.addCommitCallback(this::onCommit); + } + + @VisibleForTesting + void onCommit( + final String threadId, + final Map recordedCommittedOffsetsFromCommit, + final Map writtenOffsetsFromCommit + ) { + try { + maybeTakeTaskSnapshots( + threadId, + recordedCommittedOffsetsFromCommit, + writtenOffsetsFromCommit + ); + } catch (final RuntimeException e) { + LOG.warn("error taking task snapshots", e); + } + } + + private void maybeTakeTaskSnapshots( + final String threadId, + final Map recordedCommittedOffsetsFromCommit, + final Map writtenOffsetsFromCommit + ) { + final Map committedOffsetsFromCommit = recordedCommittedOffsetsFromCommit + .entrySet() + .stream() + .collect(Collectors.toMap(e -> e.getKey().getPartition(), Map.Entry::getValue)); + // get all tasks represented in the commit and snapshot them + // TODO: this means we won't ever snapshot a task if it's idle, or if its partitions + // don't commit together. This means that (in the former case for certain) we + // we may not ever finish the snapshot. We should do a few things: + // (1) trigger this logic when either commits happen or when we observe the app is + // totally idle. Alternatively we could trigger a commit for all tasks when the + // generation bumps, but that requires a kip to trigger commits. + // (2) derive the set of tasks to snapshot from the assignment rather than whats + // in the commit. + // (3) to do the above we need to make sure to filter out restoring tasks + final Set tasksInCommit = committedOffsetsFromCommit.keySet() + .stream() + .map(p -> topologyTaskInfo.tasksByPartition().get(p)) + .collect(Collectors.toSet()); + + final long generation = orchestrator.getCurrentGeneration(); + + // go task by task + final List taskSnapshots = new ArrayList<>(tasksInCommit.size()); + for (final TaskId taskId : tasksInCommit) { + final long taskGeneration = generationStorage.lookupGeneration(taskId); + if (taskGeneration >= generation) { + // we already have a snapshot for this task, skip + continue; + } + final List partitionsInTask = topologyTaskInfo.partitionsByTask().get(taskId); + final List snapshotOffsets + = new ArrayList<>(partitionsInTask.size()); + final Set partitionsNotInCommit = new HashSet<>(); + for (final TopicPartition p : partitionsInTask) { + if (committedOffsetsFromCommit.containsKey(p)) { + snapshotOffsets.add(new Snapshot.CommittedOffset( + p.topic(), + p.partition(), + committedOffsetsFromCommit.get(p) + )); + } else { + partitionsNotInCommit.add(p); + } + } + if (!partitionsNotInCommit.isEmpty()) { + // we can only take the snapshot if the commit includes all partitions from the task + // if it doesn't we don't know the source offset. In that case, wait for a later + // commit + LOG.warn("commit missing partitions from task {}: {}. Skip snapshot for this commit", + taskId, + partitionsNotInCommit.stream() + .map(TopicPartition::toString) + .collect(Collectors.joining(", ")) + ); + continue; + } + // checkpoint stores + final Map storeCheckpoints = new HashMap<>(); + final var stores = storeRegistry.getRegisteredStoresForTask(taskId, threadId); + for (final var store : stores) { + // get the store's changelog offset from either the written or committed offsets + Optional changelogOffset; + if (committedOffsetsFromCommit.containsKey(store.changelogTopicPartition())) { + // for stores sourced from a source topic + changelogOffset + = Optional.of(committedOffsetsFromCommit.get(store.changelogTopicPartition())); + } else if (writtenOffsetsFromCommit.containsKey(store.changelogTopicPartition())) { + // for stores with an internal changelog topic + changelogOffset + = Optional.of(writtenOffsetsFromCommit.get(store.changelogTopicPartition())); + } else { + // the task's source offset advanced, but it didn't write anything + changelogOffset = Optional.empty(); + } + final var checkpoint = store.callbacks().checkpoint(changelogOffset); + storeCheckpoints.put(store.storeName(), checkpoint); + } + final Snapshot.TaskSnapshotMetadata tsm = new Snapshot.TaskSnapshotMetadata( + taskId, + snapshotOffsets, + storeCheckpoints, + clock.get() + ); + taskSnapshots.add(tsm); + } + if (!taskSnapshots.isEmpty()) { + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + } + } + + public void close() { + orchestrator.close(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java new file mode 100644 index 000000000..4482c7ba0 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrator.java @@ -0,0 +1,40 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; + +/** + * SnapshotOrchestrator is responsible for coordinating snapshot execution by initiating + * task-level snapshots and updating the snapshot's state as they complete. + */ +public interface SnapshotOrchestrator { + /** + * Gets the application's current snapshot generation + * + * @return The current snapshot generation. + */ + long getCurrentGeneration(); + + /** + * Called by the stream thread to report task snapshot metadata for a given task. + * + * @param snapshotGeneration The generation of the task snapshot(s) being reported. + * @param metadata The task snapshot(s) being reported. + */ + void reportTaskSnapshotMetadata( + long snapshotGeneration, + List metadata + ); + + /** + * Called by the stream thread to report a failed task snapshot. This should only + * be called to report terminal failures. + * + * @param snapshotGeneration The generation of the snapshot to fail. + */ + void failSnapshot(long snapshotGeneration); + + /** + * Clean up any resources held by the orchestrator + */ + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java new file mode 100644 index 000000000..e9b76b2fd --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStore.java @@ -0,0 +1,39 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.List; +import java.util.function.Function; + +/** + * A store for the Snapshot metadata for a given applicaiton + */ +public interface SnapshotStore extends AutoCloseable { + /** + * Returns the current snapshot + * + * @param block if true, then the call blocks until the store has observed the latest update + * @return the current snapshot of the application + */ + Snapshot currentSnapshot(boolean block); + + /** + * List all snapshots + * + * @param block if true, then the call will block until the store has observed the latest update + * @return a list of all snapshots of the application + */ + List listSnapshots(boolean block); + + /** + * Updates a snapshot given an updater function. The update is guaranteed to be isolated from + * other concurrent updates. + * + * @param updater is passed the current snapshot and returns an updated snapshot. + * @return the updated snapshot + */ + Snapshot updateCurrentSnapshot( + Function updater + ); + + @Override + void close(); +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java new file mode 100644 index 000000000..572ff9a86 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorage.java @@ -0,0 +1,33 @@ +package dev.responsive.kafka.internal.snapshot; + +import org.apache.kafka.streams.processor.TaskId; + +/** + * Computes a task's generation from the current Snapshot metadata. If the current snapshot + * contains a task snapshot for a task, the task is considered to be on the current snapshot's + * generation. Otherwise, it's considered to be on the previous generation. + */ +public class SnapshotStoreBasedGenerationStorage implements GenerationStorage { + private final SnapshotStore snapshotStore; + + public SnapshotStoreBasedGenerationStorage(final SnapshotStore snapshotStore) { + this.snapshotStore = snapshotStore; + } + + @Override + public long lookupGeneration(final TaskId taskId) { + final var currentSnapshot = snapshotStore.currentSnapshot(false); + // todo: move to a fn + if (currentSnapshot.state() == Snapshot.State.COMPLETED + || currentSnapshot.state() == Snapshot.State.FAILED) { + return currentSnapshot.generation(); + } + if (currentSnapshot.taskSnapshots().stream() + .anyMatch(s -> s.taskId().equals(taskId))) { + return currentSnapshot.generation(); + } + // this task has not completed. set generation to previous generation + // todo: make previous generation a field rather than computing it here + return currentSnapshot.generation() - 1; + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java new file mode 100644 index 000000000..4a0bc8049 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/SnapshotSupport.java @@ -0,0 +1,7 @@ +package dev.responsive.kafka.internal.snapshot; + +public enum SnapshotSupport { + DISABLED, + LOCAL, + // eventually add an option for responsive platform +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java new file mode 100644 index 000000000..0ff5fa0a5 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecord.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecord { + private final SnapshotStoreRecordType type; + private final Snapshot snapshot; + + @JsonCreator + public SnapshotStoreRecord( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("snapshot") final Snapshot snapshot + ) { + this.type = Objects.requireNonNull(type); + this.snapshot = snapshot; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("snapshot") + public Optional snapshot() { + return Optional.ofNullable(snapshot); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecord)) { + return false; + } + final SnapshotStoreRecord that = (SnapshotStoreRecord) o; + return type == that.type && Objects.equals(snapshot, that.snapshot); + } + + @Override + public int hashCode() { + return Objects.hash(type, snapshot); + } + + @Override + public String toString() { + return "SnapshotStoreRecord{" + + "type=" + type + + ", snapshot=" + snapshot + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java new file mode 100644 index 000000000..161fa3781 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordKey.java @@ -0,0 +1,56 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; +import java.util.Optional; + +public class SnapshotStoreRecordKey { + + private final SnapshotStoreRecordType type; + private final Long generation; + + @JsonCreator + public SnapshotStoreRecordKey( + @JsonProperty("type") final SnapshotStoreRecordType type, + @JsonProperty("generation") final Long generation + ) { + this.type = Objects.requireNonNull(type); + this.generation = generation; + } + + @JsonProperty("type") + public SnapshotStoreRecordType type() { + return type; + } + + @JsonProperty("generation") + public Optional generation() { + return Optional.ofNullable(generation); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SnapshotStoreRecordKey)) { + return false; + } + final SnapshotStoreRecordKey that = (SnapshotStoreRecordKey) o; + return type == that.type && Objects.equals(generation, that.generation); + } + + @Override + public int hashCode() { + return Objects.hash(type, generation); + } + + @Override + public String toString() { + return "SnapshotStoreRecordKey{" + + "type=" + type + + ", generation=" + generation + + '}'; + } +} \ No newline at end of file diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java new file mode 100644 index 000000000..d14cfc6bf --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreRecordType.java @@ -0,0 +1,5 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +public enum SnapshotStoreRecordType { + Snapshot +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java new file mode 100644 index 000000000..df8483255 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdes.java @@ -0,0 +1,99 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.io.IOException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.TaskId; + +public class SnapshotStoreSerdes { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + static { + MAPPER.registerModule(new JavaTimeModule()); + MAPPER.registerModule(new Jdk8Module()); + final SimpleModule module = new SimpleModule(); + module.addSerializer(TaskId.class, new TaskIDJacksonSerializer()); + module.addDeserializer(TaskId.class, new TaskIDJacksonDeserializer()); + MAPPER.registerModule(module); + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_ABSENT); + } + + public static class SnapshotStoreRecordKeySerializer + implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecordKey data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordKeyDeserializer + implements Deserializer { + @Override + public SnapshotStoreRecordKey deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecordKey.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordSerializer implements Serializer { + @Override + public byte[] serialize(String topic, SnapshotStoreRecord data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SnapshotStoreRecordDeserializer implements Deserializer { + @Override + public SnapshotStoreRecord deserialize(String topic, byte[] data) { + try { + return MAPPER.readValue(data, SnapshotStoreRecord.class); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class TaskIDJacksonSerializer extends JsonSerializer { + @Override + public void serialize( + final TaskId taskId, + final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider + ) throws IOException { + jsonGenerator.writeString(taskId.toString()); + } + } + + public static class TaskIDJacksonDeserializer extends JsonDeserializer { + + @Override + public TaskId deserialize( + final JsonParser jsonParser, + final DeserializationContext deserializationContext + ) throws IOException { + return TaskId.parse(jsonParser.getValueAsString()); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java new file mode 100644 index 000000000..a26f6309f --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/SynchronizedConsumerPosition.java @@ -0,0 +1,20 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +class SynchronizedConsumerPosition { + private long position = 0; + + synchronized void waitTillConsumerPosition(long targetPosition) { + while (position < targetPosition) { + try { + wait(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + synchronized void updateConsumerPosition(long position) { + this.position = position; + notifyAll(); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java new file mode 100644 index 000000000..3274b15c8 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/snapshot/topic/TopicSnapshotStore.java @@ -0,0 +1,273 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.internal.snapshot.Snapshot; +import dev.responsive.kafka.internal.snapshot.SnapshotStore; +import java.time.Duration; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.streams.StreamsConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implements SnapshotStore by storing Snapshot metadata for a single application on a Kafka topic. + * The topic has a single partition. Each record represents an update to a single snapshot of an + * application. The key includes the generation of the snapshot the update applies to. The + * value contains an instance of Snapshot. + */ +public class TopicSnapshotStore implements SnapshotStore { + private static final Logger LOG = LoggerFactory.getLogger(TopicSnapshotStore.class); + + private final TopicPartition topicPartition; + private final Supplier> producerSupplier; + private final Supplier> consumerSupplier; + private final Thread readerThread; + private final AtomicBoolean running = new AtomicBoolean(true); + private final AtomicReference currentSnapshot + = new AtomicReference<>(Snapshot.initial()); + private final ConcurrentMap snapshots = new ConcurrentHashMap<>(); + private final SynchronizedConsumerPosition consumedOffset = new SynchronizedConsumerPosition(); + private final Consumer endOffsetConsumer; + + public TopicSnapshotStore( + final String topic, + final short replicas, + final Supplier> consumerSupplier, + final Supplier> producerSupplier, + final Admin admin + ) { + this.topicPartition = new TopicPartition(topic, 0); + this.producerSupplier = producerSupplier; + this.consumerSupplier = consumerSupplier; + createTopic(admin, replicas); + final var consumer = consumerSupplier.get(); + consumer.assign(List.of(topicPartition)); + consumer.seekToBeginning(List.of(topicPartition)); + readerThread = new Thread(() -> runReader( + consumer, + currentSnapshot, + snapshots, + consumedOffset, + running + )); + readerThread.start(); + this.endOffsetConsumer = consumerSupplier.get(); + this.endOffsetConsumer.assign(List.of(topicPartition)); + waitTillConsumedAll(); + } + + public static TopicSnapshotStore create(final Map config) { + final ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(config); + final String applicationId = config.get(StreamsConfig.APPLICATION_ID_CONFIG).toString(); + final String topicSuffix = responsiveConfig + .getString(ResponsiveConfig.SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX); + final String snapshotStoreTopic + = String.format("_responsive-%s-%s", applicationId, topicSuffix); + return create( + snapshotStoreTopic, + responsiveConfig.getShort(ResponsiveConfig.SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR), + config + ); + } + + public static TopicSnapshotStore create( + final String topic, + final short replicas, + final Map config + ) { + final Map consumerConfig = new HashMap<>(config); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, null); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + consumerConfig.put( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerConfig = new HashMap<>(config); + final String appId = config.get(StreamsConfig.APPLICATION_ID_CONFIG).toString(); + producerConfig.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + String.format("__responsive-%s-snapshot-store", appId) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerConfig, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + try (final Admin admin = Admin.create(config)) { + return new TopicSnapshotStore( + topic, replicas, consumerSupplier, producerSupplier, admin); + } + } + + @Override + public Snapshot currentSnapshot(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return currentSnapshot.get(); + } + + @Override + public List listSnapshots(boolean block) { + if (block) { + waitTillConsumedAll(); + } + return snapshots.values().stream() + .sorted(Comparator.comparingLong(Snapshot::generation)) + .collect(Collectors.toList()); + } + + @Override + public Snapshot updateCurrentSnapshot( + final Function updater + ) { + final Future sendFut; + final Snapshot updated; + try (final var producer = producerSupplier.get()) { + producer.initTransactions(); + producer.beginTransaction(); + try { + waitTillConsumedAll(); + updated = updater.apply(currentSnapshot.get()); + final var record = createRecord(updated); + sendFut = producer.send(record); + producer.commitTransaction(); + } catch (final RuntimeException e) { + producer.abortTransaction(); + throw e; + } + } + final RecordMetadata recordMetadata; + try { + recordMetadata = sendFut.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + waitTillConsumerPosition(recordMetadata.offset() + 1); + return updated; + } + + @Override + public void close() { + try { + endOffsetConsumer.close(); + } catch (final RuntimeException e) { + LOG.warn("error closing end offset consumer", e); + } + running.set(false); + try { + readerThread.join(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + private ProducerRecord createRecord( + final Snapshot snapshot + ) { + return new ProducerRecord<>( + topicPartition.topic(), + topicPartition.partition(), + new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, snapshot.generation()), + new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot) + ); + } + + private void createTopic(final Admin admin, final short replicas) { + try { + final var result = admin.createTopics(List.of( + new NewTopic(topicPartition.topic(), 1, replicas) + )); + result.all().get(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } catch (final ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + LOG.info("snapshot store topic already exists."); + } else { + throw new RuntimeException(e); + } + } catch (final TopicExistsException e) { + LOG.info("snapshot store topic already exists."); + } + } + + private void waitTillConsumedAll() { + waitTillConsumerPosition(endOffset()); + } + + private void waitTillConsumerPosition(final long offset) { + consumedOffset.waitTillConsumerPosition(offset); + } + + private long endOffset() { + synchronized (endOffsetConsumer) { + return endOffsetConsumer.endOffsets(List.of(topicPartition)).get(topicPartition); + } + } + + private void runReader( + final Consumer consumer, + final AtomicReference currentSnapshot, + final ConcurrentMap allSnapshots, + final SynchronizedConsumerPosition consumedOffset, + final AtomicBoolean running + ) { + while (running.get()) { + final ConsumerRecords records + = consumer.poll(Duration.ofMillis(100)); + for (final ConsumerRecord record : records) { + switch (record.key().type()) { + case Snapshot: { + final Snapshot update = record.value().snapshot().get(); + currentSnapshot.getAndUpdate(c -> { + if (update.generation() >= c.generation()) { + return update; + } else { + return c; + } + }); + allSnapshots.put(update.generation(), update); + break; + } + default: { + throw new IllegalStateException(); + } + } + } + consumedOffset.updateConsumerPosition(consumer.position(topicPartition)); + } + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index 85c900400..eb1084cb7 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -48,6 +48,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; @@ -93,6 +94,7 @@ public class CommitBuffer, P> implements Closeable { private final Sensor flushSensor; private final Sensor flushLatencySensor; private final Sensor flushErrorsSensor; + private Optional lastKnownConsumedOffset = Optional.empty(); private KafkaFuture deleteRecordsFuture = KafkaFuture.completedFuture(null); @@ -261,10 +263,12 @@ private static boolean hasSourceTopicChangelog(final String changelogTopicName) } public void put(final K key, final byte[] value, long timestamp) { + lastKnownConsumedOffset = Optional.empty(); buffer.put(key, Result.value(key, value, timestamp)); } public void tombstone(final K key, long timestamp) { + lastKnownConsumedOffset = Optional.empty(); buffer.put(key, Result.tombstone(key, timestamp)); } @@ -382,8 +386,45 @@ private boolean triggerFlush() { return recordsTrigger || bytesTrigger || timeTrigger; } + /** + * Force a flush of the commit buffer. When forcing a flush of the commit buffer, the + * caller can specify the offset in the changelog topic up to which the commit buffer + * has data. If not specified, then the commit buffer will use the last offset specified + * in a call to flush or forceFlush, unless there were writes done since the last flush. + * In that case, this call throws an IllegalStateException. + */ + public void forceFlush(final Optional maybeConsumedOffset) { + if (buffer.sizeInRecords() == 0) { + return; + } + final long consumedOffset; + if (maybeConsumedOffset.isPresent()) { + consumedOffset = maybeConsumedOffset.get(); + } else if (lastKnownConsumedOffset.isPresent()) { + consumedOffset = lastKnownConsumedOffset.get(); + } else { + throw new IllegalStateException( + "tried to force-flush buffer w/ new writes since last flush without specifying" + + " a consumed offset" + ); + } + flush(consumedOffset, true); + } + public void flush(final long consumedOffset) { - if (!triggerFlush()) { + flush(consumedOffset, false); + } + + private void flush(final long consumedOffset, final boolean force) { + if (lastKnownConsumedOffset.isPresent() && lastKnownConsumedOffset.get() > consumedOffset) { + throw new IllegalStateException(String.format( + "tried to flush to an earlier offset (%d) than last known consumed offset (%d", + consumedOffset, + lastKnownConsumedOffset.get() + )); + } + lastKnownConsumedOffset = Optional.of(consumedOffset); + if (!force && !triggerFlush()) { return; } @@ -471,6 +512,7 @@ private void restoreCassandraBatch(final Collection= 0) { + lastKnownConsumedOffset = Optional.of(consumedOffset); doFlush(consumedOffset, records.size()); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java index 9496d33da..df73d01a0 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/PartitionedOperations.java @@ -81,7 +81,6 @@ public static PartitionedOperations create( final StateStoreContext storeContext, final ResponsiveKeyValueParams params ) throws InterruptedException, TimeoutException { - final var log = new LogContext( String.format("store [%s] ", name.kafkaName()) ).logger(PartitionedOperations.class); @@ -177,11 +176,13 @@ public void notifyCommit(long committedOffset) { } @Override - public byte[] checkpoint() { + public byte[] checkpoint(final Optional committedOffset) { + initializedBuffer.forceFlush(committedOffset); return table.checkpoint(); } }, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java index 54b6907c1..be47a1325 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/RemoteWindowOperations.java @@ -169,7 +169,8 @@ public static RemoteWindowOperations create( ? OptionalLong.empty() : OptionalLong.of(restoreStartOffset), buffer::flush, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java index 7d4b50a55..329d7654c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistration.java @@ -14,9 +14,11 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; public final class ResponsiveStoreRegistration { @@ -27,6 +29,7 @@ public final class ResponsiveStoreRegistration { private final TopicPartition changelogTopicPartition; private final StoreCallbacks callbacks; private final String threadId; + private final TaskId taskId; private final InjectedStoreArgs injectedStoreArgs = new InjectedStoreArgs(); private final OptionalLong startOffset; // stored offset during init, (where restore should start) @@ -37,13 +40,15 @@ public ResponsiveStoreRegistration( final TopicPartition changelogTopicPartition, final OptionalLong startOffset, final StoreCallbacks callbacks, - final String threadId + final String threadId, + final TaskId taskId ) { this.storeName = Objects.requireNonNull(storeName); this.changelogTopicPartition = Objects.requireNonNull(changelogTopicPartition); this.startOffset = startOffset; this.callbacks = Objects.requireNonNull(callbacks); this.threadId = Objects.requireNonNull(threadId); + this.taskId = Objects.requireNonNull(taskId); this.log = new LogContext( String.format("changelog [%s]", changelogTopicPartition) ).logger(ResponsiveStoreRegistration.class); @@ -74,10 +79,14 @@ public String threadId() { return threadId; } + public TaskId taskId() { + return taskId; + } + public interface StoreCallbacks { void notifyCommit(long committedOffset); - default byte[] checkpoint() { + default byte[] checkpoint(Optional committedOffset) { throw new UnsupportedOperationException("checkpoints not supported for store type"); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java index c409c99b0..8db52bd77 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistry.java @@ -18,6 +18,7 @@ import java.util.OptionalLong; import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,25 @@ public synchronized OptionalLong getCommittedOffset( .max(); } + private List filterStoresForThread( + final List stores, + final String threadId, + final String context + ) { + if (stores.isEmpty()) { + return stores; + } + final List storesForThread = stores.stream() + .filter(s -> s.threadId().equals(threadId)) + .collect(Collectors.toList()); + if (storesForThread.isEmpty()) { + throw new IllegalStateException(String.format( + "there should always be a store for the thread (%s) if there are stores registered " + + "for this %s", threadId, context)); + } + return stores; + } + public synchronized List getRegisteredStoresForChangelog( final TopicPartition topicPartition ) { @@ -53,18 +73,25 @@ public synchronized List getRegisteredStoresForChan final List storesForTopicPartition = stores.stream() .filter(s -> s.changelogTopicPartition().equals(topicPartition)) .collect(Collectors.toList()); - if (storesForTopicPartition.isEmpty()) { - return storesForTopicPartition; - } - final List storesForThread = storesForTopicPartition.stream() - .filter(s -> s.threadId().equals(threadId)) + return filterStoresForThread( + storesForTopicPartition, + threadId, + String.format("topic partition (%s)", topicPartition) + ); + } + + public synchronized List getRegisteredStoresForTask( + final TaskId taskId, + final String threadId + ) { + final List storesForTask = stores.stream() + .filter(s -> s.taskId().equals(taskId)) .collect(Collectors.toList()); - if (storesForThread.isEmpty()) { - throw new IllegalStateException(String.format( - "there should always be a store for the thread (%s) if there are stores registered " - + "for this topic partition (%s)", threadId, topicPartition)); - } - return storesForThread; + return filterStoresForThread( + storesForTask, + threadId, + String.format("task (%s)", taskId) + ); } public synchronized void registerStore(final ResponsiveStoreRegistration registration) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java index 3d3e554af..7d7df378b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SessionOperationsImpl.java @@ -125,7 +125,8 @@ public static SessionOperationsImpl create( ? OptionalLong.empty() : OptionalLong.of(restoreStartOffset), buffer::flush, - streamThreadId() + streamThreadId(), + storeContext.taskId() ); storeRegistry.registerStore(registration); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java new file mode 100644 index 000000000..c0ed37e7d --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/utils/TopologyTaskInfo.java @@ -0,0 +1,114 @@ +package dev.responsive.kafka.internal.utils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.processor.TaskId; + +public class TopologyTaskInfo { + private final Map tasksByPartition; + private final Map> partitionsByTask; + // todo: add info about internal topics + + @VisibleForTesting + TopologyTaskInfo( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + this.tasksByPartition = Map.copyOf(tasksByPartition); + this.partitionsByTask = Map.copyOf(partitionsByTask); + } + + public Map tasksByPartition() { + return tasksByPartition; + } + + public Map> partitionsByTask() { + return partitionsByTask; + } + + public static TopologyTaskInfo forTopology( + final TopologyDescription topology, + final Admin admin + ) { + final Map tasksByPartition = new HashMap<>(); + final Map> partitionsByTask = new HashMap<>(); + final Set sinkTopics = sinkTopics(topology); + for (final var st : topology.subtopologies()) { + final Set topics = new HashSet<>(); + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Source) { + topics.addAll(((TopologyDescription.Source) node).topicSet()); + if (((TopologyDescription.Source) node).topicPattern() != null) { + throw new TopologyTaskInfoException( + "topic patterns are not supported for snapshots"); + } + } + } + if (!Sets.intersection(topics, sinkTopics).isEmpty()) { + throw new TopologyTaskInfoException( + "internal topics are not supported for snapshots" + ); + } + final Map descriptions; + try { + descriptions = admin.describeTopics(topics).allTopicNames().get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + final Set partitionCounts = descriptions.values().stream() + .map(d -> d.partitions().size()) + .collect(Collectors.toSet()); + if (partitionCounts.size() != 1) { + throw new TopologyTaskInfoException( + "unexpected topics with different partition counts"); + } + final int nPartitions = partitionCounts.iterator().next(); + for (int i = 0; i < nPartitions; i++) { + final var taskId = new TaskId(st.id(), i); + partitionsByTask.put(taskId, new ArrayList<>(nPartitions)); + for (final var topic : topics) { + final var tp = new TopicPartition(topic, i); + tasksByPartition.put(tp, taskId); + partitionsByTask.get(taskId).add(tp); + } + } + } + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private static Set sinkTopics(TopologyDescription topology) { + final Set sinkTopics = new HashSet<>(); + for (final var st : topology.subtopologies()) { + for (final TopologyDescription.Node node : st.nodes()) { + if (node instanceof TopologyDescription.Sink) { + final String sinkTopic = ((TopologyDescription.Sink) node).topic(); + if (sinkTopic == null) { + throw new TopologyTaskInfoException("non-explicit sink topics not yet supported"); + } + sinkTopics.add(sinkTopic); + } + } + } + return sinkTopics; + } + + public static class TopologyTaskInfoException extends RuntimeException { + private static final long serialVersionUID = 0L; + + public TopologyTaskInfoException(final String message) { + super(message); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java index 459911bbc..61800697d 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/ResponsiveKafkaClientSupplierTest.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; @@ -308,7 +309,8 @@ private ResponsiveKafkaClientSupplier supplier( metrics, storageBackend, oeReporter, - false + false, + Optional.empty() ); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java index 10cef9fcf..e3f7f1a13 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/clients/StoreCommitListenerTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -50,14 +51,16 @@ public void setup() { PARTITION1, OptionalLong.of(0), store1Callbacks, - "thread1" + "thread1", + new TaskId(0, 0) )); registry.registerStore(new ResponsiveStoreRegistration( "store2", PARTITION2, OptionalLong.of(0), store2Callbacks, - "thread1" + "thread1", + new TaskId(0, 1) )); commitListener = new StoreCommitListener(registry, offsetRecorder); } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java new file mode 100644 index 000000000..32f6a394f --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/LocalSnapshotOrchestratorTest.java @@ -0,0 +1,195 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class LocalSnapshotOrchestratorTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + private static final TaskId TASK_1 = new TaskId(0, 1); + private static final TaskId TASK_2 = new TaskId(0, 2); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private long generation; + private final LocalSnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + snapshotStore, + Set.of(TASK_0, TASK_1, TASK_2) + ); + + @BeforeEach + public void setup() { + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldFailUpdateWithConflictingTaskSnapshot() { + // given: + orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )) + ); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 456L)), + Map.of(), + Instant.now() + )) + ) + ); + } + + @Test + public void shouldDoIdempotentTaskSnapshotUpdate() { + // given: + final var taskSnapshots = List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + )); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldFailUpdateForWrongGeneration() { + // when/then: + assertThrows(RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation - 1, List.of())); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + assertThat(snapshot.taskSnapshots(), is(taskSnapshots)); + } + + @Test + public void shouldCompleteSnapshotWhenFinishedTasksInSeparateUpdates() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ) + ); + final var taskSnapshots2 = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when: + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots2); + + // then: + final var snapshot = api.getCurrentSnapshot(); + assertThat(snapshot.state(), is(Snapshot.State.COMPLETED)); + final var allTaskSnapshots = new ArrayList<>(taskSnapshots); + allTaskSnapshots.addAll(taskSnapshots2); + assertThat(snapshot.taskSnapshots(), is(allTaskSnapshots)); + } + + @Test + public void shouldFailUpdateForCompletedSnapshot() { + // given: + final var taskSnapshots = List.of( + new Snapshot.TaskSnapshotMetadata( + TASK_0, + List.of(new Snapshot.CommittedOffset("foo", 0, 123L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_1, + List.of(new Snapshot.CommittedOffset("foo", 1, 456L)), + Map.of(), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + TASK_2, + List.of(new Snapshot.CommittedOffset("foo", 2, 100L)), + Map.of(), + Instant.now() + ) + ); + orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots); + + // when/then: + assertThrows( + RuntimeException.class, + () -> orchestrator.reportTaskSnapshotMetadata(generation, taskSnapshots)); + } + + @Test + public void shouldGetCurrentGeneration() { + assertThat(orchestrator.getCurrentGeneration(), is(generation)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java new file mode 100644 index 000000000..12011e31d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotCommitListenerTest.java @@ -0,0 +1,295 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import dev.responsive.kafka.internal.clients.OffsetRecorder; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration; +import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; +import dev.responsive.kafka.internal.utils.TopologyTaskInfo; +import dev.responsive.kafka.internal.utils.TopologyTaskInfoUtils; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SnapshotCommitListenerTest { + private static final int SUBTOPOLOGIES = 1; + private static final int TOPICS = 2; + private static final int PARTITIONS = 2; + private static final String STORE0_NAME = "store0"; + private static final byte[] TASK0_STORE0_CHECKPOINT + = "task0topic0cp".getBytes(Charset.defaultCharset()); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final TopologyTaskInfo topologyTaskInfo = createTopologyTaskInfoWith( + SUBTOPOLOGIES, + TOPICS, + PARTITIONS + ); + private final GenerationStorage generationStorage + = new SnapshotStoreBasedGenerationStorage(snapshotStore); + private final LocalSnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator( + snapshotStore, + Set.copyOf(topologyTaskInfo.partitionsByTask().keySet()) + ); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private final AtomicReference clock = new AtomicReference<>(Instant.now()); + + @Mock + private ResponsiveStoreRegistry storeRegistry; + @Spy + private SnapshotOrchestrator orchestratorSpy = orchestrator; + @Mock + private ResponsiveStoreRegistration task0t0storeRegistration; + @Mock + private ResponsiveStoreRegistration.StoreCallbacks task0t0storeCallbacks; + @Mock + private OffsetRecorder offsetRecorder; + private long generation; + + private SnapshotCommitListener listener; + + @BeforeEach + public void setup() { + listener = new SnapshotCommitListener( + orchestratorSpy, + generationStorage, + storeRegistry, + topologyTaskInfo, + offsetRecorder, + clock::get + ); + generation = api.createSnapshot().generation(); + lenient().when(task0t0storeRegistration.storeName()).thenReturn(STORE0_NAME); + lenient().when(task0t0storeRegistration.callbacks()).thenReturn(task0t0storeCallbacks); + lenient().when(task0t0storeCallbacks.checkpoint(any())).thenReturn(TASK0_STORE0_CHECKPOINT); + lenient().when(storeRegistry.getRegisteredStoresForTask(eq(taskId(0, 0)), any())) + .thenReturn(List.of(task0t0storeRegistration)); + } + + @Test + public void shouldSkipTasksOnTheCurrentGeneration() { + // given: + orchestrator.reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), List.of(), Map.of(), Instant.now())) + ); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition(0, 0, 0)), 100L, + recordingKey(partition(0, 1, 0)), 200L + ), + Map.of() + ); + + // then: + verify(orchestratorSpy, times(0)) + .reportTaskSnapshotMetadata(anyLong(), any()); + } + + @Test + public void shouldSkipTasksWithPartitionsThatDidNotCommit() { + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition(0, 0, 0)), 100L + ), + Map.of() + ); + + // then: + verify(orchestratorSpy, times(0)) + .reportTaskSnapshotMetadata(anyLong(), any()); + } + + @Test + public void shouldTakeCheckpointOfStoreUsingSourceTopicOffset() { + // given: + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(partition(0, 0, 0)); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.of(100L)); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldTakeCheckpointOfStoreUsingChangelogOffset() { + // given: + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of(changelog, 300L) + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.of(300L)); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldTakeCheckpointOfStoreWithoutSpecifyingOffsetIfChangelogNotInCommit() { + // given: + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + + // then: + verify(task0t0storeCallbacks).checkpoint(Optional.empty()); + verify(orchestratorSpy).reportTaskSnapshotMetadata( + generation, + List.of(new Snapshot.TaskSnapshotMetadata( + taskId(0, 0), + List.of( + new Snapshot.CommittedOffset(partition0.topic(), partition0.partition(), 100L), + new Snapshot.CommittedOffset(partition1.topic(), partition1.partition(), 200L) + ), + Map.of(STORE0_NAME, TASK0_STORE0_CHECKPOINT), + clock.get() + )) + ); + } + + @Test + public void shouldNotThrowOnErrorFromSnapshotting() { + // given: + doThrow(new RuntimeException("oops")) + .when(orchestratorSpy) + .reportTaskSnapshotMetadata(anyLong(), any()); + final TopicPartition changelog = new TopicPartition("store0-changelog", 0); + when(task0t0storeRegistration.changelogTopicPartition()) + .thenReturn(changelog); + final TopicPartition partition0 = partition(0, 0, 0); + final TopicPartition partition1 = partition(0, 1, 0); + + // when/then (no throw0: + listener.onCommit( + "foo", + Map.of( + recordingKey(partition0), 100L, + recordingKey(partition1), 200L + ), + Map.of() + ); + verify(orchestratorSpy).reportTaskSnapshotMetadata(anyLong(), any()); + } + + private OffsetRecorder.RecordingKey recordingKey(final TopicPartition topicPartition) { + return new OffsetRecorder.RecordingKey(topicPartition, ""); + } + + private TaskId taskId(final int subtopology, final int partition) { + return new TaskId(subtopology, partition); + } + + private static TopicPartition partition( + final int subtopology, + final int topic, + final int partition + ) { + return new TopicPartition(String.format("%s-%s", subtopology, topic), partition); + } + + private static TopologyTaskInfo createTopologyTaskInfoWith( + final int subtoplogies, + final int topics, + final int partitions + ) { + final Map partitionToTask = new HashMap<>(); + final Map> taskToPartitions = new HashMap<>(); + for (int s = 0; s < subtoplogies; s++) { + for (int t = 0; t < topics; t++) { + for (int p = 0; p < partitions; p++) { + final TaskId taskId = new TaskId(s, p); + taskToPartitions.putIfAbsent(new TaskId(s, p), new ArrayList<>(t)); + final TopicPartition tp = partition(s, t, p); + taskToPartitions.get(taskId).add(tp); + partitionToTask.put(tp, taskId); + } + } + } + return TopologyTaskInfoUtils.createWith(partitionToTask, taskToPartitions); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java new file mode 100644 index 000000000..e48bda4cc --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotOrchestrationIntegrationTest.java @@ -0,0 +1,303 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecord; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreRecordKey; +import dev.responsive.kafka.internal.snapshot.topic.SnapshotStoreSerdes; +import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore; +import dev.responsive.kafka.testutils.TestConstants; +import java.nio.charset.Charset; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.lifecycle.Startables; + +@ExtendWith(MockitoExtension.class) +class SnapshotOrchestrationIntegrationTest { + private static KafkaContainer KAFKA = new KafkaContainer(TestConstants.KAFKA) + .withEnv("KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS", "1000") + .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "60000"); + private static final Set TASKS = Set.of( + new TaskId(0, 0), + new TaskId(0, 1), + new TaskId(0, 2) + ); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + KAFKA.stop(); + })); + } + + private String topic; + private TestCtx ctx; + + @BeforeAll + public static void setupAll() { + final var fut = Startables.deepStart(KAFKA); + try { + fut.get(); + } catch (final InterruptedException | ExecutionException e) { + System.out.println("LOGS: " + KAFKA.getLogs()); + throw new RuntimeException(e); + } + } + + @AfterAll + public static void teardownAll() { + KAFKA.stop(); + } + + @BeforeEach + public void setup(final TestInfo testInfo) { + topic = testInfo.getTestMethod().get().getName(); + ctx = createCtx(); + } + + @Test + public void shouldReturnGenerationZeroSnapshot() { + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + assertThat(snapshot, is(Snapshot.initial())); + } + + @Test + public void shouldCreateInitialSnapshot() { + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + assertThat(snapshot.generation(), is(1L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldRefreshStore() { + // given: + final var store2 = createCtx(); + store2.api.createSnapshot(); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot, Matchers.is(ctx.store.currentSnapshot(true))); + MatcherAssert.assertThat(snapshot.generation(), is(1L)); + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldFailCreateNextSnapshotIfNotCompleted() { + // given: + ctx.api.createSnapshot(); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.api.createSnapshot()); + } + + @Test + public void shouldCreatNextSnapshot() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when: + final var snapshot = ctx.api.createSnapshot(); + + // then: + assertThat(snapshot.generation(), is(2L)); + assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + } + + @Test + public void shouldAddTaskSnapshotToSnapshot() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when: + final var snapshot = ctx.store.currentSnapshot(true); + + // then: + MatcherAssert.assertThat(snapshot.state(), is(Snapshot.State.CREATED)); + MatcherAssert.assertThat(snapshot.taskSnapshots(), contains(metadata)); + } + + @Test + public void shouldFailAddTaskSnapshotIfConflictingMetadata() { + // given: + ctx.api.createSnapshot(); + final var metadata = new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of(), + Map.of("foo", "bar".getBytes(Charset.defaultCharset())), + Instant.now() + ); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of(metadata)); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldFailAddTaskSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // when/then: + assertThrows( + RuntimeException.class, + () -> ctx.orchestrator.reportTaskSnapshotMetadata( + 1, + List.of(new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now())) + ) + ); + } + + @Test + public void shouldCompleteSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + } + + @Test + public void shouldFailSnapshot() { + // given: + ctx.api.createSnapshot(); + + // when: + ctx.orchestrator.failSnapshot(1); + + // then: + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.FAILED)); + } + + @Test + public void shouldFailFailSnapshotIfCompleted() { + // given: + ctx.api.createSnapshot(); + ctx.orchestrator.reportTaskSnapshotMetadata(1, List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 1), List.of(), Map.of(), Instant.now()), + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 2), List.of(), Map.of(), Instant.now()) + )); + MatcherAssert.assertThat(ctx.store.currentSnapshot(true).state(), is(Snapshot.State.COMPLETED)); + + // when/then: + assertThrows(RuntimeException.class, () -> ctx.orchestrator.failSnapshot(1)); + } + + private TestCtx createCtx() { + return new TestCtx(topic); + } + + private static class TestCtx { + private final TopicSnapshotStore store; + private final SnapshotOrchestrator orchestrator; + private final SnapshotApi api; + + private TestCtx(final String topic) { + final Admin admin + = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers())); + final Map consumerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false + ); + final Supplier> consumerSupplier = () -> + new KafkaConsumer<>( + consumerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer() + ); + final Map producerProps = Map.of( + BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers(), + ProducerConfig.TRANSACTIONAL_ID_CONFIG, String.format("%s-snapshot-store", topic) + ); + final Supplier> producerSupplier = () -> + new KafkaProducer<>( + producerProps, + new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(), + new SnapshotStoreSerdes.SnapshotStoreRecordSerializer() + ); + this.store = new TopicSnapshotStore( + topic, + (short) 1, + consumerSupplier, + producerSupplier, + admin + ); + this.orchestrator = new LocalSnapshotOrchestrator(this.store, TASKS); + this.api = new LocalSnapshotApi(this.store); + } + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java new file mode 100644 index 000000000..438287d89 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/SnapshotStoreBasedGenerationStorageTest.java @@ -0,0 +1,59 @@ +package dev.responsive.kafka.internal.snapshot; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SnapshotStoreBasedGenerationStorageTest { + private static final TaskId TASK_0 = new TaskId(0, 0); + + private final TestSnapshotStore snapshotStore = new TestSnapshotStore(); + private final SnapshotApi api = new LocalSnapshotApi(snapshotStore); + private final GenerationStorage generationStorage + = new SnapshotStoreBasedGenerationStorage(snapshotStore); + private long oldGeneration; + private long generation; + + @BeforeEach + public void setup() { + oldGeneration = snapshotStore.currentSnapshot(true).generation(); + final var snapshot = api.createSnapshot(); + generation = snapshot.generation(); + } + + @Test + public void shouldReturnNewGenerationForCompletedSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot(s -> s.withTaskSnapshots(List.of(), Snapshot.State.COMPLETED)); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnNewGenerationForTaskWithCompletedTaskSnapshot() { + // given: + snapshotStore + .updateCurrentSnapshot( + s -> s.withTaskSnapshots( + List.of(new Snapshot.TaskSnapshotMetadata( + TASK_0, List.of(), Map.of(), Instant.now())), + s.state() + )); + + // when/then: + assertThat(generationStorage.lookupGeneration(TASK_0), is(generation)); + } + + @Test + public void shouldReturnOldGenerationForTaskWithoutCompletedSnapshot() { + assertThat(generationStorage.lookupGeneration(TASK_0), is(oldGeneration)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java new file mode 100644 index 000000000..4777b6414 --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/TestSnapshotStore.java @@ -0,0 +1,37 @@ +package dev.responsive.kafka.internal.snapshot; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +public class TestSnapshotStore implements SnapshotStore { + private final Map snapshots = new HashMap<>(); + + public TestSnapshotStore() { + final var initial = Snapshot.initial(); + snapshots.put(initial.generation(), initial); + } + + @Override + public synchronized Snapshot currentSnapshot(final boolean block) { + return snapshots.get(snapshots.keySet().stream().max(Long::compare).get()); + } + + @Override + public synchronized List listSnapshots(final boolean block) { + return new ArrayList<>(snapshots.values()); + } + + @Override + public synchronized Snapshot updateCurrentSnapshot(final Function updater) { + final var updated = updater.apply(currentSnapshot(false)); + snapshots.put(updated.generation(), updated); + return updated; + } + + @Override + public void close() { + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java new file mode 100644 index 000000000..fd1b7496d --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/snapshot/topic/SnapshotStoreSerdesTest.java @@ -0,0 +1,78 @@ +package dev.responsive.kafka.internal.snapshot.topic; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.internal.snapshot.Snapshot; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.Test; + +class SnapshotStoreSerdesTest { + private final SnapshotStoreSerdes.SnapshotStoreRecordSerializer recordSerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordSerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordDeserializer recordDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordDeserializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer keySerializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeySerializer(); + private final SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer keyDeserializer + = new SnapshotStoreSerdes.SnapshotStoreRecordKeyDeserializer(); + + @Test + public void shouldSerializeRecord() { + // given: + final Snapshot snapshot = new Snapshot( + Instant.now(), + 123, + Snapshot.State.COMPLETED, + List.of( + new Snapshot.TaskSnapshotMetadata( + new TaskId(0, 0), + List.of( + new Snapshot.CommittedOffset("foo", 0, 100), + new Snapshot.CommittedOffset("bar", 0, 200) + ), + Map.of( + "store1", "store1-cp".getBytes(StandardCharsets.UTF_8), + "store2", "store2-cp".getBytes(StandardCharsets.UTF_8) + ), + Instant.now() + ), + new Snapshot.TaskSnapshotMetadata( + new TaskId(1, 0), + List.of(), + Map.of(), + Instant.now() + ) + ) + ); + final SnapshotStoreRecord record + = new SnapshotStoreRecord(SnapshotStoreRecordType.Snapshot, snapshot); + + // when: + final byte[] serialized = recordSerializer.serialize("", record); + System.out.println(new String(serialized)); + final SnapshotStoreRecord deserialized = recordDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(record)); + } + + @Test + public void shouldSerializeKey() { + // given: + final SnapshotStoreRecordKey key + = new SnapshotStoreRecordKey(SnapshotStoreRecordType.Snapshot, 100L); + + // when; + final byte[] serialized = keySerializer.serialize("", key); + System.out.println(new String(serialized)); + final SnapshotStoreRecordKey deserialized = keyDeserializer.deserialize("", serialized); + + // then: + assertThat(deserialized, is(key)); + } +} \ No newline at end of file diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java index 2b14e27ee..944f61bbc 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/CommitBufferTest.java @@ -503,6 +503,96 @@ public void shouldOnlyFlushWhenIntervalTriggerElapsed() { } } + @Test + public void shouldForceFlush() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + + // when: + buffer.forceFlush(Optional.of(9L)); + + // Then: + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(9L)); + } + } + + @Test + public void shouldForceFlushUsingLastKnownConsumedOffset() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + + // when: + buffer.forceFlush(Optional.empty()); + + // Then: + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(9L)); + } + } + + @Test + public void shouldFailForceFlushIfPutsSinceLastFlushAndNoOffsetSpecified() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + buffer.put(Bytes.wrap(new byte[]{0}), VALUE, CURRENT_TS); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> buffer.forceFlush(Optional.empty())); + } + } + + @Test + public void shouldFailForceFlushIfDeletesSinceLastFlushAndNoOffsetSpecified() { + // Given: + try (final CommitBuffer buffer = createCommitBuffer( + FlushTriggers.ofRecords(100), + 100, + Instant::now + )) { + + for (byte i = 0; i < 9; i++) { + buffer.put(Bytes.wrap(new byte[]{i}), VALUE, CURRENT_TS); + } + buffer.flush(9L); + assertThat(table.lastWrittenOffset(KAFKA_PARTITION), is(-1L)); + buffer.tombstone(Bytes.wrap(new byte[]{0}), CURRENT_TS); + + // when/then: + assertThrows( + IllegalStateException.class, + () -> buffer.forceFlush(Optional.empty())); + } + } + @Test public void shouldUpdateOffsetWhenNoRecordsInBuffer() { // Given: diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java index 0c07d289a..d14fd0b96 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/stores/ResponsiveStoreRegistryTest.java @@ -19,6 +19,7 @@ import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry; import java.util.OptionalLong; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,7 +32,8 @@ class ResponsiveStoreRegistryTest { TOPIC_PARTITION, OptionalLong.of(123L), o -> {}, - "thread" + "thread", + new TaskId(0, 5) ); private static final ResponsiveStoreRegistration UNINIT_REGISTRATION = @@ -40,7 +42,8 @@ class ResponsiveStoreRegistryTest { UNINIT_TOPIC_PARTITION, OptionalLong.empty(), o -> { }, - "thread" + "thread", + new TaskId(0, 2) ); private final ResponsiveStoreRegistry registry = new ResponsiveStoreRegistry(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java new file mode 100644 index 000000000..17ebdd70b --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoTest.java @@ -0,0 +1,120 @@ +package dev.responsive.kafka.internal.utils; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.processor.TaskId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class TopologyTaskInfoTest { + @Mock + private Admin admin; + + @SuppressWarnings("unchecked") + @BeforeEach + public void setup() { + when(admin.describeTopics(any(Collection.class))).thenAnswer( + i -> { + final var topics = i.>getArgument(0); + final Map> futures = topics.stream() + .collect(Collectors.toMap( + t -> t, + t -> KafkaFuture.completedFuture(new TopicDescription( + t, + false, + List.of( + new TopicPartitionInfo(0, null, List.of(), List.of()), + new TopicPartitionInfo(1, null, List.of(), List.of()) + ) + )) + )); + return new TestDescribeTopicsResult(futures); + } + ); + } + + @Test + public void shouldComputeTaskAndPartitionMappings() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupByKey(Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when: + final var tti = TopologyTaskInfo.forTopology(description, admin); + + // then: + assertThat( + tti.partitionsByTask(), + is( + Map.of( + new TaskId(0, 0), List.of(new TopicPartition("source", 0)), + new TaskId(0, 1), List.of(new TopicPartition("source", 1)) + ) + ) + ); + assertThat( + tti.tasksByPartition(), + is( + Map.of( + new TopicPartition("source", 0), new TaskId(0, 0), + new TopicPartition("source", 1), new TaskId(0, 1) + ) + ) + ); + } + + @Test + public void shouldThrowIfRepartition() { + // given: + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source", Consumed.with(Serdes.Long(), Serdes.Long())) + .groupBy((k, v) -> v, Grouped.as("groupBy")) + .count(Named.as("count")) + .toStream(Named.as("toStream")) + .to("sink"); + final TopologyDescription description = builder.build().describe(); + + // when/then: + assertThrows( + TopologyTaskInfo.TopologyTaskInfoException.class, + () -> TopologyTaskInfo.forTopology(description, admin) + ); + } + + private static class TestDescribeTopicsResult extends DescribeTopicsResult { + protected TestDescribeTopicsResult( + final Map> nameFutures + ) { + super(null, nameFutures); + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java new file mode 100644 index 000000000..428e01a5a --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/internal/utils/TopologyTaskInfoUtils.java @@ -0,0 +1,18 @@ +package dev.responsive.kafka.internal.utils; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.processor.TaskId; + +public final class TopologyTaskInfoUtils { + public static TopologyTaskInfo createWith( + final Map tasksByPartition, + final Map> partitionsByTask + ) { + return new TopologyTaskInfo(tasksByPartition, partitionsByTask); + } + + private TopologyTaskInfoUtils() { + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java index 7aae1d30e..535f035c0 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/TestConstants.java @@ -17,7 +17,7 @@ public class TestConstants { public static final DockerImageName CASSANDRA = DockerImageName.parse("cassandra:4.1.0"); - public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.3.2"); + public static final DockerImageName KAFKA = DockerImageName.parse("confluentinc/cp-kafka:7.9.0"); public static final DockerImageName MONGODB = DockerImageName.parse("mongo:7.0.2"); } \ No newline at end of file diff --git a/operator/build.gradle.kts b/operator/build.gradle.kts index 0536dfd6a..1bf08b090 100644 --- a/operator/build.gradle.kts +++ b/operator/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":controller-api")) implementation(libs.crd.generator.atp) - implementation(libs.jackson) + implementation(libs.bundles.jackson) implementation(libs.javaoperatorsdk) implementation(libs.bundles.commons) diff --git a/settings.gradle.kts b/settings.gradle.kts index ea9325b38..d70615b5d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -52,7 +52,9 @@ dependencyResolutionManagement { version("mongoDB", "4.10.2") version("fabric8", "6.13.4") - library("jackson", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jdk8", "com.fasterxml.jackson.datatype", "jackson-datatype-jdk8").versionRef("jackson") + library("jackson-jsr310", "com.fasterxml.jackson.datatype", "jackson-datatype-jsr310").versionRef("jackson") + bundle("jackson", listOf("jackson-jdk8", "jackson-jsr310")) library("kafka-clients", "org.apache.kafka", "kafka-clients").versionRef("kafka") library("kafka-streams", "org.apache.kafka", "kafka-streams").versionRef("kafka")