Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void shouldProcessRecordsWithProcessor() throws Exception {
final var partition = new TopicPartition(TOPIC, PARTITION);
final var commandQueue = new ConcurrentLinkedQueue<ConsumerCommand>();
final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value"));
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));
final var functionalConsumer = new TestableKPipeConsumer<>(
properties,
TOPIC,
Expand Down Expand Up @@ -132,7 +132,7 @@ void shouldHandleProcessorExceptions() throws Exception {
// Create mock records
final var partition = new TopicPartition(TOPIC, PARTITION);
final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value"));
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));
final var functionalConsumer = new TestableKPipeConsumer<>(
properties,
TOPIC,
Expand Down Expand Up @@ -199,7 +199,7 @@ void shouldRetryProcessingOnFailureUpToMaxRetries() throws Exception {
final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value");
final var partition = new TopicPartition(TOPIC, PARTITION);
final var recordsList = List.of(record);
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));
final var functionalConsumer = new TestableKPipeConsumer<>(
properties,
TOPIC,
Expand Down Expand Up @@ -249,7 +249,7 @@ void shouldNotRetryWhenMaxRetriesIsZero() throws Exception {
final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value");
final var partition = new TopicPartition(TOPIC, PARTITION);
final var recordsList = List.of(record);
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));

// Create a consumer with no retries
final var functionalConsumer = new TestableKPipeConsumer<>(
Expand Down Expand Up @@ -391,7 +391,7 @@ void shouldUpdateMetricsOnSuccessfulProcessing() throws Exception {
// Create mock records
var partition = new TopicPartition(TOPIC, PARTITION);
var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"));
var records = new ConsumerRecords<>(Map.of(partition, recordsList));
var records = consumerRecords(Map.of(partition, recordsList));

final var functionalConsumer = new TestableKPipeConsumer<>(
properties,
Expand Down Expand Up @@ -438,7 +438,7 @@ void shouldUpdateMetricsOnProcessingError() throws Exception {
// Create mock records
final var partition = new TopicPartition(TOPIC, PARTITION);
final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"));
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));
final var functionalConsumer = new TestableKPipeConsumer<>(
properties,
TOPIC,
Expand All @@ -456,6 +456,9 @@ void shouldUpdateMetricsOnProcessingError() throws Exception {

assertTrue(latch.await(1, TimeUnit.SECONDS), "Processing did not complete in time");

// errorHandler is invoked after processingErrors is incremented in KPipeConsumer.processRecord
verify(errorHandler, timeout(1000)).accept(any());

// Verify metrics
final var metrics = functionalConsumer.getMetrics();
assertEquals(1L, metrics.get("messagesReceived"));
Expand Down Expand Up @@ -539,7 +542,7 @@ void shouldHandleEmptyRecordBatch() {
);

// Create empty records
final var records = new ConsumerRecords<String, String>(Map.of());
final var records = ConsumerRecords.<String, String>empty();

// Process records
functionalConsumer.executeProcessRecords(records);
Expand Down Expand Up @@ -580,7 +583,7 @@ void shouldHandleNullValueInRecord() throws Exception {
// Create record with null value
final var partition = new TopicPartition(TOPIC, PARTITION);
final var recordsList = List.of(new ConsumerRecord<String, String>(TOPIC, PARTITION, 0L, "key", null));
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));

// Process records
functionalConsumer.executeProcessRecords(records);
Expand Down Expand Up @@ -685,7 +688,7 @@ void shouldTrackInFlightMessagesCorrectly() throws Exception {
new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key1", "value1"),
new ConsumerRecord<>(TOPIC, PARTITION, 1L, "key2", "value2")
);
final var records = new ConsumerRecords<>(Map.of(partition, recordsList));
final var records = consumerRecords(Map.of(partition, recordsList));

// Use CountDownLatch to control when processing completes
final var startLatch = new CountDownLatch(2);
Expand Down Expand Up @@ -840,7 +843,7 @@ void shouldMarkOffsetAsProcessedEvenWhenProcessingFails() throws Exception {
// Create a record that will fail processing
final var record = new ConsumerRecord<>(TOPIC, PARTITION, 123L, "key", "value");
final var partition = new TopicPartition(TOPIC, PARTITION);
final var records = new ConsumerRecords<>(Map.of(partition, List.of(record)));
final var records = consumerRecords(Map.of(partition, List.of(record)));

// Process record
functionalConsumer.executeProcessRecords(records);
Expand Down Expand Up @@ -1036,7 +1039,7 @@ void shouldProcessRecordsConcurrently() throws Exception {

// Create consumer records
final var recordsMap = Map.of(new TopicPartition(TOPIC, PARTITION), records);
final var consumerRecords = new ConsumerRecords<>(recordsMap);
final var consumerRecords = consumerRecords(recordsMap);

// Process records
functionalConsumer.executeProcessRecords(consumerRecords);
Expand All @@ -1048,6 +1051,13 @@ void shouldProcessRecordsConcurrently() throws Exception {
assertTrue(maxConcurrent.get() > 1, "Records should be processed concurrently");
}

private static <K, V> ConsumerRecords<K, V> consumerRecords(
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records
) {
// Kafka 4.2 deprecates the single-arg constructor; provide explicit nextOffsets.
return new ConsumerRecords<>(records, Map.of());
}

public static class TestableKPipeConsumer<K, V> extends KPipeConsumer<K, V> {

private static final String METRIC_MESSAGES_RECEIVED = "messagesReceived";
Expand Down
3 changes: 2 additions & 1 deletion lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ void sequentialProcessingShouldProcessInOrder() {
Map.of(
new TopicPartition(TOPIC, 0),
List.of(createRecord(0, "k1", "v1"), createRecord(1, "k2", "v2"), createRecord(2, "k3", "v3"))
)
),
Map.of()
);

// Act
Expand Down