diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java index d0108d1ea..c908d7fba 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/ResponsiveKafkaStreams.java @@ -12,8 +12,6 @@ package dev.responsive.kafka.api; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; -import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.METRICS_ENABLED_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ADDITIONAL_CONNECTION_STRING_PARAMS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.MONGO_ENDPOINT_CONFIG; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 81e3ba163..00efebc0d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -12,9 +12,9 @@ package dev.responsive.kafka.api.async.internals; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG; -import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.AsyncProcessorSupplier; import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext; @@ -40,7 +40,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; @@ -218,7 +217,7 @@ private void initFields( final long punctuationInterval = configs.getLong(ASYNC_FLUSH_INTERVAL_MS_CONFIG); final int maxEventsPerKey = configs.getInt(ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG); - this.asyncThreadPoolRegistration = getAsyncThreadPool(taskContext, streamThreadName); + this.asyncThreadPoolRegistration = getAsyncThreadPool(appConfigs, streamThreadName); asyncThreadPoolRegistration.registerAsyncProcessor(taskId, this::flushPendingEventsForCommit); asyncThreadPoolRegistration.threadPool().maybeInitThreadPoolMetrics(); @@ -756,17 +755,5 @@ private void verifyConnectedStateStores( } } - private static AsyncThreadPoolRegistration getAsyncThreadPool( - final ProcessingContext context, - final String streamThreadName - ) { - try { - final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(context.appConfigs()); - return registry.asyncThreadPoolForStreamThread(streamThreadName); - } catch (final Exception e) { - throw new ConfigException( - "Unable to locate async thread pool registry. Make sure to configure " - + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); - } - } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 9f31be5cf..860dcd547 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -182,7 +182,7 @@ public void scheduleForProcessing( = inFlight.computeIfAbsent(asyncProcessorId, k -> new ConcurrentHashMap<>()); for (final AsyncEvent event : events) { - log.info("Scheduled event {}", event.inputRecord()); + log.trace("Scheduled event {}", event.inputRecord()); try { queueSemaphore.acquire(); } catch (final InterruptedException e) { diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java index 9b8d5d3f6..926237fcf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncUtils.java @@ -15,6 +15,7 @@ import static dev.responsive.kafka.api.async.internals.AsyncThreadPool.ASYNC_THREAD_NAME; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_ASYNC_THREAD_CONFIG; import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; +import static dev.responsive.kafka.internal.config.InternalSessionConfigs.loadAsyncThreadPoolRegistry; import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder; import dev.responsive.kafka.api.config.ResponsiveConfig; @@ -26,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.StoreBuilder; @@ -155,4 +157,18 @@ public static Optional configuredAsyncThreadPool( } } + public static AsyncThreadPoolRegistration getAsyncThreadPool( + final Map appConfigs, + final String streamThreadName + ) { + try { + final AsyncThreadPoolRegistry registry = loadAsyncThreadPoolRegistry(appConfigs); + return registry.asyncThreadPoolForStreamThread(streamThreadName); + } catch (final Exception e) { + throw new ConfigException( + "Unable to locate async thread pool registry. Make sure to configure " + + ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG, e); + } + } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java index a3142576d..1c24d3643 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/Segmenter.java @@ -143,6 +143,10 @@ public Segmenter( ); } + public long retentionPeriodMs() { + return retentionPeriodMs; + } + public long segmentIntervalMs() { return segmentIntervalMs; } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java index 0acfde7b1..3ac86d74f 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriver.java @@ -12,9 +12,11 @@ package dev.responsive.kafka.api; +import static dev.responsive.kafka.api.async.internals.AsyncUtils.getAsyncThreadPool; +import static dev.responsive.kafka.api.config.ResponsiveConfig.ASYNC_THREAD_POOL_SIZE_CONFIG; import static dev.responsive.kafka.internal.stores.TTDRestoreListener.mockRestoreListener; -import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry; +import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration; import dev.responsive.kafka.api.async.internals.AsyncUtils; import dev.responsive.kafka.api.config.ResponsiveConfig; import dev.responsive.kafka.internal.clients.TTDCassandraClient; @@ -26,20 +28,26 @@ import java.time.Duration; import java.time.Instant; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TTDUtils.TopologyTestDriverAccessor; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.test.TestRecord; -public class ResponsiveTopologyTestDriver extends TopologyTestDriver { +public class ResponsiveTopologyTestDriver extends TopologyTestDriverAccessor { public static final String RESPONSIVE_TTD_ORG = "Responsive"; public static final String RESPONSIVE_TTD_ENV = "TopologyTestDriver"; private final TTDCassandraClient client; + private final Optional asyncThreadPool; /** * Create a new test diver instance with default test properties. @@ -97,7 +105,7 @@ public ResponsiveTopologyTestDriver( ) { this( topology, - config, + baseProps(config), initialWallClockTime, new TTDCassandraClient( new TTDMockAdmin(baseProps(config), topology), @@ -115,9 +123,16 @@ public ResponsiveTopologyTestDriver( @Override public void advanceWallClockTime(final Duration advance) { client.advanceWallClockTime(advance); + client.flush(); super.advanceWallClockTime(advance); } + public void flush() { + asyncThreadPool.ifPresent(AsyncThreadPoolRegistration::flushAllAsyncEvents); + client.flush(); + super.advanceWallClockTime(Duration.ZERO); + } + private ResponsiveTopologyTestDriver( final Topology topology, final Properties config, @@ -130,23 +145,34 @@ private ResponsiveTopologyTestDriver( initialWallClockTime ); this.client = cassandraClient; + this.asyncThreadPool = getAsyncThreadPoolRegistration(super.props()); + } + + @Override + protected void pipeRecord( + final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time + ) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + flush(); } private static Properties testDriverProps( - final Properties userProps, + final Properties baseProps, final TopologyDescription topologyDescription, final TTDCassandraClient client ) { - final Properties props = baseProps(userProps); - final SessionClients sessionClients = new SessionClients( Optional.empty(), Optional.of(client), Optional.empty(), false, client.mockAdmin() ); - final var restoreListener = mockRestoreListener(props); + final var restoreListener = mockRestoreListener(baseProps); sessionClients.initialize(restoreListener.metrics(), restoreListener); final var metrics = new ResponsiveMetrics(new Metrics()); - final String appId = userProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + final String appId = baseProps.getProperty(StreamsConfig.APPLICATION_ID_CONFIG); metrics.initializeTags( appId, appId + "-client", @@ -160,14 +186,14 @@ private static Properties testDriverProps( .withMetrics(metrics) .withTopologyDescription(topologyDescription); - AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(props), 1, metrics) + AsyncUtils.configuredAsyncThreadPool(ResponsiveConfig.responsiveConfig(baseProps), 1, metrics) .ifPresent(threadPoolRegistry -> { - threadPoolRegistry.startNewAsyncThreadPool("Test worker"); + threadPoolRegistry.startNewAsyncThreadPool(Thread.currentThread().getName()); sessionConfig.withAsyncThreadPoolRegistry(threadPoolRegistry); }); - props.putAll(sessionConfig.build()); - return props; + baseProps.putAll(sessionConfig.build()); + return baseProps; } @SuppressWarnings("deprecation") @@ -197,6 +223,20 @@ private static MockTime mockTime(final Instant initialWallClockTime) { return mockTime; } + private static Optional getAsyncThreadPoolRegistration( + final Properties props + ) { + final int asyncThreadPoolSize = (int) props.getOrDefault(ASYNC_THREAD_POOL_SIZE_CONFIG, 0); + if (asyncThreadPoolSize > 0) { + final Map configMap = new HashMap<>(); + // stupid conversion to deal with Map vs Properties type discrepancy + props.forEach((key, value) -> configMap.put(key.toString(), value)); + + return Optional.of(getAsyncThreadPool(configMap, Thread.currentThread().getName())); + } else { + return Optional.empty(); + } + } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java index 57a46328b..aa027fc61 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/clients/TTDCassandraClient.java @@ -67,7 +67,6 @@ public long currentWallClockTimeMs() { } public void advanceWallClockTime(final Duration advance) { - flush(); time.sleep(advance.toMillis()); } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java index 09571bc62..94fbf0128 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowTable.java @@ -46,7 +46,7 @@ public TTDWindowTable( ) { super(client); this.name = spec.tableName(); - this.stub = new WindowStoreStub(); + this.stub = new WindowStoreStub(partitioner.segmenter().retentionPeriodMs()); this.partitioner = partitioner; } @@ -207,17 +207,17 @@ protected RemoteWriteResult updateOffsetAndStreamTime( final long consumedOffset, final long streamTime ) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult createSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } @Override protected RemoteWriteResult deleteSegment(final SegmentPartition partition) { - return null; + return RemoteWriteResult.success(null); } } } diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java index b2abaa74e..dd2a1303c 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java @@ -28,9 +28,8 @@ public class WindowStoreStub { private final long retentionPeriod; private long observedStreamTime = 0L; - public WindowStoreStub() { - // TODO: how can we pass the actual retention period through to the store stub? - this.retentionPeriod = 15L; + public WindowStoreStub(final long retentionPeriod) { + this.retentionPeriod = retentionPeriod; } public void put(final WindowedKey key, final byte[] value) { @@ -48,7 +47,7 @@ public byte[] fetch( final long windowStart ) { final WindowedKey windowedKey = new WindowedKey(key, windowStart); - if (windowStart < minValidTimestamp() && records.containsKey(windowedKey)) { + if (windowStart > minValidTimestamp() && records.containsKey(windowedKey)) { return records.get(windowedKey); } else { return null; diff --git a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java index 51136525b..ab42a6333 100644 --- a/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java +++ b/responsive-test-utils/src/main/java/org/apache/kafka/streams/TTDUtils.java @@ -14,9 +14,13 @@ import static org.apache.kafka.streams.processor.internals.ProcessorStateManager.storeChangelogTopic; +import java.time.Instant; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.test.TestRecord; /** * A utility class that lives in the o.a.k.streams package so we can access @@ -47,4 +51,32 @@ public static Set extractChangelogTopics(final Topology topology) { .flatMap(t -> t.stateChangelogTopics.keySet().stream()) .collect(Collectors.toSet()); } + + public static class TopologyTestDriverAccessor extends TopologyTestDriver { + + private final Properties props; + + public TopologyTestDriverAccessor( + final Topology topology, + final Properties config, + final Instant initialWallClockTime + ) { + super(topology, config, initialWallClockTime); + this.props = config; + } + + public Properties props() { + return props; + } + + @Override + protected void pipeRecord(final String topic, + final TestRecord record, + final Serializer keySerializer, + final Serializer valueSerializer, + final Instant time) { + super.pipeRecord(topic, record, keySerializer, valueSerializer, time); + } + + } } diff --git a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java index c5c918e31..3f49a91ca 100644 --- a/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java +++ b/responsive-test-utils/src/test/java/dev/responsive/kafka/api/ResponsiveTopologyTestDriverKeyValueStoreTest.java @@ -66,7 +66,7 @@ private static ResponsiveKeyValueParams paramsForType(final KVSchema type) { public void shouldRunWithoutResponsiveConnectionAndNoTtl(final KVSchema type) { // Given: final Topology topology = topology(paramsForType(type)); - try (final TopologyTestDriver driver = setupDriver(topology)) { + try (final ResponsiveTopologyTestDriver driver = setupDriver(topology)) { final TestInputTopic bids = driver.createInputTopic( "bids", new StringSerializer(), new StringSerializer()); @@ -206,7 +206,7 @@ public void shouldEnforceKeyBasedTtlByAdvancingWallclockTime(final KVSchema type // Then: final List outputs = output.readValuesToList(); - MatcherAssert.assertThat(outputs, Matchers.contains( + MatcherAssert.assertThat(outputs, Matchers.containsInAnyOrder( "a,100,1,1,alice,CA", "d,103,3,3,carol,CA", "e,104,1,1,alex,CA", @@ -257,7 +257,7 @@ public void shouldDeduplicateWithTtlProviderToExpireOldRecords(final KVSchema ty assertNull(transactionIdStore.get(key), "should have no txn id in state store"); // send the same event in again, outside the dedupe window - inputTopic.pipeInput(key, value); + inputTopic.pipeInput(key, value, ttlMs + 1); assertNotNull( transactionIdStore.get(key), "should have a single txn id in state store, again"); } @@ -290,20 +290,21 @@ private Topology topology(final ResponsiveKeyValueParams storeParams) { .join(people, (bid, person) -> bid + "," + person) // state is the 6th column .filter((k, v) -> v.split(",")[5].equals("CA")) - .processValues(createAsyncProcessorSupplier(() -> new FixedKeyProcessor() { + .processValues(createAsyncProcessorSupplier( + () -> new FixedKeyProcessor() { - private FixedKeyProcessorContext context; + private FixedKeyProcessorContext context; - @Override - public void init(final FixedKeyProcessorContext context) { - this.context = context; - } + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } - @Override - public void process(final FixedKeyRecord fixedKeyRecord) { - context.forward(fixedKeyRecord); - } - })) + @Override + public void process(final FixedKeyRecord fixedKeyRecord) { + context.forward(fixedKeyRecord); + } + })) .to("output"); return builder.build();