diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index f55a2fb1a..b3eabb4d1 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -19,6 +19,7 @@ plugins { } repositories { + mavenLocal() gradlePluginPortal() } diff --git a/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts b/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts index af46b8cf5..5d362a331 100644 --- a/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/responsive.java-common-conventions.gradle.kts @@ -44,6 +44,7 @@ checkstyle { repositories { mavenCentral() + mavenLocal() } tasks.test { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index 59c08a990..ad02e7306 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -34,6 +34,7 @@ import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; import dev.responsive.kafka.api.config.CompatibilityMode; import dev.responsive.kafka.api.config.ResponsiveConfig; +import dev.responsive.kafka.api.stores.ResponsiveDslStoreSuppliers; import dev.responsive.kafka.internal.clients.ResponsiveKafkaClientSupplier; import dev.responsive.kafka.internal.config.ConfigUtils; import dev.responsive.kafka.internal.config.InternalSessionConfigs; @@ -271,7 +272,6 @@ private static Properties propsWithOverrides( ) { final Properties propsWithOverrides = new Properties(); - final InternalSessionConfigs.Builder internalConfBuilder = new InternalSessionConfigs.Builder() .withSessionClients(sessionClients) .withStoreRegistry(storeRegistry) @@ -313,6 +313,11 @@ private static Properties propsWithOverrides( throw new ConfigException(errorMsg); } + propsWithOverrides.putIfAbsent( + StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, + ResponsiveDslStoreSuppliers.class.getName() + ); + return propsWithOverrides; } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java new file mode 100644 index 000000000..fcfa97372 --- /dev/null +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveDslStoreSuppliers.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.api.stores; + +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.DslSessionParams; +import org.apache.kafka.streams.state.DslStoreSuppliers; +import org.apache.kafka.streams.state.DslWindowParams; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; + +public class ResponsiveDslStoreSuppliers implements DslStoreSuppliers { + + @Override + public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams dslKeyValueParams) { + return ResponsiveStores.keyValueStore( + ResponsiveKeyValueParams.keyValue(dslKeyValueParams.name()) + ); + } + + @Override + public WindowBytesStoreSupplier windowStore(final DslWindowParams dslWindowParams) { + return ResponsiveStores.windowStoreSupplier( + ResponsiveWindowParams.window( + dslWindowParams.name(), + dslWindowParams.windowSize(), + dslWindowParams.retentionPeriod().minus(dslWindowParams.windowSize()) + ) + ); + } + + @Override + public SessionBytesStoreSupplier sessionStore(final DslSessionParams dslSessionParams) { + return ResponsiveStores.sessionStoreSupplier( + ResponsiveSessionParams.session( + dslSessionParams.name(), + dslSessionParams.retentionPeriod() + ) + ); + } +} diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java index 43b18ad8f..d7db019ea 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveSessionParams.java @@ -28,35 +28,36 @@ public final class ResponsiveSessionParams { private final TableName name; private final SessionSchema schemaType; - private final long inactivityGapMs; - private final long gracePeriodMs; private final long retentionPeriodMs; - - private long numSegments; + private final long numSegments; private ResponsiveSessionParams( final String name, final SessionSchema schemaType, - final Duration inactivityGap, - final Duration gracePeriod + final long retentionPeriodMs ) { this.name = new TableName(name); this.schemaType = schemaType; - this.inactivityGapMs = durationToMillis(inactivityGap, "inactivityGap"); - this.gracePeriodMs = durationToMillis(gracePeriod, "gracePeriod"); - - this.retentionPeriodMs = this.inactivityGapMs + this.gracePeriodMs; + this.retentionPeriodMs = retentionPeriodMs; this.numSegments = computeDefaultNumSegments(retentionPeriodMs); } + public static ResponsiveSessionParams session( + final String name, + final Duration retention + ) { + return new ResponsiveSessionParams(name, SessionSchema.SESSION, retention.toMillis()); + } + public static ResponsiveSessionParams session( final String name, final Duration inactivityGap, final Duration gracePeriod ) { - return new ResponsiveSessionParams( - name, SessionSchema.SESSION, inactivityGap, gracePeriod - ); + final long inactivityGapMs = durationToMillis(inactivityGap, "inactivityGap"); + final long gracePeriodMs = durationToMillis(gracePeriod, "gracePeriod"); + final long retentionPeriodMs = inactivityGapMs + gracePeriodMs; + return new ResponsiveSessionParams(name, SessionSchema.SESSION, retentionPeriodMs); } public SessionSchema schemaType() { @@ -75,10 +76,6 @@ public long numSegments() { return this.numSegments; } - public long gracePeriodMs() { - return this.gracePeriodMs; - } - private static long computeDefaultNumSegments(final long retentionPeriodMs) { // TODO: Smart implementation. return DEFAULT_NUM_SEGMENTS; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java index d71a359c6..fc113c4dd 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/stores/ResponsiveStores.java @@ -316,6 +316,19 @@ public static Materialized> windowMateri ); } + /** + * See for example {@link Stores#inMemorySessionStore(String, Duration)} + * + * @param params the {@link ResponsiveSessionParams} for this store + * @return a supplier for a session store with the given options + * that uses Responsive's storage for its backend + */ + public static SessionBytesStoreSupplier sessionStoreSupplier( + final ResponsiveSessionParams params + ) { + return new ResponsiveSessionStoreSupplier(params); + } + /** * Create a {@link StoreBuilder} that can be used to build a Responsive * {@link SessionStore} and connect it via the Processor API. If using the DSL, use diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java index b24cd8ba0..94a65f061 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/DelegatingConsumer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; public abstract class DelegatingConsumer implements Consumer { @@ -292,4 +293,9 @@ public void close(final Duration timeout) { public void wakeup() { delegate.wakeup(); } + + @Override + public Uuid clientInstanceId(final Duration duration) { + return delegate.clientInstanceId(duration); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java index ad5d6890b..507342634 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/clients/ResponsiveProducer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ProducerFencedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,6 +147,11 @@ public List partitionsFor(final String topic) { return wrapped.metrics(); } + @Override + public Uuid clientInstanceId(final Duration duration) { + return wrapped.clientInstanceId(duration); + } + @Override public void close() { wrapped.close(); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java new file mode 100644 index 000000000..35405a60b --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/CustomAssignmentTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2024 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeInput; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTION_TIMEOUT_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; +import static org.apache.kafka.streams.StreamsConfig.producerPrefix; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; +import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.internal.stores.SchemaTypes; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ExtendWith(ResponsiveExtension.class) +public class CustomAssignmentTest { + private static final Logger LOG = LoggerFactory.getLogger(CustomAssignmentTest.class); + + private static final int MAX_POLL_MS = 5000; + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private Admin admin; + private ScheduledExecutorService executor; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + @ResponsiveConfigParam final Map responsiveProps + ) { + // add displayName to name to account for parameterized tests + name = info.getTestMethod().orElseThrow().getName() + "-" + new Random().nextInt(); + executor = new ScheduledThreadPoolExecutor(2); + + this.responsiveProps.putAll(responsiveProps); + + this.admin = admin; + createTopicsAndWait(admin, Map.of(inputTopic(), 2, outputTopic(), 1)); + } + + @AfterEach + public void after() { + admin.deleteTopics(List.of(inputTopic(), outputTopic())); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @Test + public void shouldUseCustomAssignorInRebalance() throws Exception { + // Given: + final Map properties = getMutableProperties(); + final KafkaProducer producer = new KafkaProducer<>(properties); + final SharedState state = new SharedState(); + + // When: + try ( + final ResponsiveKafkaStreams streamsA = buildStreams(properties, "a", state); + ) { + startAppAndAwaitRunning(Duration.ofSeconds(10), streamsA); + pipeInput(inputTopic(), 2, producer, System::currentTimeMillis, 0, 10, 0, 1, 2, 3); + + Thread.sleep(5_000); + assertThat(state.numRecords.get(), equalTo(0)); + } + } + + private Map getMutableProperties() { + final Map properties = new HashMap<>(responsiveProps); + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + + properties.put(APPLICATION_ID_CONFIG, name); + properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); + properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); + properties.put(PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); + properties.put(NUM_STREAM_THREADS_CONFIG, 1); + + // this ensures we can control the commits by explicitly requesting a commit + properties.put(COMMIT_INTERVAL_MS_CONFIG, 20_000); + properties.put(producerPrefix(TRANSACTION_TIMEOUT_CONFIG), 20_000); + + properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + properties.put(consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000"); + properties.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); + properties.put(consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_MS); + properties.put(consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), MAX_POLL_MS); + properties.put(consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), MAX_POLL_MS - 1); + + properties.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, CompleteStallAssignor.class.getName()); + + return properties; + } + + private StoreBuilder> storeSupplier(SchemaTypes.KVSchema type) { + return ResponsiveStores.keyValueStoreBuilder( + ResponsiveStores.keyValueStore( + type == SchemaTypes.KVSchema.FACT + ? ResponsiveKeyValueParams.fact(name) + : ResponsiveKeyValueParams.keyValue(name) + ), + Serdes.Long(), + Serdes.Long() + ).withLoggingEnabled( + Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)); + } + + private ResponsiveKafkaStreams buildStreams( + final Map originals, + final String instance, + final SharedState state + ) { + final Map properties = new HashMap<>(originals); + properties.put(APPLICATION_SERVER_CONFIG, instance + ":1024"); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.addStateStore(storeSupplier(SchemaTypes.KVSchema.KEY_VALUE)); + + final KStream input = builder.stream(inputTopic()); + input + .process(() -> new TestProcessor(instance, state, name), name) + .to(outputTopic()); + + return new ResponsiveKafkaStreams(builder.build(), properties); + } + + public static class CompleteStallAssignor implements TaskAssignor { + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final Map assignments = new HashMap<>(); + final Collection states = applicationState.kafkaStreamsStates(false).values(); + for (final KafkaStreamsState state : states) { + LOG.info("Client state: {} | {}", state.processId(), state.numProcessingThreads()); + assignments.put(state.processId(), KafkaStreamsAssignment.of(state.processId(), new HashSet<>())); + } + + return new TaskAssignment(assignments.values()); + } + } + + private static class SharedState { + private final AtomicInteger numRecords = new AtomicInteger(0); + } + + private static class TestProcessor implements Processor { + + private final String instance; + private final SharedState state; + private final String name; + private ProcessorContext context; + private KeyValueStore store; + + public TestProcessor(final String instance, final SharedState state, String name) { + this.instance = instance; + this.state = state; + this.name = name; + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + this.store = context.getStateStore(name); + } + + @Override + public void process(final Record record) { + state.numRecords.incrementAndGet(); + final long sum = updateSum(record.key(), record.value()); + context.forward(new Record<>(record.key(), sum, System.currentTimeMillis())); + } + + private long updateSum(final long key, final long value) { + Long sum = store.get(key); + sum = (sum == null) ? value : sum + value; + store.put(key, sum); + return sum; + } + } +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java index e93ea2b33..cba1aad00 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/MinimalIntegrationTest.java @@ -37,8 +37,6 @@ import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.api.config.StorageBackend; -import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; -import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; @@ -56,6 +54,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -138,7 +137,7 @@ private ResponsiveKafkaStreams buildStreams(final Map properties final KStream input = builder.stream(inputTopic()); input .groupByKey() - .count(ResponsiveStores.materialized(ResponsiveKeyValueParams.keyValue("countStore"))) + .count(Named.as("count")) .toStream() .to(outputTopic()); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java new file mode 100644 index 000000000..66b0234ec --- /dev/null +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKafkaStreamsIntegrationTest.java @@ -0,0 +1,171 @@ +/* + * Copyright 2023 Responsive Computing, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dev.responsive.kafka.integration; + +import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; +import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.is; + +import dev.responsive.kafka.api.ResponsiveKafkaStreams; +import dev.responsive.kafka.api.config.StorageBackend; +import dev.responsive.kafka.internal.utils.SessionUtil; +import dev.responsive.kafka.testutils.IntegrationTestUtils; +import dev.responsive.kafka.testutils.KeyValueTimestamp; +import dev.responsive.kafka.testutils.ResponsiveConfigParam; +import dev.responsive.kafka.testutils.ResponsiveExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Named; +import org.bson.types.Binary; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.MongoDBContainer; + +public class ResponsiveKafkaStreamsIntegrationTest { + + public static final String COUNT_TABLE_NAME = "count"; + @RegisterExtension + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); + + private static final String INPUT_TOPIC = "input"; + private static final String OUTPUT_TOPIC = "output"; + + private final Map responsiveProps = new HashMap<>(); + + private String name; + private MongoDBContainer mongo; + + @BeforeEach + public void before( + final TestInfo info, + final Admin admin, + final MongoDBContainer mongo, + @ResponsiveConfigParam final Map responsiveProps + ) throws InterruptedException, ExecutionException { + // add displayName to name to account for parameterized tests + name = info.getTestMethod().orElseThrow().getName() + "-" + new Random().nextInt(); + this.mongo = mongo; + + this.responsiveProps.putAll(responsiveProps); + + final var result = admin.createTopics( + List.of( + new NewTopic(inputTopic(), Optional.of(1), Optional.empty()), + new NewTopic(outputTopic(), Optional.of(1), Optional.empty()) + ) + ); + result.all().get(); + } + + private String inputTopic() { + return name + "." + INPUT_TOPIC; + } + + private String outputTopic() { + return name + "." + OUTPUT_TOPIC; + } + + @Test + public void shouldDefaultToResponsiveStoresWhenUsingDsl() throws Exception { + // Given: + final List> inputEvents = Arrays.asList( + new KeyValueTimestamp<>("key", "a", 0L), + new KeyValueTimestamp<>("key", "b", 2_000L), + new KeyValueTimestamp<>("key", "c", 3_000L), + new KeyValueTimestamp<>("STOP", "ignored", 18_000L) + ); + final CountDownLatch outputLatch = new CountDownLatch(1); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = builder.stream(inputTopic()); + input + .groupByKey() + .count(Named.as(COUNT_TABLE_NAME)) + .toStream() + .peek((k, v) -> { + if (k.equals("STOP")) { + outputLatch.countDown(); + } + }) + .selectKey((k, v) -> k) + .to(outputTopic()); + + // When: + final Map properties = + IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); + final KafkaProducer producer = new KafkaProducer<>(properties); + try ( + final ResponsiveKafkaStreams kafkaStreams = + new ResponsiveKafkaStreams(builder.build(), properties) + ) { + startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); + pipeTimestampedRecords(producer, inputTopic(), inputEvents); + + final long maxWait = inputEvents.get(inputEvents.size() - 1).timestamp() + 2_000; + assertThat( + outputLatch.await(maxWait, TimeUnit.MILLISECONDS), + Matchers.equalTo(true) + ); + } + + // Then: + try ( + final var mongoClient = SessionUtil.connect(mongo.getConnectionString(), null, null); + final var deserializer = new StringDeserializer(); + ) { + final List dbs = new ArrayList<>(); + mongoClient.listDatabaseNames().into(dbs); + assertThat(dbs, hasItem("kstream_aggregate_state_store_0000000001")); + + final var db = mongoClient.getDatabase("kstream_aggregate_state_store_0000000001"); + final var collection = db.getCollection("kv_data"); + final long numDocs = collection.countDocuments(); + assertThat(numDocs, is(2L)); + + final List keys = new ArrayList<>(); + collection.find() + .map(doc -> doc.get("_id", Binary.class).getData()) + .map(doc -> deserializer.deserialize("", doc)) + .into(keys); + assertThat(keys, hasItems("key", "STOP")); + } + } + +} diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java index 907f72aa1..825221821 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveKeyValueStoreIntegrationTest.java @@ -18,26 +18,14 @@ import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; -import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.hamcrest.MatcherAssert.assertThat; import dev.responsive.kafka.api.ResponsiveKafkaStreams; -import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams; import dev.responsive.kafka.api.stores.ResponsiveStores; +import dev.responsive.kafka.testutils.IntegrationTestUtils; import dev.responsive.kafka.testutils.KeyValueTimestamp; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; @@ -55,11 +43,6 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; @@ -84,7 +67,6 @@ public class ResponsiveKeyValueStoreIntegrationTest { private final Map responsiveProps = new HashMap<>(); private String name; - private Admin admin; @BeforeEach public void before( @@ -97,7 +79,6 @@ public void before( this.responsiveProps.putAll(responsiveProps); - this.admin = admin; final var result = admin.createTopics( List.of( new NewTopic(inputTopic(), Optional.of(1), Optional.empty()), @@ -169,7 +150,8 @@ public void shouldMatchRocksDB() throws Exception { .to(outputTopic()); // When: - final Map properties = getMutablePropertiesWithStringSerdes(); + final Map properties = + IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); final KafkaProducer producer = new KafkaProducer<>(properties); try ( @@ -187,38 +169,4 @@ public void shouldMatchRocksDB() throws Exception { } } - private Map getMutablePropertiesWithStringSerdes() { - final Map properties = getMutableProperties(); - properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); - return properties; - } - - private Map getMutableProperties() { - final Map properties = new HashMap<>(responsiveProps); - - properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - - properties.put(APPLICATION_ID_CONFIG, name); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName()); - properties.put(NUM_STREAM_THREADS_CONFIG, 1); - properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); // commit as often as possible - - properties.put(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG), 5_000 - 1); - - properties.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), 1); - - properties.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); - - return properties; - } - } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java index 869860238..c36e984eb 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java @@ -1,6 +1,18 @@ package dev.responsive.kafka.testutils; +import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -34,6 +46,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -368,6 +383,29 @@ public static void startAppAndAwaitRunning( } } + public static Map getDefaultMutablePropertiesWithStringSerdes( + final Map responsiveProps, + final String name + ) { + final Map properties = new HashMap<>(responsiveProps); + properties.put(APPLICATION_ID_CONFIG, name); + properties.put(NUM_STREAM_THREADS_CONFIG, 1); + properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); // commit as often as possible + + properties.put(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG), 5_000 - 1); + properties.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), 1); + properties.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); + + properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + + return properties; + } + private IntegrationTestUtils() { } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java index 5a903f253..3ee5ee515 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/testutils/ResponsiveExtension.java @@ -89,6 +89,7 @@ public void beforeAll(final ExtensionContext context) throws Exception { @Override public void afterAll(final ExtensionContext context) throws Exception { cassandra.stop(); + mongo.stop(); kafka.stop(); admin.close(); } @@ -100,6 +101,7 @@ public boolean supportsParameter( ) throws ParameterResolutionException { return parameterContext.getParameter().getType().equals(CassandraContainer.class) || parameterContext.getParameter().getType().equals(KafkaContainer.class) + || parameterContext.getParameter().getType().equals(MongoDBContainer.class) || parameterContext.getParameter().getType().equals(Admin.class) || isContainerConfig(parameterContext); } @@ -111,6 +113,8 @@ public Object resolveParameter( ) throws ParameterResolutionException { if (parameterContext.getParameter().getType() == CassandraContainer.class) { return cassandra; + } else if (parameterContext.getParameter().getType() == MongoDBContainer.class) { + return mongo; } else if (parameterContext.getParameter().getType() == KafkaContainer.class) { return kafka; } else if (parameterContext.getParameter().getType() == Admin.class) { diff --git a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java index 62a329a18..e293a8548 100644 --- a/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java +++ b/kafka-client/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadIntegrationTest.java @@ -270,12 +270,13 @@ private GlobalStreamThread getThread( final Time time = new SystemTime(); final InternalTopologyBuilder builder = new InternalTopologyBuilder(); builder.addGlobalStore( - new KeyValueStoreBuilder<>( - storeSupplier, - new ByteArraySerde(), - new ByteArraySerde(), - time - ).withLoggingDisabled(), + new StoreBuilderWrapper( + new KeyValueStoreBuilder<>( + storeSupplier, + new ByteArraySerde(), + new ByteArraySerde(), + time).withLoggingDisabled() + ), "global", null, null, @@ -294,7 +295,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { global.put(record.key(), record.value()); } - } + }, + false ); final String baseDirectoryName = tempDir.getAbsolutePath(); diff --git a/settings.gradle.kts b/settings.gradle.kts index 20b033a15..8a2ffb945 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -45,7 +45,7 @@ dependencyResolutionManagement { versionCatalogs { create("libs") { version("jackson", "2.14.2") - version("kafka", "3.6.0") + version("kafka", "3.9.0-SNAPSHOT") version("scylla", "4.15.0.0") version("javaoperatorsdk", "4.3.0") version("grpc", "1.52.1")