Skip to content

Commit 036434e

Browse files
authored
Make async context an instance of InternalProcessorContext (#391)
1 parent 1495cb1 commit 036434e

File tree

7 files changed

+196
-100
lines changed

7 files changed

+196
-100
lines changed

kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public class AsyncProcessor<KIn, VIn, KOut, VOut>
102102
private Cancellable punctuator;
103103

104104
// the context passed to us in init, ie the one created for this task and owned by Kafka Streams
105-
private ProcessingContext taskContext;
105+
private InternalProcessorContext<KOut, VOut> taskContext;
106106

107107
// the async context owned by the StreamThread that is running this processor/task
108108
private StreamThreadProcessorContext<KOut, VOut> streamThreadContext;

kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/AsyncThreadPool.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.kafka.common.utils.LogContext;
4141
import org.apache.kafka.streams.errors.StreamsException;
4242
import org.apache.kafka.streams.processor.TaskId;
43-
import org.apache.kafka.streams.processor.api.ProcessingContext;
43+
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
4444
import org.slf4j.Logger;
4545

4646
public class AsyncThreadPool {
@@ -168,7 +168,7 @@ public <KOut, VOut> void scheduleForProcessing(
168168
final TaskId taskId,
169169
final List<AsyncEvent> events,
170170
final FinalizingQueue finalizingQueue,
171-
final ProcessingContext taskContext,
171+
final InternalProcessorContext<KOut, VOut> taskContext,
172172
final AsyncUserProcessorContext<KOut, VOut> asyncProcessorContext,
173173
final AsyncProcessorMetricsRecorder processorMetricsRecorder
174174
) {
@@ -285,7 +285,7 @@ private static class AsyncEventTask<KOut, VOut> implements Supplier<StreamsExcep
285285

286286
private AsyncEventTask(
287287
final AsyncEvent event,
288-
final ProcessingContext taskContext,
288+
final InternalProcessorContext<KOut, VOut> taskContext,
289289
final AsyncUserProcessorContext<KOut, VOut> userContext,
290290
final Semaphore queueSemaphore,
291291
final AsyncProcessorMetricsRecorder metricsRecorder

kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.java

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,18 @@
1414

1515
import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
1616
import dev.responsive.kafka.api.async.internals.events.DelayedForward;
17-
import java.io.File;
1817
import java.time.Duration;
19-
import java.util.Map;
2018
import java.util.Optional;
21-
import org.apache.kafka.common.serialization.Serde;
22-
import org.apache.kafka.streams.StreamsMetrics;
2319
import org.apache.kafka.streams.processor.Cancellable;
2420
import org.apache.kafka.streams.processor.PunctuationType;
2521
import org.apache.kafka.streams.processor.Punctuator;
2622
import org.apache.kafka.streams.processor.StateStore;
27-
import org.apache.kafka.streams.processor.TaskId;
2823
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
29-
import org.apache.kafka.streams.processor.api.ProcessingContext;
3024
import org.apache.kafka.streams.processor.api.Processor;
3125
import org.apache.kafka.streams.processor.api.ProcessorContext;
3226
import org.apache.kafka.streams.processor.api.Record;
3327
import org.apache.kafka.streams.processor.api.RecordMetadata;
28+
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
3429

3530
/**
3631
* A special kind of mock/wrapper context to be used by the AsyncThread.
@@ -66,7 +61,8 @@
6661
* (ie one per AsyncThread per StreamThread per async processor per partition)
6762
* Equivalently, one per AsyncThread for each "original" ProcessorContext in Streams
6863
*/
69-
public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorContext<KOut, VOut> {
64+
public class AsyncThreadProcessorContext<KOut, VOut>
65+
extends DelegatingProcessorContext<KOut, VOut, InternalProcessorContext<KOut, VOut>> {
7066

7167
// The AsyncEvent that is currently being processed by this AsyncThread. Updated each
7268
// time a new event is picked up from the processing queue but before beginning
@@ -78,7 +74,7 @@ public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorC
7874
// in to the async processor during init. This MUST be protected from
7975
// any mutations and should only be delegated to in pure getters that
8076
// access immutable fields (such as applicationId)
81-
private final ProcessingContext taskContext;
77+
private final InternalProcessorContext<KOut, VOut> taskContext;
8278

8379
// TODO: we won't need to do this until we support async with the DSL and support
8480
// the new windowed emit semantics specifically, which is the only thing using it,
@@ -90,7 +86,7 @@ public class AsyncThreadProcessorContext<KOut, VOut> implements MergedProcessorC
9086
// further inspection but isn't supported by either the async framework or in
9187
// Responsive in general, so it's not urgent.
9288
public AsyncThreadProcessorContext(
93-
final ProcessingContext taskContext,
89+
final InternalProcessorContext<KOut, VOut> taskContext,
9490
final AsyncEvent currentAsyncEvent
9591
) {
9692
this.taskContext = taskContext;
@@ -199,50 +195,7 @@ public long currentStreamTimeMs() {
199195
}
200196

201197
@Override
202-
// This is an immutable field so it's safe to delegate
203-
public String applicationId() {
204-
return taskContext.applicationId();
205-
}
206-
207-
@Override
208-
// This is an immutable field so it's safe to delegate
209-
public TaskId taskId() {
210-
return taskContext.taskId();
211-
}
212-
213-
@Override
214-
// This just looks up the default serde in the configs so it's safe
215-
public Serde<?> keySerde() {
216-
return taskContext.keySerde();
217-
}
218-
219-
@Override
220-
// This just looks up the default serde in the configs so it's safe
221-
public Serde<?> valueSerde() {
222-
return taskContext.valueSerde();
223-
}
224-
225-
@Override
226-
// This is an immutable field so it's safe to delegate
227-
public File stateDir() {
228-
return taskContext.stateDir();
229-
}
230-
231-
@Override
232-
// This is an immutable field so it's safe to delegate
233-
public StreamsMetrics metrics() {
234-
return taskContext.metrics();
235-
}
236-
237-
@Override
238-
// Safe to delegate since all StreamThreads share the same configs anyway
239-
public Map<String, Object> appConfigs() {
240-
return taskContext.appConfigs();
241-
}
242-
243-
@Override
244-
// Safe to delegate since all StreamThreads share the same configs anyway
245-
public Map<String, Object> appConfigsWithPrefix(final String prefix) {
246-
return taskContext.appConfigsWithPrefix(prefix);
198+
public InternalProcessorContext<KOut, VOut> delegate() {
199+
return taskContext;
247200
}
248201
}

kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/DelegatingProcessorContext.java

Lines changed: 176 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,41 @@
1616
import java.time.Duration;
1717
import java.util.Map;
1818
import java.util.Optional;
19+
import org.apache.kafka.common.header.Headers;
1920
import org.apache.kafka.common.serialization.Serde;
20-
import org.apache.kafka.streams.StreamsMetrics;
21+
import org.apache.kafka.common.utils.Bytes;
2122
import org.apache.kafka.streams.processor.Cancellable;
23+
import org.apache.kafka.streams.processor.CommitCallback;
2224
import org.apache.kafka.streams.processor.PunctuationType;
2325
import org.apache.kafka.streams.processor.Punctuator;
26+
import org.apache.kafka.streams.processor.StateRestoreCallback;
2427
import org.apache.kafka.streams.processor.StateStore;
2528
import org.apache.kafka.streams.processor.TaskId;
29+
import org.apache.kafka.streams.processor.To;
2630
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
2731
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
2832
import org.apache.kafka.streams.processor.api.ProcessorContext;
2933
import org.apache.kafka.streams.processor.api.Record;
3034
import org.apache.kafka.streams.processor.api.RecordMetadata;
35+
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
36+
import org.apache.kafka.streams.processor.internals.ProcessorMetadata;
37+
import org.apache.kafka.streams.processor.internals.ProcessorNode;
38+
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
39+
import org.apache.kafka.streams.processor.internals.RecordCollector;
40+
import org.apache.kafka.streams.processor.internals.StreamTask;
41+
import org.apache.kafka.streams.processor.internals.Task;
42+
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
43+
import org.apache.kafka.streams.query.Position;
44+
import org.apache.kafka.streams.state.StoreBuilder;
45+
import org.apache.kafka.streams.state.internals.ThreadCache;
3146

3247
/**
3348
* Basic wrapper around a {@link ProcessorContext}/{@link FixedKeyProcessorContext}
3449
* that just delegates to the underlying context.
3550
*/
3651
@SuppressWarnings({"checkstyle:linelength", "checkstyle:overloadmethodsdeclarationorder"})
37-
public abstract class DelegatingProcessorContext<KOut, VOut, D extends ProcessorContext<KOut, VOut> & FixedKeyProcessorContext<KOut, VOut>>
52+
public abstract class DelegatingProcessorContext<KOut, VOut, D
53+
extends InternalProcessorContext<KOut, VOut> & FixedKeyProcessorContext<KOut, VOut>>
3854
implements MergedProcessorContext<KOut, VOut> {
3955

4056
public abstract D delegate();
@@ -70,7 +86,7 @@ public File stateDir() {
7086
}
7187

7288
@Override
73-
public StreamsMetrics metrics() {
89+
public StreamsMetricsImpl metrics() {
7490
return delegate().metrics();
7591
}
7692

@@ -138,4 +154,161 @@ public <K extends KOut, V extends VOut> void forward(
138154
) {
139155
delegate().forward(record, childName);
140156
}
157+
158+
@Override
159+
public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) {
160+
return delegate().getStateStore(builder);
161+
}
162+
163+
@Override
164+
public void setSystemTimeMs(final long timeMs) {
165+
delegate().setSystemTimeMs(timeMs);
166+
}
167+
168+
@Override
169+
public ProcessorRecordContext recordContext() {
170+
return delegate().recordContext();
171+
}
172+
173+
@Override
174+
public void setRecordContext(final ProcessorRecordContext recordContext) {
175+
delegate().setRecordContext(recordContext);
176+
}
177+
178+
@Override
179+
public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
180+
delegate().setCurrentNode(currentNode);
181+
}
182+
183+
@Override
184+
public ProcessorNode<?, ?, ?, ?> currentNode() {
185+
return delegate().currentNode();
186+
}
187+
188+
@Override
189+
public ThreadCache cache() {
190+
return delegate().cache();
191+
}
192+
193+
@Override
194+
public void initialize() {
195+
delegate().initialize();
196+
}
197+
198+
@Override
199+
public void uninitialize() {
200+
delegate().uninitialize();
201+
}
202+
203+
@Override
204+
public Task.TaskType taskType() {
205+
return delegate().taskType();
206+
}
207+
208+
@Override
209+
public void transitionToActive(
210+
final StreamTask streamTask,
211+
final RecordCollector recordCollector,
212+
final ThreadCache newCache
213+
) {
214+
delegate().transitionToActive(streamTask, recordCollector, newCache);
215+
}
216+
217+
@Override
218+
public void transitionToStandby(final ThreadCache newCache) {
219+
delegate().transitionToStandby(newCache);
220+
}
221+
222+
@Override
223+
public void registerCacheFlushListener(
224+
final String namespace,
225+
final ThreadCache.DirtyEntryFlushListener listener
226+
) {
227+
delegate().registerCacheFlushListener(namespace, listener);
228+
}
229+
230+
@Override
231+
public void logChange(
232+
final String storeName,
233+
final Bytes key,
234+
final byte[] value,
235+
final long timestamp,
236+
final Position position
237+
) {
238+
delegate().logChange(storeName, key, value, timestamp, position);
239+
}
240+
241+
@Override
242+
public String changelogFor(final String storeName) {
243+
return delegate().changelogFor(storeName);
244+
}
245+
246+
@Override
247+
public void addProcessorMetadataKeyValue(final String key, final long value) {
248+
delegate().addProcessorMetadataKeyValue(key, value);
249+
}
250+
251+
@Override
252+
public Long processorMetadataForKey(final String key) {
253+
return delegate().processorMetadataForKey(key);
254+
}
255+
256+
@Override
257+
public void setProcessorMetadata(final ProcessorMetadata metadata) {
258+
delegate().setProcessorMetadata(metadata);
259+
}
260+
261+
@Override
262+
public ProcessorMetadata getProcessorMetadata() {
263+
return delegate().getProcessorMetadata();
264+
}
265+
266+
@Override
267+
public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {
268+
delegate().register(store, stateRestoreCallback);
269+
}
270+
271+
@Override
272+
public <K, V> void forward(final K key, final V value) {
273+
delegate().forward(key, value);
274+
}
275+
276+
@Override
277+
public <K, V> void forward(final K key, final V value, final To to) {
278+
delegate().forward(key, value, to);
279+
}
280+
281+
@Override
282+
public String topic() {
283+
return delegate().topic();
284+
}
285+
286+
@Override
287+
public int partition() {
288+
return delegate().partition();
289+
}
290+
291+
@Override
292+
public long offset() {
293+
return delegate().offset();
294+
}
295+
296+
@Override
297+
public Headers headers() {
298+
return delegate().headers();
299+
}
300+
301+
@Override
302+
public long timestamp() {
303+
return delegate().timestamp();
304+
}
305+
306+
@Override
307+
public void register(
308+
final StateStore store,
309+
final StateRestoreCallback stateRestoreCallback,
310+
final CommitCallback commitCallback
311+
) {
312+
delegate().register(store, stateRestoreCallback, commitCallback);
313+
}
141314
}

kafka-client/src/main/java/dev/responsive/kafka/api/async/internals/contexts/MergedProcessorContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
package dev.responsive.kafka.api.async.internals.contexts;
1414

1515
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
16-
import org.apache.kafka.streams.processor.api.ProcessorContext;
16+
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
1717

1818
/**
1919
* Simple interface for processor contexts that may be used for both
2020
* fixed-key and non-fixed-key processors
2121
*/
2222
public interface MergedProcessorContext<KOut, VOut>
23-
extends ProcessorContext<KOut, VOut>, FixedKeyProcessorContext<KOut, VOut> {
23+
extends InternalProcessorContext<KOut, VOut>, FixedKeyProcessorContext<KOut, VOut> {
2424
}

0 commit comments

Comments
 (0)