Skip to content

Commit f0f565e

Browse files
author
Kishan Sairam Adapa
authoredMar 26, 2024··
move register callback to non builder inference (#94)
* move register callback to non builder inference * add comment * address comments
1 parent df4f796 commit f0f565e

File tree

3 files changed

+30
-28
lines changed

3 files changed

+30
-28
lines changed
 

‎kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListener.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package org.hypertrace.core.kafka.event.listener;
22

33
import com.typesafe.config.Config;
4-
import java.util.ArrayList;
5-
import java.util.List;
4+
import java.util.Collection;
5+
import java.util.concurrent.ConcurrentLinkedQueue;
66
import java.util.concurrent.ExecutorService;
77
import java.util.concurrent.Executors;
88
import java.util.concurrent.Future;
@@ -30,6 +30,7 @@
3030
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
3131
*/
3232
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
33+
private final KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable;
3334
private final Future<Void> kafkaLiveEventListenerCallableFuture;
3435
private final ExecutorService executorService;
3536
private final boolean cleanupExecutor;
@@ -40,10 +41,17 @@ private KafkaLiveEventListener(
4041
boolean cleanupExecutor) {
4142
this.executorService = executorService;
4243
this.cleanupExecutor = cleanupExecutor;
44+
this.kafkaLiveEventListenerCallable = kafkaLiveEventListenerCallable;
4345
this.kafkaLiveEventListenerCallableFuture =
4446
executorService.submit(kafkaLiveEventListenerCallable);
4547
}
4648

49+
public KafkaLiveEventListener<K, V> registerCallback(
50+
BiConsumer<? super K, ? super V> callbackFunction) {
51+
kafkaLiveEventListenerCallable.addCallback(callbackFunction);
52+
return this;
53+
}
54+
4755
@Override
4856
public void close() throws Exception {
4957
kafkaLiveEventListenerCallableFuture.cancel(true);
@@ -54,11 +62,14 @@ public void close() throws Exception {
5462
}
5563

5664
public static final class Builder<K, V> {
57-
List<BiConsumer<? super K, ? super V>> callbacks = new ArrayList<>();
58-
ExecutorService executorService = Executors.newSingleThreadExecutor();
59-
boolean cleanupExecutor =
65+
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
66+
new ConcurrentLinkedQueue<>();
67+
private ExecutorService executorService = Executors.newSingleThreadExecutor();
68+
private boolean cleanupExecutor =
6069
true; // if builder creates executor shutdown executor while closing event listener
6170

71+
/** use registerCallback on the built instance instead */
72+
@Deprecated(forRemoval = true)
6273
public Builder<K, V> registerCallback(BiConsumer<? super K, ? super V> callbackFunction) {
6374
callbacks.add(callbackFunction);
6475
return this;
@@ -73,17 +84,10 @@ public Builder<K, V> withExecutorService(
7384

7485
public KafkaLiveEventListener<K, V> build(
7586
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
76-
assertCallbacksPresent();
7787
return new KafkaLiveEventListener<>(
7888
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
7989
executorService,
8090
cleanupExecutor);
8191
}
82-
83-
private void assertCallbacksPresent() {
84-
if (callbacks.isEmpty()) {
85-
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
86-
}
87-
}
8892
}
8993
}

‎kafka-event-listener/src/main/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerCallable.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import com.typesafe.config.Config;
77
import io.micrometer.core.instrument.Counter;
88
import java.time.Duration;
9+
import java.util.Collection;
910
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.concurrent.Callable;
13+
import java.util.concurrent.ConcurrentLinkedQueue;
1214
import java.util.function.BiConsumer;
1315
import java.util.stream.Collectors;
1416
import lombok.extern.slf4j.Slf4j;
@@ -26,14 +28,15 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
2628
private final Consumer<K, V> kafkaConsumer;
2729
private final Duration pollTimeout;
2830
private final Counter errorCounter;
29-
private final List<BiConsumer<? super K, ? super V>> callbacks;
31+
private final ConcurrentLinkedQueue<BiConsumer<? super K, ? super V>> callbacks;
3032

3133
KafkaLiveEventListenerCallable(
3234
String consumerName,
3335
Config kafkaConfig,
3436
Consumer<K, V> kafkaConsumer,
35-
List<BiConsumer<? super K, ? super V>> callbacks) {
36-
this.callbacks = callbacks;
37+
Collection<BiConsumer<? super K, ? super V>> callbackCollection) {
38+
this.callbacks = new ConcurrentLinkedQueue<>();
39+
callbackCollection.forEach(this::addCallback);
3740
this.pollTimeout =
3841
kafkaConfig.hasPath(POLL_TIMEOUT)
3942
? kafkaConfig.getDuration(POLL_TIMEOUT)
@@ -53,6 +56,10 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
5356
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
5457
}
5558

59+
void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
60+
callbacks.add(callbackFunction);
61+
}
62+
5663
@Override
5764
public Void call() {
5865
do {

‎kafka-event-listener/src/test/java/org/hypertrace/core/kafka/event/listener/KafkaLiveEventListenerTest.java

+4-13
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,13 @@ class KafkaLiveEventListenerTest {
2222

2323
@Test
2424
void testThrowOnInvalidInputs() {
25-
// no callback
26-
assertThrows(
27-
IllegalArgumentException.class,
28-
() ->
29-
new KafkaLiveEventListener.Builder<String, Long>()
30-
.build(
31-
"",
32-
ConfigFactory.parseMap(Map.of("topic.name", "")),
33-
new MockConsumer<>(OffsetResetStrategy.LATEST)));
3425
// no topic name
3526
assertThrows(
3627
ConfigException.class,
3728
() ->
3829
new KafkaLiveEventListener.Builder<String, Long>()
39-
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v))
40-
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST)));
30+
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST))
31+
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v)));
4132
}
4233

4334
@Test
@@ -80,9 +71,9 @@ static class EventModificationCache {
8071
.buildAsync(this::load);
8172
eventListener =
8273
new KafkaLiveEventListener.Builder<String, Long>()
74+
.build(consumerName, kafkaConfig, consumer)
8375
.registerCallback(this::actOnEvent)
84-
.registerCallback(this::log)
85-
.build(consumerName, kafkaConfig, consumer);
76+
.registerCallback(this::log);
8677
}
8778

8879
public void close() throws Exception {

0 commit comments

Comments
 (0)
Please sign in to comment.