diff --git a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java index 233ea20..21b00c6 100644 --- a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java @@ -83,7 +83,7 @@ void shouldProcessRecordsWithProcessor() throws Exception { final var partition = new TopicPartition(TOPIC, PARTITION); final var commandQueue = new ConcurrentLinkedQueue(); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -132,7 +132,7 @@ void shouldHandleProcessorExceptions() throws Exception { // Create mock records final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -199,7 +199,7 @@ void shouldRetryProcessingOnFailureUpToMaxRetries() throws Exception { final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(record); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -249,7 +249,7 @@ void shouldNotRetryWhenMaxRetriesIsZero() throws Exception { final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(record); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); // Create a consumer with no retries final var functionalConsumer = new TestableKPipeConsumer<>( @@ -391,7 +391,7 @@ void shouldUpdateMetricsOnSuccessfulProcessing() throws Exception { // Create mock records var partition = new TopicPartition(TOPIC, PARTITION); var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); - var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + var records = consumerRecords(Map.of(partition, recordsList)); final var functionalConsumer = new TestableKPipeConsumer<>( properties, @@ -438,7 +438,7 @@ void shouldUpdateMetricsOnProcessingError() throws Exception { // Create mock records final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -456,6 +456,9 @@ void shouldUpdateMetricsOnProcessingError() throws Exception { assertTrue(latch.await(1, TimeUnit.SECONDS), "Processing did not complete in time"); + // errorHandler is invoked after processingErrors is incremented in KPipeConsumer.processRecord + verify(errorHandler, timeout(1000)).accept(any()); + // Verify metrics final var metrics = functionalConsumer.getMetrics(); assertEquals(1L, metrics.get("messagesReceived")); @@ -539,7 +542,7 @@ void shouldHandleEmptyRecordBatch() { ); // Create empty records - final var records = new ConsumerRecords(Map.of()); + final var records = ConsumerRecords.empty(); // Process records functionalConsumer.executeProcessRecords(records); @@ -580,7 +583,7 @@ void shouldHandleNullValueInRecord() throws Exception { // Create record with null value final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord(TOPIC, PARTITION, 0L, "key", null)); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); // Process records functionalConsumer.executeProcessRecords(records); @@ -685,7 +688,7 @@ void shouldTrackInFlightMessagesCorrectly() throws Exception { new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key1", "value1"), new ConsumerRecord<>(TOPIC, PARTITION, 1L, "key2", "value2") ); - final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); + final var records = consumerRecords(Map.of(partition, recordsList)); // Use CountDownLatch to control when processing completes final var startLatch = new CountDownLatch(2); @@ -840,7 +843,7 @@ void shouldMarkOffsetAsProcessedEvenWhenProcessingFails() throws Exception { // Create a record that will fail processing final var record = new ConsumerRecord<>(TOPIC, PARTITION, 123L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); - final var records = new ConsumerRecords<>(Map.of(partition, List.of(record))); + final var records = consumerRecords(Map.of(partition, List.of(record))); // Process record functionalConsumer.executeProcessRecords(records); @@ -1036,7 +1039,7 @@ void shouldProcessRecordsConcurrently() throws Exception { // Create consumer records final var recordsMap = Map.of(new TopicPartition(TOPIC, PARTITION), records); - final var consumerRecords = new ConsumerRecords<>(recordsMap); + final var consumerRecords = consumerRecords(recordsMap); // Process records functionalConsumer.executeProcessRecords(consumerRecords); @@ -1048,6 +1051,13 @@ void shouldProcessRecordsConcurrently() throws Exception { assertTrue(maxConcurrent.get() > 1, "Records should be processed concurrently"); } + private static ConsumerRecords consumerRecords( + final Map>> records + ) { + // Kafka 4.2 deprecates the single-arg constructor; provide explicit nextOffsets. + return new ConsumerRecords<>(records, Map.of()); + } + public static class TestableKPipeConsumer extends KPipeConsumer { private static final String METRIC_MESSAGES_RECEIVED = "messagesReceived"; diff --git a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java index ea1120b..22a6382 100644 --- a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java @@ -385,7 +385,8 @@ void sequentialProcessingShouldProcessInOrder() { Map.of( new TopicPartition(TOPIC, 0), List.of(createRecord(0, "k1", "v1"), createRecord(1, "k2", "v2"), createRecord(2, "k3", "v3")) - ) + ), + Map.of() ); // Act