Skip to content

Commit 965510c

Browse files
author
kishansairam9
committed
name single threads of kafka live event listener
1 parent f555a3e commit 965510c

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.ExecutorService;
77
import java.util.concurrent.Executors;
88
import java.util.concurrent.Future;
9+
import java.util.concurrent.ThreadFactory;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.function.BiConsumer;
1112
import org.apache.kafka.clients.consumer.Consumer;
@@ -64,7 +65,7 @@ public void close() throws Exception {
6465
public static final class Builder<K, V> {
6566
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
6667
new ConcurrentLinkedQueue<>();
67-
private ExecutorService executorService = Executors.newSingleThreadExecutor();
68+
private ExecutorService executorService;
6869
private boolean cleanupExecutor =
6970
true; // if builder creates executor shutdown executor while closing event listener
7071

@@ -84,10 +85,26 @@ public Builder<K, V> withExecutorService(
8485

8586
public KafkaLiveEventListener<K, V> build(
8687
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
88+
if (executorService == null) {
89+
executorService =
90+
Executors.newSingleThreadExecutor(new ListenerThreadFactory(consumerName));
91+
}
8792
return new KafkaLiveEventListener<>(
8893
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
8994
executorService,
9095
cleanupExecutor);
9196
}
9297
}
9398
}
99+
100+
class ListenerThreadFactory implements ThreadFactory {
101+
private final String name;
102+
103+
public ListenerThreadFactory(String consumerName) {
104+
this.name = "kafka-live-event-listener-" + consumerName;
105+
}
106+
107+
public Thread newThread(Runnable r) {
108+
return new Thread(r, name);
109+
}
110+
}

0 commit comments

Comments
 (0)