Skip to content
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.hypertrace.core.kafkastreams.framework.punctuators;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -8,6 +10,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.Punctuator;
Expand All @@ -17,15 +21,24 @@

@Slf4j
public abstract class AbstractThrottledPunctuator<T> implements Punctuator {
private static final ConcurrentHashMap<String, AtomicInteger> totalEventCountGauge =
new ConcurrentHashMap<>();
private static final String TOTAL_EVENT_COUNT_GAUGE_NAME =
"abstract.throttled.punctuator.total.events.count";
private final Clock clock;
private final KeyValueStore<Long, List<T>> eventStore;
private final ThrottledPunctuatorConfig config;
private final MeterRegistry meterRegistry;

public AbstractThrottledPunctuator(
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
Clock clock,
ThrottledPunctuatorConfig config,
KeyValueStore<Long, List<T>> eventStore,
MeterRegistry meterRegistry) {
this.clock = clock;
this.config = config;
this.eventStore = eventStore;
this.meterRegistry = meterRegistry;
}

public void scheduleTask(long scheduleMs, T event) {
Expand Down Expand Up @@ -68,6 +81,7 @@ public final void punctuate(long timestamp) {
long startTime = clock.millis();
int totalProcessedWindows = 0;
int totalProcessedTasks = 0;
int totalEventCount = 0;

log.debug(
"Processing tasks with throttling yield of {} until timestamp {}",
Expand All @@ -81,6 +95,7 @@ public final void punctuate(long timestamp) {
totalProcessedWindows++;
List<T> events = kv.value;
long windowMs = kv.key;
totalEventCount += events.size();
// collect all tasks to be rescheduled by key to perform bulk reschedules
Map<Long, List<T>> rescheduledTasks = new HashMap<>();
// loop through all events for this key until yield timeout is reached
Expand Down Expand Up @@ -124,11 +139,16 @@ public final void punctuate(long timestamp) {
}
}
}
long timeTakenMs = clock.millis() - startTime;
boolean yielded = shouldYieldNow(startTime);
updateTotalEventCountGauge(totalEventCount, yielded);

log.debug(
"processed windows: {}, processed tasks: {}, time taken: {}",
"processed windows: {}, processed tasks: {}, total events: {}, time taken: {}ms",
totalProcessedWindows,
totalProcessedTasks,
clock.millis() - startTime);
totalEventCount,
timeTakenMs);
}

protected abstract TaskResult executeTask(long punctuateTimestamp, T object);
Expand All @@ -148,4 +168,23 @@ private boolean shouldYieldNow(long startTimestamp) {
private long normalize(long timestamp) {
return timestamp - (timestamp % config.getWindowMs());
}

private void updateTotalEventCountGauge(int totalEventCount, boolean yielded) {
if (meterRegistry == null) {
return;
}

String tagValue = String.valueOf(yielded);

AtomicInteger gauge =
totalEventCountGauge.computeIfAbsent(
tagValue,
key -> {
AtomicInteger newGauge = new AtomicInteger(0);
meterRegistry.gauge(TOTAL_EVENT_COUNT_GAUGE_NAME, Tags.of("yielded", key), newGauge);
return newGauge;
});

gauge.set(totalEventCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public TestPunctuator(
Clock clock,
ThrottledPunctuatorConfig config,
KeyValueStore<Long, List<String>> objectStore) {
super(clock, config, objectStore);
super(clock, config, objectStore, null);
}

void setReturnResult(String object, TaskResult result) {
Expand Down