Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move register callback to non builder inference #94

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.hypertrace.core.kafka.event.listener;

import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -30,6 +30,7 @@
* for sample usage and test. Note that testing requires Thread.sleep > poll timeout in between
*/
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
private final KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable;
private final Future<Void> kafkaLiveEventListenerCallableFuture;
private final ExecutorService executorService;
private final boolean cleanupExecutor;
Expand All @@ -40,10 +41,17 @@ private KafkaLiveEventListener(
boolean cleanupExecutor) {
this.executorService = executorService;
this.cleanupExecutor = cleanupExecutor;
this.kafkaLiveEventListenerCallable = kafkaLiveEventListenerCallable;
this.kafkaLiveEventListenerCallableFuture =
executorService.submit(kafkaLiveEventListenerCallable);
}

public KafkaLiveEventListener<K, V> registerCallback(
BiConsumer<? super K, ? super V> callbackFunction) {
kafkaLiveEventListenerCallable.addCallback(callbackFunction);
return this;
}

@Override
public void close() throws Exception {
kafkaLiveEventListenerCallableFuture.cancel(true);
Expand All @@ -54,11 +62,14 @@ public void close() throws Exception {
}

public static final class Builder<K, V> {
List<BiConsumer<? super K, ? super V>> callbacks = new ArrayList<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
boolean cleanupExecutor =
private final Collection<BiConsumer<? super K, ? super V>> callbacks =
new ConcurrentLinkedQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - no reason to use a concurrent queue here. No concurrency requirements in the builder and this will be copied into a concurrent queue when the listener is built.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I realised it, felt no harm to have a concurrent safe data structure because in deprecated method we suggested to have singleton instance of builder

private ExecutorService executorService = Executors.newSingleThreadExecutor();
private boolean cleanupExecutor =
true; // if builder creates executor shutdown executor while closing event listener

/** use registerCallback on the built instance instead */
@Deprecated(forRemoval = true)
public Builder<K, V> registerCallback(BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
return this;
Expand All @@ -73,17 +84,10 @@ public Builder<K, V> withExecutorService(

public KafkaLiveEventListener<K, V> build(
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
assertCallbacksPresent();
return new KafkaLiveEventListener<>(
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
executorService,
cleanupExecutor);
}

private void assertCallbacksPresent() {
if (callbacks.isEmpty()) {
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.typesafe.config.Config;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,14 +28,15 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
private final Consumer<K, V> kafkaConsumer;
private final Duration pollTimeout;
private final Counter errorCounter;
private final List<BiConsumer<? super K, ? super V>> callbacks;
private final ConcurrentLinkedQueue<BiConsumer<? super K, ? super V>> callbacks;

KafkaLiveEventListenerCallable(
String consumerName,
Config kafkaConfig,
Consumer<K, V> kafkaConsumer,
List<BiConsumer<? super K, ? super V>> callbacks) {
this.callbacks = callbacks;
Collection<BiConsumer<? super K, ? super V>> callbackCollection) {
this.callbacks = new ConcurrentLinkedQueue<>();
callbackCollection.forEach(this::addCallback);
this.pollTimeout =
kafkaConfig.hasPath(POLL_TIMEOUT)
? kafkaConfig.getDuration(POLL_TIMEOUT)
Expand All @@ -53,6 +56,10 @@ class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
}

void addCallback(BiConsumer<? super K, ? super V> callbackFunction) {
callbacks.add(callbackFunction);
}

@Override
public Void call() {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@ class KafkaLiveEventListenerTest {

@Test
void testThrowOnInvalidInputs() {
// no callback
assertThrows(
IllegalArgumentException.class,
() ->
new KafkaLiveEventListener.Builder<String, Long>()
.build(
"",
ConfigFactory.parseMap(Map.of("topic.name", "")),
new MockConsumer<>(OffsetResetStrategy.LATEST)));
// no topic name
assertThrows(
ConfigException.class,
() ->
new KafkaLiveEventListener.Builder<String, Long>()
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v))
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST)));
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST))
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v)));
}

@Test
Expand Down Expand Up @@ -80,9 +71,9 @@ static class EventModificationCache {
.buildAsync(this::load);
eventListener =
new KafkaLiveEventListener.Builder<String, Long>()
.build(consumerName, kafkaConfig, consumer)
.registerCallback(this::actOnEvent)
.registerCallback(this::log)
.build(consumerName, kafkaConfig, consumer);
.registerCallback(this::log);
}

public void close() throws Exception {
Expand Down
Loading