Skip to content
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
id("org.hypertrace.publish-plugin") version "1.1.1" apply false
id("org.hypertrace.jacoco-report-plugin") version "0.3.0" apply false
id("org.hypertrace.code-style-plugin") version "2.1.2" apply false
id("org.owasp.dependencycheck") version "12.1.0"
id("org.owasp.dependencycheck") version "12.1.3"
}

subprojects {
Expand Down
6 changes: 5 additions & 1 deletion kafka-bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ dependencies {
api("org.apache.commons:commons-compress:1.26.0") {
because("https://www.tenable.com/cve/CVE-2024-25710")
}
api("org.apache.commons:commons-lang3:3.18.0") {
because("CVE-2025-48924 is fixed in 3.18.0")
}


api("io.confluent:kafka-streams-avro-serde:$confluentVersion")
api("io.confluent:kafka-protobuf-serializer:$confluentVersion")
Expand All @@ -30,6 +34,6 @@ dependencies {
api("org.apache.kafka:kafka-clients:$confluentCcsVersion")
api("org.apache.kafka:kafka-streams:$confluentCcsVersion")
api("org.apache.kafka:kafka-streams-test-utils:$confluentCcsVersion")
api("org.apache.avro:avro:1.11.4")
api("org.apache.avro:avro:1.12.0")
}
}
4 changes: 2 additions & 2 deletions kafka-streams-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ dependencies {
api(platform(project(":kafka-bom")))
api("org.apache.kafka:kafka-streams")
api("io.confluent:kafka-streams-avro-serde")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16")

implementation("org.apache.avro:avro")
implementation("org.apache.kafka:kafka-clients")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.89")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.89")
implementation("org.apache.commons:commons-lang3:3.12.0")
implementation("org.apache.commons:commons-lang3:3.18.0")

testCompileOnly("org.projectlombok:lombok:1.18.26")
testAnnotationProcessor("org.projectlombok:lombok:1.18.26")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.hypertrace.core.kafkastreams.framework.punctuators;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -20,12 +22,29 @@ public abstract class AbstractThrottledPunctuator<T> implements Punctuator {
private final Clock clock;
private final KeyValueStore<Long, List<T>> eventStore;
private final ThrottledPunctuatorConfig config;
private final MeterRegistry meterRegistry;
private final String punctuatorName;

public AbstractThrottledPunctuator(
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
Clock clock,
ThrottledPunctuatorConfig config,
KeyValueStore<Long, List<T>> eventStore,
MeterRegistry meterRegistry,
String punctuatorName) {
this.clock = clock;
this.config = config;
this.eventStore = eventStore;
this.meterRegistry = meterRegistry;
this.punctuatorName = resolvePunctuatorName(punctuatorName);
}

public AbstractThrottledPunctuator(
Clock clock, ThrottledPunctuatorConfig config, KeyValueStore<Long, List<T>> eventStore) {
this(clock, config, eventStore, null, null);
}

private String resolvePunctuatorName(String name) {
return (name != null && !name.isBlank()) ? name : this.getClass().getSimpleName();
}

public void scheduleTask(long scheduleMs, T event) {
Expand Down Expand Up @@ -124,6 +143,8 @@ public final void punctuate(long timestamp) {
}
}
}
boolean yielded = shouldYieldNow(startTime);
publishMetrics(totalProcessedTasks, yielded);
log.debug(
"processed windows: {}, processed tasks: {}, time taken: {}",
totalProcessedWindows,
Expand All @@ -148,4 +169,15 @@ private boolean shouldYieldNow(long startTimestamp) {
private long normalize(long timestamp) {
return timestamp - (timestamp % config.getWindowMs());
}

private void publishMetrics(int totalProcessedTasks, boolean yielded) {
if (meterRegistry != null) {
meterRegistry
.counter(
"throttled.punctuator.processed.task.count", Tags.of("punctuator", punctuatorName))
.increment(totalProcessedTasks);
meterRegistry.gauge(
"throttled.punctuator.yielded", Tags.of("punctuator", punctuatorName), yielded ? 1 : 0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {

api(platform(project(":kafka-bom")))
api("org.apache.kafka:kafka-streams")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14")
api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16")
api("com.typesafe:config:1.4.2")
implementation("com.google.guava:guava:32.0.1-jre")
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.13.14")
Expand Down