diff --git a/build.gradle.kts b/build.gradle.kts index e7cc1d5..5df088b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { id("org.hypertrace.publish-plugin") version "1.1.1" apply false id("org.hypertrace.jacoco-report-plugin") version "0.3.0" apply false id("org.hypertrace.code-style-plugin") version "2.1.2" apply false - id("org.owasp.dependencycheck") version "12.1.0" + id("org.owasp.dependencycheck") version "12.1.3" } subprojects { diff --git a/kafka-bom/build.gradle.kts b/kafka-bom/build.gradle.kts index 115344a..77e1eee 100644 --- a/kafka-bom/build.gradle.kts +++ b/kafka-bom/build.gradle.kts @@ -22,6 +22,10 @@ dependencies { api("org.apache.commons:commons-compress:1.26.0") { because("https://www.tenable.com/cve/CVE-2024-25710") } + api("org.apache.commons:commons-lang3:3.18.0") { + because("CVE-2025-48924 is fixed in 3.18.0") + } + api("io.confluent:kafka-streams-avro-serde:$confluentVersion") api("io.confluent:kafka-protobuf-serializer:$confluentVersion") @@ -30,6 +34,6 @@ dependencies { api("org.apache.kafka:kafka-clients:$confluentCcsVersion") api("org.apache.kafka:kafka-streams:$confluentCcsVersion") api("org.apache.kafka:kafka-streams-test-utils:$confluentCcsVersion") - api("org.apache.avro:avro:1.11.4") + api("org.apache.avro:avro:1.12.0") } } diff --git a/kafka-streams-framework/build.gradle.kts b/kafka-streams-framework/build.gradle.kts index 692e2a4..69db434 100644 --- a/kafka-streams-framework/build.gradle.kts +++ b/kafka-streams-framework/build.gradle.kts @@ -18,13 +18,13 @@ dependencies { api(platform(project(":kafka-bom"))) api("org.apache.kafka:kafka-streams") api("io.confluent:kafka-streams-avro-serde") - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16") implementation("org.apache.avro:avro") implementation("org.apache.kafka:kafka-clients") implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.89") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.89") - implementation("org.apache.commons:commons-lang3:3.12.0") + implementation("org.apache.commons:commons-lang3:3.18.0") testCompileOnly("org.projectlombok:lombok:1.18.26") testAnnotationProcessor("org.projectlombok:lombok:1.18.26") diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java index b90576a..640ab79 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/punctuators/AbstractThrottledPunctuator.java @@ -1,5 +1,7 @@ package org.hypertrace.core.kafkastreams.framework.punctuators; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -20,12 +22,29 @@ public abstract class AbstractThrottledPunctuator implements Punctuator { private final Clock clock; private final KeyValueStore> eventStore; private final ThrottledPunctuatorConfig config; + private final MeterRegistry meterRegistry; + private final String punctuatorName; public AbstractThrottledPunctuator( - Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { + Clock clock, + ThrottledPunctuatorConfig config, + KeyValueStore> eventStore, + MeterRegistry meterRegistry, + String punctuatorName) { this.clock = clock; this.config = config; this.eventStore = eventStore; + this.meterRegistry = meterRegistry; + this.punctuatorName = resolvePunctuatorName(punctuatorName); + } + + public AbstractThrottledPunctuator( + Clock clock, ThrottledPunctuatorConfig config, KeyValueStore> eventStore) { + this(clock, config, eventStore, null, null); + } + + private String resolvePunctuatorName(String name) { + return (name != null && !name.isBlank()) ? name : this.getClass().getSimpleName(); } public void scheduleTask(long scheduleMs, T event) { @@ -124,6 +143,8 @@ public final void punctuate(long timestamp) { } } } + boolean yielded = shouldYieldNow(startTime); + publishMetrics(totalProcessedTasks, yielded); log.debug( "processed windows: {}, processed tasks: {}, time taken: {}", totalProcessedWindows, @@ -148,4 +169,15 @@ private boolean shouldYieldNow(long startTimestamp) { private long normalize(long timestamp) { return timestamp - (timestamp % config.getWindowMs()); } + + private void publishMetrics(int totalProcessedTasks, boolean yielded) { + if (meterRegistry != null) { + meterRegistry + .counter( + "throttled.punctuator.processed.task.count", Tags.of("punctuator", punctuatorName)) + .increment(totalProcessedTasks); + meterRegistry.gauge( + "throttled.punctuator.yielded", Tags.of("punctuator", punctuatorName), yielded ? 1 : 0); + } + } } diff --git a/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts b/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts index 6c2f360..0e75c5f 100644 --- a/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts +++ b/kafka-streams-partitioners/weighted-group-partitioner/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { api(platform(project(":kafka-bom"))) api("org.apache.kafka:kafka-streams") - api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.14") + api("org.hypertrace.core.grpcutils:grpc-client-utils:0.13.16") api("com.typesafe:config:1.4.2") implementation("com.google.guava:guava:32.0.1-jre") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.13.14")