diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java index 3a771bedf..81e3ba163 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java @@ -102,7 +102,7 @@ public class AsyncProcessor private Cancellable punctuator; // the context passed to us in init, ie the one created for this task and owned by Kafka Streams - private ProcessingContext taskContext; + private InternalProcessorContext taskContext; // the async context owned by the StreamThread that is running this processor/task private StreamThreadProcessorContext streamThreadContext; diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java index 1e7a3a8e1..84647e97d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.api.ProcessingContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.slf4j.Logger; public class AsyncThreadPool { @@ -168,7 +168,7 @@ public void scheduleForProcessing( final TaskId taskId, final List events, final FinalizingQueue finalizingQueue, - final ProcessingContext taskContext, + final InternalProcessorContext taskContext, final AsyncUserProcessorContext asyncProcessorContext, final AsyncProcessorMetricsRecorder processorMetricsRecorder ) { @@ -285,7 +285,7 @@ private static class AsyncEventTask implements Supplier taskContext, final AsyncUserProcessorContext userContext, final Semaphore queueSemaphore, final AsyncProcessorMetricsRecorder metricsRecorder diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.java index c67d4fd96..319f1af82 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.java @@ -14,23 +14,18 @@ import dev.responsive.kafka.api.async.internals.events.AsyncEvent; import dev.responsive.kafka.api.async.internals.events.DelayedForward; -import java.io.File; import java.time.Duration; -import java.util.Map; import java.util.Optional; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.FixedKeyRecord; -import org.apache.kafka.streams.processor.api.ProcessingContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; /** * A special kind of mock/wrapper context to be used by the AsyncThread. @@ -66,7 +61,8 @@ * (ie one per AsyncThread per StreamThread per async processor per partition) * Equivalently, one per AsyncThread for each "original" ProcessorContext in Streams */ -public class AsyncThreadProcessorContext implements MergedProcessorContext { +public class AsyncThreadProcessorContext + extends DelegatingProcessorContext> { // The AsyncEvent that is currently being processed by this AsyncThread. Updated each // time a new event is picked up from the processing queue but before beginning @@ -78,7 +74,7 @@ public class AsyncThreadProcessorContext implements MergedProcessorC // in to the async processor during init. This MUST be protected from // any mutations and should only be delegated to in pure getters that // access immutable fields (such as applicationId) - private final ProcessingContext taskContext; + private final InternalProcessorContext taskContext; // TODO: we won't need to do this until we support async with the DSL and support // the new windowed emit semantics specifically, which is the only thing using it, @@ -90,7 +86,7 @@ public class AsyncThreadProcessorContext implements MergedProcessorC // further inspection but isn't supported by either the async framework or in // Responsive in general, so it's not urgent. public AsyncThreadProcessorContext( - final ProcessingContext taskContext, + final InternalProcessorContext taskContext, final AsyncEvent currentAsyncEvent ) { this.taskContext = taskContext; @@ -199,50 +195,7 @@ public long currentStreamTimeMs() { } @Override - // This is an immutable field so it's safe to delegate - public String applicationId() { - return taskContext.applicationId(); - } - - @Override - // This is an immutable field so it's safe to delegate - public TaskId taskId() { - return taskContext.taskId(); - } - - @Override - // This just looks up the default serde in the configs so it's safe - public Serde keySerde() { - return taskContext.keySerde(); - } - - @Override - // This just looks up the default serde in the configs so it's safe - public Serde valueSerde() { - return taskContext.valueSerde(); - } - - @Override - // This is an immutable field so it's safe to delegate - public File stateDir() { - return taskContext.stateDir(); - } - - @Override - // This is an immutable field so it's safe to delegate - public StreamsMetrics metrics() { - return taskContext.metrics(); - } - - @Override - // Safe to delegate since all StreamThreads share the same configs anyway - public Map appConfigs() { - return taskContext.appConfigs(); - } - - @Override - // Safe to delegate since all StreamThreads share the same configs anyway - public Map appConfigsWithPrefix(final String prefix) { - return taskContext.appConfigsWithPrefix(prefix); + public InternalProcessorContext delegate() { + return taskContext; } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java index 3df563ea2..1f4eb1215 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java @@ -16,25 +16,41 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.CommitCallback; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorMetadata; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.StreamTask; +import org.apache.kafka.streams.processor.internals.Task; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; /** * Basic wrapper around a {@link ProcessorContext}/{@link FixedKeyProcessorContext} * that just delegates to the underlying context. */ @SuppressWarnings({"checkstyle:linelength", "checkstyle:overloadmethodsdeclarationorder"}) -public abstract class DelegatingProcessorContext & FixedKeyProcessorContext> +public abstract class DelegatingProcessorContext & FixedKeyProcessorContext> implements MergedProcessorContext { public abstract D delegate(); @@ -70,7 +86,7 @@ public File stateDir() { } @Override - public StreamsMetrics metrics() { + public StreamsMetricsImpl metrics() { return delegate().metrics(); } @@ -138,4 +154,161 @@ public void forward( ) { delegate().forward(record, childName); } + + @Override + public T getStateStore(final StoreBuilder builder) { + return delegate().getStateStore(builder); + } + + @Override + public void setSystemTimeMs(final long timeMs) { + delegate().setSystemTimeMs(timeMs); + } + + @Override + public ProcessorRecordContext recordContext() { + return delegate().recordContext(); + } + + @Override + public void setRecordContext(final ProcessorRecordContext recordContext) { + delegate().setRecordContext(recordContext); + } + + @Override + public void setCurrentNode(final ProcessorNode currentNode) { + delegate().setCurrentNode(currentNode); + } + + @Override + public ProcessorNode currentNode() { + return delegate().currentNode(); + } + + @Override + public ThreadCache cache() { + return delegate().cache(); + } + + @Override + public void initialize() { + delegate().initialize(); + } + + @Override + public void uninitialize() { + delegate().uninitialize(); + } + + @Override + public Task.TaskType taskType() { + return delegate().taskType(); + } + + @Override + public void transitionToActive( + final StreamTask streamTask, + final RecordCollector recordCollector, + final ThreadCache newCache + ) { + delegate().transitionToActive(streamTask, recordCollector, newCache); + } + + @Override + public void transitionToStandby(final ThreadCache newCache) { + delegate().transitionToStandby(newCache); + } + + @Override + public void registerCacheFlushListener( + final String namespace, + final ThreadCache.DirtyEntryFlushListener listener + ) { + delegate().registerCacheFlushListener(namespace, listener); + } + + @Override + public void logChange( + final String storeName, + final Bytes key, + final byte[] value, + final long timestamp, + final Position position + ) { + delegate().logChange(storeName, key, value, timestamp, position); + } + + @Override + public String changelogFor(final String storeName) { + return delegate().changelogFor(storeName); + } + + @Override + public void addProcessorMetadataKeyValue(final String key, final long value) { + delegate().addProcessorMetadataKeyValue(key, value); + } + + @Override + public Long processorMetadataForKey(final String key) { + return delegate().processorMetadataForKey(key); + } + + @Override + public void setProcessorMetadata(final ProcessorMetadata metadata) { + delegate().setProcessorMetadata(metadata); + } + + @Override + public ProcessorMetadata getProcessorMetadata() { + return delegate().getProcessorMetadata(); + } + + @Override + public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { + delegate().register(store, stateRestoreCallback); + } + + @Override + public void forward(final K key, final V value) { + delegate().forward(key, value); + } + + @Override + public void forward(final K key, final V value, final To to) { + delegate().forward(key, value, to); + } + + @Override + public String topic() { + return delegate().topic(); + } + + @Override + public int partition() { + return delegate().partition(); + } + + @Override + public long offset() { + return delegate().offset(); + } + + @Override + public Headers headers() { + return delegate().headers(); + } + + @Override + public long timestamp() { + return delegate().timestamp(); + } + + @Override + public void register( + final StateStore store, + final StateRestoreCallback stateRestoreCallback, + final CommitCallback commitCallback + ) { + delegate().register(store, stateRestoreCallback, commitCallback); + } } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/MergedProcessorContext.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/MergedProcessorContext.java index 455a0a93c..24c17b793 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/MergedProcessorContext.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/MergedProcessorContext.java @@ -13,12 +13,12 @@ package dev.responsive.kafka.api.async.internals.contexts; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; /** * Simple interface for processor contexts that may be used for both * fixed-key and non-fixed-key processors */ public interface MergedProcessorContext - extends ProcessorContext, FixedKeyProcessorContext { + extends InternalProcessorContext, FixedKeyProcessorContext { } diff --git a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java index b3c375155..845728f5d 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/stores/AsyncKeyValueStore.java @@ -19,14 +19,13 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StateStoreContext; -import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.Query; import org.apache.kafka.streams.query.QueryConfig; import org.apache.kafka.streams.query.QueryResult; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; import org.slf4j.Logger; /** @@ -41,7 +40,9 @@ * -One for each AsyncThread per physical state store instance * (ie per state store per processor per partition per AsyncThread per StreamThread */ -public class AsyncKeyValueStore implements KeyValueStore { +public class AsyncKeyValueStore + extends WrappedStateStore, KS, VS> + implements KeyValueStore { private final Logger log; @@ -55,6 +56,7 @@ public AsyncKeyValueStore( final KeyValueStore userDelegate, final DelayedAsyncStoreWriter delayedWriter ) { + super(userDelegate); this.log = new LogContext(String.format(" async-store [%s-%d]", name, partition)) .logger(AsyncKeyValueStore.class); this.userDelegate = (KeyValueStore) userDelegate; @@ -123,33 +125,6 @@ public void init( + "init(StateStoreContext, StateStore) instead"); } - @Override - public void init(final StateStoreContext context, final StateStore root) { - userDelegate.init(context, root); - } - - @Override - public void flush() { - // TODO: how should we handle this, particularly in the ALOS case where it might be - // called as part of regular processing? - userDelegate.flush(); - } - - @Override - public void close() { - userDelegate.close(); - } - - @Override - public boolean persistent() { - return userDelegate.persistent(); - } - - @Override - public boolean isOpen() { - return userDelegate.isOpen(); - } - @Override public QueryResult query( final Query query, @@ -159,11 +134,6 @@ public QueryResult query( throw new UnsupportedOperationException("IQv2 not yet supported with async processing"); } - @Override - public Position getPosition() { - return userDelegate.getPosition(); - } - @Override public KeyValueIterator range(final KS from, final KS to) { throw new UnsupportedOperationException("#range is not yet supported with async processing"); diff --git a/kafka-client/src/test/java/dev/responsive/kafka/api/async/internals/AsyncThreadPoolTest.java b/kafka-client/src/test/java/dev/responsive/kafka/api/async/internals/AsyncThreadPoolTest.java index 292b5224a..237cea625 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/api/async/internals/AsyncThreadPoolTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/api/async/internals/AsyncThreadPoolTest.java @@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.api.ProcessingContext; import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -55,7 +55,7 @@ class AsyncThreadPoolTest { @Mock private AsyncUserProcessorContext userContext; @Mock - private ProcessingContext originalContext; + private InternalProcessorContext originalContext; @Mock private ProcessorRecordContext recordContext; private final FinalizingQueue finalizingQueue0 = new FinalizingQueue("fq", 0);