Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import dev.responsive.examples.regression.model.EnrichedOrder;
import dev.responsive.examples.regression.model.GroupedOrder;
import dev.responsive.examples.regression.model.Order;
import dev.responsive.examples.regression.model.StoredOrder;
import dev.responsive.examples.regression.model.OrderMetadata;
import org.apache.kafka.common.serialization.Serde;

public class RegressionSchema {
Expand All @@ -40,8 +40,8 @@ public static Serde<GroupedOrder> groupedOrderSerde() {
return new JsonSerde<>(GroupedOrder.class);
}

public static Serde<StoredOrder> storedOrderSerde() {
return new JsonSerde<>(StoredOrder.class);
public static Serde<OrderMetadata> orderMetadataSerde() {
return new JsonSerde<>(OrderMetadata.class);
}

public static class CustomerSerializer extends JsonSerializer<Customer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;

/**
* A batch of one or more purchases grouped into a single order
*/
public record GroupedOrder(
@JsonProperty("orders") List<Order> orders
) implements Comparable<GroupedOrder> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* An instance of a single purchase order
*/
public record Order(
@JsonProperty("orderId") String orderId,
@JsonProperty("customerId") String customerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@
package dev.responsive.examples.regression.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Optional;

public record StoredOrder(
@JsonProperty("order") Optional<Order> order,
@JsonProperty("meta") Optional<Meta> meta
public record OrderMetadata(
@JsonProperty("timestamp") long timestamp,
@JsonProperty("count") long count,
@JsonProperty("size") long size
) {

public record Meta(
@JsonProperty("timestamp") long timestamp,
@JsonProperty("count") long count,
@JsonProperty("size") long size
) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@
import dev.responsive.examples.regression.RegressionSchema;
import dev.responsive.examples.regression.model.GroupedOrder;
import dev.responsive.examples.regression.model.Order;
import dev.responsive.examples.regression.model.StoredOrder;
import dev.responsive.examples.regression.model.OrderMetadata;
import dev.responsive.kafka.api.stores.ResponsiveStores;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

public class KeyBatchExample extends AbstractKSExampleService {

private static final String METADATA_STORE_NAME = "metadata";
private static final String PURCHASES_STORE_NAME = "purchases";

private final UrandomGenerator randomGenerator = new UrandomGenerator();

public KeyBatchExample(final Map<String, Object> props, final boolean responsive) {
Expand All @@ -51,26 +57,12 @@ public KeyBatchExample(final Map<String, Object> props, final boolean responsive
);
}

@SuppressWarnings("deprecation") // using Transformer interface for compatibility testing
@Override
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
if (responsive) {
builder.addStateStore(ResponsiveStores.keyValueStoreBuilder(
ResponsiveStores.keyValueStore("grouped-orders-store"),
Serdes.String(),
RegressionSchema.storedOrderSerde()
));
} else {
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("grouped-orders-store"),
Serdes.String(),
RegressionSchema.storedOrderSerde()
));
}

builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde()))
.transform(BatchTransformer::new, "grouped-orders-store")
.process(new BatchProcessorSupplier(responsive), PURCHASES_STORE_NAME, METADATA_STORE_NAME)
.peek((k, v) -> {
if (responsive) {
final var random = Math.abs(randomGenerator.nextLong() % 10000);
Expand All @@ -84,117 +76,145 @@ protected Topology buildTopology() {
return builder.build();
}

private static class BatchTransformer
implements Transformer<String, Order, KeyValue<String, GroupedOrder>> {
private static class BatchProcessorSupplier implements
ProcessorSupplier<String, Order, String, GroupedOrder> {

final boolean responsive;

private ProcessorContext context;
private KeyValueStore<String, StoredOrder> store;
public BatchProcessorSupplier(final boolean responsive) {
this.responsive = responsive;
}

@Override
public Processor<String, Order, String, GroupedOrder> get() {
return new BatchProcessor();
}

@Override
public void init(final ProcessorContext context) {
public Set<StoreBuilder<?>> stores() {
if (responsive) {
return Set.of(
ResponsiveStores.keyValueStoreBuilder(
ResponsiveStores.keyValueStore(PURCHASES_STORE_NAME),
Serdes.String(),
RegressionSchema.orderSerde()
),
ResponsiveStores.keyValueStoreBuilder(
ResponsiveStores.keyValueStore(METADATA_STORE_NAME),
Serdes.String(),
RegressionSchema.orderMetadataSerde()
)
);
} else {
return Set.of(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(PURCHASES_STORE_NAME),
Serdes.String(),
RegressionSchema.orderSerde()
),
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(METADATA_STORE_NAME),
Serdes.String(),
RegressionSchema.orderMetadataSerde()
)
);
}
}
}

// TODO: use FixedKeyProcessor after fixing https://issues.apache.org/jira/browse/KAFKA-16585
private static class BatchProcessor implements Processor<String, Order, String, GroupedOrder> {

private ProcessorContext<String, GroupedOrder> context;
private KeyValueStore<String, Order> purchasesStore;
private KeyValueStore<String, OrderMetadata> metadataStore;

@Override
public void init(final ProcessorContext<String, GroupedOrder> context) {
this.context = context;
this.store = context.getStateStore("grouped-orders-store");
this.purchasesStore = context.getStateStore(PURCHASES_STORE_NAME);
this.metadataStore = context.getStateStore(METADATA_STORE_NAME);
this.context.schedule(
Duration.ofSeconds(30),
PunctuationType.STREAM_TIME,
this::flushExpired
this::flushReadyOrders
);
}

@Override
public KeyValue<String, GroupedOrder> transform(final String key, final Order value) {
final long ts = context.timestamp();

// first add the order to the list of orders that are stored
store.put(storedKey(key, ts), new StoredOrder(Optional.of(value), Optional.empty()));

// next, we need to update the tracked metadata row to
// check whether the value ought to be emitted
final String mKey = metaKey(key);
final StoredOrder.Meta meta = Optional.ofNullable(store.get(mKey))
.orElse(new StoredOrder(Optional.empty(), Optional.of(new StoredOrder.Meta(ts, 0, 0))))
.meta()
.orElseThrow();

// instead of computing the actual size, for now just
// use the value amount and assume that it should be emitted
// after a certain amount of $$$ is spent
final StoredOrder.Meta newMeta = new StoredOrder.Meta(
ts,
meta.count() + 1,
meta.size() + (long) value.amount()
);

if (shouldFlush(newMeta, ts)) {
doFlush(key);
store.delete(mKey);
public void process(final Record<String, Order> newPurchase) {
final String key = newPurchase.key();
final long newPurchaseTimestamp = newPurchase.timestamp();
final long newPurchaseSize = (long) newPurchase.value().amount();

// first store the purchase under the key+timestamp
purchasesStore.put(storedKey(key, newPurchaseTimestamp), newPurchase.value());

// next, we need to look up and update the tracked metadata for this key
final OrderMetadata orderMetadata = metadataStore.get(key);

final OrderMetadata newOrderMetadata =
orderMetadata == null
? new OrderMetadata(newPurchaseTimestamp, 1, newPurchaseSize)
: new OrderMetadata(
orderMetadata.timestamp(),
orderMetadata.count() + 1,
orderMetadata.size() + newPurchaseSize
);

// check if the key's purchases are ready to be batched and flushed,
// otherwise just overwrite the metadata row with the new info
if (shouldFlush(newOrderMetadata, newPurchaseTimestamp)) {
doFlush(key, newOrderMetadata.timestamp());
} else {
store.put(mKey, new StoredOrder(Optional.empty(), Optional.of(newMeta)));
metadataStore.put(key, newOrderMetadata);
}

return null;
}

private void flushExpired(long ts) {
// iterate through all the metadata keys and check whether
// the corresponding values should be flushed - we can end
// at "null" because all keys that are not metadata keys
// start with "k." (which is less than "m.")
try (KeyValueIterator<String, StoredOrder> range = store.range("m.", "n.")) {
private void flushReadyOrders(long ts) {
// iterate through all the metadata rows and check whether the purchases
// for each key are ready to be batched and flushed
try (KeyValueIterator<String, OrderMetadata> range = metadataStore.all()) {
while (range.hasNext()) {
final KeyValue<String, StoredOrder> kv = range.next();
final StoredOrder.Meta meta = kv.value.meta()
.orElseThrow(() -> new IllegalStateException(
"Got stored meta key with no meta: " + kv));
if (shouldFlush(meta, ts)) {
doFlush(kv.key.split("\\.")[1]);
store.delete(kv.key);
final KeyValue<String, OrderMetadata> kv = range.next();
final OrderMetadata orderMetadata = kv.value;
if (shouldFlush(orderMetadata, ts)) {
doFlush(kv.key, orderMetadata.timestamp());
}
}
}
}

private void doFlush(final String key) {
private void doFlush(final String key, final long batchTimestamp) {
try (
KeyValueIterator<String, StoredOrder> range = store.range(
KeyValueIterator<String, Order> range = purchasesStore.range(
storedKey(key, 0),
storedKey(key, Long.MAX_VALUE)
)
) {
final GroupedOrder result = new GroupedOrder(new ArrayList<>());
final GroupedOrder groupedOrder = new GroupedOrder(new ArrayList<>());

while (range.hasNext()) {
final KeyValue<String, StoredOrder> kv = range.next();
store.delete(kv.key);

final StoredOrder value = kv.value;
result.orders()
.add(value.order()
.orElseThrow(() -> new IllegalStateException(
"Got stored order with no order! %s".formatted(value))));
final KeyValue<String, Order> kv = range.next();
purchasesStore.delete(kv.key);
groupedOrder.orders().add(kv.value);
}

context.forward(key, result);
context.forward(new Record<>(key, groupedOrder, batchTimestamp));
}
}

@Override
public void close() {

}

static boolean shouldFlush(final StoredOrder.Meta meta, final long now) {
return ((meta.timestamp() - now) > 60_000)
|| (meta.count() > 50)
|| (meta.size() > 1_000);
// make sure to delete from the metadata store once the key is fully flushed
metadataStore.delete(key);
}

static String metaKey(final String key) {
return "m." + key;
private static boolean shouldFlush(final OrderMetadata orderMetadata, final long now) {
return ((orderMetadata.timestamp() - now) > 60_000)
|| (orderMetadata.count() > 50)
|| (orderMetadata.size() > 1_000);
}

static String storedKey(final String key, final long ts) {
return "s.%s.%d".formatted(key, ts);
private static String storedKey(final String key, final long ts) {
return "%s.%d".formatted(key, ts);
}
}
}
Loading