Skip to content

Commit 7d5f42f

Browse files
author
Kishan Sairam Adapa
authored
feat: add kafka event listener (#87)
* feat: add kafka event listener * wrap up * nit * update * Nit * wrap up * nit: visibility * nit: docs * update * pay more attention next time :) * nit * update * update
1 parent a81cc48 commit 7d5f42f

File tree

6 files changed

+394
-2
lines changed

6 files changed

+394
-2
lines changed

kafka-event-listener/build.gradle.kts

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
plugins {
2+
`java-library`
3+
jacoco
4+
id("org.hypertrace.publish-plugin")
5+
id("org.hypertrace.jacoco-report-plugin")
6+
}
7+
8+
dependencies {
9+
annotationProcessor("org.projectlombok:lombok:1.18.26")
10+
compileOnly("org.projectlombok:lombok:1.18.26")
11+
12+
api(platform(project(":kafka-bom")))
13+
api("org.apache.kafka:kafka-clients")
14+
15+
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.62")
16+
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
17+
testImplementation("org.mockito:mockito-core:5.2.0")
18+
testImplementation("com.github.ben-manes.caffeine:caffeine:3.1.8")
19+
}
20+
21+
tasks.test {
22+
useJUnitPlatform()
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package org.hypertrace.core.kafka.event.listener;
2+
3+
import com.typesafe.config.Config;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.Future;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.function.BiConsumer;
11+
import org.apache.kafka.clients.consumer.Consumer;
12+
import org.apache.kafka.common.serialization.Deserializer;
13+
14+
/**
15+
* KafkaLiveEventListener consumes events produced to a single Kafka Topic from its initialisation
16+
* and on every message invokes provided callbacks. The thread safety of callback method must be
17+
* ensured by provider. It is important to note that there is no guarantee that all messages on
18+
* topic are consumed by this listener as every time we create a listener we consume from latest
19+
* offsets by design.
20+
*
21+
* <p>Typical usage of this listener is to back the remote caches to have lower latency of refresh
22+
* by generating respective information on kafka topics
23+
*/
24+
public class KafkaLiveEventListener<K, V> implements AutoCloseable {
25+
private final Future<Void> kafkaLiveEventListenerCallableFuture;
26+
private final ExecutorService executorService;
27+
private final boolean cleanupExecutor;
28+
29+
private KafkaLiveEventListener(
30+
KafkaLiveEventListenerCallable<K, V> kafkaLiveEventListenerCallable,
31+
ExecutorService executorService,
32+
boolean cleanupExecutor) {
33+
this.executorService = executorService;
34+
this.cleanupExecutor = cleanupExecutor;
35+
this.kafkaLiveEventListenerCallableFuture =
36+
executorService.submit(kafkaLiveEventListenerCallable);
37+
}
38+
39+
@Override
40+
public void close() throws Exception {
41+
kafkaLiveEventListenerCallableFuture.cancel(true);
42+
if (cleanupExecutor) {
43+
executorService.shutdown();
44+
executorService.awaitTermination(10, TimeUnit.SECONDS);
45+
}
46+
}
47+
48+
public static final class Builder<K, V> {
49+
List<BiConsumer<? super K, ? super V>> callbacks = new ArrayList<>();
50+
ExecutorService executorService = Executors.newSingleThreadExecutor();
51+
boolean cleanupExecutor =
52+
true; // if builder creates executor shutdown executor while closing event listener
53+
54+
public Builder<K, V> registerCallback(BiConsumer<? super K, ? super V> callbackFunction) {
55+
callbacks.add(callbackFunction);
56+
return this;
57+
}
58+
59+
public Builder<K, V> withExecutorService(
60+
ExecutorService executorService, boolean cleanupExecutor) {
61+
this.executorService = executorService;
62+
this.cleanupExecutor = cleanupExecutor;
63+
return this;
64+
}
65+
66+
public KafkaLiveEventListener<K, V> build(
67+
String consumerName, Config kafkaConfig, Consumer<K, V> kafkaConsumer) {
68+
assertCallbacksPresent();
69+
return new KafkaLiveEventListener<>(
70+
new KafkaLiveEventListenerCallable<>(consumerName, kafkaConfig, kafkaConsumer, callbacks),
71+
executorService,
72+
cleanupExecutor);
73+
}
74+
75+
public KafkaLiveEventListener<K, V> build(
76+
String consumerName,
77+
Config kafkaConfig,
78+
Deserializer<K> keyDeserializer,
79+
Deserializer<V> valueDeserializer) {
80+
assertCallbacksPresent();
81+
return new KafkaLiveEventListener<>(
82+
new KafkaLiveEventListenerCallable<>(
83+
consumerName, kafkaConfig, keyDeserializer, valueDeserializer, callbacks),
84+
executorService,
85+
cleanupExecutor);
86+
}
87+
88+
private void assertCallbacksPresent() {
89+
if (callbacks.isEmpty()) {
90+
throw new IllegalArgumentException("no call backs are provided to KafkaLiveEventListener");
91+
}
92+
}
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package org.hypertrace.core.kafka.event.listener;
2+
3+
import com.typesafe.config.Config;
4+
import com.typesafe.config.ConfigFactory;
5+
import com.typesafe.config.ConfigValue;
6+
import io.micrometer.core.instrument.Counter;
7+
import java.time.Duration;
8+
import java.util.Collections;
9+
import java.util.HashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Map.Entry;
13+
import java.util.Properties;
14+
import java.util.Set;
15+
import java.util.concurrent.Callable;
16+
import java.util.function.BiConsumer;
17+
import java.util.stream.Collectors;
18+
import lombok.extern.slf4j.Slf4j;
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecords;
22+
import org.apache.kafka.clients.consumer.KafkaConsumer;
23+
import org.apache.kafka.common.PartitionInfo;
24+
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.errors.InterruptException;
26+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
27+
import org.apache.kafka.common.serialization.Deserializer;
28+
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
29+
30+
@Slf4j
31+
class KafkaLiveEventListenerCallable<K, V> implements Callable<Void> {
32+
private static final String EVENT_CONSUMER_ERROR_COUNT = "event.consumer.error.count";
33+
private static final String TOPIC_NAME = "topic.name";
34+
private static final String POLL_TIMEOUT = "poll.timeout";
35+
private final List<TopicPartition> topicPartitions;
36+
private final Consumer<K, V> kafkaConsumer;
37+
private final Duration pollTimeout;
38+
private final Counter errorCounter;
39+
private final List<BiConsumer<? super K, ? super V>> callbacks;
40+
41+
KafkaLiveEventListenerCallable(
42+
String consumerName,
43+
Config kafkaConfig,
44+
Deserializer<K> keyDeserializer,
45+
Deserializer<V> valueDeserializer,
46+
List<BiConsumer<? super K, ? super V>> callbacks) {
47+
this(
48+
consumerName,
49+
kafkaConfig,
50+
new KafkaConsumer<>(
51+
getKafkaConsumerConfigs(kafkaConfig.withFallback(getDefaultKafkaConsumerConfigs())),
52+
keyDeserializer,
53+
valueDeserializer),
54+
callbacks);
55+
}
56+
57+
KafkaLiveEventListenerCallable(
58+
String consumerName,
59+
Config kafkaConfig,
60+
Consumer<K, V> kafkaConsumer,
61+
List<BiConsumer<? super K, ? super V>> callbacks) {
62+
this.callbacks = callbacks;
63+
this.pollTimeout =
64+
kafkaConfig.hasPath(POLL_TIMEOUT)
65+
? kafkaConfig.getDuration(POLL_TIMEOUT)
66+
: Duration.ofSeconds(30);
67+
String topic = kafkaConfig.getString(TOPIC_NAME);
68+
this.kafkaConsumer = kafkaConsumer;
69+
// fetch partitions and seek to end of partitions to consume live events
70+
List<PartitionInfo> partitions = kafkaConsumer.partitionsFor(topic);
71+
topicPartitions =
72+
partitions.stream()
73+
.map(p -> new TopicPartition(p.topic(), p.partition()))
74+
.collect(Collectors.toList());
75+
kafkaConsumer.assign(topicPartitions);
76+
kafkaConsumer.seekToEnd(topicPartitions);
77+
this.errorCounter =
78+
PlatformMetricsRegistry.registerCounter(
79+
consumerName + "." + EVENT_CONSUMER_ERROR_COUNT, Collections.emptyMap());
80+
}
81+
82+
@Override
83+
public Void call() {
84+
do {
85+
try {
86+
ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeout);
87+
records.forEach(
88+
r -> this.callbacks.forEach(callback -> callback.accept(r.key(), r.value())));
89+
if (log.isDebugEnabled()) {
90+
for (TopicPartition partition : topicPartitions) {
91+
long position = kafkaConsumer.position(partition);
92+
log.debug(
93+
"Consumer state topic: {}, partition:{}, offset: {}",
94+
partition.topic(),
95+
partition.partition(),
96+
position);
97+
}
98+
}
99+
} catch (InterruptException interruptedException) {
100+
log.warn("Received interrupt exception from kafka poll ", interruptedException);
101+
kafkaConsumer.close();
102+
return null;
103+
} catch (Exception ex) {
104+
this.errorCounter.increment();
105+
log.error("Consumer Error ", ex);
106+
}
107+
108+
} while (true);
109+
}
110+
111+
private static Config getDefaultKafkaConsumerConfigs() {
112+
Map<String, String> defaultKafkaConsumerConfigMap = new HashMap<>();
113+
defaultKafkaConsumerConfigMap.put(
114+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
115+
defaultKafkaConsumerConfigMap.put(
116+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
117+
defaultKafkaConsumerConfigMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
118+
defaultKafkaConsumerConfigMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
119+
return ConfigFactory.parseMap(defaultKafkaConsumerConfigMap);
120+
}
121+
122+
private static Properties getKafkaConsumerConfigs(Config configs) {
123+
Map<String, String> configMap = new HashMap<>();
124+
Set<Entry<String, ConfigValue>> entries = configs.entrySet();
125+
for (Entry<String, ConfigValue> entry : entries) {
126+
String key = entry.getKey();
127+
configMap.put(key, configs.getString(key));
128+
}
129+
Properties props = new Properties();
130+
props.putAll(configMap);
131+
return props;
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package org.hypertrace.core.kafka.event.listener;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertFalse;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
import static org.mockito.Mockito.mock;
7+
8+
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
9+
import com.github.benmanes.caffeine.cache.Caffeine;
10+
import com.typesafe.config.Config;
11+
import com.typesafe.config.ConfigException;
12+
import com.typesafe.config.ConfigFactory;
13+
import java.time.Duration;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.CompletableFuture;
18+
import java.util.concurrent.TimeUnit;
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.clients.consumer.MockConsumer;
22+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
23+
import org.apache.kafka.common.Node;
24+
import org.apache.kafka.common.PartitionInfo;
25+
import org.apache.kafka.common.TopicPartition;
26+
import org.junit.jupiter.api.Test;
27+
28+
class KafkaLiveEventListenerTest {
29+
30+
@Test
31+
void testThrowOnInvalidInputs() {
32+
// no callback
33+
assertThrows(
34+
IllegalArgumentException.class,
35+
() ->
36+
new KafkaLiveEventListener.Builder<String, Long>()
37+
.build(
38+
"",
39+
ConfigFactory.parseMap(Map.of("topic.name", "")),
40+
new MockConsumer<>(OffsetResetStrategy.LATEST)));
41+
// no topic name
42+
assertThrows(
43+
ConfigException.class,
44+
() ->
45+
new KafkaLiveEventListener.Builder<String, Long>()
46+
.registerCallback((String k, Long v) -> System.out.println(k + ":" + v))
47+
.build("", ConfigFactory.empty(), new MockConsumer<>(OffsetResetStrategy.LATEST)));
48+
}
49+
50+
@Test
51+
void testEventModificationCache() throws Exception {
52+
// kafka consumer mock setup
53+
MockConsumer<String, Long> kafkaConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
54+
String topic = "event-update-topic";
55+
kafkaConsumer.updatePartitions(
56+
topic,
57+
List.of(
58+
getPartitionInfo(topic, 0),
59+
getPartitionInfo(topic, 1),
60+
getPartitionInfo(topic, 2),
61+
getPartitionInfo(topic, 3)));
62+
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
63+
endOffsets.put(new TopicPartition(topic, 0), 50L);
64+
endOffsets.put(new TopicPartition(topic, 1), 50L);
65+
endOffsets.put(new TopicPartition(topic, 2), 50L);
66+
endOffsets.put(new TopicPartition(topic, 3), 50L);
67+
kafkaConsumer.updateEndOffsets(endOffsets);
68+
// create instance of event modification cache consuming from this consumer
69+
EventModificationCache eventModificationCache =
70+
new EventModificationCache(
71+
"modification-event-consumer",
72+
ConfigFactory.parseMap(Map.of("topic.name", topic, "poll.timeout", "5ms")),
73+
kafkaConsumer);
74+
Thread.sleep(10);
75+
assertEquals(10L, eventModificationCache.get(10));
76+
assertEquals(100L, eventModificationCache.get(100));
77+
// not present key won't trigger any population but callback function should be called
78+
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 0, 100, "32", 89L));
79+
Thread.sleep(100);
80+
assertFalse(eventModificationCache.hasKey(32));
81+
// existing key will be modified based on entry
82+
kafkaConsumer.addRecord(new ConsumerRecord<>(topic, 3, 200, "10", -3L));
83+
Thread.sleep(100);
84+
assertEquals(-3L, eventModificationCache.get(10));
85+
eventModificationCache.close();
86+
}
87+
88+
private PartitionInfo getPartitionInfo(String topic, int partition) {
89+
return new PartitionInfo(topic, partition, mock(Node.class), new Node[0], new Node[0]);
90+
}
91+
92+
static class EventModificationCache {
93+
private final AsyncLoadingCache<Integer, Long> cache;
94+
private final KafkaLiveEventListener<String, Long> eventListener;
95+
96+
EventModificationCache(
97+
String consumerName, Config kafkaConfig, Consumer<String, Long> consumer) {
98+
cache =
99+
Caffeine.newBuilder()
100+
.maximumSize(10_000)
101+
.expireAfterAccess(Duration.ofHours(6))
102+
.refreshAfterWrite(Duration.ofHours(1))
103+
.buildAsync(this::load);
104+
eventListener =
105+
new KafkaLiveEventListener.Builder<String, Long>()
106+
.registerCallback(this::actOnEvent)
107+
.registerCallback(this::log)
108+
.build(consumerName, kafkaConfig, consumer);
109+
}
110+
111+
public void close() throws Exception {
112+
eventListener.close();
113+
}
114+
115+
long get(int key) throws Exception {
116+
return cache.get(key).get(10, TimeUnit.SECONDS);
117+
}
118+
119+
boolean hasKey(int key) {
120+
return cache.asMap().containsKey(key);
121+
}
122+
123+
private Long load(Integer key) {
124+
// ideally this will be remote call, just for sake of dummy test we returned a cast
125+
return (long) (key);
126+
}
127+
128+
public void actOnEvent(String eventKey, Long eventValue) {
129+
int key = Integer.parseInt(eventKey);
130+
if (cache.asMap().containsKey(key)) {
131+
long value = eventValue;
132+
cache.put(key, CompletableFuture.completedFuture(value));
133+
}
134+
}
135+
136+
// just a dummy logger to showcase multiple callbacks
137+
public void log(String eventKey, Long eventValue) {
138+
System.out.println("updated cache with event data from topic " + eventKey + ":" + eventValue);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)