Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut>
private Cancellable punctuator;

// the context passed to us in init, ie the one created for this task and owned by Kafka Streams
private ProcessingContext taskContext;
private InternalProcessorContext<KOut, VOut> taskContext;

// the async context owned by the StreamThread that is running this processor/task
private StreamThreadProcessorContext<KOut, VOut> streamThreadContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.slf4j.Logger;

public class AsyncThreadPool {
Expand Down Expand Up @@ -168,7 +168,7 @@ public <KOut, VOut> void scheduleForProcessing(
final TaskId taskId,
final List<AsyncEvent> events,
final FinalizingQueue finalizingQueue,
final ProcessingContext taskContext,
final InternalProcessorContext<KOut, VOut> taskContext,
final AsyncUserProcessorContext<KOut, VOut> asyncProcessorContext,
final AsyncProcessorMetricsRecorder processorMetricsRecorder
) {
Expand Down Expand Up @@ -285,7 +285,7 @@ private static class AsyncEventTask<KOut, VOut> implements Supplier<StreamsExcep

private AsyncEventTask(
final AsyncEvent event,
final ProcessingContext taskContext,
final InternalProcessorContext<KOut, VOut> taskContext,
final AsyncUserProcessorContext<KOut, VOut> userContext,
final Semaphore queueSemaphore,
final AsyncProcessorMetricsRecorder metricsRecorder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,18 @@

import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.api.async.internals.events.DelayedForward;
import java.io.File;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;

/**
* A special kind of mock/wrapper context to be used by the AsyncThread.
Expand Down Expand Up @@ -66,7 +61,8 @@
* (ie one per AsyncThread per StreamThread per async processor per partition)
* Equivalently, one per AsyncThread for each "original" ProcessorContext in Streams
*/
public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorContext<KOut, VOut> {
public class AsyncThreadProcessorContext<KOut, VOut>
extends DelegatingProcessorContext<KOut, VOut, InternalProcessorContext<KOut, VOut>> {

// The AsyncEvent that is currently being processed by this AsyncThread. Updated each
// time a new event is picked up from the processing queue but before beginning
Expand All @@ -78,7 +74,7 @@ public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorC
// in to the async processor during init. This MUST be protected from
// any mutations and should only be delegated to in pure getters that
// access immutable fields (such as applicationId)
private final ProcessingContext taskContext;
private final InternalProcessorContext<KOut, VOut> taskContext;

// TODO: we won't need to do this until we support async with the DSL and support
// the new windowed emit semantics specifically, which is the only thing using it,
Expand All @@ -90,7 +86,7 @@ public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorC
// further inspection but isn't supported by either the async framework or in
// Responsive in general, so it's not urgent.
public AsyncThreadProcessorContext(
final ProcessingContext taskContext,
final InternalProcessorContext<KOut, VOut> taskContext,
final AsyncEvent currentAsyncEvent
) {
this.taskContext = taskContext;
Expand Down Expand Up @@ -199,50 +195,7 @@ public long currentStreamTimeMs() {
}

@Override
// This is an immutable field so it's safe to delegate
public String applicationId() {
return taskContext.applicationId();
}

@Override
// This is an immutable field so it's safe to delegate
public TaskId taskId() {
return taskContext.taskId();
}

@Override
// This just looks up the default serde in the configs so it's safe
public Serde<?> keySerde() {
return taskContext.keySerde();
}

@Override
// This just looks up the default serde in the configs so it's safe
public Serde<?> valueSerde() {
return taskContext.valueSerde();
}

@Override
// This is an immutable field so it's safe to delegate
public File stateDir() {
return taskContext.stateDir();
}

@Override
// This is an immutable field so it's safe to delegate
public StreamsMetrics metrics() {
return taskContext.metrics();
}

@Override
// Safe to delegate since all StreamThreads share the same configs anyway
public Map<String, Object> appConfigs() {
return taskContext.appConfigs();
}

@Override
// Safe to delegate since all StreamThreads share the same configs anyway
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
return taskContext.appConfigsWithPrefix(prefix);
public InternalProcessorContext<KOut, VOut> delegate() {
return taskContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,41 @@
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorMetadata;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.ThreadCache;

/**
* Basic wrapper around a {@link ProcessorContext}/{@link FixedKeyProcessorContext}
* that just delegates to the underlying context.
*/
@SuppressWarnings({"checkstyle:linelength", "checkstyle:overloadmethodsdeclarationorder"})
public abstract class DelegatingProcessorContext<KOut, VOut, D extends ProcessorContext<KOut, VOut> & FixedKeyProcessorContext<KOut, VOut>>
public abstract class DelegatingProcessorContext<KOut, VOut, D
extends InternalProcessorContext<KOut, VOut> & FixedKeyProcessorContext<KOut, VOut>>
implements MergedProcessorContext<KOut, VOut> {

public abstract D delegate();
Expand Down Expand Up @@ -70,7 +86,7 @@ public File stateDir() {
}

@Override
public StreamsMetrics metrics() {
public StreamsMetricsImpl metrics() {
return delegate().metrics();
}

Expand Down Expand Up @@ -138,4 +154,161 @@ public <K extends KOut, V extends VOut> void forward(
) {
delegate().forward(record, childName);
}

@Override
public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
return delegate().getStateStore(builder);
}

@Override
public void setSystemTimeMs(final long timeMs) {
delegate().setSystemTimeMs(timeMs);
}

@Override
public ProcessorRecordContext recordContext() {
return delegate().recordContext();
}

@Override
public void setRecordContext(final ProcessorRecordContext recordContext) {
delegate().setRecordContext(recordContext);
}

@Override
public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
delegate().setCurrentNode(currentNode);
}

@Override
public ProcessorNode<?, ?, ?, ?> currentNode() {
return delegate().currentNode();
}

@Override
public ThreadCache cache() {
return delegate().cache();
}

@Override
public void initialize() {
delegate().initialize();
}

@Override
public void uninitialize() {
delegate().uninitialize();
}

@Override
public Task.TaskType taskType() {
return delegate().taskType();
}

@Override
public void transitionToActive(
final StreamTask streamTask,
final RecordCollector recordCollector,
final ThreadCache newCache
) {
delegate().transitionToActive(streamTask, recordCollector, newCache);
}

@Override
public void transitionToStandby(final ThreadCache newCache) {
delegate().transitionToStandby(newCache);
}

@Override
public void registerCacheFlushListener(
final String namespace,
final ThreadCache.DirtyEntryFlushListener listener
) {
delegate().registerCacheFlushListener(namespace, listener);
}

@Override
public void logChange(
final String storeName,
final Bytes key,
final byte[] value,
final long timestamp,
final Position position
) {
delegate().logChange(storeName, key, value, timestamp, position);
}

@Override
public String changelogFor(final String storeName) {
return delegate().changelogFor(storeName);
}

@Override
public void addProcessorMetadataKeyValue(final String key, final long value) {
delegate().addProcessorMetadataKeyValue(key, value);
}

@Override
public Long processorMetadataForKey(final String key) {
return delegate().processorMetadataForKey(key);
}

@Override
public void setProcessorMetadata(final ProcessorMetadata metadata) {
delegate().setProcessorMetadata(metadata);
}

@Override
public ProcessorMetadata getProcessorMetadata() {
return delegate().getProcessorMetadata();
}

@Override
public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
delegate().register(store, stateRestoreCallback);
}

@Override
public <K, V> void forward(final K key, final V value) {
delegate().forward(key, value);
}

@Override
public <K, V> void forward(final K key, final V value, final To to) {
delegate().forward(key, value, to);
}

@Override
public String topic() {
return delegate().topic();
}

@Override
public int partition() {
return delegate().partition();
}

@Override
public long offset() {
return delegate().offset();
}

@Override
public Headers headers() {
return delegate().headers();
}

@Override
public long timestamp() {
return delegate().timestamp();
}

@Override
public void register(
final StateStore store,
final StateRestoreCallback stateRestoreCallback,
final CommitCallback commitCallback
) {
delegate().register(store, stateRestoreCallback, commitCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
package dev.responsive.kafka.api.async.internals.contexts;

import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;

/**
* Simple interface for processor contexts that may be used for both
* fixed-key and non-fixed-key processors
*/
public interface MergedProcessorContext<KOut, VOut>
extends ProcessorContext<KOut, VOut>, FixedKeyProcessorContext<KOut, VOut> {
extends InternalProcessorContext<KOut, VOut>, FixedKeyProcessorContext<KOut, VOut> {
}
Loading
Loading