Skip to content

Commit f555a3e

Browse files
refactor: move kafka initialization off calling thread (#95)
1 parent f0f565e commit f555a3e

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,18 @@
1616
import lombok.extern.slf4j.Slf4j;
1717
import org.apache.kafka.clients.consumer.Consumer;
1818
import org.apache.kafka.clients.consumer.ConsumerRecords;
19-
import org.apache.kafka.common.PartitionInfo;
2019
import org.apache.kafka.common.TopicPartition;
2120
import org.apache.kafka.common.errors.InterruptException;
2221
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
2322

2423
@Slf4j
2524
class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
25+
2626
private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count";
27-
private final List<TopicPartition> topicPartitions;
2827
private final Consumer<K, V> kafkaConsumer;
2928
private final Duration pollTimeout;
3029
private final Counter errorCounter;
30+
private final String topic;
3131
private final ConcurrentLinkedQueue<BiConsumer<? super K, ? super V>> callbacks;
3232

3333
KafkaLiveEventListenerCallable(
@@ -41,16 +41,9 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
4141
kafkaConfig.hasPath(POLL_TIMEOUT)
4242
? kafkaConfig.getDuration(POLL_TIMEOUT)
4343
: Duration.ofSeconds(30);
44-
String topic = kafkaConfig.getString(TOPIC_NAME);
44+
this.topic = kafkaConfig.getString(TOPIC_NAME);
4545
this.kafkaConsumer = kafkaConsumer;
46-
// fetch partitions and seek to end of partitions to consume live events
47-
List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic);
48-
topicPartitions =
49-
partitions.stream()
50-
.map(p -> new TopicPartition(p.topic(), p.partition()))
51-
.collect(Collectors.toList());
52-
kafkaConsumer.assign(topicPartitions);
53-
kafkaConsumer.seekToEnd(topicPartitions);
46+
5447
this.errorCounter =
5548
PlatformMetricsRegistry.registerCounter(
5649
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
@@ -60,8 +53,20 @@ void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
6053
callbacks.add(callbackFunction);
6154
}
6255

56+
private List<TopicPartition> initializePartitions() {
57+
// fetch partitions and seek to end of partitions to consume live events
58+
List<TopicPartition> topicPartitions =
59+
kafkaConsumer.partitionsFor(this.topic).stream()
60+
.map(p -> new TopicPartition(p.topic(), p.partition()))
61+
.collect(Collectors.toUnmodifiableList());
62+
kafkaConsumer.assign(topicPartitions);
63+
kafkaConsumer.seekToEnd(topicPartitions);
64+
return topicPartitions;
65+
}
66+
6367
@Override
6468
public Void call() {
69+
List<TopicPartition> topicPartitions = this.initializePartitions();
6570
do {
6671
try {
6772
ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeout);

0 commit comments

Comments
 (0)