Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import dev.responsive.kafka.api.config.CompatibilityMode;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.ResponsiveMode;
import dev.responsive.kafka.bootstrap.EmbeddedChangelogMigrationRunner;
import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier;
import dev.responsive.kafka.internal.config.ConfigUtils;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
Expand Down Expand Up @@ -87,6 +89,10 @@ public class ResponsiveKafkaStreams extends KafkaStreams {
private final ResponsiveStateListener responsiveStateListener;
private final ResponsiveRestoreListener responsiveRestoreListener;
private final SessionClients sessionClients;
private final String applicationId;
private final ResponsiveConfig responsiveConfig;
private final Topology topology;
private final boolean migrating;

/**
* Create a {@code ResponsiveKafkaStreams} instance.
Expand Down Expand Up @@ -126,7 +132,7 @@ public ResponsiveKafkaStreams(
final Map<?, ?> configs,
final KafkaClientSupplier clientSupplier
) {
this(topology, configs, clientSupplier, Time.SYSTEM);
this(topology, configs, clientSupplier, Time.SYSTEM, false);
}

/**
Expand All @@ -147,7 +153,7 @@ public ResponsiveKafkaStreams(
final Map<?, ?> configs,
final Time time
) {
this(topology, configs, new DefaultKafkaClientSupplier(), time);
this(topology, configs, new DefaultKafkaClientSupplier(), time, false);
}

/**
Expand All @@ -169,12 +175,14 @@ public ResponsiveKafkaStreams(
final Topology topology,
final Map<?, ?> configs,
final KafkaClientSupplier clientSupplier,
final Time time
final Time time,
final boolean migrating
) {
this(
new Params(topology, configs)
.withClientSupplier(clientSupplier)
.withTime(time)
.withMigrating(migrating)
.build()
);
}
Expand Down Expand Up @@ -220,6 +228,25 @@ protected ResponsiveKafkaStreams(final Params params) {
sessionClients.initialize(responsiveMetrics, responsiveRestoreListener);
super.setGlobalStateRestoreListener(responsiveRestoreListener);
super.setStateListener(responsiveStateListener);

this.applicationId = applicationConfigs.getString(APPLICATION_ID_CONFIG);
this.responsiveConfig = params.responsiveConfig;
this.topology = params.topology;
this.migrating = params.migrating;
}

@Override
public synchronized void start() throws IllegalStateException, StreamsException {
if (responsiveConfig.getString(ResponsiveConfig.RESPONSIVE_MODE)
.equals(ResponsiveMode.MIGRATE.name()) && !migrating) {
EmbeddedChangelogMigrationRunner.runMigration(
responsiveConfig,
applicationId,
topology
);
} else {
super.start();
}
}

private static ResponsiveMetrics createMetrics(
Expand Down Expand Up @@ -387,6 +414,7 @@ protected static class Params {
// initialized on init()
private SessionClients sessionClients;
private ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier;
private boolean migrating = false;

public Params(final Topology topology, final Map<?, ?> configs) {
this.topology = topology;
Expand Down Expand Up @@ -415,6 +443,11 @@ public Params withTime(final Time time) {
return this;
}

public Params withMigrating(final boolean migrating) {
this.migrating = migrating;
return this;
}

// we may want to consider wrapping this in an additional Builder to ensure
// that it's impossible to use a Params instance that hasn't called build(),
// but that felt a little extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package dev.responsive.kafka.api.stores;

import dev.responsive.kafka.internal.stores.ResponsiveKeyValueStore;
import dev.responsive.kafka.internal.stores.StoreAccumulator;
import java.util.Locale;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
Expand All @@ -28,6 +29,7 @@ public class ResponsiveKeyValueBytesStoreSupplier implements KeyValueBytesStoreS

public ResponsiveKeyValueBytesStoreSupplier(final ResponsiveKeyValueParams params) {
this.params = params;
StoreAccumulator.INSTANCE.register(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,41 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;

public class ChangelogMigrationTool {

private static final org.slf4j.Logger LOG =
org.slf4j.LoggerFactory.getLogger(ChangelogMigrationTool.class);

private final Consumer<Record<byte[], byte[]>> processor;
private final ChangelogMigrationConfig config;
private final Properties properties;
private final ResponsiveKeyValueParams params;
private final KeyValueBytesStoreSupplier storeSupplier;
private final String changelogTopic;

@SuppressWarnings("unused") // reason: public API
public ChangelogMigrationTool(
final Properties properties,
final ResponsiveKeyValueParams params
final KeyValueBytesStoreSupplier storeSupplier,
final String changelogTopic
) {
this(properties, params, r -> {});
this(properties, storeSupplier, changelogTopic, r -> {});
}

// Visible for testing
ChangelogMigrationTool(
final Properties properties,
final ResponsiveKeyValueParams params,
final KeyValueBytesStoreSupplier storeSupplier,
final String changelogTopic,
final Consumer<Record<byte[], byte[]>> processor
) {
this.processor = processor;
Expand All @@ -80,23 +85,21 @@ public ChangelogMigrationTool(
// stability
properties.putIfAbsent(STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 10_000);

config = new ChangelogMigrationConfig(properties);
this.properties = properties;
this.params = params;
this.changelogTopic = changelogTopic;
this.storeSupplier = storeSupplier;
}

public ResponsiveKafkaStreams buildStreams() {
final StreamsBuilder builder = new StreamsBuilder();
final String source = config.getString(CHANGELOG_TOPIC_CONFIG);

final KTable<byte[], byte[]> table =
builder.table(
source,
Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()),
Materialized
.<byte[], byte[]>as(ResponsiveStores.keyValueStore(params))
.withValueSerde(Serdes.ByteArray())
.withKeySerde(Serdes.ByteArray())
final AtomicLong processed = new AtomicLong();

final KTable<byte[], byte[]> table = builder.table(
changelogTopic,
Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()),
Materialized.<byte[], byte[]>as(storeSupplier)
.withValueSerde(Serdes.ByteArray())
.withKeySerde(Serdes.ByteArray())
);

table
Expand Down Expand Up @@ -124,6 +127,12 @@ public void process(final Record<byte[], byte[]> record) {
}
}));

return new ResponsiveKafkaStreams(builder.build(properties), properties);
return new ResponsiveKafkaStreams(
builder.build(properties),
properties,
new DefaultKafkaClientSupplier(),
Time.SYSTEM,
true
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package dev.responsive.kafka.bootstrap;

import dev.responsive.kafka.api.ResponsiveKafkaStreams;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.api.config.ResponsiveMode;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueBytesStoreSupplier;
import dev.responsive.kafka.internal.stores.StoreAccumulator;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.TopologyDescription.Node;
import org.apache.kafka.streams.TopologyDescription.Processor;
import org.apache.kafka.streams.TopologyDescription.Subtopology;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedChangelogMigrationRunner {
private static final Logger LOG = LoggerFactory.getLogger(EmbeddedChangelogMigrationRunner.class);

private EmbeddedChangelogMigrationRunner() {
}

public static void runMigration(
final ResponsiveConfig responsiveConfig,
final String applicationId,
final Topology topology
) {
final var kvBytesStores = StoreAccumulator.INSTANCE.getRegisteredKeyValueBytesStoreSuppliers();
if (kvBytesStores.size() != 1) {
throw new IllegalStateException("migration only supported for a single state store");
}
final ResponsiveKeyValueBytesStoreSupplier supplier = kvBytesStores.get(0);
final String storeName = supplier.name();
validateStoreIsNotSource(storeName, topology);
final Properties properties = new Properties();
properties.putAll(responsiveConfig.originals());
final String changelogTopic = changelogFor(
responsiveConfig.originals(),
applicationId,
storeName,
topology instanceof NamedTopology ? ((NamedTopology) topology).name() : null
);
final ChangelogMigrationTool tool = new ChangelogMigrationTool(
properties,
supplier,
changelogTopic
);
properties.put(ResponsiveConfig.RESPONSIVE_MODE, ResponsiveMode.RUN.name());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + "-migrate");
final ResponsiveKafkaStreams app = tool.buildStreams();
final CountDownLatch closed = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
app.close(Duration.ofMinutes(5));
closed.countDown();
}));
LOG.info("starting bootstrap application for store {}", storeName);
app.start();
try {
LOG.info("blocking until JVM is shut down");
closed.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

private static void validateStoreIsNotSource(final String storeName, final Topology topology) {
final TopologyDescription desc = topology.describe();
for (final Subtopology st : desc.subtopologies()) {
for (final Node n : st.nodes()) {
if (n instanceof Processor) {
final Processor processor = (Processor) n;
if (processor.stores().contains(storeName)) {
LOG.info("found store with name {} in processor {}", storeName, processor.name());
return;
}
}
}
}
throw new RuntimeException("Could not find processor with store. " +
"Source and global stores not supported with embedded migration");
}

private static String changelogFor(
final Map<String, Object> configs,
final String applicationId,
final String storeName,
final String topologyName
) {
final String prefix = ProcessorContextUtils.getPrefix(configs, applicationId);
return ProcessorStateManager.storeChangelogTopic(prefix, storeName, topologyName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package dev.responsive.kafka.internal.stores;

import com.google.common.collect.ImmutableList;
import dev.responsive.kafka.api.stores.ResponsiveKeyValueBytesStoreSupplier;
import java.util.LinkedList;
import java.util.List;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreSupplier;

public class StoreAccumulator {
public static final StoreAccumulator INSTANCE = new StoreAccumulator();

private final List<ResponsiveKeyValueBytesStoreSupplier> registeredKeyValueBytesStoreSuppliers
= new LinkedList<>();

private StoreAccumulator() {
}

public void register(final ResponsiveKeyValueBytesStoreSupplier instance) {
registeredKeyValueBytesStoreSuppliers.add(instance);
}

public List<ResponsiveKeyValueBytesStoreSupplier> getRegisteredKeyValueBytesStoreSuppliers() {
return ImmutableList.copyOf(registeredKeyValueBytesStoreSuppliers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void test() throws Exception {
final int numKeys = 100;
final int numEvents = 1000;

final var params = ResponsiveKeyValueParams.keyValue(tableName);
final var params = ResponsiveStores.keyValueStore(tableName);

final Map<String, Object> baseProps = getProperties();
final Properties bootProps = new Properties();
Expand All @@ -172,7 +172,8 @@ public void test() throws Exception {
// 2: Run the changelog migration tool to bootstrap the new Cassandra table
final CountDownLatch processedAllRecords = new CountDownLatch(numEvents);
final Consumer<Record<byte[], byte[]>> countdown = r -> processedAllRecords.countDown();
try (final var app = new ChangelogMigrationTool(bootProps, params, countdown).buildStreams()) {
try (final var app
= new ChangelogMigrationTool(bootProps, params, changelog, countdown).buildStreams()) {
LOG.info("Awaiting for Bootstrapping application to start");
startAppAndAwaitRunning(Duration.ofSeconds(120), app);

Expand Down Expand Up @@ -218,7 +219,7 @@ public void testFactStore() throws Exception {
final int numKeys = 100;
final int numEvents = 200;

final var params = ResponsiveKeyValueParams.fact(tableName);
final var params = ResponsiveStores.factStore(tableName);

final Map<String, Object> baseProps = getProperties();
final Properties bootProps = new Properties();
Expand All @@ -243,7 +244,8 @@ public void testFactStore() throws Exception {
// 2: Run the changelog migration tool to bootstrap the new Cassandra table
final CountDownLatch processedAllRecords = new CountDownLatch(numKeys);
final Consumer<Record<byte[], byte[]>> countdown = r -> processedAllRecords.countDown();
final ChangelogMigrationTool app = new ChangelogMigrationTool(bootProps, params, countdown);
final ChangelogMigrationTool app
= new ChangelogMigrationTool(bootProps, params, changelog, countdown);

LOG.info("Awaiting for Bootstrapping application to start");
try (final var migrationApp = app.buildStreams()) {
Expand Down