diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java index 96cd6c6aa..678b437a8 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/config/ResponsiveConfig.java @@ -180,12 +180,13 @@ public class ResponsiveConfig extends AbstractConfig { // ------------------ WindowStore configurations ---------------------- public static final String MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_CONFIG = "responsive.mongo.windowed.key.timestamp.first"; - private static final boolean MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DEFAULT = false; + private static final boolean MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DEFAULT = true; private static final String MONGO_WINDOWED_KEY_TIMESTAMP_FIRST_DOC = "Whether to put the window start timestamp " + "first in the composite windowed key format for MongoDB. This can be toggled true/false to get better " + "performance depending on the density of unique keys per window, and should be experimented " - + "with for best results. However it is important to note that this cannot be changed for " - + "an active application. Messing with this can corrupt existing state!"; + + "with for best results. Must be true for any application that uses range queries on window stores. " + + "It is also important to note that this cannot be changed for an active application -- flipping " + + "this can corrupt existing state."; public static final String WINDOW_BLOOM_FILTER_COUNT_CONFIG = "responsive.window.bloom.filter.count"; private static final int WINDOW_BLOOM_FILTER_COUNT_DEFAULT = 0; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java index f95ff05b1..00dd8bcbf 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java @@ -935,6 +935,22 @@ public KeyValueIterator backFetchAll( ); } + @Override + public KeyValueIterator all( + final int kafkaPartition, + final long streamTime + ) { + throw new UnsupportedOperationException("all is not yet supported for Cassandra backends"); + } + + @Override + public KeyValueIterator backAll( + final int kafkaPartition, + final long streamTime + ) { + throw new UnsupportedOperationException("backAll is not yet supported for Cassandra backends"); + } + private static KeyValue windowRows(final Row row) { final long startTs = row.getInstant(WINDOW_START.column()).toEpochMilli(); final Bytes key = Bytes.wrap(row.getByteBuffer(DATA_KEY.column()).array()); diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java index cbe3e8751..337a28e8b 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/RemoteWindowedTable.java @@ -18,6 +18,7 @@ import dev.responsive.kafka.internal.utils.WindowedKey; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; public interface RemoteWindowedTable extends RemoteTable { @@ -157,4 +158,22 @@ KeyValueIterator backFetchAll( long timeFrom, long timeTo ); + + /** + * Retrieves the windows of the given {@code kafkaPartition} across all keys and timestamps. + * + * @param kafkaPartition the kafka partition + * + * @return a forwards iterator over all the windows and values previously set + */ + KeyValueIterator all(int kafkaPartition, long streamTime); + + /** + * Retrieves the windows of the given {@code kafkaPartition} across all keys and timestamps. + * + * @param kafkaPartition the kafka partition + * + * @return a backwards iterator over all the windows and values previously set + */ + KeyValueIterator backAll(int kafkaPartition, long streamTime); } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java index 62997caeb..c4da3a215 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/mongo/MongoWindowedTable.java @@ -21,6 +21,7 @@ import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry; import static dev.responsive.kafka.internal.db.partitioning.SegmentPartitioner.UNINITIALIZED_STREAM_TIME; import static dev.responsive.kafka.internal.stores.ResponsiveStoreRegistration.NO_COMMITTED_OFFSET; +import static dev.responsive.kafka.internal.stores.SegmentedOperations.MIN_KEY; import static org.bson.codecs.configuration.CodecRegistries.fromProviders; import static org.bson.codecs.configuration.CodecRegistries.fromRegistries; @@ -52,6 +53,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TaskMigratedException; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; import org.bson.codecs.configuration.CodecProvider; import org.bson.codecs.configuration.CodecRegistry; @@ -457,13 +459,37 @@ public KeyValueIterator backFetch( @Override public KeyValueIterator fetchRange( final int kafkaPartition, - final Bytes fromKey, - final Bytes toKey, + final Bytes keyFrom, + final Bytes keyTo, final long timeFrom, final long timeTo ) { - throw new UnsupportedOperationException("fetchRange not yet supported for Mongo backends"); + final List> segmentIterators = new LinkedList<>(); + final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition); + + for (final var segment : partitioner.range(kafkaPartition, timeFrom, timeTo)) { + final var segmentWindows = partitionSegments.segmentWindows.get(segment); + + // Since we use a flat keyspace by concatenating the timestamp with the data key and have + // variable length data keys, it's impossible to request only valid data that's within + // the given bounds. Instead issue a broad request from the valid bounds and then post-filter + final var lowerBound = compositeKey(keyFrom, timeFrom); + final var upperBound = compositeKey(keyTo, timeTo); + final FindIterable fetchResults = segmentWindows.find( + Filters.and( + Filters.gte(WindowDoc.ID, lowerBound), + Filters.lte(WindowDoc.ID, upperBound)) + ); + + + segmentIterators.add( + Iterators.filterKv( + Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc), + kv -> filterFetchRange(kv, timeFrom, timeTo, keyFrom, keyTo, timestampFirstOrder)) + ); + } + return Iterators.wrapped(segmentIterators); } @Override @@ -483,7 +509,35 @@ public KeyValueIterator fetchAll( final long timeFrom, final long timeTo ) { - throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends"); + if (!timestampFirstOrder) { + throw new UnsupportedOperationException("Range queries such as fetchAll require stores to be " + + "configured with timestamp-first order"); + } + + final List> segmentIterators = new LinkedList<>(); + final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition); + + for (final var segment : partitioner.range(kafkaPartition, timeFrom, timeTo)) { + final var segmentWindows = partitionSegments.segmentWindows.get(segment); + + // To avoid scanning the entire segment, we use the bytewise "minimum key" to start the scan + // at the lower time bound. Since there's no corresponding "maximum key" given the variable + // length keys, we have to set the upper bound at timeTo + 1, while using strict comparison + // (ie #lt rather than #lte) to exclude said upper bound + final var lowerBound = compositeKey(MIN_KEY, timeFrom); + final var upperBound = compositeKey(MIN_KEY, timeTo + 1); + + final FindIterable fetchResults = segmentWindows.find( + Filters.and( + Filters.gte(WindowDoc.ID, lowerBound), + Filters.lt(WindowDoc.ID, upperBound)) + ); + + segmentIterators.add( + Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc) + ); + } + return Iterators.wrapped(segmentIterators); } @Override @@ -495,6 +549,34 @@ public KeyValueIterator backFetchAll( throw new UnsupportedOperationException("backFetchAll not yet supported for MongoDB backends"); } + @Override + public KeyValueIterator all( + final int kafkaPartition, + final long streamTime + ) { + final List> segmentIterators = new LinkedList<>(); + final var partitionSegments = kafkaPartitionToSegments.get(kafkaPartition); + + for (final var segment : partitioner.activeSegments(kafkaPartition, streamTime)) { + final var segmentWindows = partitionSegments.segmentWindows.get(segment); + + final FindIterable fetchResults = segmentWindows.find(); + + segmentIterators.add( + Iterators.kv(fetchResults.iterator(), MongoWindowedTable::windowFromDoc) + ); + } + return Iterators.wrapped(segmentIterators); + } + + @Override + public KeyValueIterator backAll( + final int kafkaPartition, + final long streamTime + ) { + throw new UnsupportedOperationException("backAll not yet supported for MongoDB backends"); + } + public BasicDBObject compositeKey(final WindowedKey windowedKey) { return compositeKey(windowedKey.key, windowedKey.windowStartMs); } @@ -511,5 +593,22 @@ private static KeyValue windowFromDoc(final WindowDoc windo return new KeyValue<>(WindowDoc.windowedKey(windowDoc.id), windowDoc.value); } + private static boolean filterFetchRange( + final WindowedKey windowedKey, + final long timeFrom, + final long timeTo, + final Bytes keyFrom, + final Bytes keyTo, + final boolean timestampFirstOrder + ) { + // If we use timestamp-first order, then the upper/lower bounds guarantee the timestamps are + // valid, so therefore we only need to filter out the invalid keys (and vice versa) + if (timestampFirstOrder) { + return windowedKey.key.compareTo(keyFrom) > 0 && windowedKey.key.compareTo(keyTo) < 0; + } else { + return windowedKey.windowStartMs > timeFrom && windowedKey.windowStartMs < timeTo; + } + } + } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java index af65bbd22..5a8fa5467 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/CommitBuffer.java @@ -342,6 +342,20 @@ public KeyValueIterator> range(final K from, final K to) { ); } + public KeyValueIterator> range( + final K from, + final K to, + final boolean fromInclusive, + final boolean toInclusive + ) { + return Iterators.kv( + Iterators.filter( + buffer.getReader().subMap(from, fromInclusive, to, toInclusive).entrySet().iterator(), + e -> keySpec.retain(e.getKey())), + result -> new KeyValue<>(result.getKey(), result.getValue()) + ); + } + public KeyValueIterator> range( final K from, final K to, diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java index 58833b41c..21295ad0a 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/ResponsiveWindowStore.java @@ -270,7 +270,7 @@ public KeyValueIterator, byte[]> fetchAll( @Override public KeyValueIterator, byte[]> all() { - return windowOperations.all(); + return windowOperations.all(observedStreamTime); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java index 282f6a01d..777bf7e54 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/SegmentedOperations.java @@ -55,6 +55,9 @@ public class SegmentedOperations implements WindowOperations { + // The "minimum" possible key when comparing bytewise, used to define range query bounds + public static final Bytes MIN_KEY = Bytes.wrap(new byte[0]); + @SuppressWarnings("rawtypes") private final InternalProcessorContext context; private final ResponsiveWindowParams params; @@ -267,12 +270,26 @@ public KeyValueIterator, byte[]> fetchAll( final long timeFrom, final long timeTo ) { - throw new UnsupportedOperationException("Not yet implemented"); + final WindowedKey lowerBound = new WindowedKey(MIN_KEY, timeFrom); + final WindowedKey upperBound = new WindowedKey(MIN_KEY, timeTo + 1); + + // set toInclusive bound to false due to the +1 in the upper bound + return Iterators.windowedKey( + new LocalRemoteKvIterator<>( + buffer.range(lowerBound, upperBound, true, false), + table.fetchAll(changelog.partition(), timeFrom, timeTo)), + params.windowSize() + ); } @Override - public KeyValueIterator, byte[]> all() { - throw new UnsupportedOperationException("Not yet implemented"); + public KeyValueIterator, byte[]> all(final long streamTime) { + return Iterators.windowedKey( + new LocalRemoteKvIterator<>( + buffer.all(), + table.all(changelog.partition(), streamTime)), + params.windowSize() + ); } @Override diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/WindowOperations.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/WindowOperations.java index bb96a12b9..9a06f2b8c 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/WindowOperations.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/stores/WindowOperations.java @@ -53,7 +53,7 @@ KeyValueIterator, byte[]> fetchAll( final long timeTo ); - KeyValueIterator, byte[]> all(); + KeyValueIterator, byte[]> all(final long streamTime); WindowStoreIterator backwardFetch( final Bytes key, diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/StoreQueryIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/StoreQueryIntegrationTest.java index 29afb4ac0..807f300b8 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/StoreQueryIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/StoreQueryIntegrationTest.java @@ -17,6 +17,7 @@ package dev.responsive.kafka.integration; import static dev.responsive.kafka.api.config.ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG; +import static dev.responsive.kafka.api.stores.ResponsiveWindowParams.window; import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait; import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeRecords; import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutput; @@ -58,6 +59,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -65,6 +67,7 @@ import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.WindowStore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -74,7 +77,7 @@ public class StoreQueryIntegrationTest { @RegisterExtension - static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.CASSANDRA); + static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB); private static final String INPUT_TOPIC = "input"; private static final String OUTPUT_TOPIC = "output"; @@ -178,6 +181,31 @@ public void shouldAggregateAllCapitalLettersUsingRangeQuery() throws Exception { } } + @Test + public void shouldAggregateAllWindowsInRangeQuery() throws Exception { + // Given: + final Map properties = getMutableProperties(); + final KafkaProducer producer = new KafkaProducer<>(properties); + try (final ResponsiveKafkaStreams streams = buildWindowStreams(properties, true)) { + startAppAndAwaitRunning(Duration.ofSeconds(10), streams); + + final List> records = Arrays.asList( + new KeyValueTimestamp<>("A", "A", 0L), + new KeyValueTimestamp<>("b", "b", 1L), + new KeyValueTimestamp<>("C", "C", 2L), + new KeyValueTimestamp<>("d", "d", 3L), + new KeyValueTimestamp<>("E", "E", 4L) + ); + + // When: + pipeRecords(producer, inputTopic(), records); + + // Then: + final var kvs = readOutput(outputTopic(), 0, 5, true, properties); + // TODO + } + } + private ResponsiveKafkaStreams buildKVStreams( final Map properties, final boolean range @@ -188,11 +216,34 @@ private ResponsiveKafkaStreams buildKVStreams( final StoreBuilder> storeBuilder = ResponsiveStores.keyValueStoreBuilder( - ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(kvStoreName())), + ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(storeName())), Serdes.String(), Serdes.String()); input - .processValues(new TransformerSupplier(range, storeBuilder), kvStoreName()) + .processValues(new TransformerSupplier(false, range, storeBuilder), storeName()) + .to(outputTopic()); + + return new ResponsiveKafkaStreams(builder.build(), properties); + } + + private ResponsiveKafkaStreams buildWindowStreams( + final Map properties, + final boolean range + ) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream input = builder.stream(inputTopic()); + + final Duration windowSize = Duration.ofMillis(10L); + final Duration gracePeriod = Duration.ofMillis(100L); + + final StoreBuilder> storeBuilder = + ResponsiveStores.windowStoreBuilder( + ResponsiveStores.windowStoreSupplier(window(storeName(), windowSize, gracePeriod)), + Serdes.String(), + Serdes.String()); + input + .processValues(new TransformerSupplier(true, range, storeBuilder), storeName()) .to(outputTopic()); return new ResponsiveKafkaStreams(builder.build(), properties); @@ -200,17 +251,27 @@ private ResponsiveKafkaStreams buildKVStreams( private class TransformerSupplier implements FixedKeyProcessorSupplier { - private final StoreBuilder storeBuilder; + private final boolean windowed; private final boolean rangeQuery; + private final StoreBuilder storeBuilder; - public TransformerSupplier(final boolean rangeQuery, final StoreBuilder storeBuilder) { + public TransformerSupplier( + final boolean windowed, + final boolean rangeQuery, + final StoreBuilder storeBuilder + ) { + this.windowed = windowed; this.rangeQuery = rangeQuery; this.storeBuilder = storeBuilder; } @Override public FixedKeyProcessor get() { - return new CountingProcessor(rangeQuery); + if (windowed) { + return new CrossKeyWindowAggregator(rangeQuery); + } else { + return new CrossKeyKVAggregator(rangeQuery); + } } @Override @@ -222,11 +283,56 @@ public Set> stores() { } } - private class CountingProcessor implements FixedKeyProcessor { + private class CrossKeyWindowAggregator implements FixedKeyProcessor { + + private final boolean rangeQuery; + + public CrossKeyWindowAggregator(final boolean rangeQuery) { + this.rangeQuery = rangeQuery; + } + + private WindowStore windowStore; + private FixedKeyProcessorContext context; + + @Override + public void init(final FixedKeyProcessorContext context) { + FixedKeyProcessor.super.init(context); + this.windowStore = context.getStateStore(storeName()); + this.context = context; + } + + @Override + public void process(final FixedKeyRecord record) { + final StringBuilder builder = new StringBuilder(); + + KeyValueIterator, String> iterator = null; + try { + + if (rangeQuery) { + iterator = windowStore.fetch("A", "Z", 0L, 5L); + } else { + iterator = windowStore.all(); + } + + while (iterator.hasNext()) { + builder.append(iterator.next().value); + } + builder.append(record.value()); + + } finally { + iterator.close(); + } + + windowStore.put(record.key(), record.value(), record.timestamp()); + context.forward(record.withValue(builder.toString())); + } + } + + private class CrossKeyKVAggregator implements FixedKeyProcessor { private final boolean rangeQuery; - public CountingProcessor(final boolean rangeQuery) { + public CrossKeyKVAggregator(final boolean rangeQuery) { this.rangeQuery = rangeQuery; } @@ -236,7 +342,7 @@ public CountingProcessor(final boolean rangeQuery) { @Override public void init(final FixedKeyProcessorContext context) { FixedKeyProcessor.super.init(context); - this.kvStore = context.getStateStore(kvStoreName()); + this.kvStore = context.getStateStore(storeName()); this.context = context; } @@ -267,8 +373,8 @@ public void process(final FixedKeyRecord record) { } } - private String kvStoreName() { - return name + "-kv-store"; + private String storeName() { + return name + "-store"; } private Map getMutableProperties() { diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java index 430c1ff9e..c7f25620b 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/db/TTDWindowedTable.java @@ -148,6 +148,22 @@ public KeyValueIterator backFetchAll( return stub.backFetchAll(timeFrom, timeTo); } + @Override + public KeyValueIterator all( + final int kafkaPartition, + final long streamTime + ) { + return stub.all(); + } + + @Override + public KeyValueIterator backAll( + final int kafkaPartition, + final long streamTime + ) { + return stub.backAll(); + } + @Override public long count() { return 0; diff --git a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java index 8dd1fd754..4556b3de8 100644 --- a/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java +++ b/responsive-test-utils/src/main/java/dev/responsive/kafka/internal/stores/WindowStoreStub.java @@ -107,6 +107,14 @@ public KeyValueIterator backFetchAll( throw new UnsupportedOperationException("Not yet implemented."); } + public KeyValueIterator all() { + throw new UnsupportedOperationException("Not yet implemented."); + } + + public KeyValueIterator backAll() { + throw new UnsupportedOperationException("Not yet implemented."); + } + private long minValidTimestamp() { return observedStreamTime - retentionPeriod + 1; }