diff --git a/controller-api/src/main/external-protos/opentelemetry-proto b/controller-api/src/main/external-protos/opentelemetry-proto index 35c97806f..d9fd878d4 160000 --- a/controller-api/src/main/external-protos/opentelemetry-proto +++ b/controller-api/src/main/external-protos/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 35c97806f233c17680f9a00461310b17e0085dd8 +Subproject commit d9fd878d46b1970cf72b3d11ea012fff60716aa8 diff --git a/kafka-client/src/main/external-protos/rs3 b/kafka-client/src/main/external-protos/rs3 index 8dcefbcb1..c03fb0c8a 160000 --- a/kafka-client/src/main/external-protos/rs3 +++ b/kafka-client/src/main/external-protos/rs3 @@ -1 +1 @@ -Subproject commit 8dcefbcb1ca1ab85fa2a7ea44ca67ec8b0dd334e +Subproject commit c03fb0c8a3d3b8521340434bc9a5043f231f3758 diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java index 9510af6e9..cdab6db66 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java @@ -12,69 +12,43 @@ package dev.responsive.kafka.integration; -import static dev.responsive.kafka.testutils.IntegrationTestUtils.minutesToMillis; +import static dev.responsive.kafka.testutils.IntegrationTestUtils.getDefaultMutablePropertiesWithStringSerdes; import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeTimestampedRecords; import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutput; import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning; -import static java.util.Arrays.asList; -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.APPLICATION_SERVER_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG; -import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; import dev.responsive.kafka.api.ResponsiveKafkaStreams; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.api.config.StorageBackend; import dev.responsive.kafka.api.stores.ResponsiveStores; import dev.responsive.kafka.api.stores.ResponsiveWindowParams; +import dev.responsive.kafka.internal.utils.StoreUtil; import dev.responsive.kafka.testutils.KeyValueTimestamp; import dev.responsive.kafka.testutils.ResponsiveConfigParam; import dev.responsive.kafka.testutils.ResponsiveExtension; import java.time.Duration; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.Random; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes.LongSerde; -import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; @@ -87,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; public class ResponsiveWindowStoreIntegrationTest { + private static final String STOP = "STOP"; @RegisterExtension static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); @@ -124,241 +99,190 @@ public void before( result.all().get(); } - @AfterEach - public void after() { - admin.deleteTopics(List.of(inputTopic(), otherTopic(), outputTopic())); - } - @Test - public void shouldComputeTumblingWindowAggregateWithRetention() throws Exception { + public void shouldComputeTumblingWindowAggregate() throws Exception { // Given: - final Map properties = getMutableProperties(); - final StreamsBuilder builder = new StreamsBuilder(); - - final ConcurrentMap, Long> collect = new ConcurrentHashMap<>(); - final CountdownLatchWrapper outputLatch = new CountdownLatchWrapper(0); + final CountDownLatch latch = new CountDownLatch(1); + final Map, String> collect = new ConcurrentHashMap<>(); + final Duration windowSize = Duration.ofSeconds(5); - final CountDownLatch finalLatch = new CountDownLatch(2); - final KStream input = builder.stream(inputTopic()); - - input + final var builder = new StreamsBuilder(); + final KStream stream = builder.stream(inputTopic()); + stream .groupByKey() - .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(1))) - .aggregate(() -> 0L, (k, v, agg) -> agg + v, Materialized.as(name)) + .windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize)) + .aggregate(() -> "", (k, v, agg) -> agg + v, Materialized.as(name)) .toStream() + .peek(collect::put) + // discards window for easier serialization since we're not checking + // the output topic anyway + .selectKey((k, v) -> k.key()) .peek((k, v) -> { - collect.put(k, v); - outputLatch.countDown(); - - if (v == 1000) { - finalLatch.countDown(); + if (k.equals(STOP)) { + latch.countDown(); } }) - // discard the window, so we don't have to serialize it - // we're not checking the output topic anyway - .selectKey((k, v) -> k.key()) .to(outputTopic()); - // When: - // use a base timestamp that is aligned with a minute boundary to - // ensure predictable test results - final long baseTs = (System.currentTimeMillis() / 60_000) * 60_000; - final AtomicLong timestamp = new AtomicLong(baseTs); - properties.put(APPLICATION_SERVER_CONFIG, "host1:1024"); - try ( - final ResponsiveKafkaStreams kafkaStreams = - new ResponsiveKafkaStreams(builder.build(), properties); - final KafkaProducer producer = new KafkaProducer<>(properties) - ) { - startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); - - // produce two keys each having values 0-2 that are 100ms apart - // which should result in just one window (per key) with the sum - // of 3 after the first Streams instance is closed - outputLatch.resetCountdown(6); - pipeInput(producer, inputTopic(), () -> timestamp.getAndAdd(100), 0, 3, 0, 1); - outputLatch.await(); - - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs, 5000), 3L)); - } + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); - // force a commit/flush so that we can test Cassandra by closing - // the old Kafka Streams and creating a new one - properties.put(APPLICATION_SERVER_CONFIG, "host2:1024"); + // When: try ( - final ResponsiveKafkaStreams kafkaStreams = new ResponsiveKafkaStreams( - builder.build(), - properties - ); - final KafkaProducer producer = new KafkaProducer<>(properties) + final var producer = new KafkaProducer(props); + final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props) ) { startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); - - outputLatch.resetCountdown(4); - // Produce another two records with values 3-4, still within the first window - pipeInput(producer, inputTopic(), () -> timestamp.getAndAdd(100), 3, 5, 0, 1); - - outputLatch.await(); - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs, 5000), 10L)); - - // Produce another set with values 5-10 in new window that advances stream-time - // enough to expire the first window - outputLatch.resetCountdown(10); - final long secondWindowStart = baseTs + 10_000L; - timestamp.set(secondWindowStart); - pipeInput(producer, inputTopic(), () -> timestamp.getAndAdd(100), 5, 10, 0, 1); - - outputLatch.await(); - assertThat(collect, Matchers.hasEntry(windowed(0, secondWindowStart, 5000), 35L)); - - // at this point the records produced by this pipe should be - // past the retention period and therefore be ignored from the - // first window - outputLatch.resetCountdown(2); - timestamp.set(baseTs); - pipeInput(producer, inputTopic(), timestamp::get, 100, 101, 0, 1); - - outputLatch.await(); - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs, 5000), 10L)); - - // use this to signify that we're done processing and count - // down the latch - outputLatch.resetCountdown(2); - timestamp.set(baseTs + 15000); - - pipeInput(producer, inputTopic(), timestamp::get, 1000, 1001, 0, 1); - finalLatch.await(); + // final outputs of + // - [k1, window1] -> "ab", + // - [k2, window1] -> "ab" + // - [k1, window2] -> "cd", + // - [k2, window2] -> "cd" + pipeTimestampedRecords(producer, inputTopic(), List.of( + new KeyValueTimestamp<>("k1", "a", 0), + new KeyValueTimestamp<>("k2", "a", 1), + new KeyValueTimestamp<>("k1", "b", 2), + new KeyValueTimestamp<>("k2", "b", 3), + + new KeyValueTimestamp<>("k1", "c", 10_000), + new KeyValueTimestamp<>("k2", "c", 10_001), + new KeyValueTimestamp<>("k1", "d", 10_002), + new KeyValueTimestamp<>("k2", "d", 10_003), + + // this record is for an expired window and therefore should be ignored + new KeyValueTimestamp<>("k1", "d", 10), + + new KeyValueTimestamp<>(STOP, "", 50_000) + )); + + // Then: + assertThat("Latch should have been awaited within 30s", latch.await(30, TimeUnit.SECONDS)); + + final long sizeMs = windowSize.toMillis(); + assertThat(collect.entrySet(), hasSize(5)); + assertThat(collect, hasEntry(windowed("k1", 0, sizeMs), "ab")); + assertThat(collect, hasEntry(windowed("k2", 0, sizeMs), "ab")); + assertThat(collect, hasEntry(windowed("k1", 10_000, sizeMs), "cd")); + assertThat(collect, hasEntry(windowed("k2", 10_000, sizeMs), "cd")); } - - outputLatch.await(); - - // Then: - assertThat(collect.size(), Matchers.is(6)); - - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs, 5000), 10L)); - assertThat(collect, Matchers.hasEntry(windowed(1, baseTs, 5000), 10L)); - - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs + 10_000, 5000), 35L)); - assertThat(collect, Matchers.hasEntry(windowed(1, baseTs + 10_000, 5000), 35L)); - - assertThat(collect, Matchers.hasEntry(windowed(0, baseTs + 15_000, 5000), 1000L)); - assertThat(collect, Matchers.hasEntry(windowed(1, baseTs + 15_000, 5000), 1000L)); } @Test - public void shouldComputeMultipleWindowsPerSegment() throws Exception { + public void shouldTumbleWithSegmentedStore() throws Exception { // Given: - final Map properties = getMutablePropertiesWithStringSerdes(); - final StreamsBuilder builder = new StreamsBuilder(); - - final ConcurrentMap, String> results = new ConcurrentHashMap<>(); - final CountdownLatchWrapper outputLatch = new CountdownLatchWrapper(0); - - final KStream input = builder.stream(inputTopic()); - - final Duration windowSize = Duration.ofMinutes(15); - final Duration gracePeriod = Duration.ofDays(15); - final long numSegments = 35L; - - input + final CountDownLatch latch = new CountDownLatch(1); + final Map, String> collect = new ConcurrentHashMap<>(); + final Duration windowSize = Duration.ofSeconds(5); + final int numSegments = 5; + final long segInterval = StoreUtil.computeSegmentInterval(windowSize.toMillis(), numSegments); + + final var builder = new StreamsBuilder(); + final KStream stream = builder.stream(inputTopic()); + stream .groupByKey() - .windowedBy(TimeWindows.ofSizeAndGrace(windowSize, gracePeriod)) + .windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize)) .aggregate( () -> "", (k, v, agg) -> agg + v, ResponsiveStores.windowMaterialized( ResponsiveWindowParams - .window(name, windowSize, gracePeriod, false) - .withNumSegments(numSegments) - )) + .window(name, windowSize, windowSize, false) + .withNumSegments(numSegments)) + ) .toStream() + .peek(collect::put) + // discards window for easier serialization since we're not checking + // the output topic anyway + .selectKey((k, v) -> k.key()) .peek((k, v) -> { - results.put(k, v); - outputLatch.countDown(); + if (k.equals(STOP)) { + latch.countDown(); + } }) - // discard the window, so we don't have to serialize it - // we're not checking the output topic anyway - .selectKey((k, v) -> k.key()) .to(outputTopic()); + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + // When: try ( - final ResponsiveKafkaStreams kafkaStreams = - new ResponsiveKafkaStreams(builder.build(), properties); - final KafkaProducer producer = new KafkaProducer<>(properties) + final var producer = new KafkaProducer(props); + final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props) ) { startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); - - outputLatch.resetCountdown(11); - - // Start from timestamp of 0L to get predictable results - // Write to multiple windows within a single segment and all within the large grace period - final List> input1 = asList( - new KeyValueTimestamp<>("key", "a", minutesToMillis(0L)), // [0, 15] --> "a" - new KeyValueTimestamp<>("key", "b", minutesToMillis(11L)), // [0, 15] --> "ab" - new KeyValueTimestamp<>("key", "c", minutesToMillis(7L)), // [0, 15] --> "abc" - new KeyValueTimestamp<>("key", "d", minutesToMillis(16L)), // [15, 30] --> "d" - new KeyValueTimestamp<>("key", "e", minutesToMillis(28L)), // [15, 30] --> "de" - new KeyValueTimestamp<>("key", "f", minutesToMillis(5L)), // [0, 15] --> "abcf" - new KeyValueTimestamp<>("key", "g", minutesToMillis(26L)), // [15, 30] --> "deg" - new KeyValueTimestamp<>("key", "h", minutesToMillis(41L)), // [30, 45] --> "h" - new KeyValueTimestamp<>("key", "i", minutesToMillis(31L)), // [30, 45] --> "hi" - new KeyValueTimestamp<>("key", "j", minutesToMillis(2L)), // [0, 15] --> "abcfj" - new KeyValueTimestamp<>("key", "k", minutesToMillis(16L)) // [15, 30] --> "degk" - ); - - pipeTimestampedRecords(producer, inputTopic(), input1); - - outputLatch.await(); - assertThat(results.size(), equalTo(3)); - assertThat(results, Matchers.hasEntry(windowedKeyInMinutes(0L, 15L), "abcfj")); - assertThat(results, Matchers.hasEntry(windowedKeyInMinutes(15L, 30L), "degk")); - assertThat(results, Matchers.hasEntry(windowedKeyInMinutes(30L, 45L), "hi")); + // final outputs of + // - [k1, window1] -> "abc", + // - [k2, window1] -> "abc" + // - [k1, window2] -> "de", + // - [k2, window2] -> "de" + pipeTimestampedRecords(producer, inputTopic(), List.of( + // these are within the first segment of the first window + new KeyValueTimestamp<>("k1", "a", 0), + new KeyValueTimestamp<>("k2", "a", 1), + new KeyValueTimestamp<>("k1", "b", 2), + new KeyValueTimestamp<>("k2", "b", 3), + + // these are within the second segment of the first window + new KeyValueTimestamp<>("k1", "c", segInterval + 1), + new KeyValueTimestamp<>("k2", "c", segInterval + 2), + + new KeyValueTimestamp<>("k1", "d", 10_000), + new KeyValueTimestamp<>("k2", "d", 10_001), + new KeyValueTimestamp<>("k1", "e", 10_002), + new KeyValueTimestamp<>("k2", "e", 10_003), + + // this record is for an expired window and therefore should be ignored + new KeyValueTimestamp<>("k1", "d", 10), + + new KeyValueTimestamp<>(STOP, "", 50_000) + )); + + // Then: + assertThat("Latch should have been awaited within 30s", latch.await(30, TimeUnit.SECONDS)); + + final long sizeMs = windowSize.toMillis(); + assertThat(collect.entrySet(), hasSize(5)); + assertThat(collect, hasEntry(windowed("k1", 0, sizeMs), "abc")); + assertThat(collect, hasEntry(windowed("k2", 0, sizeMs), "abc")); + assertThat(collect, hasEntry(windowed("k1", 10_000, sizeMs), "de")); + assertThat(collect, hasEntry(windowed("k2", 10_000, sizeMs), "de")); } } @Test - public void shouldComputeHoppingWindowAggregateWithRetention() throws Exception { + public void shouldComputeHoppingWindowAggregate() throws Exception { // Given: - final Map properties = getMutablePropertiesWithStringSerdes(); - - final StreamsBuilder builder = new StreamsBuilder(); - - final ConcurrentMap, String> results = new ConcurrentHashMap<>(); - final CountdownLatchWrapper outputLatch = new CountdownLatchWrapper(0); - + final CountDownLatch latch = new CountDownLatch(2); // STOP will be included in two windows + final Map, String> collect = new ConcurrentHashMap<>(); final Duration windowSize = Duration.ofSeconds(10); - final Duration gracePeriod = Duration.ofSeconds(5); + final Duration grace = Duration.ofSeconds(5); final Duration advance = Duration.ofSeconds(5); - final KStream input = builder.stream(inputTopic()); - input + final var builder = new StreamsBuilder(); + final KStream stream = builder.stream(inputTopic()); + stream .groupByKey() - .windowedBy(TimeWindows.ofSizeAndGrace(windowSize, gracePeriod).advanceBy(advance)) - .aggregate(() -> "", (k, v, agg) -> agg + v, Named.as(name)) + .windowedBy(TimeWindows.ofSizeAndGrace(windowSize, grace).advanceBy(advance)) + .aggregate(() -> "", (k, v, agg) -> agg + v, Materialized.as(name)) .toStream() + .peek(collect::put) + // discards window for easier serialization since we're not checking + // the output topic anyway + .selectKey((k, v) -> k.key()) .peek((k, v) -> { - results.put(k, v); - outputLatch.countDown(); + if (k.equals(STOP)) { + latch.countDown(); + } }) - // discard the window, so we don't have to serialize it - // we're not checking the output topic anyway - .selectKey((k, v) -> k.key()) .to(outputTopic()); + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + // When: - properties.put(APPLICATION_SERVER_CONFIG, "host1:1024"); - final KafkaProducer producer = new KafkaProducer<>(properties); try ( - final ResponsiveKafkaStreams kafkaStreams = - new ResponsiveKafkaStreams(builder.build(), properties); + final var producer = new KafkaProducer(props); + final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props) ) { startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); - - outputLatch.resetCountdown(11); - - // Start from timestamp of 0L to get predictable results - final List> input1 = asList( + pipeTimestampedRecords(producer, inputTopic(), List.of( new KeyValueTimestamp<>("key", "a", 0L), new KeyValueTimestamp<>("key", "b", 6_000L), new KeyValueTimestamp<>("key", "c", 8_000L), @@ -366,109 +290,69 @@ public void shouldComputeHoppingWindowAggregateWithRetention() throws Exception new KeyValueTimestamp<>("key", "e", 8_000L), // within grace for [5, 15s] window new KeyValueTimestamp<>("key", "f", 0L), // outside grace for all windows new KeyValueTimestamp<>("key", "g", 11_000L), - new KeyValueTimestamp<>("key", "h", 5_000L) // within grace for [5, 15s] window - ); - - pipeTimestampedRecords(producer, inputTopic(), input1); - outputLatch.await(); - - assertThat(results.size(), equalTo(4)); - assertThat(results, Matchers.hasEntry(windowedKey(0L), "abc")); // [0, 10s] - assertThat(results, Matchers.hasEntry(windowedKey(5000L), "bcegh")); // [5, 15s] - assertThat(results, Matchers.hasEntry(windowedKey(10000L), "dg")); // [10s, 20s] - assertThat(results, Matchers.hasEntry(windowedKey(15000L), "d")); // [15s, 25s] - } - } - - static class CountdownLatchWrapper { - private CountDownLatch currentLatch; - - public CountdownLatchWrapper(final int initialCountdown) { - currentLatch = new CountDownLatch(initialCountdown); - } + new KeyValueTimestamp<>("key", "h", 5_000L), // within grace for [5, 15s] window - public void countDown() { - currentLatch.countDown(); - } + new KeyValueTimestamp<>(STOP, "", 45_000) + )); - public void resetCountdown(final int countdown) { - currentLatch = new CountDownLatch(countdown); - } + // Then: + assertThat("Latch should have been awaited within 30s", latch.await(30, TimeUnit.SECONDS)); - public boolean await() { - try { - return currentLatch.await(60, TimeUnit.SECONDS); - } catch (final Exception e) { - throw new AssertionError(e); - } + assertThat(collect.toString(), collect.entrySet(), hasSize(6)); // STOP has 2 windows + assertThat(collect, Matchers.hasEntry(windowedKey(0L), "abc")); // [0, 10s] + assertThat(collect, Matchers.hasEntry(windowedKey(5_000L), "bcegh")); // [5, 15s] + assertThat(collect, Matchers.hasEntry(windowedKey(10_000L), "dg")); // [10s, 20s] + assertThat(collect, Matchers.hasEntry(windowedKey(15_000L), "d")); // [15s, 25s] } } - + @Test public void shouldDoStreamStreamJoin() throws Exception { // Given: - final Map properties = getMutablePropertiesWithStringSerdes(); - properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream input = builder.stream(inputTopic()); - final KStream other = builder.stream(otherTopic()); - final Map> collect = new ConcurrentHashMap<>(); + final Duration windowSize = Duration.ofSeconds(5); - final CountDownLatch latch1 = new CountDownLatch(1); + final var builder = new StreamsBuilder(); + final KStream left = builder.stream(inputTopic()); + final KStream right = builder.stream(otherTopic()); - final Duration windowSize = Duration.ofMillis(1000); - input - .peek((k, v) -> { - if (k.equals("STOP")) { - latch1.countDown(); - } - }) + left + .peek((k, v) -> System.out.println(k + ": " + v)) .join( - other, - (v1, v2) -> { - System.out.println("Joining: " + v1 + ", " + v2); - return v1 + "-" + v2; - }, + right, + (v1, v2) -> v1 + "-" + v2, JoinWindows.ofTimeDifferenceWithNoGrace(windowSize.dividedBy(2)), StreamJoined.as(name) ) - .peek((k, v) -> collect.computeIfAbsent(k, old -> new ArrayBlockingQueue<>(10)).add(v)) .to(outputTopic()); - //When: - try (final KafkaProducer producer = new KafkaProducer<>(properties)) { - final List> leftStreamInput = Arrays.asList( - new KeyValueTimestamp<>("A", "L:a", 0L), - new KeyValueTimestamp<>("A", "L:a2", 0L), - new KeyValueTimestamp<>("A", "L:a3", 0L), - new KeyValueTimestamp<>("B", "L:b", 300L), // should join with R:b - new KeyValueTimestamp<>("A", "L:a3", 2_000L), // should not join, outside window of R:a - new KeyValueTimestamp<>("STOP", "ignored", 10_000L) - ); - - final List> rightStreamInput = Arrays.asList( - new KeyValueTimestamp<>("A", "R:a", 30L), // should join with L:a and L:a2 - new KeyValueTimestamp<>("B", "R:b", 200L), - new KeyValueTimestamp<>("B", "R:b2", 500L) // should join with L:b - ); - - pipeTimestampedRecords(producer, inputTopic(), leftStreamInput); - pipeTimestampedRecords(producer, otherTopic(), rightStreamInput); - } + final var props = getDefaultMutablePropertiesWithStringSerdes(responsiveProps, name); + // set the max task idle just in case records come out of order on one topic or the other + props.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 1_000); - // Then: + // When: try ( - final ResponsiveKafkaStreams kafkaStreams = new ResponsiveKafkaStreams( - builder.build(), properties - ) + final var producer = new KafkaProducer(props); + final var kafkaStreams = new ResponsiveKafkaStreams(builder.build(), props) ) { startAppAndAwaitRunning(Duration.ofSeconds(15), kafkaStreams); - latch1.await(30, TimeUnit.SECONDS); + pipeTimestampedRecords(producer, otherTopic(), List.of( + new KeyValueTimestamp<>("A", "R:a", 30L), + new KeyValueTimestamp<>("B", "R:b", 200L), // should join with R:b + new KeyValueTimestamp<>("B", "R:b2", 500L) // should join with R:b + )); + pipeTimestampedRecords(producer, inputTopic(), List.of( + new KeyValueTimestamp<>("A", "L:a", 0L), + new KeyValueTimestamp<>("A", "L:a2", 0L), + new KeyValueTimestamp<>("A", "L:a3", 0L), + new KeyValueTimestamp<>("B", "L:b", 300L), // should join with R:b + new KeyValueTimestamp<>("A", "no_match", 2_000L), // should not join, outside of window + new KeyValueTimestamp<>(STOP, STOP, 10_000L) + )); + // Then: final List> output = - readOutput(outputTopic(), 0L, 5L, true, properties); + readOutput(outputTopic(), 0L, 5L, true, props); assertThat(output.get(0), equalTo(new KeyValue<>("A", "L:a-R:a"))); assertThat(output.get(1), equalTo(new KeyValue<>("A", "L:a2-R:a"))); @@ -478,80 +362,19 @@ public void shouldDoStreamStreamJoin() throws Exception { } } - private Windowed windowed(final long k, final long startMs, final long size) { - return new Windowed<>(k, new TimeWindow(startMs, startMs + size)); + @AfterEach + public void after() { + admin.deleteTopics(List.of(inputTopic(), otherTopic(), outputTopic())); } - private Windowed windowedKeyInMinutes( - final long startMinutes, - final long endMinutes - ) { - final long startMs = minutesToMillis(startMinutes); - final long endMs = minutesToMillis(endMinutes); - return new Windowed<>("key", new TimeWindow(startMs, endMs)); + private static Windowed windowed(final String k, final long startMs, final long size) { + return new Windowed<>(k, new TimeWindow(startMs, startMs + size)); } private Windowed windowedKey(final long startMs) { return new Windowed<>("key", new TimeWindow(startMs, startMs + 10_000)); } - private static void pipeInput( - final KafkaProducer producer, - final String topic, - final Supplier timestamp, - final long valFrom, - final long valTo, - final long... keys - ) { - for (final long k : keys) { - for (long v = valFrom; v < valTo; v++) { - producer.send(new ProducerRecord<>( - topic, - 0, - timestamp.get(), - k, - v - )); - } - } - producer.flush(); - } - - private Map getMutablePropertiesWithStringSerdes() { - final Map properties = getMutableProperties(); - properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class.getName()); - return properties; - } - - private Map getMutableProperties() { - final Map properties = new HashMap<>(responsiveProps); - - properties.put(KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); - - properties.put(APPLICATION_ID_CONFIG, name); - properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, LongSerde.class.getName()); - properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, LongSerde.class.getName()); - properties.put(NUM_STREAM_THREADS_CONFIG, 1); - properties.put(COMMIT_INTERVAL_MS_CONFIG, 1); // commit as often as possible - - properties.put(consumerPrefix(REQUEST_TIMEOUT_MS_CONFIG), 5_000); - properties.put(consumerPrefix(SESSION_TIMEOUT_MS_CONFIG), 5_000 - 1); - - properties.put(consumerPrefix(MAX_POLL_RECORDS_CONFIG), 1); - - properties.put(ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1); - - return properties; - } - private String inputTopic() { return name + "." + INPUT_TOPIC; }