Skip to content
4 changes: 4 additions & 0 deletions kafka-client-bootstrap/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ plugins {
id("responsive.docker")
}

repositories {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to put this into all the build.gradle files so I could compile from my local AK branch. I can remove all of these before merging if anyone feels strongly about it, but if it doesn't do any harm then I'd rather keep these for the next time I'm working off of a dev branch

mavenLocal()
}

application {
mainClass.set("dev.responsive.kafka.bootstrap.main.Main")
}
Expand Down
4 changes: 4 additions & 0 deletions kafka-client-examples/e2e-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ plugins {
id("responsive.docker")
}

repositories {
mavenLocal()
}

application {
mainClass.set("dev.responsive.examples.e2etest.Main")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected Topology buildTopology() {
}

builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde()))
.transform(BatchTransformer::new, "grouped-orders-store")
.process(BatchTransformer::new, "grouped-orders-store")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of async -- related to the 4.0 upgrade

.peek((k, v) -> {
if (responsive) {
final var random = Math.abs(randomGenerator.nextLong() % 10000);
Expand All @@ -84,14 +85,13 @@ protected Topology buildTopology() {
return builder.build();
}

private static class BatchTransformer
implements Transformer<String, Order, KeyValue<String, GroupedOrder>> {
private static class BatchTransformer implements Processor<String, Order, String, GroupedOrder> {

private ProcessorContext context;
private ProcessorContext<String, GroupedOrder> context;
private KeyValueStore<String, StoredOrder> store;

@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<String, GroupedOrder> context) {
this.context = context;
this.store = context.getStateStore("grouped-orders-store");
this.context.schedule(
Expand All @@ -102,15 +102,15 @@ public void init(final ProcessorContext context) {
}

@Override
public KeyValue<String, GroupedOrder> transform(final String key, final Order value) {
final long ts = context.timestamp();
public void process(final Record<String, Order> record) {
final long ts = record.timestamp();

// first add the order to the list of orders that are stored
store.put(storedKey(key, ts), new StoredOrder(Optional.of(value), Optional.empty()));
store.put(storedKey(record.key(), ts), new StoredOrder(Optional.of(record.value()), Optional.empty()));

// next, we need to update the tracked metadata row to
// check whether the value ought to be emitted
final String mKey = metaKey(key);
final String mKey = metaKey(record.key());
final StoredOrder.Meta meta = Optional.ofNullable(store.get(mKey))
.orElse(new StoredOrder(Optional.empty(), Optional.of(new StoredOrder.Meta(ts, 0, 0))))
.meta()
Expand All @@ -122,17 +122,15 @@ public KeyValue<String, GroupedOrder> transform(final String key, final Order va
final StoredOrder.Meta newMeta = new StoredOrder.Meta(
ts,
meta.count() + 1,
meta.size() + (long) value.amount()
meta.size() + (long) record.value().amount()
);

if (shouldFlush(newMeta, ts)) {
doFlush(key);
doFlush(record.key());
store.delete(mKey);
} else {
store.put(mKey, new StoredOrder(Optional.empty(), Optional.of(newMeta)));
}

return null;
}

private void flushExpired(long ts) {
Expand Down Expand Up @@ -174,7 +172,7 @@ private void doFlush(final String key) {
"Got stored order with no order! %s".formatted(value))));
}

context.forward(key, result);
context.forward(new Record<>(key, result, 0L));
}
}

Expand Down
8 changes: 6 additions & 2 deletions kafka-client-examples/simple-example/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ application {
mainClass.set("dev.responsive.examples.simpleapp.Main")
}

repositories {
mavenLocal()
}

dependencies {
// todo: how to set the version here?
implementation(project(":kafka-client"))
implementation("com.google.guava:guava:32.1.1-jre")
implementation("org.apache.kafka:kafka-clients:3.4.0")
implementation("org.apache.kafka:kafka-streams:3.4.0")
implementation(libs.kafka.clients)
implementation(libs.kafka.streams)
implementation("io.opentelemetry.javaagent:opentelemetry-javaagent:1.25.0")
implementation("org.apache.logging.log4j:log4j-slf4j-impl:2.20.0")
implementation("org.apache.commons:commons-text:1.10.0")
Expand Down
4 changes: 4 additions & 0 deletions kafka-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ plugins {
id("java")
}

repositories {
mavenLocal()
}

/*********** Generated Resources ***********/

val gitCommitId: String by lazy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,16 +284,16 @@ private static Properties propsWithOverrides(
return propsWithOverrides;
}

final Object o = configs.originals().get(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS);
final Object o = configs.originals().get(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of async -- related to the 4.0 upgrade

if (o == null) {
propsWithOverrides.put(
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
} else if (!TASK_ASSIGNOR_CLASS_OVERRIDE.equals(o.toString())) {
final String errorMsg = String.format(
"Invalid Streams configuration value for '%s': got %s, expected '%s'",
InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS,
StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG,
o,
TASK_ASSIGNOR_CLASS_OVERRIDE
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public final class AsyncProcessorSupplier<KIn, VIn, KOut, VOut>
implements ProcessorSupplier<KIn, VIn, KOut, VOut> {

private final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier;
private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders;

private Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders = null;

/**
* Create an AsyncProcessorSupplier that wraps a custom {@link ProcessorSupplier}
Expand All @@ -132,32 +133,30 @@ public final class AsyncProcessorSupplier<KIn, VIn, KOut, VOut>
public static <KIn, VIn, KOut, VOut> AsyncProcessorSupplier<KIn, VIn, KOut, VOut> createAsyncProcessorSupplier(
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return new AsyncProcessorSupplier<>(processorSupplier, processorSupplier.stores());
return new AsyncProcessorSupplier<>(processorSupplier);
}

private AsyncProcessorSupplier(
final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier,
final Set<StoreBuilder<?>> userStoreBuilders
final ProcessorSupplier<KIn, VIn, KOut, VOut> userProcessorSupplier
) {
if (userStoreBuilders == null || userStoreBuilders.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to KIP-1112 -- this check was supposed to be deleted already but I must have missed it before (the corresponding check in FixedKeyProcessorSupplier was correctly removed at that time)

throw new UnsupportedOperationException(
"Async processing currently requires at least one state store be "
+ "connected to the async processor, and that stores be connected "
+ "by implementing the #stores method in your processor supplier");
}

this.userProcessorSupplier = userProcessorSupplier;
this.asyncStoreBuilders = initializeAsyncBuilders(userStoreBuilders);
}

@Override
public AsyncProcessor<KIn, VIn, KOut, VOut> get() {
maybeInitializeAsyncStoreBuilders();
return createAsyncProcessor(userProcessorSupplier.get(), asyncStoreBuilders);
}

@Override
public Set<StoreBuilder<?>> stores() {
maybeInitializeAsyncStoreBuilders();
return new HashSet<>(asyncStoreBuilders.values());
}

private void maybeInitializeAsyncStoreBuilders() {
if (asyncStoreBuilders == null) {
asyncStoreBuilders = initializeAsyncBuilders(userProcessorSupplier.stores());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to delay the initialization of the async store builders because the call to the inner processor's #stores method might return different things at different points in the build process (for example, if a downstream operator forces materialization, the #stores will return null/empty until that downstream operator is processed)

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.kafka.api.async;

import org.apache.kafka.streams.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

public class AsyncProcessorWrapper implements ProcessorWrapper {


@Override
public <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(
final String processorName,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return AsyncProcessorSupplier.createAsyncProcessorSupplier(processorSupplier);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we really want to wrap every processor as async -- WDYT about only wrapping stateful operators by default?

(We can of course still allow users to inject their own custom async processors if they have some heavier processing without state like an RPC)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we'd leverage configure() to allow us to plug into exactly this consideration. Not sure how we want to expose that as an API, but we can figure that out later. For now we can do just stateful ones. Like you said, if it's something stateless they want to make async it's pretty easy to write a processor for it manually.

}

@Override
public <KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(
final String processorName,
final FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier
) {
return AsyncFixedKeyProcessorSupplier.createAsyncProcessorSupplier(fixedKeyProcessorSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@

import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder;
import dev.responsive.kafka.internal.stores.ResponsiveStoreBuilder.StoreType;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.StoreFactory.FactoryWrappingStoreBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreBuilder.StoreType;
import org.apache.kafka.streams.state.internals.AsyncKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.AsyncTimestampedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.DelayedAsyncStoreBuilder;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;

public class AsyncUtils {

Expand Down Expand Up @@ -70,33 +80,12 @@ public static boolean isStreamThreadOrAsyncThread(
final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> asyncStoreBuilders = new HashMap<>();
for (final StoreBuilder<?> builder : userConnectedStores) {
final String storeName = builder.name();
if (builder instanceof ResponsiveStoreBuilder) {
final ResponsiveStoreBuilder<?, ?, ?> responsiveBuilder =
(ResponsiveStoreBuilder<?, ?, ?>) builder;

final StoreType storeType = responsiveBuilder.storeType();
asyncStoreBuilders.put(
storeName,
new DelayedAsyncStoreBuilder<>(builder)
);

final AbstractAsyncStoreBuilder<?, ?, ?> storeBuilder;
if (storeType.equals(StoreType.TIMESTAMPED_KEY_VALUE)) {
storeBuilder = new AsyncTimestampedKeyValueStoreBuilder<>(responsiveBuilder);
} else if (storeType.equals(StoreType.KEY_VALUE)) {
storeBuilder = new AsyncKeyValueStoreBuilder<>(responsiveBuilder);
} else {
throw new UnsupportedOperationException(
"Only key-value stores are supported by async processors at this time");
}

asyncStoreBuilders.put(
storeName,
storeBuilder
);

} else {
throw new IllegalStateException(String.format(
"Detected the StoreBuilder for %s was not created via the ResponsiveStores factory, "
+ "please ensure that all store builders and suppliers are provided through the "
+ "appropriate API from ResponsiveStores", storeName));
}
}
return asyncStoreBuilders;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ public void setProcessorMetadata(final ProcessorMetadata metadata) {
}

@Override
public ProcessorMetadata getProcessorMetadata() {
return delegate().getProcessorMetadata();
public ProcessorMetadata processorMetadata() {
return delegate().processorMetadata();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public abstract class AbstractAsyncStoreBuilder<K, V, T extends StateStore>
implements StoreBuilder<T> {

protected final String name;
protected final Serde<K> keySerde;
protected final Serde<V> valueSerde;
protected final Time time;
protected Serde<K> keySerde;
protected Serde<V> valueSerde;
protected Time time;
protected final Map<String, String> logConfig = new HashMap<>();

private boolean cachingEnabled = false;
Expand All @@ -48,14 +48,17 @@ public abstract class AbstractAsyncStoreBuilder<K, V, T extends StateStore>
new ConcurrentHashMap<>();

public AbstractAsyncStoreBuilder(
final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time
final String name
) {
Objects.requireNonNull(name, "name cannot be null");
Objects.requireNonNull(time, "time cannot be null");
this.name = name;
}

public void initialize(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
Objects.requireNonNull(time, "time cannot be null");

this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.time = time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
Expand Down Expand Up @@ -115,16 +114,6 @@ public String name() {
return userDelegate.name();
}

@Override
@Deprecated
public void init(
final org.apache.kafka.streams.processor.ProcessorContext context,
final StateStore root
) {
throw new UnsupportedOperationException("This init method is deprecated, please implement"
+ "init(StateStoreContext, StateStore) instead");
}

@Override
public <R> QueryResult<R> query(
final Query<R> query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor;

/**
* Configurations for {@link ResponsiveKafkaStreams}
Expand Down
Loading
Loading