diff --git a/README.md b/README.md index c0ce623..189b13b 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,7 @@ retries, built-in metrics, and support for both parallel and sequential processi - Message processors are **pure functions** (e.g., `UnaryOperator>` for JSON or `UnaryOperator` for Avro) that transform data without side effects. - Build complex, high-performance pipelines that minimize serialization/deserialization cycles. -- **Declarative processing** lets you describe *what* to do, not *how* to do it. -- **Optimized Pipelines**: Deserialization happens once at the start, and serialization happens once at the end, +- **Single SerDe Cycle**: Deserialization happens once at the start, and serialization happens once at the end, regardless of the number of transformations. - Teams can **register their own operators** in a central registry via: @@ -36,12 +35,10 @@ retries, built-in metrics, and support for both parallel and sequential processi final var registry = new MessageProcessorRegistry("myApp"); // Register team-specific JSON operators - registry.registerJsonOperator("sanitizeData", - JsonMessageProcessor.removeFields("password", "ssn")); + registry.registerJsonOperator("sanitizeData", JsonMessageProcessor.removeFields("password", "ssn")); // Create an optimized JSON pipeline from registered operators - final var pipeline = registry.jsonPipeline( - "sanitizeData", "addTimestamp"); + final var pipeline = registry.jsonPipeline("sanitizeData", "addTimestamp"); // Apply transformations with built-in error handling and retry logic final var consumer = new KPipeConsumer.builder() @@ -116,6 +113,65 @@ retries, built-in metrics, and support for both parallel and sequential processi --- +## Architecture and Reliability + +KPipe is designed to be a lightweight, high-performance alternative to existing Kafka consumer libraries, focusing on +modern Java 25+ features and predictable behavior. + +### 1. The "Single SerDe Cycle" Strategy + +Unlike traditional pipelines that often perform `byte[] -> Object -> byte[]` at every transformation step, KPipe +optimizes for throughput: + +- **Single Deserialization**: Messages are deserialized **once** into a mutable representation (e.g., `Map` for JSON, + `GenericRecord` for Avro). +- **In-Place Transformations**: A chain of `UnaryOperator` functions is applied to the same object. +- **Single Serialization**: The final object is serialized back to `byte[]` only once before being sent to the sink. + +This approach significantly reduces CPU overhead and GC pressure. + +### 2. Virtual Threads + +By default, KPipe uses **Java Virtual Threads** (Project Loom) to process messages. This provides a "thread-per-record" +model that excels at I/O-bound tasks (e.g., enrichment via REST/DB). + +- **I/O Bound**: Use the default parallel mode with virtual threads for maximum concurrency. +- **Sequential**: Use `.withSequentialProcessing(true)` for stateful operations where strict FIFO ordering per + partition is required. + +### 3. Reliable "At-Least-Once" Delivery + +KPipe implements a **Lowest Pending Offset** strategy to ensure reliability even with parallel processing: + +- **In-Flight Tracking**: Every record's offset is tracked in a `ConcurrentSkipListSet` per partition. +- **No-Gap Commits**: Even if message 102 finishes before 101, offset 102 will **not** be committed until 101 is + successfully processed. +- **Crash Recovery**: If the consumer crashes, it will resume from the last committed "safe" offset. While this may + result in some records being re-processed (standard "at-least-once" behavior), it guarantees no message is ever + skipped. + +### 4. Error Handling & Retries + +KPipe provides a robust, multi-layered error handling mechanism: + +- **Built-in Retries**: Configure `.withRetry(maxRetries, backoff)` to automatically retry transient failures. +- **Dead Letter Handling**: Provide a `.withErrorHandler()` to redirect messages that fail after all retries to an + error topic or database. +- **Safe Pipelines**: Use `MessageProcessorRegistry.withErrorHandling()` to wrap individual processors with default + values or logging, preventing a single malformed message from blocking the partition. + +### 5. Graceful Shutdown & Interrupt Handling + +KPipe respects JVM signals and ensures timely shutdown without data loss: + +- **Interrupt Awareness**: Interrupts trigger a coordinated shutdown sequence. They do **not** cause records to be + skipped. +- **Reliable Redelivery**: If a record's processing is interrupted (e.g., during retry backoff or transformation), + the offset is NOT marked as processed. This ensures it will be safely picked up by the next consumer instance, + guaranteeing "at-least-once" delivery even during shutdown. + +--- + ## 📁 Project Structure KPipe is organized into two main modules: @@ -219,18 +275,22 @@ Monitor your consumer with built-in metrics: ```java // Access consumer metrics - Map metrics = consumer.getMetrics(); - System.out.println("Messages received: " + metrics.get("messagesReceived")); - System.out.println("Successfully processed: " + metrics.get("messagesProcessed")); - System.out.println("Processing errors: " + metrics.get("processingErrors")); + final var metrics = consumer.getMetrics(); + final var log = System.getLogger("org.kpipe.metrics"); + log.log(Level.INFO, "Messages received: " + metrics.get("messagesReceived")); + log.log(Level.INFO, "Successfully processed: " + metrics.get("messagesProcessed")); + log.log(Level.INFO, "Processing errors: " + metrics.get("processingErrors")); ``` Configure automatic metrics reporting: ```java - new App(config) - .withMetricsInterval(Duration.ofSeconds(30)) - .start(); + final var runner = ConsumerRunner.builder(consumer) + .withMetricsReporters(List.of(new ConsumerMetricsReporter(consumer::getMetrics, () -> uptimeMs, null))) + .withMetricsInterval(30_000) + .build(); + + runner.start(); ``` --- @@ -239,17 +299,16 @@ Configure automatic metrics reporting: The consumer supports graceful shutdown with in-flight message handling: ```java + final var log = System.getLogger("org.kpipe.app.Shutdown"); + // Initiate graceful shutdown with 5-second timeout - boolean allProcessed = kafkaApp.shutdownGracefully(5000); - if (allProcessed) { - LOGGER.info("All messages processed successfully before shutdown"); - } else { - LOGGER.warning("Shutdown completed with some messages still in flight"); - } + boolean allProcessed = runner.shutdownGracefully(5000); + if (allProcessed) log.log(Level.INFO, "All messages processed successfully before shutdown"); + else log.log(Level.WARNING, "Shutdown completed with some messages still in flight"); // Register as JVM shutdown hook Runtime.getRuntime().addShutdownHook( - new Thread(() -> app.shutdownGracefully(5000)) + new Thread(() -> runner.close()) ); ``` @@ -325,17 +384,11 @@ public interface MessageSink { KPipe provides several built-in sinks: ```java -// Create a JSON console sink that logs messages at INFO level -final var jsonConsoleSink = new JsonConsoleSink<>( - System.getLogger("org.kpipe.sink.JsonConsoleSink"), - Level.INFO -); - -// Create an Avro console sink that logs messages at INFO level -final var avroConsoleSink = new AvroConsoleSink<>( - System.getLogger("org.kpipe.sink.AvroConsoleSink"), - Level.INFO -); +// Create a JSON console sink +final var jsonConsoleSink = new JsonConsoleSink<>(); + +// Create an Avro console sink +final var avroConsoleSink = new AvroConsoleSink<>(); // Use a sink with a consumer final var consumer = new KPipeConsumer.builder() @@ -355,15 +408,15 @@ You can create custom sinks using lambda expressions: MessageSink databaseSink = (record, processedValue) -> { try { // Parse the processed value - Map data = JsonMessageProcessor.parseJson().apply(processedValue); + final var data = JsonMessageProcessor.parseJson().apply(processedValue); // Write to database databaseService.insert(data); // Log success - logger.info("Successfully wrote message to database: " + record.key()); + log.log(Level.INFO, "Successfully wrote message to database: " + record.key()); } catch (Exception e) { - logger.error("Failed to write message to database", e); + log.log(Level.ERROR, "Failed to write message to database", e); } }; @@ -382,7 +435,7 @@ The `MessageSinkRegistry` provides a centralized repository for registering and final var registry = new MessageSinkRegistry(); // Register sinks -registry.register("console", new JsonConsoleSink<>(logger, Level.INFO)); +registry.register("console", new JsonConsoleSink<>()); registry.register("database", databaseSink); registry.register("metrics", (record, value) -> metricsService.recordMessage(record.topic(), value.length)); @@ -401,16 +454,11 @@ The registry provides utilities for adding error handling to sinks: ```java // Create a sink with error handling -final var safeSink = MessageSinkRegistry.withErrorHandling( - riskySink, - (record, value, error) -> logger.error("Error in sink: " + error.getMessage()) -); - -// Or use the registry's error handling -final var safePipeline = registry.pipelineWithErrorHandling( - "console", "database", "metrics", - (record, value, error) -> errorService.reportError(record.topic(), error) -); +final var safeSink = MessageSinkRegistry.withErrorHandling(riskySink); + +// Register and use the wrapped sink +registry.register("safeDatabase", safeSink); +final var safePipeline = registry.pipeline("console", "safeDatabase", "metrics"); ``` --- @@ -438,33 +486,27 @@ The `ConsumerRunner` supports extensive configuration options: ```java // Create a consumer runner with advanced configuration -ConsumerRunner> runner = ConsumerRunner.builder(consumer) +final var runner = ConsumerRunner.builder(consumer) // Configure metrics reporting - .withMetricsReporter(new ConsumerMetricsReporter( - consumer::getMetrics, - () -> System.currentTimeMillis() - startTime + .withMetricsReporters(List.of( + new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime, null) )) .withMetricsInterval(30000) // Report metrics every 30 seconds - // Configure health checks - .withHealthCheck(c -> c.getState() == ConsumerState.RUNNING) - + .withHealthCheck(KPipeConsumer::isRunning) // Configure graceful shutdown .withShutdownTimeout(10000) // 10 seconds timeout for shutdown .withShutdownHook(true) // Register JVM shutdown hook - // Configure custom start action .withStartAction(c -> { - logger.info("Starting consumer for topic: " + c.getTopic()); + log.log(Level.INFO, "Starting consumer"); c.start(); }) - // Configure custom graceful shutdown .withGracefulShutdown((c, timeoutMs) -> { - logger.info("Initiating graceful shutdown with timeout: " + timeoutMs + "ms"); - return c.shutdownGracefully(timeoutMs); + log.log(Level.INFO, "Initiating graceful shutdown with timeout: " + timeoutMs + "ms"); + return ConsumerRunner.performGracefulConsumerShutdown(c, timeoutMs); }) - .build(); ``` @@ -494,7 +536,7 @@ The `ConsumerRunner` integrates with metrics reporting: // Add multiple metrics reporters ConsumerRunner> runner = ConsumerRunner.builder(consumer) .withMetricsReporters(List.of( - new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime), + new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime, null), new ProcessorMetricsReporter(registry) )) .withMetricsInterval(60000) // Report every minute @@ -521,26 +563,27 @@ Here's a concise example of a KPipe application: ```java public class KPipeApp implements AutoCloseable { + private static final System.Logger LOGGER = System.getLogger(KPipeApp.class.getName()); private final ConsumerRunner> runner; static void main() { // Load configuration from environment variables final var config = AppConfig.fromEnv(); - try (final var app = new MyKafkaApp(config)) { + try (final var app = new KPipeApp(config)) { app.start(); app.awaitShutdown(); } catch (final Exception e) { - System.getLogger(MyKafkaApp.class.getName()) - .log(System.Logger.Level.ERROR, "Fatal error in application", e); + LOGGER.log(Level.ERROR, "Fatal error in application", e); System.exit(1); } } - public MyKafkaApp(final AppConfig config) { + public KPipeApp(final AppConfig config) { // Create processor and sink registries final var processorRegistry = new MessageProcessorRegistry(config.appName()); final var sinkRegistry = new MessageSinkRegistry(); + final var commandQueue = new ConcurrentLinkedQueue(); // Create the functional consumer final var functionalConsumer = KPipeConsumer.builder() @@ -549,11 +592,13 @@ public class KPipeApp implements AutoCloseable { .withTopic(config.topic()) .withProcessor(processorRegistry.jsonPipeline( "addSource", "markProcessed", "addTimestamp")) - .withMessageSink(sinkRegistry.pipeline("logging")) + .withMessageSink(sinkRegistry.pipeline("jsonLogging")) + .withCommandQueue(commandQueue) .withOffsetManagerProvider(consumer -> - OffsetManager.builder(consumer) - .withCommitInterval(Duration.ofSeconds(30)) - .build()) + OffsetManager.builder(consumer) + .withCommandQueue(commandQueue) + .withCommitInterval(Duration.ofSeconds(30)) + .build()) .withMetrics(true) .build(); @@ -584,12 +629,9 @@ public class KPipeApp implements AutoCloseable { export KAFKA_BOOTSTRAP_SERVERS=localhost:9092 export KAFKA_CONSUMER_GROUP=my-group export KAFKA_TOPIC=json-events - -# Run the application -./gradlew run - -# Test with a sample message -echo '{"message":"Hello from KPipe!"}' | kcat -P -b localhost:9092 -t json-events +export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp +export METRICS_INTERVAL_MS=30000 +export SHUTDOWN_TIMEOUT_MS=5000 ``` --- @@ -774,9 +816,9 @@ multiple records concurrently. - **Execution Order**: Messages start in order but may finish out of order (e.g., a small message finishing before a large one). -- **No-Gap Offset Management**: Even with parallel processing, KPipe's `OffsetManager` uses a **Lowest Pending Offset** +- **At-Least-Once Delivery**: Even with parallel processing, KPipe's `OffsetManager` uses a **Lowest Pending Offset** strategy. It will never commit a higher offset until all lower offsets in that partition are successfully processed. - This ensures **at-least-once delivery** and prevents "holes" in your data if the application crashes. + This ensures **no gaps** in your committed data. ### 3. Key-Level Ordering for Critical Systems (e.g., Payments) diff --git a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java index 8db9e0d..5fddbb8 100644 --- a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java +++ b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java @@ -18,7 +18,7 @@ /// /// ### Design Integrity: /// - Both frameworks start from the beginning of the topic (`earliest`) for each iteration. -/// - Both process exactly **1,000 messages** per iteration. +/// - Both process exactly **10,000 messages** per iteration. /// - KPipe uses Loom; Confluent uses a max concurrency of **100**. /// /// ### Running the Benchmark: diff --git a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java index a7b4865..223da53 100644 --- a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java +++ b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java @@ -259,7 +259,7 @@ void start() { @TearDown(Level.Invocation) public void tearDown() { if (processor != null) processor.closeDontDrainFirst(PC_CLOSE_TIMEOUT); - if (kafkaConsumer != null) kafkaConsumer.close(PC_CLOSE_TIMEOUT); + if (kafkaConsumer != null) kafkaConsumer.close(); } /// @return invocation-scoped processed message counter diff --git a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java index d6c7d85..18dfe12 100644 --- a/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java +++ b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java @@ -2,12 +2,12 @@ import java.lang.System.Logger; import java.lang.System.Logger.Level; +import java.nio.channels.ClosedByInterruptException; import java.time.Duration; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -695,65 +695,60 @@ protected void processRecords(final ConsumerRecords records) { protected void processRecord(final ConsumerRecord record) { if (enableMetrics) metrics.get(METRIC_MESSAGES_RECEIVED).incrementAndGet(); - final var retryProcessor = new BiFunction() { - @Override - public V apply(final Integer attempt, final Exception previousException) { - // If we've exceeded max retries, handle the error and return null - if (attempt > maxRetries) { + for (int attempt = 0; attempt <= maxRetries; attempt++) { + if (attempt > 0) { + if (enableMetrics) metrics.get(METRIC_RETRIES).incrementAndGet(); + LOGGER.log( + Level.INFO, + "Retrying message at offset %d (attempt %d of %d)".formatted(record.offset(), attempt, maxRetries) + ); + + try { + Thread.sleep(retryBackoff.toMillis()); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; // Interrupts must not mark the record as processed. + } + } + + try { + final var processedValue = processor.apply(record.value()); + messageSink.send(record, processedValue); + if (enableMetrics) metrics.get(METRIC_MESSAGES_PROCESSED).incrementAndGet(); + if (offsetManager != null) commandQueue.offer(ConsumerCommand.MARK_OFFSET_PROCESSED.withRecord(record)); + return; + } catch (final Exception e) { + if (isInterruptionRelated(e)) { + Thread.currentThread().interrupt(); + return; // Interrupt-like failures should be retried after restart/rebalance. + } + + if (attempt == maxRetries) { if (enableMetrics) metrics.get(METRIC_PROCESSING_ERRORS).incrementAndGet(); LOGGER.log( Level.WARNING, "Failed to process message at offset %d after %d attempts: %s".formatted( record.offset(), - attempt, - previousException.getMessage() + maxRetries + 1, + e.getMessage() ), - previousException - ); - - errorHandler.accept(new ProcessingError<>(record, previousException, maxRetries)); - return null; - } - - // If this is a retry attempt, log and increment metrics - if (attempt > 0) { - if (enableMetrics) metrics.get(METRIC_RETRIES).incrementAndGet(); - - LOGGER.log( - Level.INFO, - "Retrying message at offset %d (attempt %d of %d)".formatted(record.offset(), attempt, maxRetries) + e ); - - // Wait before retry - try { - Thread.sleep(retryBackoff.toMillis()); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - return null; - } - } - - // Attempt to process the record - try { - return processor.apply(record.value()); - } catch (final Exception e) { - // If processing fails, retry with an incremented attempt count - return apply(attempt + 1, e); + errorHandler.accept(new ProcessingError<>(record, e, maxRetries)); + if (offsetManager != null) commandQueue.offer(ConsumerCommand.MARK_OFFSET_PROCESSED.withRecord(record)); + return; } } - }; - - // Process the record with retry logic starting at attempt 0 - final V processedValue = retryProcessor.apply(0, null); - - // If processing was successful, send to sink and update metrics - if (processedValue != null) { - messageSink.send(record, processedValue); - if (enableMetrics) metrics.get(METRIC_MESSAGES_PROCESSED).incrementAndGet(); } + } - // Mark offset as processed regardless of success or failure - if (offsetManager != null) commandQueue.offer(ConsumerCommand.MARK_OFFSET_PROCESSED.withRecord(record)); + private static boolean isInterruptionRelated(final Throwable error) { + Throwable current = error; + while (current != null) { + if (current instanceof InterruptedException || current instanceof ClosedByInterruptException) return true; + current = current.getCause(); + } + return false; } private ConsumerRecords pollRecords() { diff --git a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java index 21b00c6..56411ea 100644 --- a/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java @@ -83,7 +83,7 @@ void shouldProcessRecordsWithProcessor() throws Exception { final var partition = new TopicPartition(TOPIC, PARTITION); final var commandQueue = new ConcurrentLinkedQueue(); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -132,7 +132,7 @@ void shouldHandleProcessorExceptions() throws Exception { // Create mock records final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -199,7 +199,7 @@ void shouldRetryProcessingOnFailureUpToMaxRetries() throws Exception { final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(record); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -249,7 +249,7 @@ void shouldNotRetryWhenMaxRetriesIsZero() throws Exception { final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(record); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); // Create a consumer with no retries final var functionalConsumer = new TestableKPipeConsumer<>( @@ -391,7 +391,7 @@ void shouldUpdateMetricsOnSuccessfulProcessing() throws Exception { // Create mock records var partition = new TopicPartition(TOPIC, PARTITION); var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); - var records = consumerRecords(Map.of(partition, recordsList)); + var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); final var functionalConsumer = new TestableKPipeConsumer<>( properties, @@ -438,7 +438,7 @@ void shouldUpdateMetricsOnProcessingError() throws Exception { // Create mock records final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, @@ -583,7 +583,7 @@ void shouldHandleNullValueInRecord() throws Exception { // Create record with null value final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord(TOPIC, PARTITION, 0L, "key", null)); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); // Process records functionalConsumer.executeProcessRecords(records); @@ -688,7 +688,7 @@ void shouldTrackInFlightMessagesCorrectly() throws Exception { new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key1", "value1"), new ConsumerRecord<>(TOPIC, PARTITION, 1L, "key2", "value2") ); - final var records = consumerRecords(Map.of(partition, recordsList)); + final var records = new ConsumerRecords<>(Map.of(partition, recordsList), Map.of()); // Use CountDownLatch to control when processing completes final var startLatch = new CountDownLatch(2); @@ -843,7 +843,7 @@ void shouldMarkOffsetAsProcessedEvenWhenProcessingFails() throws Exception { // Create a record that will fail processing final var record = new ConsumerRecord<>(TOPIC, PARTITION, 123L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); - final var records = consumerRecords(Map.of(partition, List.of(record))); + final var records = new ConsumerRecords<>(Map.of(partition, List.of(record)), Map.of()); // Process record functionalConsumer.executeProcessRecords(records); @@ -1039,10 +1039,10 @@ void shouldProcessRecordsConcurrently() throws Exception { // Create consumer records final var recordsMap = Map.of(new TopicPartition(TOPIC, PARTITION), records); - final var consumerRecords = consumerRecords(recordsMap); + final var recordsBatch = new ConsumerRecords<>(recordsMap, Map.of()); // Process records - functionalConsumer.executeProcessRecords(consumerRecords); + functionalConsumer.executeProcessRecords(recordsBatch); // Wait for all records to be processed assertTrue(completionLatch.await(3, TimeUnit.SECONDS), "Records processing timed out"); @@ -1051,13 +1051,6 @@ void shouldProcessRecordsConcurrently() throws Exception { assertTrue(maxConcurrent.get() > 1, "Records should be processed concurrently"); } - private static ConsumerRecords consumerRecords( - final Map>> records - ) { - // Kafka 4.2 deprecates the single-arg constructor; provide explicit nextOffsets. - return new ConsumerRecords<>(records, Map.of()); - } - public static class TestableKPipeConsumer extends KPipeConsumer { private static final String METRIC_MESSAGES_RECEIVED = "messagesReceived"; diff --git a/lib/src/test/java/org/kpipe/consumer/KPipeInterruptTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeInterruptTest.java new file mode 100644 index 0000000..15b21fb --- /dev/null +++ b/lib/src/test/java/org/kpipe/consumer/KPipeInterruptTest.java @@ -0,0 +1,169 @@ +package org.kpipe.consumer; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.kafka.clients.consumer.*; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.kpipe.consumer.enums.ConsumerCommand; +import org.kpipe.sink.MessageSink; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class KPipeInterruptTest { + + @Mock + private Function processor; + + @Mock + private KafkaConsumer mockConsumer; + + @Mock + private MessageSink messageSink; + + @Mock + private Consumer> errorHandler; + + @Mock + private OffsetManager offsetManager; + + private KPipeConsumer createConsumer( + final String topic, + final Queue commandQueue, + final int maxRetries, + final Duration backoff + ) { + final var props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("group.id", "test-group"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + return KPipeConsumer + .builder() + .withProperties(props) + .withTopic(topic) + .withProcessor(processor) + .withMessageSink(messageSink) + .withRetry(maxRetries, backoff) + .withErrorHandler(errorHandler) + .withCommandQueue(commandQueue) + .withOffsetManager(offsetManager) + .withConsumer(() -> mockConsumer) + .build(); + } + + private static boolean hasMarkOffsetProcessed(final Queue commandQueue, final long offset) { + return commandQueue + .stream() + .anyMatch(cmd -> + cmd == ConsumerCommand.MARK_OFFSET_PROCESSED && cmd.getRecord() != null && cmd.getRecord().offset() == offset + ); + } + + @Test + void interruptDuringRetryShouldNotMarkOffsetAsProcessed() throws Exception { + final var topic = "test-topic"; + final var record = new ConsumerRecord<>(topic, 0, 123L, "key", "value"); + final var commandQueue = new LinkedBlockingQueue(); + + final var consumer = createConsumer(topic, commandQueue, 1, Duration.ofMillis(1000)); + + when(processor.apply("value")).thenThrow(new RuntimeException("first failure")); + + final var threadStarted = new CountDownLatch(1); + final var threadFinished = new CountDownLatch(1); + + final var processingThread = Thread + .ofVirtual() + .start(() -> { + threadStarted.countDown(); + try { + consumer.processRecord(record); + } finally { + threadFinished.countDown(); + } + }); + + assertTrue(threadStarted.await(1, TimeUnit.SECONDS)); + Thread.sleep(100); // let retry sleep begin + processingThread.interrupt(); + assertTrue(threadFinished.await(1, TimeUnit.SECONDS)); + + verify(messageSink, never()).send(any(), any()); + verify(errorHandler, never()).accept(any()); + assertFalse(hasMarkOffsetProcessed(commandQueue, 123L)); + } + + @Test + void interruptionRelatedExceptionShouldNotMarkOffsetAsProcessed() throws Exception { + final var topic = "test-topic"; + final var record = new ConsumerRecord<>(topic, 0, 456L, "key", "value"); + final var commandQueue = new LinkedBlockingQueue(); + final var consumer = createConsumer(topic, commandQueue, 0, Duration.ofMillis(1)); + + when(processor.apply("value")).thenThrow(new RuntimeException(new InterruptedException("interrupted"))); + + final var interruptedFlag = new CompletableFuture(); + final var done = new CountDownLatch(1); + + Thread + .ofVirtual() + .start(() -> { + try { + consumer.processRecord(record); + interruptedFlag.complete(Thread.currentThread().isInterrupted()); + } finally { + // clear interrupted status on this worker thread before it exits + Thread.interrupted(); + done.countDown(); + } + }); + + assertTrue(done.await(1, TimeUnit.SECONDS)); + assertTrue(interruptedFlag.get(1, TimeUnit.SECONDS)); + + verify(messageSink, never()).send(any(), any()); + verify(errorHandler, never()).accept(any()); + assertFalse(hasMarkOffsetProcessed(commandQueue, 456L)); + } + + @Test + void terminalNonInterruptFailureShouldReportAndMarkOffset() { + final var topic = "test-topic"; + final var record = new ConsumerRecord<>(topic, 0, 789L, "key", "value"); + final var commandQueue = new LinkedBlockingQueue(); + final var consumer = createConsumer(topic, commandQueue, 0, Duration.ofMillis(1)); + + when(processor.apply("value")).thenThrow(new RuntimeException("boom")); + + consumer.processRecord(record); + + verify(messageSink, never()).send(any(), any()); + verify(errorHandler, times(1)).accept(any()); + assertTrue(hasMarkOffsetProcessed(commandQueue, 789L)); + } + + @Test + void successShouldSendAndMarkOffset() { + final var topic = "test-topic"; + final var record = new ConsumerRecord<>(topic, 0, 999L, "key", "value"); + final var commandQueue = new LinkedBlockingQueue(); + final var consumer = createConsumer(topic, commandQueue, 0, Duration.ofMillis(1)); + + when(processor.apply("value")).thenReturn("processed"); + + consumer.processRecord(record); + + verify(messageSink, times(1)).send(record, "processed"); + verify(errorHandler, never()).accept(any()); + assertTrue(hasMarkOffsetProcessed(commandQueue, 999L)); + } +}