diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/RegressionSchema.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/RegressionSchema.java index 594af6eb4..248d47711 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/RegressionSchema.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/RegressionSchema.java @@ -19,7 +19,7 @@ import dev.responsive.examples.regression.model.EnrichedOrder; import dev.responsive.examples.regression.model.GroupedOrder; import dev.responsive.examples.regression.model.Order; -import dev.responsive.examples.regression.model.StoredOrder; +import dev.responsive.examples.regression.model.OrderMetadata; import org.apache.kafka.common.serialization.Serde; public class RegressionSchema { @@ -40,8 +40,8 @@ public static Serde groupedOrderSerde() { return new JsonSerde<>(GroupedOrder.class); } - public static Serde storedOrderSerde() { - return new JsonSerde<>(StoredOrder.class); + public static Serde orderMetadataSerde() { + return new JsonSerde<>(OrderMetadata.class); } public static class CustomerSerializer extends JsonSerializer { diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/GroupedOrder.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/GroupedOrder.java index ec94e2c86..9a7bab592 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/GroupedOrder.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/GroupedOrder.java @@ -18,6 +18,9 @@ import java.util.function.ToIntFunction; import java.util.stream.Collectors; +/** + * A batch of one or more purchases grouped into a single order + */ public record GroupedOrder( @JsonProperty("orders") List orders ) implements Comparable { diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/Order.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/Order.java index b3d0dc667..c3fdd3e92 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/Order.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/Order.java @@ -14,6 +14,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; +/** + * An instance of a single purchase order + */ public record Order( @JsonProperty("orderId") String orderId, @JsonProperty("customerId") String customerId, diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/StoredOrder.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/OrderMetadata.java similarity index 61% rename from kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/StoredOrder.java rename to kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/OrderMetadata.java index 9d9d799e0..cfbc8609a 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/StoredOrder.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/model/OrderMetadata.java @@ -13,17 +13,11 @@ package dev.responsive.examples.regression.model; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Optional; -public record StoredOrder( - @JsonProperty("order") Optional order, - @JsonProperty("meta") Optional meta +public record OrderMetadata( + @JsonProperty("timestamp") long timestamp, + @JsonProperty("count") long count, + @JsonProperty("size") long size ) { - public record Meta( - @JsonProperty("timestamp") long timestamp, - @JsonProperty("count") long count, - @JsonProperty("size") long size - ) { - } } diff --git a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java index af9b96b56..04a91b85c 100644 --- a/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java +++ b/kafka-client-examples/e2e-test/src/main/java/dev/responsive/examples/regression/tests/KeyBatchExample.java @@ -20,27 +20,33 @@ import dev.responsive.examples.regression.RegressionSchema; import dev.responsive.examples.regression.model.GroupedOrder; import dev.responsive.examples.regression.model.Order; -import dev.responsive.examples.regression.model.StoredOrder; +import dev.responsive.examples.regression.model.OrderMetadata; import dev.responsive.kafka.api.stores.ResponsiveStores; import java.time.Duration; import java.util.ArrayList; import java.util.Map; -import java.util.Optional; +import java.util.Set; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; public class KeyBatchExample extends AbstractKSExampleService { + private static final String METADATA_STORE_NAME = "metadata"; + private static final String PURCHASES_STORE_NAME = "purchases"; + private final UrandomGenerator randomGenerator = new UrandomGenerator(); public KeyBatchExample(final Map props, final boolean responsive) { @@ -51,26 +57,12 @@ public KeyBatchExample(final Map props, final boolean responsive ); } - @SuppressWarnings("deprecation") // using Transformer interface for compatibility testing @Override protected Topology buildTopology() { final StreamsBuilder builder = new StreamsBuilder(); - if (responsive) { - builder.addStateStore(ResponsiveStores.keyValueStoreBuilder( - ResponsiveStores.keyValueStore("grouped-orders-store"), - Serdes.String(), - RegressionSchema.storedOrderSerde() - )); - } else { - builder.addStateStore(Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("grouped-orders-store"), - Serdes.String(), - RegressionSchema.storedOrderSerde() - )); - } builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde())) - .transform(BatchTransformer::new, "grouped-orders-store") + .process(new BatchProcessorSupplier(responsive), PURCHASES_STORE_NAME, METADATA_STORE_NAME) .peek((k, v) -> { if (responsive) { final var random = Math.abs(randomGenerator.nextLong() % 10000); @@ -84,117 +76,145 @@ protected Topology buildTopology() { return builder.build(); } - private static class BatchTransformer - implements Transformer> { + private static class BatchProcessorSupplier implements + ProcessorSupplier { + + final boolean responsive; - private ProcessorContext context; - private KeyValueStore store; + public BatchProcessorSupplier(final boolean responsive) { + this.responsive = responsive; + } + + @Override + public Processor get() { + return new BatchProcessor(); + } @Override - public void init(final ProcessorContext context) { + public Set> stores() { + if (responsive) { + return Set.of( + ResponsiveStores.keyValueStoreBuilder( + ResponsiveStores.keyValueStore(PURCHASES_STORE_NAME), + Serdes.String(), + RegressionSchema.orderSerde() + ), + ResponsiveStores.keyValueStoreBuilder( + ResponsiveStores.keyValueStore(METADATA_STORE_NAME), + Serdes.String(), + RegressionSchema.orderMetadataSerde() + ) + ); + } else { + return Set.of( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(PURCHASES_STORE_NAME), + Serdes.String(), + RegressionSchema.orderSerde() + ), + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(METADATA_STORE_NAME), + Serdes.String(), + RegressionSchema.orderMetadataSerde() + ) + ); + } + } + } + + // TODO: use FixedKeyProcessor after fixing https://issues.apache.org/jira/browse/KAFKA-16585 + private static class BatchProcessor implements Processor { + + private ProcessorContext context; + private KeyValueStore purchasesStore; + private KeyValueStore metadataStore; + + @Override + public void init(final ProcessorContext context) { this.context = context; - this.store = context.getStateStore("grouped-orders-store"); + this.purchasesStore = context.getStateStore(PURCHASES_STORE_NAME); + this.metadataStore = context.getStateStore(METADATA_STORE_NAME); this.context.schedule( Duration.ofSeconds(30), PunctuationType.STREAM_TIME, - this::flushExpired + this::flushReadyOrders ); } @Override - public KeyValue transform(final String key, final Order value) { - final long ts = context.timestamp(); - - // first add the order to the list of orders that are stored - store.put(storedKey(key, ts), new StoredOrder(Optional.of(value), Optional.empty())); - - // next, we need to update the tracked metadata row to - // check whether the value ought to be emitted - final String mKey = metaKey(key); - final StoredOrder.Meta meta = Optional.ofNullable(store.get(mKey)) - .orElse(new StoredOrder(Optional.empty(), Optional.of(new StoredOrder.Meta(ts, 0, 0)))) - .meta() - .orElseThrow(); - - // instead of computing the actual size, for now just - // use the value amount and assume that it should be emitted - // after a certain amount of $$$ is spent - final StoredOrder.Meta newMeta = new StoredOrder.Meta( - ts, - meta.count() + 1, - meta.size() + (long) value.amount() - ); - - if (shouldFlush(newMeta, ts)) { - doFlush(key); - store.delete(mKey); + public void process(final Record newPurchase) { + final String key = newPurchase.key(); + final long newPurchaseTimestamp = newPurchase.timestamp(); + final long newPurchaseSize = (long) newPurchase.value().amount(); + + // first store the purchase under the key+timestamp + purchasesStore.put(storedKey(key, newPurchaseTimestamp), newPurchase.value()); + + // next, we need to look up and update the tracked metadata for this key + final OrderMetadata orderMetadata = metadataStore.get(key); + + final OrderMetadata newOrderMetadata = + orderMetadata == null + ? new OrderMetadata(newPurchaseTimestamp, 1, newPurchaseSize) + : new OrderMetadata( + orderMetadata.timestamp(), + orderMetadata.count() + 1, + orderMetadata.size() + newPurchaseSize + ); + + // check if the key's purchases are ready to be batched and flushed, + // otherwise just overwrite the metadata row with the new info + if (shouldFlush(newOrderMetadata, newPurchaseTimestamp)) { + doFlush(key, newOrderMetadata.timestamp()); } else { - store.put(mKey, new StoredOrder(Optional.empty(), Optional.of(newMeta))); + metadataStore.put(key, newOrderMetadata); } - - return null; } - private void flushExpired(long ts) { - // iterate through all the metadata keys and check whether - // the corresponding values should be flushed - we can end - // at "null" because all keys that are not metadata keys - // start with "k." (which is less than "m.") - try (KeyValueIterator range = store.range("m.", "n.")) { + private void flushReadyOrders(long ts) { + // iterate through all the metadata rows and check whether the purchases + // for each key are ready to be batched and flushed + try (KeyValueIterator range = metadataStore.all()) { while (range.hasNext()) { - final KeyValue kv = range.next(); - final StoredOrder.Meta meta = kv.value.meta() - .orElseThrow(() -> new IllegalStateException( - "Got stored meta key with no meta: " + kv)); - if (shouldFlush(meta, ts)) { - doFlush(kv.key.split("\\.")[1]); - store.delete(kv.key); + final KeyValue kv = range.next(); + final OrderMetadata orderMetadata = kv.value; + if (shouldFlush(orderMetadata, ts)) { + doFlush(kv.key, orderMetadata.timestamp()); } } } } - private void doFlush(final String key) { + private void doFlush(final String key, final long batchTimestamp) { try ( - KeyValueIterator range = store.range( + KeyValueIterator range = purchasesStore.range( storedKey(key, 0), storedKey(key, Long.MAX_VALUE) ) ) { - final GroupedOrder result = new GroupedOrder(new ArrayList<>()); + final GroupedOrder groupedOrder = new GroupedOrder(new ArrayList<>()); while (range.hasNext()) { - final KeyValue kv = range.next(); - store.delete(kv.key); - - final StoredOrder value = kv.value; - result.orders() - .add(value.order() - .orElseThrow(() -> new IllegalStateException( - "Got stored order with no order! %s".formatted(value)))); + final KeyValue kv = range.next(); + purchasesStore.delete(kv.key); + groupedOrder.orders().add(kv.value); } - context.forward(key, result); + context.forward(new Record<>(key, groupedOrder, batchTimestamp)); } - } - - @Override - public void close() { - - } - static boolean shouldFlush(final StoredOrder.Meta meta, final long now) { - return ((meta.timestamp() - now) > 60_000) - || (meta.count() > 50) - || (meta.size() > 1_000); + // make sure to delete from the metadata store once the key is fully flushed + metadataStore.delete(key); } - static String metaKey(final String key) { - return "m." + key; + private static boolean shouldFlush(final OrderMetadata orderMetadata, final long now) { + return ((orderMetadata.timestamp() - now) > 60_000) + || (orderMetadata.count() > 50) + || (orderMetadata.size() > 1_000); } - static String storedKey(final String key, final long ts) { - return "s.%s.%d".formatted(key, ts); + private static String storedKey(final String key, final long ts) { + return "%s.%d".formatted(key, ts); } } }