diff --git a/README.md b/README.md index 0b7c03f..c0ce623 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ [![Build Status](https://github.com/eschizoid/kpipe/actions/workflows/ci.yaml/badge.svg)](https://github.com/eschizoid/kpipe/actions/workflows/ci.yaml) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -A **modern, functional, and high-performance Kafka consumer** built using **Java 24** features like **virtual threads**, +A **modern, functional, and high-performance Kafka consumer** built using **Java 25** features like **virtual threads**, **composable message processors**, and **DslJson** for JSON processing. Features robust error handling, configurable retries, built-in metrics, and support for both parallel and sequential processing. Ideal for large-scale systems. @@ -44,7 +44,7 @@ retries, built-in metrics, and support for both parallel and sequential processi "sanitizeData", "addTimestamp"); // Apply transformations with built-in error handling and retry logic - final var consumer = new FunctionalConsumer.builder() + final var consumer = new KPipeConsumer.builder() .withProcessor(pipeline) .withRetry(3, Duration.ofSeconds(1)) .build(); @@ -70,7 +70,7 @@ retries, built-in metrics, and support for both parallel and sequential processi "addTimestamp_userSchema"); // Create a consumer with the optimized pipeline - final var consumer = new FunctionalConsumer.builder() + final var consumer = new KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("team-topic") .withProcessor(avroPipeline) @@ -127,7 +127,7 @@ The core library that provides the Kafka consumer functionality: ``` ├── src/main/java/org/kpipe/ │ ├── consumer/ # Core consumer components -│ │ ├── FunctionalConsumer.java # Main functional consumer implementation +│ │ ├── KPipeConsumer.java # Main functional consumer implementation │ │ ├── OffsetManager.java # Manages Kafka offsets for reliable processing │ │ ├── MessageTracker.java # Tracks message processing state │ │ ├── RebalanceListener.java # Handles Kafka consumer rebalancing @@ -200,7 +200,7 @@ Extend the registry like this: ); // Use the pipeline with a consumer - final var consumer = new FunctionalConsumer.builder() + final var consumer = new KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("events") .withProcessor(pipeline) @@ -338,7 +338,7 @@ final var avroConsoleSink = new AvroConsoleSink<>( ); // Use a sink with a consumer -final var consumer = new FunctionalConsumer.builder() +final var consumer = new KPipeConsumer.builder() .withProperties(kafkaProps) .withTopic("events") .withProcessor(pipeline) @@ -368,7 +368,7 @@ MessageSink databaseSink = (record, processedValue) -> { }; // Use the custom sink with a consumer -final var consumer = new FunctionalConsumer.builder() +final var consumer = new KPipeConsumer.builder() .withMessageSink(databaseSink) .build(); ``` @@ -390,7 +390,7 @@ registry.register("metrics", (record, value) -> metricsService.recordMessage(rec final var sinkPipeline = registry.pipeline("console", "database", "metrics"); // Use the sink pipeline with a consumer -final var consumer = new FunctionalConsumer.builder() +final var consumer = new KPipeConsumer.builder() .withMessageSink(sinkPipeline) .build(); ``` @@ -422,7 +422,7 @@ graceful shutdown: ```java // Create a consumer runner with default settings -ConsumerRunner> runner = ConsumerRunner.builder(consumer) +ConsumerRunner> runner = ConsumerRunner.builder(consumer) .build(); // Start the consumer @@ -438,7 +438,7 @@ The `ConsumerRunner` supports extensive configuration options: ```java // Create a consumer runner with advanced configuration -ConsumerRunner> runner = ConsumerRunner.builder(consumer) +ConsumerRunner> runner = ConsumerRunner.builder(consumer) // Configure metrics reporting .withMetricsReporter(new ConsumerMetricsReporter( consumer::getMetrics, @@ -492,7 +492,7 @@ The `ConsumerRunner` integrates with metrics reporting: ```java // Add multiple metrics reporters -ConsumerRunner> runner = ConsumerRunner.builder(consumer) +ConsumerRunner> runner = ConsumerRunner.builder(consumer) .withMetricsReporters(List.of( new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime), new ProcessorMetricsReporter(registry) @@ -506,7 +506,7 @@ ConsumerRunner> runner = ConsumerRunner.build The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources: ```java -try (ConsumerRunner> runner = ConsumerRunner.builder(consumer).build()) { +try (ConsumerRunner> runner = ConsumerRunner.builder(consumer).build()) { runner.start(); // Application logic here // Runner will be automatically closed when exiting the try block @@ -521,7 +521,7 @@ Here's a concise example of a KPipe application: ```java public class KPipeApp implements AutoCloseable { - private final ConsumerRunner> runner; + private final ConsumerRunner> runner; static void main() { // Load configuration from environment variables @@ -543,7 +543,7 @@ public class KPipeApp implements AutoCloseable { final var sinkRegistry = new MessageSinkRegistry(); // Create the functional consumer - final var functionalConsumer = FunctionalConsumer.builder() + final var functionalConsumer = KPipeConsumer.builder() .withProperties(KafkaConsumerConfig.createConsumerConfig( config.bootstrapServers(), config.consumerGroup())) .withTopic(config.topic()) @@ -739,8 +739,8 @@ The library provides a built-in `when()` method for conditional processing: ### High-Performance Architecture -KPipe is designed to beat traditional Kafka consumers (and even heavy libraries like Confluent Parallel Consumer) by -leveraging modern Java features and aggressive optimizations: +KPipe is designed for high-throughput, low-overhead Kafka processing using modern Java features and pipeline +optimizations. Performance depends on workload shape (I/O vs CPU bound), partitioning, and message size. - **Single SerDe Cycle**: Traditional pipelines often perform `byte[] -> Object -> byte[]` at every step. KPipe's optimized pipelines deserialize once at the entry, apply any number of `UnaryOperator` transformations on the object, @@ -749,23 +749,12 @@ leveraging modern Java features and aggressive optimizations: `offset` parameter that allows skipping magic bytes and schema IDs without performing expensive `Arrays.copyOfRange` operations. - **Virtual Threads (Project Loom)**: Every Kafka record can be processed in its own virtual thread. This allows for - massive concurrency (thousands of concurrent "threads") with almost zero memory overhead and no complex thread pool - management. -- **DslJson Integration**: Uses one of the world's fastest JSON libraries for maximum throughput and minimum Garbage - Collection (GC) pressure. + massive concurrency with simpler coordination than large platform-thread pools. +- **DslJson Integration**: Uses a high-performance JSON library to reduce parsing overhead and GC pressure. -### Benchmarking - -KPipe includes a dedicated `benchmarks` module using JMH (Java Microbenchmark Harness) to compare its performance -against manual implementations and other libraries like Confluent Parallel Consumer. - -For detailed information on benchmark scenarios and how to run them, see the [Benchmarks README](benchmarks/README.md). - -To run the benchmarks: - -```bash -./gradlew :benchmarks:jmh -``` +Latest parallel benchmark snapshot (see `benchmarks/README.md`) shows a throughput edge for KPipe in that scenario, +with a higher allocation footprint than Confluent Parallel Consumer. Treat these as scenario-specific results, not +universal guarantees. --- @@ -775,13 +764,13 @@ KPipe provides configurable ordering guarantees to balance throughput and strict ### 1. Sequential Processing (Strict Ordering) -When `sequentialProcessing` is enabled (default in some configs), KPipe processes messages one by one in the order they +When `sequentialProcessing` is enabled, KPipe processes messages one by one in the order they were received from Kafka. This ensures **strict FIFO (First-In-First-Out) ordering** per partition. ### 2. Parallel Processing (Virtual Threads) -When `sequentialProcessing` is `false`, KPipe leverages Java 24's Virtual Threads to process multiple records -concurrently. +When `sequentialProcessing` is `false` (the builder default), KPipe leverages Java 25 Virtual Threads to process +multiple records concurrently. - **Execution Order**: Messages start in order but may finish out of order (e.g., a small message finishing before a large one). @@ -833,4 +822,4 @@ This Kafka consumer is: - **Extensible** - **Future-proof** -Use it to modernize your Kafka stack with **Java 24 elegance and performance**. +Use it to modernize your Kafka stack with **Java 25 elegance and performance**. diff --git a/app/avro/src/main/java/org/kpipe/App.java b/app/avro/src/main/java/org/kpipe/App.java index 7bac516..72f0619 100644 --- a/app/avro/src/main/java/org/kpipe/App.java +++ b/app/avro/src/main/java/org/kpipe/App.java @@ -14,7 +14,7 @@ import org.kpipe.config.AppConfig; import org.kpipe.config.KafkaConsumerConfig; import org.kpipe.consumer.ConsumerRunner; -import org.kpipe.consumer.FunctionalConsumer; +import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; import org.kpipe.metrics.ConsumerMetricsReporter; @@ -36,8 +36,8 @@ public class App implements AutoCloseable { private static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://schema-registry:8081"; private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); - private final FunctionalConsumer functionalConsumer; - private final ConsumerRunner> runner; + private final KPipeConsumer functionalConsumer; + private final ConsumerRunner> runner; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -91,7 +91,7 @@ private static String resolveSchemaRegistryUrl() { } /// Creates the consumer runner with appropriate lifecycle hooks. - private ConsumerRunner> createConsumerRunner( + private ConsumerRunner> createConsumerRunner( final AppConfig config, final MetricsReporter consumerMetricsReporter, final MetricsReporter processorMetricsReporter, @@ -103,7 +103,7 @@ private ConsumerRunner> createConsumerRunner( c.start(); LOGGER.log(Level.INFO, "Kafka consumer application started successfully"); }) - .withHealthCheck(FunctionalConsumer::isRunning) + .withHealthCheck(KPipeConsumer::isRunning) .withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown) .withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter, sinkMetricsReporter)) .withMetricsInterval(config.metricsInterval().toMillis()) @@ -118,7 +118,7 @@ private ConsumerRunner> createConsumerRunner( /// @param processorRegistry Map of processor functions /// @param sinkRegistry Map of sink functions /// @return A configured functional consumer - public static FunctionalConsumer createConsumer( + public static KPipeConsumer createConsumer( final AppConfig config, final MessageProcessorRegistry processorRegistry, final MessageSinkRegistry sinkRegistry, @@ -127,7 +127,7 @@ public static FunctionalConsumer createConsumer( final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup()); final var commandQueue = new ConcurrentLinkedQueue(); - return FunctionalConsumer + return KPipeConsumer .builder() .withProperties(kafkaProps) .withTopic(config.topic()) @@ -140,7 +140,7 @@ public static FunctionalConsumer createConsumer( .build(); } - /// Creates an OffsetManager provider function that can be used with FunctionalConsumer builder + /// Creates an OffsetManager provider function that can be used with KPipeConsumer builder /// /// @param commitInterval The interval at which to automatically commit offsets /// @param commandQueue The command queue diff --git a/app/json/src/main/java/org/kpipe/App.java b/app/json/src/main/java/org/kpipe/App.java index 92b9f4a..23b3654 100644 --- a/app/json/src/main/java/org/kpipe/App.java +++ b/app/json/src/main/java/org/kpipe/App.java @@ -14,7 +14,7 @@ import org.kpipe.config.AppConfig; import org.kpipe.config.KafkaConsumerConfig; import org.kpipe.consumer.ConsumerRunner; -import org.kpipe.consumer.FunctionalConsumer; +import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; import org.kpipe.metrics.ConsumerMetricsReporter; @@ -32,8 +32,8 @@ public class App implements AutoCloseable { private static final Logger LOGGER = System.getLogger(App.class.getName()); private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); - private final FunctionalConsumer functionalConsumer; - private final ConsumerRunner> runner; + private final KPipeConsumer kpipeConsumer; + private final ConsumerRunner> runner; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -59,10 +59,10 @@ public App(final AppConfig config) { processorRegistry = new MessageProcessorRegistry(config.appName(), MessageFormat.JSON); sinkRegistry = new MessageSinkRegistry(); - functionalConsumer = createConsumer(config, processorRegistry, sinkRegistry); + kpipeConsumer = createConsumer(config, processorRegistry, sinkRegistry); final var consumerMetricsReporter = new ConsumerMetricsReporter( - functionalConsumer::getMetrics, + kpipeConsumer::getMetrics, () -> System.currentTimeMillis() - startTime.get(), null ); @@ -73,18 +73,18 @@ public App(final AppConfig config) { } /// Creates the consumer runner with appropriate lifecycle hooks. - private ConsumerRunner> createConsumerRunner( + private ConsumerRunner> createConsumerRunner( final AppConfig config, final MetricsReporter consumerMetricsReporter, final MetricsReporter processorMetricsReporter ) { return ConsumerRunner - .builder(functionalConsumer) + .builder(kpipeConsumer) .withStartAction(c -> { c.start(); LOGGER.log(Level.INFO, "Kafka consumer application started successfully"); }) - .withHealthCheck(FunctionalConsumer::isRunning) + .withHealthCheck(KPipeConsumer::isRunning) .withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown) .withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter)) .withMetricsInterval(config.metricsInterval().toMillis()) @@ -99,7 +99,7 @@ private ConsumerRunner> createConsumerRunner( /// @param processorRegistry Map of processor functions /// @param sinkRegistry Map of sink functions /// @return A configured functional consumer - public static FunctionalConsumer createConsumer( + public static KPipeConsumer createConsumer( final AppConfig config, final MessageProcessorRegistry processorRegistry, final MessageSinkRegistry sinkRegistry @@ -107,7 +107,7 @@ public static FunctionalConsumer createConsumer( final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup()); final var commandQueue = new ConcurrentLinkedQueue(); - return FunctionalConsumer + return KPipeConsumer .builder() .withProperties(kafkaProps) .withTopic(config.topic()) @@ -120,7 +120,7 @@ public static FunctionalConsumer createConsumer( .build(); } - /// Creates an OffsetManager provider function that can be used with FunctionalConsumer builder + /// Creates an OffsetManager provider function that can be used with KPipeConsumer builder /// /// @param commitInterval The interval at which to automatically commit offsets /// @param commandQueue The command queue diff --git a/app/protobuf/src/main/java/org/kpipe/App.java b/app/protobuf/src/main/java/org/kpipe/App.java index cce7f30..87803cf 100644 --- a/app/protobuf/src/main/java/org/kpipe/App.java +++ b/app/protobuf/src/main/java/org/kpipe/App.java @@ -14,7 +14,7 @@ import org.kpipe.config.AppConfig; import org.kpipe.config.KafkaConsumerConfig; import org.kpipe.consumer.ConsumerRunner; -import org.kpipe.consumer.FunctionalConsumer; +import org.kpipe.consumer.KPipeConsumer; import org.kpipe.consumer.OffsetManager; import org.kpipe.consumer.enums.ConsumerCommand; import org.kpipe.metrics.ConsumerMetricsReporter; @@ -32,8 +32,8 @@ public class App implements AutoCloseable { private static final Logger LOGGER = System.getLogger(App.class.getName()); private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis()); - private final FunctionalConsumer functionalConsumer; - private final ConsumerRunner> runner; + private final KPipeConsumer kpipeConsumer; + private final ConsumerRunner> runner; private final AtomicReference> currentMetrics = new AtomicReference<>(); private final MessageProcessorRegistry processorRegistry; private final MessageSinkRegistry sinkRegistry; @@ -58,10 +58,10 @@ static void main() { public App(final AppConfig config) { processorRegistry = new MessageProcessorRegistry(config.appName(), MessageFormat.PROTOBUF); sinkRegistry = new MessageSinkRegistry(); - functionalConsumer = createConsumer(config, processorRegistry, sinkRegistry); + kpipeConsumer = createConsumer(config, processorRegistry, sinkRegistry); final var consumerMetricsReporter = new ConsumerMetricsReporter( - functionalConsumer::getMetrics, + kpipeConsumer::getMetrics, () -> System.currentTimeMillis() - startTime.get(), null ); @@ -71,18 +71,18 @@ public App(final AppConfig config) { } /// Creates the consumer runner with appropriate lifecycle hooks. - private ConsumerRunner> createConsumerRunner( + private ConsumerRunner> createConsumerRunner( final AppConfig config, final MetricsReporter consumerMetricsReporter, final MetricsReporter processorMetricsReporter ) { return ConsumerRunner - .builder(functionalConsumer) + .builder(kpipeConsumer) .withStartAction(c -> { c.start(); LOGGER.log(Level.INFO, "Kafka consumer application started successfully"); }) - .withHealthCheck(FunctionalConsumer::isRunning) + .withHealthCheck(KPipeConsumer::isRunning) .withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown) .withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter)) .withMetricsInterval(config.metricsInterval().toMillis()) @@ -97,7 +97,7 @@ private ConsumerRunner> createConsumerRunner( /// @param processorRegistry Map of processor functions /// @param sinkRegistry Map of sink functions /// @return A configured functional consumer - public static FunctionalConsumer createConsumer( + public static KPipeConsumer createConsumer( final AppConfig config, final MessageProcessorRegistry processorRegistry, final MessageSinkRegistry sinkRegistry @@ -105,7 +105,7 @@ public static FunctionalConsumer createConsumer( final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup()); final var commandQueue = new ConcurrentLinkedQueue(); - return FunctionalConsumer + return KPipeConsumer .builder() .withProperties(kafkaProps) .withTopic(config.topic()) @@ -118,7 +118,7 @@ public static FunctionalConsumer createConsumer( .build(); } - /// Creates an OffsetManager provider function that can be used with FunctionalConsumer builder. + /// Creates an OffsetManager provider function that can be used with KPipeConsumer builder. /// /// @param commitInterval The interval at which to automatically commit offsets /// @param commandQueue The command queue diff --git a/benchmarks/README.md b/benchmarks/README.md index 85a7954..582c164 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -24,8 +24,8 @@ Measures the efficiency of KPipe's magic byte offset handling vs. traditional by ### 3. Parallel Processing Overhead (`ParallelProcessingBenchmark`) -Evaluates the throughput of KPipe's Java 24 Virtual Thread-based parallel processing engine against the Confluent -Parallel Consumer. +Evaluates the throughput of KPipe's Java Virtual Thread-based parallel processing engine against the Confluent Parallel +Consumer. - **KPipe Parallel Mode**: Leverages a thread-per-record model using Loom to process message batches concurrently with minimal overhead. @@ -98,7 +98,7 @@ abstraction without a performance penalty. ### 3. Parallel Processing: Virtual Threads (Loom) vs. Confluent -This benchmark compares KPipe's "thread-per-record" model using Java 24 Virtual Threads against the industry-standard +This benchmark compares KPipe's "thread-per-record" model using Java Virtual Threads against the industry-standard Confluent Parallel Consumer. | Benchmark | Mode | Cnt | Score | Error | Units | @@ -130,7 +130,7 @@ Based on the latest snapshot results, we can derive the following throughput exp Key performance indicators to watch for: -- **SerDe Tax**: The drop in throughput as more transformation steps are added in the manual vs. optimized KPipe +- **SerDe Tax**: The drop-in throughput as more transformation steps are added in the manual vs. optimized KPipe pipeline. - **GC Pressure**: While not explicitly measured by throughput, the zero-copy Avro benchmark significantly reduces memory allocation and garbage collection overhead. @@ -152,12 +152,12 @@ Key performance indicators to watch for: ## Requirements -- **Java 24+**: Required for Virtual Threads (Project Loom). +- **Java 25+** - **Gradle**: Used to compile and execute the benchmark harness. ### CPU/CPI Profiling For Parallel Benchmark -For KPipe vs Confluent parallel processing, keep benchmark target fixed and enable a profiler: +For KPipe vs Confluent parallel processing, keep the benchmark target fixed and enable a profiler: ```bash # Linux: collect normalized hardware counters (includes CPI) diff --git a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java index b6e45dd..8db9e0d 100644 --- a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java +++ b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmark.java @@ -11,7 +11,7 @@ /// Kafka broker powered by Apache Kafka's test kit. /// /// ### Scenarios: -/// 1. **KPipe Parallel Mode**: Leverages Java 24 Virtual Threads (Project Loom) for +/// 1. **KPipe Parallel Mode**: Leverages Java Virtual Threads (Project Loom) for /// record-level parallelism with minimal overhead. /// 2. **Confluent Parallel Consumer**: Industry-standard library for parallel /// processing, using traditional platform thread pools. diff --git a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java index 87b1ca6..a7b4865 100644 --- a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java +++ b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java @@ -20,7 +20,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.TestKitNodes; -import org.kpipe.consumer.FunctionalConsumer; +import org.kpipe.consumer.KPipeConsumer; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -177,7 +177,7 @@ private static void createTopicIfMissing(final Properties clientProperties) { public static class KpipeInvocationContext { private AtomicInteger processedCount; - private FunctionalConsumer consumer; + private KPipeConsumer consumer; @Setup(Level.Invocation) public void setup(final KafkaContext kafkaContext) { @@ -186,7 +186,7 @@ public void setup(final KafkaContext kafkaContext) { kpipeProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumer = - FunctionalConsumer + KPipeConsumer .builder() .withProperties(kpipeProps) .withTopic(TOPIC) diff --git a/lib/src/main/java/org/kpipe/consumer/ConsumerRunner.java b/lib/src/main/java/org/kpipe/consumer/ConsumerRunner.java index ac46fcd..ca28d50 100644 --- a/lib/src/main/java/org/kpipe/consumer/ConsumerRunner.java +++ b/lib/src/main/java/org/kpipe/consumer/ConsumerRunner.java @@ -15,7 +15,7 @@ import java.util.function.Predicate; import org.kpipe.metrics.MetricsReporter; -/// A thread-safe runner for {@link FunctionalConsumer} instances that manages the consumer +/// A thread-safe runner for {@link KPipeConsumer} instances that manages the consumer /// lifecycle. /// /// The ConsumerRunner provides: @@ -28,13 +28,13 @@ /// Example usage: /// /// ```java -/// final var consumer = new FunctionalConsumer.Builder<>() +/// final var consumer = new KPipeConsumer.Builder<>() /// .withTopic("my-topic") /// .withProcessor(message -> processMessage(message)) /// .build(); /// /// final var runner = ConsumerRunner.builder(consumer) -/// .withHealthCheck(FunctionalConsumer::isRunning) +/// .withHealthCheck(KPipeConsumer::isRunning) /// .withShutdownHook(true) /// .withShutdownTimeout(5000) /// .build(); @@ -43,8 +43,8 @@ /// runner.awaitShutdown(); /// ``` /// -/// @param the type of consumer being managed, must extend FunctionalConsumer -public class ConsumerRunner> implements AutoCloseable { +/// @param the type of consumer being managed, must extend KPipeConsumer +public class ConsumerRunner> implements AutoCloseable { private static final Logger LOGGER = System.getLogger(ConsumerRunner.class.getName()); @@ -84,7 +84,7 @@ private ConsumerRunner(final Builder builder) { /// @param the type of consumer to run /// @param consumer the consumer instance to manage /// @return a new builder instance - public static > Builder builder(final T consumer) { + public static > Builder builder(final T consumer) { return new Builder<>(consumer); } @@ -184,7 +184,7 @@ public void close() { shutdownGracefully(shutdownTimeoutMs); } - /// Performs a graceful shutdown of a FunctionalConsumer by handling in-flight messages. + /// Performs a graceful shutdown of a KPipeConsumer by handling in-flight messages. /// /// This method follows these steps to ensure a clean shutdown: /// @@ -199,13 +199,13 @@ public void close() { /// The method calls `getInFlightMessageCount()` twice - first to determine if any /// messages need processing, and then after the wait period to confirm the final state. /// - /// @param the type of consumer extending FunctionalConsumer + /// @param the type of consumer extending KPipeConsumer /// @param consumer the consumer to shut down /// @param timeoutMs maximum time in milliseconds to wait for in-flight messages to complete /// @return `true` if all in-flight messages were successfully processed before shutdown, /// `false` if the timeout was reached with messages still in-flight /// @throws RuntimeException if an exception occurs during the shutdown process - public static > boolean performGracefulConsumerShutdown( + public static > boolean performGracefulConsumerShutdown( final T consumer, final long timeoutMs ) { @@ -318,7 +318,7 @@ private void stopMetricsThread() { /// Builder for creating ConsumerRunner instances with custom configuration. /// /// @param the type of consumer being managed - public static class Builder> { + public static class Builder> { private final T consumer; private Consumer startAction; diff --git a/lib/src/main/java/org/kpipe/consumer/FunctionalConsumer.java b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java similarity index 97% rename from lib/src/main/java/org/kpipe/consumer/FunctionalConsumer.java rename to lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java index 9d1b516..d6c7d85 100644 --- a/lib/src/main/java/org/kpipe/consumer/FunctionalConsumer.java +++ b/lib/src/main/java/org/kpipe/consumer/KPipeConsumer.java @@ -49,7 +49,7 @@ /// Example usage: /// /// ```java -/// final var consumer = FunctionalConsumer.builder() +/// final var consumer = KPipeConsumer.builder() /// .withProperties(kafkaProps) /// .withTopic("example-topic") /// .withProcessor(value -> processValue(value)) @@ -68,9 +68,9 @@ /// /// @param the type of keys in the consumed records /// @param the type of values in the consumed records -public class FunctionalConsumer implements AutoCloseable { +public class KPipeConsumer implements AutoCloseable { - private static final Logger LOGGER = System.getLogger(FunctionalConsumer.class.getName()); + private static final Logger LOGGER = System.getLogger(KPipeConsumer.class.getName()); // Metric key constants private static final String METRIC_MESSAGES_RECEIVED = "messagesReceived"; @@ -109,7 +109,7 @@ public class FunctionalConsumer implements AutoCloseable { /// @param retryCount the number of retry attempts made public record ProcessingError(ConsumerRecord record, Exception exception, int retryCount) {} - /// Creates a new builder for constructing {@link FunctionalConsumer} instances. + /// Creates a new builder for constructing {@link KPipeConsumer} instances. /// /// @param the type of keys in the consumed records /// @param the type of values in the consumed records @@ -118,7 +118,7 @@ public static Builder builder() { return new Builder<>(); } - /// Builder for creating and configuring {@link FunctionalConsumer} instances. + /// Builder for creating and configuring {@link KPipeConsumer} instances. /// /// @param the type of keys in the consumed records /// @param the type of values in the consumed records @@ -304,12 +304,10 @@ public Builder withConsumer(final Supplier> provider) { return this; } - /// Builds a new FunctionalConsumer with the configured settings. + /// Builds a new KPipeConsumer with the configured settings. /// - /// @return a new FunctionalConsumer instance - /// @throws IllegalArgumentException if any required parameters are invalid - /// @throws NullPointerException if any required parameters are null - public FunctionalConsumer build() { + /// @return a new KPipeConsumer instance + public KPipeConsumer build() { Objects.requireNonNull(kafkaProps, "Kafka properties must be provided"); Objects.requireNonNull(topic, "Topic must be provided"); Objects.requireNonNull(processor, "Processor function must be provided"); @@ -326,14 +324,14 @@ public FunctionalConsumer build() { kafkaProps.setProperty("enable.auto.commit", "false"); } - return new FunctionalConsumer<>(this); + return new KPipeConsumer<>(this); } } - /// Creates a new FunctionalConsumer using the provided builder. + /// Creates a new KPipeConsumer using the provided builder. /// /// @param builder the builder containing the consumer configuration - public FunctionalConsumer(final Builder builder) { + public KPipeConsumer(final Builder builder) { this.kafkaConsumer = builder.consumerProvider != null ? builder.consumerProvider.get() diff --git a/lib/src/main/java/org/kpipe/consumer/OffsetManager.java b/lib/src/main/java/org/kpipe/consumer/OffsetManager.java index 28b1417..1e4c079 100644 --- a/lib/src/main/java/org/kpipe/consumer/OffsetManager.java +++ b/lib/src/main/java/org/kpipe/consumer/OffsetManager.java @@ -97,7 +97,7 @@ private Builder(final Consumer consumer) { this.kafkaConsumer = Objects.requireNonNull(consumer, "Consumer cannot be null"); } - /// Shared command queue for the FunctionalConsumer and OffsetManager. + /// Shared command queue for the KPipeConsumer and OffsetManager. /// /// @param commandQueue The command queue to use /// @return This builder instance diff --git a/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java b/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java index 410f67f..af88ddd 100644 --- a/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java +++ b/lib/src/test/java/org/kpipe/consumer/ConsumerRunnerTest.java @@ -18,7 +18,7 @@ class ConsumerRunnerTest { @Mock - private FunctionalConsumer mockConsumer; + private KPipeConsumer mockConsumer; @Mock private MessageTracker mockTracker; @@ -26,7 +26,7 @@ class ConsumerRunnerTest { @Mock private MetricsReporter mockReporter; - private ConsumerRunner> runner; + private ConsumerRunner> runner; @Test void shouldStartConsumer() { @@ -35,7 +35,7 @@ void shouldStartConsumer() { runner = ConsumerRunner .builder(mockConsumer) - .withHealthCheck(FunctionalConsumer::isRunning) // Use the isRunning method in health check + .withHealthCheck(KPipeConsumer::isRunning) // Use the isRunning method in health check .build(); // Act @@ -127,7 +127,7 @@ void shouldTimeoutWhenInFlightMessagesRemain() { void shouldUseCustomGracefulShutdown() { // Arrange final var customShutdownCalled = new AtomicBoolean(false); - final BiFunction, Long, Boolean> customShutdown = (consumer, timeout) -> { + final BiFunction, Long, Boolean> customShutdown = (consumer, timeout) -> { customShutdownCalled.set(true); return true; }; @@ -231,10 +231,7 @@ void shouldApplyCustomShutdownTimeout() { // Arrange final var customTimeout = 5000L; final var timeoutCaptured = new AtomicBoolean(false); - final BiFunction, Long, Boolean> timeoutCapturingShutdown = ( - consumer, - timeout - ) -> { + final BiFunction, Long, Boolean> timeoutCapturingShutdown = (consumer, timeout) -> { timeoutCaptured.set(timeout == customTimeout); return true; }; @@ -353,7 +350,7 @@ void performGracefulConsumerShutdownShouldHandleNullTracker() { void shouldUseCustomStartAction() { // Arrange final var customStartActionCalled = new AtomicBoolean(false); - final Consumer> customStartAction = consumer -> { + final Consumer> customStartAction = consumer -> { customStartActionCalled.set(true); consumer.start(); }; diff --git a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java similarity index 93% rename from lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java rename to lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java index 4d40c35..233ea20 100644 --- a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerMockingTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerMockingTest.java @@ -22,7 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class FunctionalConsumerMockingTest { +class KPipeConsumerMockingTest { private static final int PARTITION = 0; private static final String TOPIC = "test-topic"; @@ -35,7 +35,7 @@ class FunctionalConsumerMockingTest { private KafkaConsumer mockConsumer; @Mock - private Consumer> errorHandler; + private Consumer> errorHandler; @Mock private OffsetManager offsetManager; @@ -44,7 +44,7 @@ class FunctionalConsumerMockingTest { private ArgumentCaptor> topicCaptor; @Captor - private ArgumentCaptor> errorCaptor; + private ArgumentCaptor> errorCaptor; @BeforeEach void setUp() { @@ -58,7 +58,7 @@ void setUp() { @Test void shouldSubscribeToTopic() { final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -84,7 +84,7 @@ void shouldProcessRecordsWithProcessor() throws Exception { final var commandQueue = new ConcurrentLinkedQueue(); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -133,7 +133,7 @@ void shouldHandleProcessorExceptions() throws Exception { final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "test-key", "test-value")); final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -159,7 +159,7 @@ void shouldHandleProcessorExceptions() throws Exception { void shouldCloseKafkaConsumerWhenClosed() { // Setup final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -200,7 +200,7 @@ final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(record); final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -252,7 +252,7 @@ final var record = new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value"); final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); // Create a consumer with no retries - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -288,7 +288,7 @@ void shouldPauseConsumerWhenPauseCalled() { when(mockConsumer.assignment()).thenReturn(partitions); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -317,7 +317,7 @@ void shouldResumeConsumerWhenResumeCalled() { when(mockConsumer.assignment()).thenReturn(partitions); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -352,7 +352,7 @@ void pauseAndResumeShouldBeIdempotent() { final var partitions = Set.of(new TopicPartition(TOPIC, PARTITION)); when(mockConsumer.assignment()).thenReturn(partitions); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -393,7 +393,7 @@ void shouldUpdateMetricsOnSuccessfulProcessing() throws Exception { var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); var records = new ConsumerRecords<>(Map.of(partition, recordsList)); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -439,7 +439,7 @@ void shouldUpdateMetricsOnProcessingError() throws Exception { final var partition = new TopicPartition(TOPIC, PARTITION); final var recordsList = List.of(new ConsumerRecord<>(TOPIC, PARTITION, 0L, "key", "value")); final var records = new ConsumerRecords<>(Map.of(partition, recordsList)); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -467,7 +467,7 @@ void shouldUpdateMetricsOnProcessingError() throws Exception { void shouldNotCollectMetricsWhenDisabled() { // Create consumer with disabled metrics try ( - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -484,13 +484,13 @@ void shouldNotCollectMetricsWhenDisabled() { void builderShouldRespectAllOptions() { // Setup final var pollTimeout = Duration.ofMillis(200); - final Consumer> errorHandler = error -> {}; + final Consumer> errorHandler = error -> {}; final var maxRetries = 3; final var retryBackoff = Duration.ofMillis(100); final var enableMetrics = true; // Create a consumer with all options - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -511,7 +511,7 @@ void builderShouldRespectAllOptions() { @Test void builderShouldThrowNullPointerExceptionWhenMissingRequiredFields() { - final var builder = FunctionalConsumer.builder(); + final var builder = KPipeConsumer.builder(); assertThrows(NullPointerException.class, builder::build); @@ -526,7 +526,7 @@ void builderShouldThrowNullPointerExceptionWhenMissingRequiredFields() { void shouldHandleEmptyRecordBatch() { // Setup final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -565,7 +565,7 @@ void shouldHandleNullValueInRecord() throws Exception { .when(processor) .apply(null); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -610,7 +610,7 @@ void shouldNotInterruptShutdownWithInFlightMessages() throws Exception { return value; }; - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, slowProcessor, @@ -637,7 +637,7 @@ void processCommandsShouldHandlePauseAndResume() { // Arrange final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -664,7 +664,7 @@ void processCommandsShouldHandlePauseAndResume() { void shouldTrackInFlightMessagesCorrectly() throws Exception { // Create a custom consumer with tracking methods final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -727,7 +727,7 @@ void shouldHandleInterruptedPollOperation() throws Exception { .thenThrow(new RuntimeException("Poll interrupted")) .thenThrow(new RuntimeException("Poll interrupted again")); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -762,7 +762,7 @@ void shouldHandleInterruptedPollOperation() throws Exception { @Test void stateShouldTransitionCorrectlyDuringLifecycle() throws InterruptedException { // Create consumer - final var functionalConsumer = FunctionalConsumer + final var functionalConsumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -825,7 +825,7 @@ void shouldMarkOffsetAsProcessedEvenWhenProcessingFails() throws Exception { .when(processor) .apply(anyString()); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, processor, @@ -870,7 +870,7 @@ void shouldIntegrateWithOffsetManager() { final var commandQueue = new ConcurrentLinkedQueue(); properties.put("enable.auto.commit", "false"); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, String::toUpperCase, @@ -919,7 +919,7 @@ void shouldCommitOffsetsViaCommandQueue() { final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, String::toUpperCase, @@ -955,7 +955,7 @@ void shouldHandleCommitFailure() { when(offsetManager.createRebalanceListener()).thenReturn(mock(ConsumerRebalanceListener.class)); final var commandQueue = new ConcurrentLinkedQueue(); - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, String::toUpperCase, @@ -1015,7 +1015,7 @@ void shouldProcessRecordsConcurrently() throws Exception { return value.toUpperCase(); }; - final var functionalConsumer = new TestableFunctionalConsumer<>( + final var functionalConsumer = new TestableKPipeConsumer<>( properties, TOPIC, concurrentProcessor, @@ -1048,25 +1048,25 @@ void shouldProcessRecordsConcurrently() throws Exception { assertTrue(maxConcurrent.get() > 1, "Records should be processed concurrently"); } - public static class TestableFunctionalConsumer extends FunctionalConsumer { + public static class TestableKPipeConsumer extends KPipeConsumer { private static final String METRIC_MESSAGES_RECEIVED = "messagesReceived"; private static final String METRIC_MESSAGES_PROCESSED = "messagesProcessed"; private static final String METRIC_PROCESSING_ERRORS = "processingErrors"; - public TestableFunctionalConsumer( + public TestableKPipeConsumer( final Properties props, final String topic, final Function processor, final KafkaConsumer mockConsumer, final int maxRetries, final Duration retryBackoff, - final Consumer> errorHandler, + final Consumer> errorHandler, final Queue mockCommandQueue, final OffsetManager mockOffsetManager ) { super( - FunctionalConsumer + KPipeConsumer .builder() .withProperties(props) .withTopic(topic) diff --git a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerTest.java b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java similarity index 90% rename from lib/src/test/java/org/kpipe/consumer/FunctionalConsumerTest.java rename to lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java index 7728cc3..ea1120b 100644 --- a/lib/src/test/java/org/kpipe/consumer/FunctionalConsumerTest.java +++ b/lib/src/test/java/org/kpipe/consumer/KPipeConsumerTest.java @@ -25,7 +25,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -class FunctionalConsumerTest { +class KPipeConsumerTest { private static final String TOPIC = "test-topic"; private static final Duration POLL_TIMEOUT = Duration.ofMillis(100); @@ -45,8 +45,8 @@ void setUp() { properties.put("enable.auto.commit", "true"); } - private FunctionalConsumer createConsumer(Function processor) { - return FunctionalConsumer + private KPipeConsumer createConsumer(Function processor) { + return KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -63,7 +63,7 @@ void constructorWithValidParametersShouldNotThrowException() { // Arrange & Act & Assert assertDoesNotThrow(() -> createConsumer(mockProcessor)); assertDoesNotThrow(() -> - FunctionalConsumer + KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -76,11 +76,11 @@ void constructorWithValidParametersShouldNotThrowException() { @Test void constructorWithNullParametersShouldThrowNullPointerException() { // Arrange & Act & Assert - assertThrows(NullPointerException.class, () -> FunctionalConsumer.builder().build()); + assertThrows(NullPointerException.class, () -> KPipeConsumer.builder().build()); assertThrows( NullPointerException.class, () -> - FunctionalConsumer + KPipeConsumer .builder() .withProperties(null) .withTopic(TOPIC) @@ -90,7 +90,7 @@ void constructorWithNullParametersShouldThrowNullPointerException() { assertThrows( NullPointerException.class, () -> - FunctionalConsumer + KPipeConsumer .builder() .withProperties(properties) .withTopic(null) @@ -100,12 +100,7 @@ void constructorWithNullParametersShouldThrowNullPointerException() { assertThrows( NullPointerException.class, () -> - FunctionalConsumer - .builder() - .withProperties(properties) - .withTopic(TOPIC) - .withProcessor(null) - .build() + KPipeConsumer.builder().withProperties(properties).withTopic(TOPIC).withProcessor(null).build() ); } @@ -163,7 +158,7 @@ void processorWithRetryEnabledShouldRetryFailedMessages() throws InterruptedExce if (attempts.getAndIncrement() == 0) throw new RuntimeException("First attempt failure"); return value.toUpperCase(); }; - var consumer = FunctionalConsumer + var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -192,8 +187,8 @@ void processorWithMaxRetriesExceededShouldCallErrorHandler() throws InterruptedE final Function failingProcessor = value -> { throw new RuntimeException("Always failing"); }; - Consumer> errorHandler = mock(Consumer.class); - var consumer = FunctionalConsumer + Consumer> errorHandler = mock(Consumer.class); + var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -209,7 +204,7 @@ final var record = createRecord(0, "key", "hello"); Thread.sleep(100); // Assert - final var errorCaptor = ArgumentCaptor.forClass(FunctionalConsumer.ProcessingError.class); + final var errorCaptor = ArgumentCaptor.forClass(KPipeConsumer.ProcessingError.class); verify(errorHandler).accept(errorCaptor.capture()); final var error = errorCaptor.getValue(); assertEquals(record, error.record()); @@ -231,7 +226,7 @@ void metricsTrackingWithFailuresAndRetriesShouldIncrementCorrectly() throws Inte if (count % 3 != 0) throw new RuntimeException("Intermittent failure #" + count); return value.toUpperCase(); }; - var consumer = FunctionalConsumer + var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -260,7 +255,7 @@ var record = createRecord(i, "key" + i, "value" + i); void pauseShouldChangeStateAndEnqueuePauseCommand() { // Arrange final var commandQueue = new ConcurrentLinkedQueue(); - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -280,7 +275,7 @@ void pauseShouldChangeStateAndEnqueuePauseCommand() { void resumeShouldChangeStateAndEnqueueResumeCommand() { // Arrange final var commandQueue = new ConcurrentLinkedQueue(); - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -301,7 +296,7 @@ void resumeShouldChangeStateAndEnqueueResumeCommand() { void closeShouldEnqueueCloseCommand() { // Arrange final var commandQueue = new ConcurrentLinkedQueue(); - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -334,7 +329,7 @@ void customMessageSinkShouldReceiveProcessedMessages() { // Arrange @SuppressWarnings("unchecked") final var sink = mock(MessageSink.class); - var consumer = FunctionalConsumer + var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -354,7 +349,7 @@ var record = createRecord(1, "k", "v"); @Test void metricsShouldBeEmptyWhenDisabled() { // Arrange - final var consumer = FunctionalConsumer + final var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC) @@ -375,7 +370,7 @@ final var record = createRecord(1, "k", "v"); void sequentialProcessingShouldProcessInOrder() { // Arrange final var processed = new ArrayList<>(); - var consumer = FunctionalConsumer + var consumer = KPipeConsumer .builder() .withProperties(properties) .withTopic(TOPIC)