diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 60096cfbb..6833f5444 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -30,6 +30,8 @@ import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.config.ResponsiveMode; +import dev.responsive.kafka.bootstrap.EmbeddedChangelogMigrationRunner; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier; import dev.responsive.kafka.internal.config.ConfigUtils; import dev.responsive.kafka.internal.config.InternalSessionConfigs; @@ -87,6 +89,10 @@ public class ResponsiveKafkaStreams extends KafkaStreams { private final ResponsiveStateListener responsiveStateListener; private final ResponsiveRestoreListener responsiveRestoreListener; private final SessionClients sessionClients; + private final String applicationId; + private final ResponsiveConfig responsiveConfig; + private final Topology topology; + private final boolean migrating; /** * Create a {@code ResponsiveKafkaStreams} instance. @@ -126,7 +132,7 @@ public ResponsiveKafkaStreams( final Map configs, final KafkaClientSupplier clientSupplier ) { - this(topology, configs, clientSupplier, Time.SYSTEM); + this(topology, configs, clientSupplier, Time.SYSTEM, false); } /** @@ -147,7 +153,7 @@ public ResponsiveKafkaStreams( final Map configs, final Time time ) { - this(topology, configs, new DefaultKafkaClientSupplier(), time); + this(topology, configs, new DefaultKafkaClientSupplier(), time, false); } /** @@ -169,12 +175,14 @@ public ResponsiveKafkaStreams( final Topology topology, final Map configs, final KafkaClientSupplier clientSupplier, - final Time time + final Time time, + final boolean migrating ) { this( new Params(topology, configs) .withClientSupplier(clientSupplier) .withTime(time) + .withMigrating(migrating) .build() ); } @@ -220,6 +228,25 @@ protected ResponsiveKafkaStreams(final Params params) { sessionClients.initialize(responsiveMetrics, responsiveRestoreListener); super.setGlobalStateRestoreListener(responsiveRestoreListener); super.setStateListener(responsiveStateListener); + + this.applicationId = applicationConfigs.getString(APPLICATION_ID_CONFIG); + this.responsiveConfig = params.responsiveConfig; + this.topology = params.topology; + this.migrating = params.migrating; + } + + @Override + public synchronized void start() throws IllegalStateException, StreamsException { + if (responsiveConfig.getString(ResponsiveConfig.RESPONSIVE_MODE) + .equals(ResponsiveMode.MIGRATE.name()) && !migrating) { + EmbeddedChangelogMigrationRunner.runMigration( + responsiveConfig, + applicationId, + topology + ); + } else { + super.start(); + } } private static ResponsiveMetrics createMetrics( @@ -387,6 +414,7 @@ protected static class Params { // initialized on init() private SessionClients sessionClients; private ResponsiveKafkaClientSupplier responsiveKafkaClientSupplier; + private boolean migrating = false; public Params(final Topology topology, final Map configs) { this.topology = topology; @@ -415,6 +443,11 @@ public Params withTime(final Time time) { return this; } + public Params withMigrating(final boolean migrating) { + this.migrating = migrating; + return this; + } + // we may want to consider wrapping this in an additional Builder to ensure // that it's impossible to use a Params instance that hasn't called build(), // but that felt a little extra diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java index 8246608bc..c15c65b65 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveKeyValueBytesStoreSupplier.java @@ -17,6 +17,7 @@ package dev.responsive.kafka.api.stores; import dev.responsive.kafka.internal.stores.ResponsiveKeyValueStore; +import dev.responsive.kafka.internal.stores.StoreAccumulator; import java.util.Locale; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -28,6 +29,7 @@ public class ResponsiveKeyValueBytesStoreSupplier implements KeyValueBytesStoreS public ResponsiveKeyValueBytesStoreSupplier(final ResponsiveKeyValueParams params) { this.params = params; + StoreAccumulator.INSTANCE.register(this); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/ChangelogMigrationTool.java b/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/ChangelogMigrationTool.java index eeea5ff20..20cd33e5b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/ChangelogMigrationTool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/ChangelogMigrationTool.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; @@ -42,6 +43,8 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; public class ChangelogMigrationTool { @@ -49,22 +52,24 @@ public class ChangelogMigrationTool { org.slf4j.LoggerFactory.getLogger(ChangelogMigrationTool.class); private final Consumer> processor; - private final ChangelogMigrationConfig config; private final Properties properties; - private final ResponsiveKeyValueParams params; + private final KeyValueBytesStoreSupplier storeSupplier; + private final String changelogTopic; @SuppressWarnings("unused") // reason: public API public ChangelogMigrationTool( final Properties properties, - final ResponsiveKeyValueParams params + final KeyValueBytesStoreSupplier storeSupplier, + final String changelogTopic ) { - this(properties, params, r -> {}); + this(properties, storeSupplier, changelogTopic, r -> {}); } // Visible for testing ChangelogMigrationTool( final Properties properties, - final ResponsiveKeyValueParams params, + final KeyValueBytesStoreSupplier storeSupplier, + final String changelogTopic, final Consumer> processor ) { this.processor = processor; @@ -80,23 +85,21 @@ public ChangelogMigrationTool( // stability properties.putIfAbsent(STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 10_000); - config = new ChangelogMigrationConfig(properties); this.properties = properties; - this.params = params; + this.changelogTopic = changelogTopic; + this.storeSupplier = storeSupplier; } public ResponsiveKafkaStreams buildStreams() { final StreamsBuilder builder = new StreamsBuilder(); - final String source = config.getString(CHANGELOG_TOPIC_CONFIG); - - final KTable table = - builder.table( - source, - Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()), - Materialized - .as(ResponsiveStores.keyValueStore(params)) - .withValueSerde(Serdes.ByteArray()) - .withKeySerde(Serdes.ByteArray()) + final AtomicLong processed = new AtomicLong(); + + final KTable table = builder.table( + changelogTopic, + Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()), + Materialized.as(storeSupplier) + .withValueSerde(Serdes.ByteArray()) + .withKeySerde(Serdes.ByteArray()) ); table @@ -124,6 +127,12 @@ public void process(final Record record) { } })); - return new ResponsiveKafkaStreams(builder.build(properties), properties); + return new ResponsiveKafkaStreams( + builder.build(properties), + properties, + new DefaultKafkaClientSupplier(), + Time.SYSTEM, + true + ); } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/EmbeddedChangelogMigrationRunner.java b/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/EmbeddedChangelogMigrationRunner.java new file mode 100644 index 000000000..744b68cb5 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/bootstrap/EmbeddedChangelogMigrationRunner.java @@ -0,0 +1,101 @@ +package dev.responsive.kafka.bootstrap; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.config.ResponsiveMode; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueBytesStoreSupplier; +import dev.responsive.kafka.internal.stores.StoreAccumulator; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.TopologyDescription.Node; +import org.apache.kafka.streams.TopologyDescription.Processor; +import org.apache.kafka.streams.TopologyDescription.Subtopology; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EmbeddedChangelogMigrationRunner { + private static final Logger LOG = LoggerFactory.getLogger(EmbeddedChangelogMigrationRunner.class); + + private EmbeddedChangelogMigrationRunner() { + } + + public static void runMigration( + final ResponsiveConfig responsiveConfig, + final String applicationId, + final Topology topology + ) { + final var kvBytesStores = StoreAccumulator.INSTANCE.getRegisteredKeyValueBytesStoreSuppliers(); + if (kvBytesStores.size() != 1) { + throw new IllegalStateException("migration only supported for a single state store"); + } + final ResponsiveKeyValueBytesStoreSupplier supplier = kvBytesStores.get(0); + final String storeName = supplier.name(); + validateStoreIsNotSource(storeName, topology); + final Properties properties = new Properties(); + properties.putAll(responsiveConfig.originals()); + final String changelogTopic = changelogFor( + responsiveConfig.originals(), + applicationId, + storeName, + topology instanceof NamedTopology ? ((NamedTopology) topology).name() : null + ); + final ChangelogMigrationTool tool = new ChangelogMigrationTool( + properties, + supplier, + changelogTopic + ); + properties.put(ResponsiveConfig.RESPONSIVE_MODE, ResponsiveMode.RUN.name()); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId + "-migrate"); + final ResponsiveKafkaStreams app = tool.buildStreams(); + final CountDownLatch closed = new CountDownLatch(1); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + app.close(Duration.ofMinutes(5)); + closed.countDown(); + })); + LOG.info("starting bootstrap application for store {}", storeName); + app.start(); + try { + LOG.info("blocking until JVM is shut down"); + closed.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static void validateStoreIsNotSource(final String storeName, final Topology topology) { + final TopologyDescription desc = topology.describe(); + for (final Subtopology st : desc.subtopologies()) { + for (final Node n : st.nodes()) { + if (n instanceof Processor) { + final Processor processor = (Processor) n; + if (processor.stores().contains(storeName)) { + LOG.info("found store with name {} in processor {}", storeName, processor.name()); + return; + } + } + } + } + throw new RuntimeException("Could not find processor with store. " + + "Source and global stores not supported with embedded migration"); + } + + private static String changelogFor( + final Map configs, + final String applicationId, + final String storeName, + final String topologyName + ) { + final String prefix = ProcessorContextUtils.getPrefix(configs, applicationId); + return ProcessorStateManager.storeChangelogTopic(prefix, storeName, topologyName); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/StoreAccumulator.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/StoreAccumulator.java new file mode 100644 index 000000000..26ab2ca8c --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/StoreAccumulator.java @@ -0,0 +1,26 @@ +package dev.responsive.kafka.internal.stores; + +import com.google.common.collect.ImmutableList; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueBytesStoreSupplier; +import java.util.LinkedList; +import java.util.List; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.StoreSupplier; + +public class StoreAccumulator { + public static final StoreAccumulator INSTANCE = new StoreAccumulator(); + + private final List registeredKeyValueBytesStoreSuppliers + = new LinkedList<>(); + + private StoreAccumulator() { + } + + public void register(final ResponsiveKeyValueBytesStoreSupplier instance) { + registeredKeyValueBytesStoreSuppliers.add(instance); + } + + public List getRegisteredKeyValueBytesStoreSuppliers() { + return ImmutableList.copyOf(registeredKeyValueBytesStoreSuppliers); + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/bootstrap/ChangelogMigrationToolIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/bootstrap/ChangelogMigrationToolIntegrationTest.java index 4c133873a..efc71ba30 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/bootstrap/ChangelogMigrationToolIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/bootstrap/ChangelogMigrationToolIntegrationTest.java @@ -147,7 +147,7 @@ public void test() throws Exception { final int numKeys = 100; final int numEvents = 1000; - final var params = ResponsiveKeyValueParams.keyValue(tableName); + final var params = ResponsiveStores.keyValueStore(tableName); final Map baseProps = getProperties(); final Properties bootProps = new Properties(); @@ -172,7 +172,8 @@ public void test() throws Exception { // 2: Run the changelog migration tool to bootstrap the new Cassandra table final CountDownLatch processedAllRecords = new CountDownLatch(numEvents); final Consumer> countdown = r -> processedAllRecords.countDown(); - try (final var app = new ChangelogMigrationTool(bootProps, params, countdown).buildStreams()) { + try (final var app + = new ChangelogMigrationTool(bootProps, params, changelog, countdown).buildStreams()) { LOG.info("Awaiting for Bootstrapping application to start"); startAppAndAwaitRunning(Duration.ofSeconds(120), app); @@ -218,7 +219,7 @@ public void testFactStore() throws Exception { final int numKeys = 100; final int numEvents = 200; - final var params = ResponsiveKeyValueParams.fact(tableName); + final var params = ResponsiveStores.factStore(tableName); final Map baseProps = getProperties(); final Properties bootProps = new Properties(); @@ -243,7 +244,8 @@ public void testFactStore() throws Exception { // 2: Run the changelog migration tool to bootstrap the new Cassandra table final CountDownLatch processedAllRecords = new CountDownLatch(numKeys); final Consumer> countdown = r -> processedAllRecords.countDown(); - final ChangelogMigrationTool app = new ChangelogMigrationTool(bootProps, params, countdown); + final ChangelogMigrationTool app + = new ChangelogMigrationTool(bootProps, params, changelog, countdown); LOG.info("Awaiting for Bootstrapping application to start"); try (final var migrationApp = app.buildStreams()) {