Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.jspecify.annotations.NullMarked;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
* Kafka Client metrics binder. This should be closed on application shutdown to clean up
Expand Down Expand Up @@ -221,4 +224,70 @@ public KafkaClientMetrics(AdminClient adminClient) {
super(adminClient::metrics);
}

/**
* Kafka client metrics binder. The lifecycle of the custom scheduler passed is the
* responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* @param metricsSupplier supplier of Kafka metrics, should come from the Java Kafka
* Client
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @since 1.16.0
*/
public KafkaClientMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> tags,
ScheduledExecutorService scheduler) {
super(metricsSupplier, tags, scheduler);
}

/**
* Kafka client metrics binder. The lifecycle of the custom scheduler passed is the
* responsibility of the caller. It will not be shut down when this instance is
* {@link #close() closed}. A scheduler can be shared among multiple instances of
* {@link KafkaClientMetrics} to reduce resource usage by reducing the number of
* threads if there will be many instances.
* <p>
* The refresh interval governs how frequently Micrometer should call the Kafka
* Client's Metrics API to discover new metrics to register and discard old ones since
* the Kafka Client can add/remove/recreate metrics on-the-fly. Please notice that
* this is not for fetching values for already registered metrics but for updating the
* list of registered metrics when the Kafka Client adds/removes/recreates them. It is
* the responsibility of the caller to choose the right value since this process can
* be expensive and metrics can appear and disappear without being published if the
* interval is not chosen appropriately.
* @param metricsSupplier supplier of Kafka metrics, should come from the Java Kafka
* Client
* @param tags additional tags
* @param scheduler custom scheduler to check and bind metrics
* @param refreshInterval interval of discovering new/removed/recreated metrics by the
* Kafka Client
* @since 1.16.0
*/
public KafkaClientMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> tags,
ScheduledExecutorService scheduler, Duration refreshInterval) {
super(metricsSupplier, tags, scheduler, refreshInterval);
}

/**
* Kafka client metrics binder
* @param metricsSupplier supplier of Kafka metrics, should come from the Java Kafka
* Client
* @param tags additional tags
* @since 1.16.0
*/
public KafkaClientMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier, Iterable<Tag> tags) {
super(metricsSupplier, tags);
}

/**
* Kafka client metrics binder
* @param metricsSupplier supplier of Kafka metrics, should come from the Java Kafka
* Client
* @since 1.16.0
*/
public KafkaClientMetrics(Supplier<Map<MetricName, ? extends Metric>> metricsSupplier) {
super(metricsSupplier);
}

}