Skip to content

Commit 37ea756

Browse files
committed
add IQ integration test
1 parent 2253dc2 commit 37ea756

File tree

2 files changed

+298
-7
lines changed

2 files changed

+298
-7
lines changed
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Copyright 2024 Responsive Computing, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.responsive.kafka.integration;
18+
19+
import static dev.responsive.kafka.api.config.ResponsiveConfig.STORE_FLUSH_RECORDS_TRIGGER_CONFIG;
20+
import static dev.responsive.kafka.testutils.IntegrationTestUtils.createTopicsAndWait;
21+
import static dev.responsive.kafka.testutils.IntegrationTestUtils.getStore;
22+
import static dev.responsive.kafka.testutils.IntegrationTestUtils.pipeRecords;
23+
import static dev.responsive.kafka.testutils.IntegrationTestUtils.readOutput;
24+
import static dev.responsive.kafka.testutils.IntegrationTestUtils.startAppAndAwaitRunning;
25+
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
26+
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
27+
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
28+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
29+
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
30+
import static org.apache.kafka.streams.StreamsConfig.COMMIT_INTERVAL_MS_CONFIG;
31+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
32+
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
33+
import static org.apache.kafka.streams.StreamsConfig.NUM_STREAM_THREADS_CONFIG;
34+
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
35+
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
36+
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
37+
import static org.apache.kafka.streams.state.QueryableStoreTypes.timestampedKeyValueStore;
38+
import static org.hamcrest.MatcherAssert.assertThat;
39+
import static org.hamcrest.Matchers.equalTo;
40+
41+
import dev.responsive.kafka.api.ResponsiveKafkaStreams;
42+
import dev.responsive.kafka.api.config.StorageBackend;
43+
import dev.responsive.kafka.api.stores.ResponsiveKeyValueParams;
44+
import dev.responsive.kafka.api.stores.ResponsiveStores;
45+
import dev.responsive.kafka.testutils.KeyValueTimestamp;
46+
import dev.responsive.kafka.testutils.ResponsiveConfigParam;
47+
import dev.responsive.kafka.testutils.ResponsiveExtension;
48+
import java.time.Duration;
49+
import java.util.Arrays;
50+
import java.util.Collections;
51+
import java.util.HashMap;
52+
import java.util.List;
53+
import java.util.Map;
54+
import java.util.Set;
55+
import org.apache.kafka.clients.admin.Admin;
56+
import org.apache.kafka.clients.consumer.ConsumerConfig;
57+
import org.apache.kafka.clients.producer.KafkaProducer;
58+
import org.apache.kafka.common.serialization.StringDeserializer;
59+
import org.apache.kafka.common.serialization.StringSerializer;
60+
import org.apache.kafka.common.serialization.Serdes;
61+
import org.apache.kafka.streams.StreamsBuilder;
62+
import org.apache.kafka.streams.kstream.KStream;
63+
import org.apache.kafka.streams.processor.api.Processor;
64+
import org.apache.kafka.streams.processor.api.ProcessorContext;
65+
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
66+
import org.apache.kafka.streams.processor.api.Record;
67+
import org.apache.kafka.streams.state.StoreBuilder;
68+
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
69+
import org.apache.kafka.streams.state.ValueAndTimestamp;
70+
import org.junit.jupiter.api.AfterEach;
71+
import org.junit.jupiter.api.BeforeEach;
72+
import org.junit.jupiter.api.Test;
73+
import org.junit.jupiter.api.TestInfo;
74+
import org.junit.jupiter.api.extension.RegisterExtension;
75+
76+
public class InteractiveQueriesIntegrationTest {
77+
78+
@RegisterExtension
79+
static ResponsiveExtension EXTENSION = new ResponsiveExtension(StorageBackend.MONGO_DB);
80+
81+
private static final String INPUT_TOPIC = "input";
82+
private static final String OUTPUT_TOPIC = "output";
83+
84+
private final Map<String, Object> responsiveProps = new HashMap<>();
85+
86+
private String name;
87+
private Admin admin;
88+
89+
@BeforeEach
90+
public void before(
91+
final TestInfo info,
92+
final Admin admin,
93+
@ResponsiveConfigParam final Map<String, Object> responsiveProps
94+
) {
95+
// add displayName to name to account for parameterized tests
96+
name = info.getDisplayName().replace("()", "");
97+
98+
this.responsiveProps.putAll(responsiveProps);
99+
100+
this.admin = admin;
101+
createTopicsAndWait(admin, Map.of(inputTopic(), 2, outputTopic(), 1));
102+
}
103+
104+
@AfterEach
105+
public void after() {
106+
admin.deleteTopics(List.of(inputTopic(), outputTopic()));
107+
}
108+
109+
private String inputTopic() {
110+
return name + "." + INPUT_TOPIC;
111+
}
112+
113+
private String outputTopic() {
114+
return name + "." + OUTPUT_TOPIC;
115+
}
116+
117+
@Test
118+
public void shouldReadLatestValueAndTimestampFromKVStoreWithIQv1() throws Exception {
119+
// Given:
120+
final Map<String, Object> properties = getMutableProperties();
121+
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
122+
try (final ResponsiveKafkaStreams streams = buildKVStoreApp(properties)) {
123+
startAppAndAwaitRunning(Duration.ofSeconds(15), streams);
124+
125+
final List<KeyValueTimestamp<String, String>> input = Arrays.asList(
126+
new KeyValueTimestamp<>("A", "a", 1L),
127+
new KeyValueTimestamp<>("B", "b", 2L),
128+
new KeyValueTimestamp<>("C", "c", 3L)
129+
);
130+
131+
// When:
132+
pipeRecords(producer, inputTopic(), input);
133+
134+
// await output to make sure we've processed all the input so far
135+
long readOffset = 0L;
136+
int outputCount = 3;
137+
final var output = readOutput(outputTopic(), readOffset, outputCount, true, properties);
138+
assertThat(output.size(), equalTo(outputCount));
139+
140+
final var store = getStore(streams, kvStoreName(), timestampedKeyValueStore());
141+
142+
// Then:
143+
assertThat(store.get("A"), equalTo(ValueAndTimestamp.make("a", 1L)));
144+
assertThat(store.get("B"), equalTo(ValueAndTimestamp.make("b", 2L)));
145+
assertThat(store.get("C"), equalTo(ValueAndTimestamp.make("c", 3L)));
146+
147+
// And Given:
148+
final List<KeyValueTimestamp<String, String>> input2 = Arrays.asList(
149+
new KeyValueTimestamp<>("A", "x", 4L),
150+
new KeyValueTimestamp<>("B", "y", 5L),
151+
new KeyValueTimestamp<>("C", "z", 6L)
152+
);
153+
154+
// When:
155+
pipeRecords(producer, inputTopic(), input2);
156+
157+
readOffset = 3L;
158+
final var output2 = readOutput(outputTopic(), readOffset, outputCount, true, properties);
159+
assertThat(output2.size(), equalTo(outputCount));
160+
161+
// Then:
162+
assertThat(store.get("A"), equalTo(ValueAndTimestamp.make("ax", 4L)));
163+
assertThat(store.get("B"), equalTo(ValueAndTimestamp.make("by", 5L)));
164+
assertThat(store.get("C"), equalTo(ValueAndTimestamp.make("cz", 6L)));
165+
}
166+
}
167+
168+
private ResponsiveKafkaStreams buildKVStoreApp(final Map<String, Object> properties) {
169+
final StreamsBuilder builder = new StreamsBuilder();
170+
171+
final KStream<String, String> input = builder.stream(inputTopic());
172+
final StoreBuilder<TimestampedKeyValueStore<String, String>> storeBuilder =
173+
ResponsiveStores.timestampedKeyValueStoreBuilder(
174+
ResponsiveStores.keyValueStore(ResponsiveKeyValueParams.keyValue(kvStoreName())),
175+
Serdes.String(),
176+
Serdes.String());
177+
input
178+
.process(new MyProcessorSupplier(storeBuilder), kvStoreName())
179+
.to(outputTopic());
180+
181+
return new ResponsiveKafkaStreams(builder.build(), properties);
182+
}
183+
184+
private static class TimestampAppendingProcessor
185+
implements Processor<String, String, String, String> {
186+
187+
private final String storeName;
188+
189+
private ProcessorContext<String, String> context;
190+
private TimestampedKeyValueStore<String, String> store;
191+
192+
public TimestampAppendingProcessor(final String storeName) {
193+
this.storeName = storeName;
194+
}
195+
196+
@Override
197+
public void init(final ProcessorContext<String, String> context) {
198+
Processor.super.init(context);
199+
this.context = context;
200+
this.store = context.getStateStore(storeName);
201+
}
202+
203+
@Override
204+
public void process(final Record<String, String> record) {
205+
final ValueAndTimestamp<String> oldValue = store.get(record.key());
206+
final String newValue = oldValue == null
207+
? record.value()
208+
: oldValue.value() + record.value();
209+
210+
store.put(record.key(), ValueAndTimestamp.make(newValue, record.timestamp()));
211+
context.forward(new Record<>(record.key(), newValue, record.timestamp()));
212+
}
213+
}
214+
215+
private static class MyProcessorSupplier
216+
implements ProcessorSupplier<String, String, String, String> {
217+
218+
private final StoreBuilder<?> storeBuilder;
219+
220+
public MyProcessorSupplier(final StoreBuilder<?> storeBuilder) {
221+
this.storeBuilder = storeBuilder;
222+
}
223+
224+
@Override
225+
public Processor<String, String, String, String> get() {
226+
return new TimestampAppendingProcessor(storeBuilder.name());
227+
}
228+
229+
@Override
230+
public Set<StoreBuilder<?>> stores() {
231+
return Collections.singleton(storeBuilder);
232+
}
233+
}
234+
235+
private String kvStoreName() {
236+
return name + "-kv-store";
237+
}
238+
239+
private Map<String, Object> getMutableProperties() {
240+
final Map<String, Object> properties = new HashMap<>(responsiveProps);
241+
242+
properties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
243+
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
244+
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
245+
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
246+
247+
properties.put(APPLICATION_ID_CONFIG, name);
248+
properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
249+
properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
250+
properties.put(NUM_STREAM_THREADS_CONFIG, 1);
251+
properties.put(STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
252+
properties.put(STORE_FLUSH_RECORDS_TRIGGER_CONFIG, 1);
253+
properties.put(COMMIT_INTERVAL_MS_CONFIG, 1);
254+
255+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
256+
properties.put(consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
257+
properties.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
258+
259+
return properties;
260+
}
261+
262+
}

kafka-client/src/test/java/dev/responsive/kafka/testutils/IntegrationTestUtils.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.Supplier;
2626
import org.apache.kafka.clients.admin.Admin;
2727
import org.apache.kafka.clients.admin.NewTopic;
28+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2829
import org.apache.kafka.clients.consumer.ConsumerRecord;
2930
import org.apache.kafka.clients.consumer.ConsumerRecords;
3031
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -37,7 +38,10 @@
3738
import org.apache.kafka.streams.KafkaStreams.State;
3839
import org.apache.kafka.streams.KafkaStreams.StateListener;
3940
import org.apache.kafka.streams.KeyValue;
41+
import org.apache.kafka.streams.StoreQueryParameters;
4042
import org.apache.kafka.streams.Topology;
43+
import org.apache.kafka.streams.errors.InvalidStateStoreException;
44+
import org.apache.kafka.streams.state.QueryableStoreType;
4145
import org.junit.jupiter.api.TestInfo;
4246

4347
public final class IntegrationTestUtils {
@@ -216,10 +220,10 @@ public static <K, V> void pipeRecords(
216220
}
217221

218222

219-
public static void awaitOutput(
223+
public static <K, V> void awaitOutput(
220224
final String topic,
221225
final long from,
222-
final Set<KeyValue<Long, Long>> expected,
226+
final Set<KeyValue<K, V>> expected,
223227
final boolean readUncommitted,
224228
final Map<String, Object> originals
225229
) throws TimeoutException {
@@ -230,15 +234,15 @@ public static void awaitOutput(
230234

231235
final var allSeen = new HashSet<>();
232236
final var notYetSeen = new HashSet<>(expected);
233-
try (final KafkaConsumer<Long, Long> consumer = new KafkaConsumer<>(properties)) {
237+
try (final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
234238
final TopicPartition output = new TopicPartition(topic, 0);
235239
consumer.assign(List.of(output));
236240
consumer.seek(output, from);
237241

238242
final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
239243
while (!notYetSeen.isEmpty()) {
240-
final ConsumerRecords<Long, Long> polled = consumer.poll(Duration.ofSeconds(30));
241-
for (ConsumerRecord<Long, Long> rec : polled) {
244+
final ConsumerRecords<K, V> polled = consumer.poll(Duration.ofSeconds(30));
245+
for (ConsumerRecord<K, V> rec : polled) {
242246
final var kv = new KeyValue<>(rec.key(), rec.value());
243247
notYetSeen.remove(kv);
244248
allSeen.add(kv);
@@ -264,6 +268,10 @@ public static <K, V> List<KeyValue<K, V>> readOutput(
264268
? IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)
265269
: IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
266270

271+
// configured to only poll one record at a time so we can
272+
// guarantee we won't accidentally poll more than numEvents
273+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
274+
267275
try (final KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
268276
final TopicPartition output = new TopicPartition(topic, 0);
269277
consumer.assign(List.of(output));
@@ -272,8 +280,6 @@ public static <K, V> List<KeyValue<K, V>> readOutput(
272280
final long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
273281
final List<KeyValue<K, V>> result = new ArrayList<>();
274282
while (result.size() < numEvents) {
275-
// this is configured to only poll one record at a time, so we
276-
// can guarantee we won't accidentally poll more than numEvents
277283
final ConsumerRecords<K, V> polled = consumer.poll(Duration.ofSeconds(30));
278284
for (ConsumerRecord<K, V> rec : polled) {
279285
result.add(new KeyValue<>(rec.key(), rec.value()));
@@ -334,6 +340,29 @@ public static void startAppAndAwaitRunning(
334340
}
335341
}
336342

343+
public static <S> S getStore(
344+
final KafkaStreams streams,
345+
final String storeName,
346+
final QueryableStoreType<S> storeType
347+
) throws Exception {
348+
final Duration defaultWaitTime = Duration.ofSeconds(10L);
349+
final long deadline = System.currentTimeMillis() + defaultWaitTime.toMillis();
350+
while (true) {
351+
try {
352+
return streams.store(StoreQueryParameters.fromNameAndType(storeName, storeType));
353+
} catch (final InvalidStateStoreException e) {
354+
if (System.currentTimeMillis() > deadline) {
355+
throw e;
356+
}
357+
} catch (final Exception e) {
358+
if (System.currentTimeMillis() > deadline) {
359+
throw new AssertionError(e);
360+
}
361+
}
362+
Thread.sleep(100L);
363+
}
364+
}
365+
337366
private IntegrationTestUtils() {
338367
}
339368

0 commit comments

Comments
 (0)