From 1fb278213235e750134eec2d9e588e747cd612ca Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 6 Dec 2024 02:51:13 -0500 Subject: [PATCH 1/7] done --- .../kafka/api/ResponsiveKafkaStreams.java | 2 - .../api/async/internals/AsyncProcessor.java | 19 +----- .../kafka/api/async/internals/AsyncUtils.java | 16 +++++ .../api/ResponsiveTopologyTestDriver.java | 68 +++++++++++++++---- .../internal/clients/TTDCassandraClient.java | 1 - .../org/apache/kafka/streams/TTDUtils.java | 32 +++++++++ ...veTopologyTestDriverKeyValueStoreTest.java | 27 ++++---- 7 files changed, 119 insertions(+), 46 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index d0108d1ea..c908d7fba 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -12,8 +12,6 @@ package dev.responsive.kafka.api; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 81e3ba163..00efebc0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -12,9 +12,9 @@ package dev.responsive.kafka.api.async.internals; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG; -import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.AsyncProcessorSupplier; import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext; @@ -40,7 +40,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -218,7 +217,7 @@ private void initFields( final long punctuationInterval = configs.getLong(ASYNC_FLUSH_INTERVAL_MS_CONFIG); final int maxEventsPerKey = configs.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG); - this.asyncThreadPoolRegistration = getAsyncThreadPool(taskContext, streamThreadName); + this.asyncThreadPoolRegistration = getAsyncThreadPool(appConfigs, streamThreadName); asyncThreadPoolRegistration.registerAsyncProcessor(taskId, this::flushPendingEventsForCommit); asyncThreadPoolRegistration.threadPool().maybeInitThreadPoolMetrics(); @@ -756,17 +755,5 @@ private void verifyConnectedStateStores( } } - private static AsyncThreadPoolRegistration getAsyncThreadPool( - final ProcessingContext context, - final String streamThreadName - ) { - try { - final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(context.appConfigs()); - return registry.asyncThreadPoolForStreamThread(streamThreadName); - } catch (final Exception e) { - throw new ConfigException( - "Unable to locate async thread pool registry. Make sure to configure " - + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); - } - } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 9b8d5d3f6..926237fcf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -15,6 +15,7 @@ import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; +import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -26,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.StoreBuilder; @@ -155,4 +157,18 @@ public static Optional configuredAsyncThreadPool( } } + public static AsyncThreadPoolRegistration getAsyncThreadPool( + final Map appConfigs, + final String streamThreadName + ) { + try { + final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(appConfigs); + return registry.asyncThreadPoolForStreamThread(streamThreadName); + } catch (final Exception e) { + throw new ConfigException( + "Unable to locate async thread pool registry. Make sure to configure " + + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); + } + } + } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 0acfde7b1..0a206a94e 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -12,9 +12,11 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener; -import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; +import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration; import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.clients.TTDCassandraClient; @@ -26,20 +28,26 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.TestRecord; -public class ResponsiveTopologyTestDriver extends TopologyTestDriver { +public class ResponsiveTopologyTestDriver extends TopologyTestDriverAccessor { public static final String RESPONSIVE_TTD_ORG = "Responsive"; public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver"; private final TTDCassandraClient client; + private final Optional asyncThreadPool; /** * Create a new test diver instance with default test properties. @@ -97,7 +105,7 @@ public ResponsiveTopologyTestDriver( ) { this( topology, - config, + baseProps(config), initialWallClockTime, new TTDCassandraClient( new TTDMockAdmin(baseProps(config), topology), @@ -115,9 +123,16 @@ public ResponsiveTopologyTestDriver( @Override public void advanceWallClockTime(final Duration advance) { client.advanceWallClockTime(advance); + client.flush(); super.advanceWallClockTime(advance); } + public void flush() { + asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents); + client.flush(); + super.advanceWallClockTime(Duration.ZERO); + } + private ResponsiveTopologyTestDriver( final Topology topology, final Properties config, @@ -126,27 +141,38 @@ private ResponsiveTopologyTestDriver( ) { super( topology, - testDriverProps(config, topology.describe(), cassandraClient), + injectTestDriverProps(config, topology.describe(), cassandraClient), initialWallClockTime ); this.client = cassandraClient; + this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props()); + } + + @Override + protected void pipeRecord( + final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time + ) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + flush(); } - private static Properties testDriverProps( - final Properties userProps, + private static Properties injectTestDriverProps( + final Properties baseProps, final TopologyDescription topologyDescription, final TTDCassandraClient client ) { - final Properties props = baseProps(userProps); - final SessionClients sessionClients = new SessionClients( Optional.empty(), Optional.of(client), Optional.empty(), false, client.mockAdmin() ); - final var restoreListener = mockRestoreListener(props); + final var restoreListener = mockRestoreListener(baseProps); sessionClients.initialize(restoreListener.metrics(), restoreListener); final var metrics = new ResponsiveMetrics(new Metrics()); - final String appId = userProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + final String appId = baseProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); metrics.initializeTags( appId, appId + "-client", @@ -160,14 +186,14 @@ private static Properties testDriverProps( .withMetrics(metrics) .withTopologyDescription(topologyDescription); - AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(props), 1, metrics) + AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics) .ifPresent(threadPoolRegistry -> { - threadPoolRegistry.startNewAsyncThreadPool("Test worker"); + threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName()); sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry); }); - props.putAll(sessionConfig.build()); - return props; + baseProps.putAll(sessionConfig.build()); + return baseProps; } @SuppressWarnings("deprecation") @@ -197,6 +223,20 @@ private static MockTime mockTime(final Instant initialWallClockTime) { return mockTime; } + private static Optional getAsyncThreadPoolRegistration( + final Properties props + ) { + final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0); + if (asyncThreadPoolSize > 0) { + // stupid conversion to deal with Map vs Properties type discrepancy + final Map configMap = new HashMap<>(); + props.forEach((key, value) -> configMap.put(key.toString(), value)); + + return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName())); + } else { + return Optional.empty(); + } + } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index 57a46328b..aa027fc61 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -67,7 +67,6 @@ public long currentWallClockTimeMs() { } public void advanceWallClockTime(final Duration advance) { - flush(); time.sleep(advance.toMillis()); } diff --git a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java index 51136525b..ab42a6333 100644 --- a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java +++ b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java @@ -14,9 +14,13 @@ import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.storeChangelogTopic; +import java.time.Instant; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.test.TestRecord; /** * A utility class that lives in the o.a.k.streams package so we can access @@ -47,4 +51,32 @@ public static Set extractChangelogTopics(final Topology topology) { .flatMap(t -> t.stateChangelogTopics.keySet().stream()) .collect(Collectors.toSet()); } + + public static class TopologyTestDriverAccessor extends TopologyTestDriver { + + private final Properties props; + + public TopologyTestDriverAccessor( + final Topology topology, + final Properties config, + final Instant initialWallClockTime + ) { + super(topology, config, initialWallClockTime); + this.props = config; + } + + public Properties props() { + return props; + } + + @Override + protected void pipeRecord(final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + } + + } } diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index c5c918e31..83fbbea01 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -66,7 +66,7 @@ private static ResponsiveKeyValueParams paramsForType(final KVSchema type) { public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) { // Given: final Topology topology = topology(paramsForType(type)); - try (final TopologyTestDriver driver = setupDriver(topology)) { + try (final ResponsiveTopologyTestDriver driver = setupDriver(topology)) { final TestInputTopic bids = driver.createInputTopic( "bids", new StringSerializer(), new StringSerializer()); @@ -206,7 +206,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type // Then: final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( + MatcherAssert.assertThat(outputs, Matchers.containsInAnyOrder( "a,100,1,1,alice,CA", "d,103,3,3,carol,CA", "e,104,1,1,alex,CA", @@ -290,20 +290,21 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) { .join(people, (bid, person) -> bid + "," + person) // state is the 6th column .filter((k, v) -> v.split(",")[5].equals("CA")) - .processValues(createAsyncProcessorSupplier(() -> new FixedKeyProcessor() { + .processValues(createAsyncProcessorSupplier( + () -> new FixedKeyProcessor() { - private FixedKeyProcessorContext context; + private FixedKeyProcessorContext context; - @Override - public void init(final FixedKeyProcessorContext context) { - this.context = context; - } + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } - @Override - public void process(final FixedKeyRecord fixedKeyRecord) { - context.forward(fixedKeyRecord); - } - })) + @Override + public void process(final FixedKeyRecord fixedKeyRecord) { + context.forward(fixedKeyRecord); + } + })) .to("output"); return builder.build(); From f17898307476a2e2aa7e39b7fa6901f75a4eea45 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 15:02:53 -0800 Subject: [PATCH 2/7] temp fix for TTD test --- .../api/ResponsiveTopologyTestDriverKeyValueStoreTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index 83fbbea01..3f49a91ca 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -257,7 +257,7 @@ public void shouldDeduplicateWithTtlProviderToExpireOldRecords(final KVSchema ty assertNull(transactionIdStore.get(key), "should have no txn id in state store"); // send the same event in again, outside the dedupe window - inputTopic.pipeInput(key, value); + inputTopic.pipeInput(key, value, ttlMs + 1); assertNotNull( transactionIdStore.get(key), "should have a single txn id in state store, again"); } From 28a36ab56992df53a37f3587d464c12e3c8ce37b Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 15:57:08 -0800 Subject: [PATCH 3/7] demote log from info to trace --- .../responsive/kafka/api/async/internals/AsyncThreadPool.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 9f31be5cf..860dcd547 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -182,7 +182,7 @@ public void scheduleForProcessing( = inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>()); for (final AsyncEvent event : events) { - log.info("Scheduled event {}", event.inputRecord()); + log.trace("Scheduled event {}", event.inputRecord()); try { queueSemaphore.acquire(); } catch (final InterruptedException e) { From ef7b5d6ba81ecf407b58cb0aed65807c7b2339de Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 15:57:44 -0800 Subject: [PATCH 4/7] undo name change --- .../responsive/kafka/api/ResponsiveTopologyTestDriver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 0a206a94e..8c0a95a6c 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -141,7 +141,7 @@ private ResponsiveTopologyTestDriver( ) { super( topology, - injectTestDriverProps(config, topology.describe(), cassandraClient), + testDriverProps(config, topology.describe(), cassandraClient), initialWallClockTime ); this.client = cassandraClient; @@ -160,7 +160,7 @@ protected void pipeRecord( flush(); } - private static Properties injectTestDriverProps( + private static Properties testDriverProps( final Properties baseProps, final TopologyDescription topologyDescription, final TTDCassandraClient client From 584e60b64e2b492af59fd9eba9c0ac04f5920074 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 16:00:31 -0800 Subject: [PATCH 5/7] move --- .../dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 8c0a95a6c..3ac86d74f 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -229,8 +229,8 @@ private static Optional getAsyncThreadPoolRegistrat final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0); if (asyncThreadPoolSize > 0) { - // stupid conversion to deal with Map vs Properties type discrepancy final Map configMap = new HashMap<>(); + // stupid conversion to deal with Map vs Properties type discrepancy props.forEach((key, value) -> configMap.put(key.toString(), value)); return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName())); From 99f175f76f0d9c6cc3fa24527cbd738fca697dbf Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 17:25:08 -0800 Subject: [PATCH 6/7] ttd window bug --- .../kafka/internal/db/partitioning/Segmenter.java | 4 ++++ .../kafka/internal/db/TTDWindowTable.java | 8 ++++---- .../kafka/internal/stores/WindowStoreStub.java | 7 +++---- ...esponsiveTopologyTestDriverWindowStoreTest.java | 14 +++++++------- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java index a3142576d..1c24d3643 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java @@ -143,6 +143,10 @@ public Segmenter( ); } + public long retentionPeriodMs() { + return retentionPeriodMs; + } + public long segmentIntervalMs() { return segmentIntervalMs; } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java index 09571bc62..94fbf0128 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java @@ -46,7 +46,7 @@ public TTDWindowTable( ) { super(client); this.name = spec.tableName(); - this.stub = new WindowStoreStub(); + this.stub = new WindowStoreStub(partitioner.segmenter().retentionPeriodMs()); this.partitioner = partitioner; } @@ -207,17 +207,17 @@ protected RemoteWriteResult updateOffsetAndStreamTime( final long consumedOffset, final long streamTime ) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult createSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult deleteSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java index b2abaa74e..dd2a1303c 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java @@ -28,9 +28,8 @@ public class WindowStoreStub { private final long retentionPeriod; private long observedStreamTime = 0L; - public WindowStoreStub() { - // TODO: how can we pass the actual retention period through to the store stub? - this.retentionPeriod = 15L; + public WindowStoreStub(final long retentionPeriod) { + this.retentionPeriod = retentionPeriod; } public void put(final WindowedKey key, final byte[] value) { @@ -48,7 +47,7 @@ public byte[] fetch( final long windowStart ) { final WindowedKey windowedKey = new WindowedKey(key, windowStart); - if (windowStart < minValidTimestamp() && records.containsKey(windowedKey)) { + if (windowStart > minValidTimestamp() && records.containsKey(windowedKey)) { return records.get(windowedKey); } else { return null; diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java index 10f8c1bcb..5caaf0353 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java @@ -58,13 +58,13 @@ public void shouldRunWindowStoreWithoutResponsiveConnection() { "output", new StringDeserializer(), new LongDeserializer()); // When: - bids.pipeInput("luna:1", 1L); // streamTime = 1 update <[0-9], 1> - bids.pipeInput("luna:6", 2L); // streamTime = 6 update <[0-9], 3> - bids.pipeInput("luna:10", 3L); // streamTime = 10 update <[10-19], 3> - bids.pipeInput("luna:5", 4L); // streamTime = 10 update <[0-9], 7> - bids.pipeInput("luna:18", 5L); // streamTime = 18 update <[10-19], 8> - bids.pipeInput("luna:2", 6L); // streamTime = 18 no update (outside grace) - bids.pipeInput("luna:23", 7L); // time = 23 update <[20-29], 7> + bids.pipeInput("luna:1", 1L, 1L); // streamTime = 1 update <[0-9], 1> + bids.pipeInput("luna:6", 2L, 6L); // streamTime = 6 update <[0-9], 3> + bids.pipeInput("luna:10", 3L, 10L); // streamTime = 10 update <[10-19], 3> + bids.pipeInput("luna:5", 4L, 5L); // streamTime = 10 update <[0-9], 7> + bids.pipeInput("luna:18", 5L, 18L); // streamTime = 18 update <[10-19], 8> + bids.pipeInput("luna:2", 6L, 2L); // streamTime = 18 no update (outside grace) + bids.pipeInput("luna:23", 7L, 23L); // time = 23 update <[20-29], 7> // Then: final List outputs = output.readValuesToList(); From 297aede57f21f7a0c1e862e11e0c108eb235d7a5 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 9 Dec 2024 17:25:33 -0800 Subject: [PATCH 7/7] simplify --- ...esponsiveTopologyTestDriverWindowStoreTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java index 5caaf0353..10f8c1bcb 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverWindowStoreTest.java @@ -58,13 +58,13 @@ public void shouldRunWindowStoreWithoutResponsiveConnection() { "output", new StringDeserializer(), new LongDeserializer()); // When: - bids.pipeInput("luna:1", 1L, 1L); // streamTime = 1 update <[0-9], 1> - bids.pipeInput("luna:6", 2L, 6L); // streamTime = 6 update <[0-9], 3> - bids.pipeInput("luna:10", 3L, 10L); // streamTime = 10 update <[10-19], 3> - bids.pipeInput("luna:5", 4L, 5L); // streamTime = 10 update <[0-9], 7> - bids.pipeInput("luna:18", 5L, 18L); // streamTime = 18 update <[10-19], 8> - bids.pipeInput("luna:2", 6L, 2L); // streamTime = 18 no update (outside grace) - bids.pipeInput("luna:23", 7L, 23L); // time = 23 update <[20-29], 7> + bids.pipeInput("luna:1", 1L); // streamTime = 1 update <[0-9], 1> + bids.pipeInput("luna:6", 2L); // streamTime = 6 update <[0-9], 3> + bids.pipeInput("luna:10", 3L); // streamTime = 10 update <[10-19], 3> + bids.pipeInput("luna:5", 4L); // streamTime = 10 update <[0-9], 7> + bids.pipeInput("luna:18", 5L); // streamTime = 18 update <[10-19], 8> + bids.pipeInput("luna:2", 6L); // streamTime = 18 no update (outside grace) + bids.pipeInput("luna:23", 7L); // time = 23 update <[20-29], 7> // Then: final List outputs = output.readValuesToList();