Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-client-examples/e2e-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies {
implementation(libs.guava)
implementation(libs.slf4j.log4j2)
implementation(libs.bundles.scylla)
implementation(libs.jackson)
implementation(libs.bundles.jackson)
implementation(libs.mongodb.driver.core)

testImplementation(testlibs.bundles.base)
Expand Down
2 changes: 1 addition & 1 deletion kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ dependencies {
implementation("dev.responsive:controller-api:0.16.0")
implementation(libs.bundles.scylla)
implementation(libs.bundles.commons)
implementation(libs.jackson)
implementation(libs.bundles.jackson)
implementation(libs.mongodb.driver.sync)
implementation(libs.bundles.otel)
implementation(libs.bundles.grpc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import dev.responsive.kafka.internal.metrics.exporter.MetricsExportService;
import dev.responsive.kafka.internal.metrics.exporter.NoopMetricsExporterService;
import dev.responsive.kafka.internal.metrics.exporter.otel.OtelMetricsService;
import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import dev.responsive.kafka.internal.utils.SessionClients;
import dev.responsive.kafka.internal.utils.SessionUtil;
Expand Down Expand Up @@ -471,14 +472,20 @@ public Params build() {
) : innerClientSupplier;

this.oeReporter = reporter(responsiveConfig, license);
final var snapshotCtx = KafkaStreamsSnapshotContext.create(
responsiveConfig,
streamsConfig,
topology.describe()
);
this.responsiveKafkaClientSupplier = new ResponsiveKafkaClientSupplier(
delegateKafkaClientSupplier,
responsiveConfig,
streamsConfig,
storeRegistry,
metrics,
oeReporter,
storageBackend
storageBackend,
snapshotCtx
);

final var admin = responsiveKafkaClientSupplier.getAdmin(responsiveConfig.originals());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.internal.db.partitioning.Murmur3Hasher;
import dev.responsive.kafka.internal.snapshot.SnapshotSupport;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -329,6 +330,24 @@ public class ResponsiveConfig extends AbstractConfig {
private static final String ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC =
"How often to report origin event usage information. This should generally not be changed in production environments";

// ------------------ Snapshot Configs
public static final String SNAPSHOTS_CONFIG = "responsive.snapshots";
private static final String SNAPSHOTS_DEFAULT = SnapshotSupport.DISABLED.name();
private static final String SNAPSHOTS_DOC
= "Set to LOCAL enable snapshot support. This feature is experimental";

public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX
= "responsive.snapshots.local.store.topic.suffix";
public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT
= "snapshots";
private static final String SNAPSHOT_LOCAL_STORE_TOPIC_DOC
= "The topic to store snapshot metadata on when using local snapshot coordination.";

public static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR
= "responsive.snapshots.local.store.topic.replication.factor";
public static final short SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT = (short) 3;
private static final String SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC
= "replication factor for the snapshot store topic when using local snapshot coordination.";
// ------------------ StreamsConfig overrides ----------------------

// These configuration values are required by Responsive, and a ConfigException will
Expand Down Expand Up @@ -660,6 +679,29 @@ public class ResponsiveConfig extends AbstractConfig {
ORIGIN_EVENT_REPORT_INTERVAL_MS_DEFAULT,
Importance.LOW,
ORIGIN_EVENT_REPORT_INTERVAL_MS_DOC
).define(
SNAPSHOTS_CONFIG,
Type.STRING,
SNAPSHOTS_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(
Arrays.stream(SnapshotSupport.values())
.map(Enum::name)
.toArray(String[]::new)
),
Importance.LOW,
SNAPSHOTS_DOC
).define(
SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX,
Type.STRING,
SNAPSHOTS_LOCAL_STORE_TOPIC_SUFFIX_DEFAULT,
Importance.LOW,
SNAPSHOT_LOCAL_STORE_TOPIC_DOC
).define(
SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR,
Type.SHORT,
SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DEFAULT,
Importance.LOW,
SNAPSHOTS_LOCAL_STORE_TOPIC_REPLICATION_FACTOR_DOC
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import dev.responsive.kafka.internal.metrics.EndOffsetsPoller;
import dev.responsive.kafka.internal.metrics.MetricPublishingCommitListener;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import dev.responsive.kafka.internal.snapshot.KafkaStreamsSnapshotContext;
import dev.responsive.kafka.internal.snapshot.SnapshotCommitListener;
import dev.responsive.kafka.internal.stores.ResponsiveStoreRegistry;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -30,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;
import org.apache.kafka.clients.CommonClientConfigs;
Expand Down Expand Up @@ -68,6 +71,7 @@ public final class ResponsiveKafkaClientSupplier implements KafkaClientSupplier
private final boolean eos;
private final StorageBackend storageBackend;
private final boolean repairRestoreOffsetOutOfRange;
private final Optional<KafkaStreamsSnapshotContext> snapshotCtx;

public ResponsiveKafkaClientSupplier(
final KafkaClientSupplier clientSupplier,
Expand All @@ -76,7 +80,8 @@ public ResponsiveKafkaClientSupplier(
final ResponsiveStoreRegistry storeRegistry,
final ResponsiveMetrics metrics,
final OriginEventReporter oeReporter,
final StorageBackend storageBackend
final StorageBackend storageBackend,
final Optional<KafkaStreamsSnapshotContext> snapshotCtx
) {
this(
new Factories() {
Expand All @@ -87,7 +92,8 @@ public ResponsiveKafkaClientSupplier(
metrics,
storageBackend,
oeReporter,
responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG)
responsiveConfig.getBoolean(ResponsiveConfig.RESTORE_OFFSET_REPAIR_ENABLED_CONFIG),
snapshotCtx
);
}

Expand All @@ -99,7 +105,8 @@ public ResponsiveKafkaClientSupplier(
final ResponsiveMetrics metrics,
final StorageBackend storageBackend,
final OriginEventReporter oeReporter,
final boolean repairRestoreOffsetOutOfRange
final boolean repairRestoreOffsetOutOfRange,
final Optional<KafkaStreamsSnapshotContext> snapshotCtx
) {
this.factories = factories;
this.wrapped = wrapped;
Expand All @@ -117,6 +124,7 @@ public ResponsiveKafkaClientSupplier(
this
);
applicationId = configs.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.snapshotCtx = snapshotCtx;
}

@Override
Expand All @@ -138,7 +146,8 @@ public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) {
config,
endOffsetsPoller,
storeRegistry,
factories
factories,
snapshotCtx
);
return factories.createResponsiveProducer(
(String) config.get(CommonClientConfigs.CLIENT_ID_CONFIG),
Expand Down Expand Up @@ -169,7 +178,8 @@ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
config,
endOffsetsPoller,
storeRegistry,
factories
factories,
snapshotCtx
);
// TODO: the end offsets poller call is kind of heavy for a synchronized block
return factories.createResponsiveConsumer(
Expand Down Expand Up @@ -253,7 +263,8 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread(
final Map<String, Object> configs,
final EndOffsetsPoller endOffsetsPoller,
final ResponsiveStoreRegistry storeRegistry,
final Factories factories
final Factories factories,
final Optional<KafkaStreamsSnapshotContext> snapshotContext
) {
if (threadListeners.containsKey(threadId)) {
final var tl = threadListeners.get(threadId);
Expand All @@ -262,6 +273,16 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread(
}
final var offsetRecorder = factories.createOffsetRecorder(eos, threadId);
final var originEventRecorder = factories.createOriginEventRecorder(oeReporter, eos);
final var storeCommitListener = new StoreCommitListener(storeRegistry, offsetRecorder);
final var snapshotCommitListener = snapshotContext.map(
ctx -> new SnapshotCommitListener(
ctx.orchestrator(),
ctx.generationStorage(),
storeRegistry,
ctx.topologyTaskInfo(),
offsetRecorder
)
);
final var tl = new ReferenceCounted<>(
String.format("ListenersForThread(%s)", threadId),
new ListenersForThread(
Expand All @@ -272,9 +293,10 @@ private synchronized ListenersForThread getAndMaybeInitListenersForThread(
threadId,
offsetRecorder
),
new StoreCommitListener(storeRegistry, offsetRecorder),
storeCommitListener,
endOffsetsPoller.addForThread(threadId),
originEventRecorder
originEventRecorder,
snapshotCommitListener
)
);
threadListeners.put(threadId, tl);
Expand Down Expand Up @@ -306,27 +328,31 @@ private static class ListenersForThread implements Closeable {
final StoreCommitListener storeCommitListener;
final EndOffsetsPoller.Listener endOffsetsPollerListener;
final OriginEventTracker originEventTracker;
final Optional<SnapshotCommitListener> snapshotCommitListener;

public ListenersForThread(
final String threadId,
final OffsetRecorder offsetRecorder,
final MetricPublishingCommitListener committedOffsetMetricListener,
final StoreCommitListener storeCommitListener,
final EndOffsetsPoller.Listener endOffsetsPollerListener,
final OriginEventTracker originEventTracker
final OriginEventTracker originEventTracker,
final Optional<SnapshotCommitListener> snapshotCommitListener
) {
this.threadId = threadId;
this.offsetRecorder = offsetRecorder;
this.committedOffsetMetricListener = committedOffsetMetricListener;
this.storeCommitListener = storeCommitListener;
this.endOffsetsPollerListener = endOffsetsPollerListener;
this.originEventTracker = originEventTracker;
this.snapshotCommitListener = snapshotCommitListener;
}

@Override
public void close() {
committedOffsetMetricListener.close();
endOffsetsPollerListener.close();
snapshotCommitListener.ifPresent(SnapshotCommitListener::close);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dev.responsive.kafka.internal.snapshot;

import org.apache.kafka.streams.processor.TaskId;

/**
* Interface that abstracts away how we lookup a task's current generation.
* For synchronized snapshots we want to do one of the following so we can record a task's
* generation metadata transactionally alongside the rows with new generation markers when
* we bump generations:
* (1) store this information in the offset metadata as part of the transaction that
* bumps the generation
* (2) if/when kafka supports 2pc store this information in another store (like rs3 or
* the snapshot store)
*
* For simple uncoordinated snapshots we'll support looking up the generation by looking
* at the snapshot's state in the generation store.
*/
public interface GenerationStorage {
long lookupGeneration(final TaskId taskId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package dev.responsive.kafka.internal.snapshot;

import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.snapshot.topic.TopicSnapshotStore;
import dev.responsive.kafka.internal.utils.TopologyTaskInfo;
import java.util.Optional;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyDescription;

public class KafkaStreamsSnapshotContext {
private final SnapshotOrchestrator orchestrator;
private final GenerationStorage generationStorage;
private final TopologyTaskInfo topologyTaskInfo;
private final SnapshotStore snapshotStore;

private KafkaStreamsSnapshotContext(
final SnapshotOrchestrator orchestrator,
final GenerationStorage generationStorage,
final SnapshotStore snapshotStore,
final TopologyTaskInfo topologyTaskInfo
) {
this.orchestrator = orchestrator;
this.generationStorage = generationStorage;
this.snapshotStore = snapshotStore;
this.topologyTaskInfo = topologyTaskInfo;
}

public SnapshotOrchestrator orchestrator() {
return orchestrator;
}

public GenerationStorage generationStorage() {
return generationStorage;
}

public TopologyTaskInfo topologyTaskInfo() {
return topologyTaskInfo;
}

public static Optional<KafkaStreamsSnapshotContext> create(
final ResponsiveConfig config,
final StreamsConfig streamsConfig,
final TopologyDescription topologyDescription
) {
final SnapshotSupport support = SnapshotSupport
.valueOf(config.getString(ResponsiveConfig.SNAPSHOTS_CONFIG));
switch (support) {
case LOCAL: {
final SnapshotStore store = TopicSnapshotStore.create(config.originals());
final TopologyTaskInfo tti;
try (final Admin admin = Admin.create(config.originals())) {
tti = TopologyTaskInfo.forTopology(
topologyDescription,
admin
);
}
final SnapshotOrchestrator orchestrator = new LocalSnapshotOrchestrator(
store,
tti.partitionsByTask().keySet()
);
final GenerationStorage generationStorage = new SnapshotStoreBasedGenerationStorage(store);
return Optional.of(new KafkaStreamsSnapshotContext(
orchestrator,
generationStorage,
store,
tti
));
}
default: {
return Optional.empty();
}
}
}
}
Loading
Loading