From 650c103b5b686d17d1f641ea5ff52f2b490a867f Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Tue, 18 Jul 2023 20:07:05 -0700 Subject: [PATCH] POC --- .../apache/kafka/streams/TopologyConfig.java | 6 ++ .../kafka/streams/kstream/Materialized.java | 4 +- .../internals/KeyValueStoreMaterializer.java | 23 +++++--- .../internals/MaterializedInternal.java | 6 ++ .../SessionWindowedCogroupedKStreamImpl.java | 36 +++++++----- .../internals/SessionWindowedKStreamImpl.java | 43 ++++++++------ .../SlidingWindowedCogroupedKStreamImpl.java | 47 ++++++++------- .../internals/SlidingWindowedKStreamImpl.java | 55 ++++++++++-------- .../TimeWindowedCogroupedKStreamImpl.java | 47 ++++++++------- .../internals/TimeWindowedKStreamImpl.java | 57 +++++++++++-------- .../kafka/streams/state/DSLStoreProvider.java | 25 ++++++++ .../state/internals/RocksDBStoreProvider.java | 26 +++++++++ 12 files changed, 245 insertions(+), 130 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/DSLStoreProvider.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStoreProvider.java diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index 61c1b0bf7a14f..25057e0c7fe56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; +import org.apache.kafka.streams.state.DSLStoreProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; @@ -216,6 +217,11 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo } } + public DSLStoreProvider storeProvider() { + // TODO(KIP-954): get this from StreamsConfig (or TopologyConfig overrides) + return null; + } + public Materialized.StoreType parseStoreType() { if (storeType.equals(IN_MEMORY)) { return Materialized.StoreType.IN_MEMORY; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index f63b6b9773df6..672c3e304e8ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.DSLStoreProvider; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; @@ -65,6 +66,7 @@ public class Materialized { protected Map topicConfig = new HashMap<>(); protected Duration retention; public StoreType storeType; + public DSLStoreProvider storeProvider; // the built-in state store types public enum StoreType { @@ -107,7 +109,7 @@ protected Materialized(final Materialized materialized) { * @param key type of the store * @param value type of the store * @param type of the {@link StateStore} - * @return a new {@link Materialized} instance with the given storeName + * @return a new {@link Materialized} instance with the given storeType */ public static Materialized as(final StoreType storeType) { Objects.requireNonNull(storeType, "store type can't be null"); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index 468846c7dee97..15f1ffc396f9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.time.Duration; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; @@ -46,15 +47,19 @@ public KeyValueStoreMaterializer(final MaterializedInternal materialize() { KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryKeyValueStore(materialized.storeName()); - break; - case ROCKS_DB: - supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName()); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().timestampedKeyValueStore(materialized.storeName()); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemoryKeyValueStore(materialized.storeName()); + break; + case ROCKS_DB: + supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName()); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index e81934716b299..25a00f6d190d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.state.DSLStoreProvider; import org.apache.kafka.streams.state.StoreSupplier; import java.time.Duration; @@ -48,6 +49,7 @@ public MaterializedInternal(final Materialized materialized, // if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider // otherwise, set to default rocksDB if (storeType == null) { + // TODO(KIP-954): get DSLStoreProvider from TopologyConfigs here, if set storeType = StoreType.ROCKS_DB; if (nameProvider instanceof InternalStreamsBuilder) { final TopologyConfig topologyConfig = ((InternalStreamsBuilder) nameProvider).internalTopologyBuilder.topologyConfigs(); @@ -73,6 +75,10 @@ public StoreType storeType() { return storeType; } + public DSLStoreProvider storeProvider() { + return storeProvider; + } + public StoreSupplier storeSupplier() { return storeSupplier; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java index 4279224fa8127..728ae0a7c5489 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java @@ -122,21 +122,27 @@ private StoreBuilder> materialize(final MaterializedInternal + " retention=[" + retentionPeriod + "]"); } - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemorySessionStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod) - ); - break; - case ROCKS_DB: - supplier = Stores.persistentSessionStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod) - ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().sessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod)); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemorySessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod) + ); + break; + case ROCKS_DB: + supplier = Stores.persistentSessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod) + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index ef00325c58f30..b7ebe7c691962 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -253,27 +253,34 @@ private StoreBuilder> materialize(final MaterializedInt + " grace=[" + windows.gracePeriodMs() + "]," + " retention=[" + retentionPeriod + "]"); } - - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemorySessionStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod) - ); - break; - case ROCKS_DB: - supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? - new RocksDbTimeOrderedSessionBytesStoreSupplier( - materialized.storeName(), - retentionPeriod, - true) : - Stores.persistentSessionStore( + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().sessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod)); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemorySessionStore( materialized.storeName(), Duration.ofMillis(retentionPeriod) ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + break; + case ROCKS_DB: + // TODO(KIP-954): should the DSLStoreProvider include an API for this or is + // it a specific optimization that only applies to rocksdb? + supplier = emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? + new RocksDbTimeOrderedSessionBytesStoreSupplier( + materialized.storeName(), + retentionPeriod, + true) : + Stores.persistentSessionStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod) + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java index 383fed70a690a..3e4b2cc4d760b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java @@ -113,26 +113,33 @@ private StoreBuilder> materialize(final Materialize + " retention=[" + retentionPeriod + "]"); } - - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.timeDifferenceMs()), - false - ); - break; - case ROCKS_DB: - supplier = Stores.persistentTimestampedWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.timeDifferenceMs()), - false - ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().timestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemoryWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false + ); + break; + case ROCKS_DB: + supplier = Stores.persistentTimestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index e4c9b823c0d4d..4231eaec54478 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -219,34 +219,43 @@ private StoreBuilder> materialize(final Mater + " grace=[" + windows.gracePeriodMs() + "]," + " retention=[" + retentionPeriod + "]"); } - - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.timeDifferenceMs()), - false - ); - break; - case ROCKS_DB: - supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? - RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.timeDifferenceMs()), - false, - true - ) : - Stores.persistentTimestampedWindowStore( + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().timestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemoryWindowStore( materialized.storeName(), Duration.ofMillis(retentionPeriod), Duration.ofMillis(windows.timeDifferenceMs()), false ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + break; + case ROCKS_DB: + // TODO(KIP-954): should the DSLStoreProvider include an API for this or is + // it a specific optimization that only applies to rocksdb? + supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false, + true + ) : + Stores.persistentTimestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.timeDifferenceMs()), + false + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java index 07b75bd1454f3..84c4e7c6383ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java @@ -119,26 +119,33 @@ private StoreBuilder> materialize( + " retention=[" + retentionPeriod + "]"); } - - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.size()), - false - ); - break; - case ROCKS_DB: - supplier = Stores.persistentTimestampedWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.size()), - false - ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().timestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemoryWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false + ); + break; + case ROCKS_DB: + supplier = Stores.persistentTimestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index a9818a604efa6..bc472cc1bb1b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -250,34 +250,43 @@ private StoreBuilder> materialize(final Mater + " grace=[" + windows.gracePeriodMs() + "]," + " retention=[" + retentionPeriod + "]"); } - - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryWindowStore( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.size()), - false - ); - break; - case ROCKS_DB: - supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? - RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( - materialized.storeName(), - Duration.ofMillis(retentionPeriod), - Duration.ofMillis(windows.size()), - false, - false - ) : - Stores.persistentTimestampedWindowStore( + if (materialized.storeProvider() != null) { + supplier = materialized.storeProvider().timestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false); + } else { + switch (materialized.storeType()) { + case IN_MEMORY: + supplier = Stores.inMemoryWindowStore( materialized.storeName(), Duration.ofMillis(retentionPeriod), Duration.ofMillis(windows.size()), false - ); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + ); + break; + case ROCKS_DB: + // TODO(KIP-954): should the DSLStoreProvider include an API for this or is + // it a specific optimization that only applies to rocksdb? + supplier = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE ? + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false, + false + ) : + Stores.persistentTimestampedWindowStore( + materialized.storeName(), + Duration.ofMillis(retentionPeriod), + Duration.ofMillis(windows.size()), + false + ); + break; + default: + throw new IllegalStateException("Unknown store type: " + materialized.storeType()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DSLStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/DSLStoreProvider.java new file mode 100644 index 0000000000000..469d26d0e0241 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/DSLStoreProvider.java @@ -0,0 +1,25 @@ +package org.apache.kafka.streams.state; + +import java.time.Duration; + +public interface DSLStoreProvider { + + KeyValueBytesStoreSupplier keyValueStore(final String name); + + default KeyValueBytesStoreSupplier timestampedKeyValueStore(final String name) { + return keyValueStore(name); + } + + WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize); + + default WindowBytesStoreSupplier timestampedWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) { + return windowStore(name, retentionPeriod, windowSize); + } + + SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod); + + default SessionBytesStoreSupplier timestampedSessionStore(final String name, final Duration retentionPeriod) { + return sessionStore(name, retentionPeriod); + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStoreProvider.java new file mode 100644 index 0000000000000..53bb79a357d46 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStoreProvider.java @@ -0,0 +1,26 @@ +package org.apache.kafka.streams.state.internals; + +import java.time.Duration; +import org.apache.kafka.streams.state.DSLStoreProvider; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; + +public class RocksDBStoreProvider implements DSLStoreProvider { + + @Override + public KeyValueBytesStoreSupplier keyValueStore(final String name) { + return Stores.persistentKeyValueStore(name); + } + + @Override + public WindowBytesStoreSupplier windowStore(final String name, final Duration retentionPeriod, final Duration windowSize) { + return Stores.persistentTimestampedWindowStore(name, retentionPeriod, windowSize, false); + } + + @Override + public SessionBytesStoreSupplier sessionStore(final String name, final Duration retentionPeriod) { + return Stores.persistentSessionStore(name, retentionPeriod); + } +}