Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 25 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![Build Status](https://github.com/eschizoid/kpipe/actions/workflows/ci.yaml/badge.svg)](https://github.com/eschizoid/kpipe/actions/workflows/ci.yaml)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)

A **modern, functional, and high-performance Kafka consumer** built using **Java 24** features like **virtual threads**,
A **modern, functional, and high-performance Kafka consumer** built using **Java 25** features like **virtual threads**,
**composable message processors**, and **DslJson** for JSON processing. Features robust error handling, configurable
retries, built-in metrics, and support for both parallel and sequential processing. Ideal for large-scale systems.

Expand Down Expand Up @@ -44,7 +44,7 @@ retries, built-in metrics, and support for both parallel and sequential processi
"sanitizeData", "addTimestamp");

// Apply transformations with built-in error handling and retry logic
final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
.withProcessor(pipeline)
.withRetry(3, Duration.ofSeconds(1))
.build();
Expand All @@ -70,7 +70,7 @@ retries, built-in metrics, and support for both parallel and sequential processi
"addTimestamp_userSchema");

// Create a consumer with the optimized pipeline
final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("team-topic")
.withProcessor(avroPipeline)
Expand Down Expand Up @@ -127,7 +127,7 @@ The core library that provides the Kafka consumer functionality:
```
├── src/main/java/org/kpipe/
│ ├── consumer/ # Core consumer components
│ │ ├── FunctionalConsumer.java # Main functional consumer implementation
│ │ ├── KPipeConsumer.java # Main functional consumer implementation
│ │ ├── OffsetManager.java # Manages Kafka offsets for reliable processing
│ │ ├── MessageTracker.java # Tracks message processing state
│ │ ├── RebalanceListener.java # Handles Kafka consumer rebalancing
Expand Down Expand Up @@ -200,7 +200,7 @@ Extend the registry like this:
);

// Use the pipeline with a consumer
final var consumer = new FunctionalConsumer.<byte[], byte[]>builder()
final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
Expand Down Expand Up @@ -338,7 +338,7 @@ final var avroConsoleSink = new AvroConsoleSink<>(
);

// Use a sink with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
final var consumer = new KPipeConsumer.<String, byte[]>builder()
.withProperties(kafkaProps)
.withTopic("events")
.withProcessor(pipeline)
Expand Down Expand Up @@ -368,7 +368,7 @@ MessageSink<String, byte[]> databaseSink = (record, processedValue) -> {
};

// Use the custom sink with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
final var consumer = new KPipeConsumer.<String, byte[]>builder()
.withMessageSink(databaseSink)
.build();
```
Expand All @@ -390,7 +390,7 @@ registry.register("metrics", (record, value) -> metricsService.recordMessage(rec
final var sinkPipeline = registry.<String, byte[]>pipeline("console", "database", "metrics");

// Use the sink pipeline with a consumer
final var consumer = new FunctionalConsumer.<String, byte[]>builder()
final var consumer = new KPipeConsumer.<String, byte[]>builder()
.withMessageSink(sinkPipeline)
.build();
```
Expand Down Expand Up @@ -422,7 +422,7 @@ graceful shutdown:

```java
// Create a consumer runner with default settings
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
.build();

// Start the consumer
Expand All @@ -438,7 +438,7 @@ The `ConsumerRunner` supports extensive configuration options:

```java
// Create a consumer runner with advanced configuration
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
// Configure metrics reporting
.withMetricsReporter(new ConsumerMetricsReporter(
consumer::getMetrics,
Expand Down Expand Up @@ -492,7 +492,7 @@ The `ConsumerRunner` integrates with metrics reporting:

```java
// Add multiple metrics reporters
ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
.withMetricsReporters(List.of(
new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime),
new ProcessorMetricsReporter(registry)
Expand All @@ -506,7 +506,7 @@ ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.build
The `ConsumerRunner` implements `AutoCloseable` for use with try-with-resources:

```java
try (ConsumerRunner<FunctionalConsumer<String, String>> runner = ConsumerRunner.builder(consumer).build()) {
try (ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer).build()) {
runner.start();
// Application logic here
// Runner will be automatically closed when exiting the try block
Expand All @@ -521,7 +521,7 @@ Here's a concise example of a KPipe application:

```java
public class KPipeApp implements AutoCloseable {
private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;

static void main() {
// Load configuration from environment variables
Expand All @@ -543,7 +543,7 @@ public class KPipeApp implements AutoCloseable {
final var sinkRegistry = new MessageSinkRegistry();

// Create the functional consumer
final var functionalConsumer = FunctionalConsumer.<byte[], byte[]>builder()
final var functionalConsumer = KPipeConsumer.<byte[], byte[]>builder()
.withProperties(KafkaConsumerConfig.createConsumerConfig(
config.bootstrapServers(), config.consumerGroup()))
.withTopic(config.topic())
Expand Down Expand Up @@ -739,8 +739,8 @@ The library provides a built-in `when()` method for conditional processing:

### High-Performance Architecture

KPipe is designed to beat traditional Kafka consumers (and even heavy libraries like Confluent Parallel Consumer) by
leveraging modern Java features and aggressive optimizations:
KPipe is designed for high-throughput, low-overhead Kafka processing using modern Java features and pipeline
optimizations. Performance depends on workload shape (I/O vs CPU bound), partitioning, and message size.

- **Single SerDe Cycle**: Traditional pipelines often perform `byte[] -> Object -> byte[]` at every step. KPipe's
optimized pipelines deserialize once at the entry, apply any number of `UnaryOperator` transformations on the object,
Expand All @@ -749,23 +749,12 @@ leveraging modern Java features and aggressive optimizations:
`offset` parameter that allows skipping magic bytes and schema IDs without performing expensive `Arrays.copyOfRange`
operations.
- **Virtual Threads (Project Loom)**: Every Kafka record can be processed in its own virtual thread. This allows for
massive concurrency (thousands of concurrent "threads") with almost zero memory overhead and no complex thread pool
management.
- **DslJson Integration**: Uses one of the world's fastest JSON libraries for maximum throughput and minimum Garbage
Collection (GC) pressure.
massive concurrency with simpler coordination than large platform-thread pools.
- **DslJson Integration**: Uses a high-performance JSON library to reduce parsing overhead and GC pressure.

### Benchmarking

KPipe includes a dedicated `benchmarks` module using JMH (Java Microbenchmark Harness) to compare its performance
against manual implementations and other libraries like Confluent Parallel Consumer.

For detailed information on benchmark scenarios and how to run them, see the [Benchmarks README](benchmarks/README.md).

To run the benchmarks:

```bash
./gradlew :benchmarks:jmh
```
Latest parallel benchmark snapshot (see `benchmarks/README.md`) shows a throughput edge for KPipe in that scenario,
with a higher allocation footprint than Confluent Parallel Consumer. Treat these as scenario-specific results, not
universal guarantees.

---

Expand All @@ -775,13 +764,13 @@ KPipe provides configurable ordering guarantees to balance throughput and strict

### 1. Sequential Processing (Strict Ordering)

When `sequentialProcessing` is enabled (default in some configs), KPipe processes messages one by one in the order they
When `sequentialProcessing` is enabled, KPipe processes messages one by one in the order they
were received from Kafka. This ensures **strict FIFO (First-In-First-Out) ordering** per partition.

### 2. Parallel Processing (Virtual Threads)

When `sequentialProcessing` is `false`, KPipe leverages Java 24's Virtual Threads to process multiple records
concurrently.
When `sequentialProcessing` is `false` (the builder default), KPipe leverages Java 25 Virtual Threads to process
multiple records concurrently.

- **Execution Order**: Messages start in order but may finish out of order (e.g., a small message finishing before a
large one).
Expand Down Expand Up @@ -833,4 +822,4 @@ This Kafka consumer is:
- **Extensible**
- **Future-proof**

Use it to modernize your Kafka stack with **Java 24 elegance and performance**.
Use it to modernize your Kafka stack with **Java 25 elegance and performance**.
16 changes: 8 additions & 8 deletions app/avro/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.kpipe.config.AppConfig;
import org.kpipe.config.KafkaConsumerConfig;
import org.kpipe.consumer.ConsumerRunner;
import org.kpipe.consumer.FunctionalConsumer;
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.metrics.ConsumerMetricsReporter;
Expand All @@ -36,8 +36,8 @@ public class App implements AutoCloseable {
private static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://schema-registry:8081";

private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final FunctionalConsumer<byte[], byte[]> functionalConsumer;
private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;
private final KPipeConsumer<byte[], byte[]> functionalConsumer;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;
Expand Down Expand Up @@ -91,7 +91,7 @@ private static String resolveSchemaRegistryUrl() {
}

/// Creates the consumer runner with appropriate lifecycle hooks.
private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
private ConsumerRunner<KPipeConsumer<byte[], byte[]>> createConsumerRunner(
final AppConfig config,
final MetricsReporter consumerMetricsReporter,
final MetricsReporter processorMetricsReporter,
Expand All @@ -103,7 +103,7 @@ private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
c.start();
LOGGER.log(Level.INFO, "Kafka consumer application started successfully");
})
.withHealthCheck(FunctionalConsumer::isRunning)
.withHealthCheck(KPipeConsumer::isRunning)
.withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown)
.withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter, sinkMetricsReporter))
.withMetricsInterval(config.metricsInterval().toMillis())
Expand All @@ -118,7 +118,7 @@ private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
/// @param processorRegistry Map of processor functions
/// @param sinkRegistry Map of sink functions
/// @return A configured functional consumer
public static FunctionalConsumer<byte[], byte[]> createConsumer(
public static KPipeConsumer<byte[], byte[]> createConsumer(
final AppConfig config,
final MessageProcessorRegistry processorRegistry,
final MessageSinkRegistry sinkRegistry,
Expand All @@ -127,7 +127,7 @@ public static FunctionalConsumer<byte[], byte[]> createConsumer(
final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup());
final var commandQueue = new ConcurrentLinkedQueue<ConsumerCommand>();

return FunctionalConsumer
return KPipeConsumer
.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic(config.topic())
Expand All @@ -140,7 +140,7 @@ public static FunctionalConsumer<byte[], byte[]> createConsumer(
.build();
}

/// Creates an OffsetManager provider function that can be used with FunctionalConsumer builder
/// Creates an OffsetManager provider function that can be used with KPipeConsumer builder
///
/// @param commitInterval The interval at which to automatically commit offsets
/// @param commandQueue The command queue
Expand Down
22 changes: 11 additions & 11 deletions app/json/src/main/java/org/kpipe/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.kpipe.config.AppConfig;
import org.kpipe.config.KafkaConsumerConfig;
import org.kpipe.consumer.ConsumerRunner;
import org.kpipe.consumer.FunctionalConsumer;
import org.kpipe.consumer.KPipeConsumer;
import org.kpipe.consumer.OffsetManager;
import org.kpipe.consumer.enums.ConsumerCommand;
import org.kpipe.metrics.ConsumerMetricsReporter;
Expand All @@ -32,8 +32,8 @@ public class App implements AutoCloseable {
private static final Logger LOGGER = System.getLogger(App.class.getName());

private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
private final FunctionalConsumer<byte[], byte[]> functionalConsumer;
private final ConsumerRunner<FunctionalConsumer<byte[], byte[]>> runner;
private final KPipeConsumer<byte[], byte[]> kpipeConsumer;
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;
private final AtomicReference<Map<String, Long>> currentMetrics = new AtomicReference<>();
private final MessageProcessorRegistry processorRegistry;
private final MessageSinkRegistry sinkRegistry;
Expand All @@ -59,10 +59,10 @@ public App(final AppConfig config) {
processorRegistry = new MessageProcessorRegistry(config.appName(), MessageFormat.JSON);
sinkRegistry = new MessageSinkRegistry();

functionalConsumer = createConsumer(config, processorRegistry, sinkRegistry);
kpipeConsumer = createConsumer(config, processorRegistry, sinkRegistry);

final var consumerMetricsReporter = new ConsumerMetricsReporter(
functionalConsumer::getMetrics,
kpipeConsumer::getMetrics,
() -> System.currentTimeMillis() - startTime.get(),
null
);
Expand All @@ -73,18 +73,18 @@ public App(final AppConfig config) {
}

/// Creates the consumer runner with appropriate lifecycle hooks.
private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
private ConsumerRunner<KPipeConsumer<byte[], byte[]>> createConsumerRunner(
final AppConfig config,
final MetricsReporter consumerMetricsReporter,
final MetricsReporter processorMetricsReporter
) {
return ConsumerRunner
.builder(functionalConsumer)
.builder(kpipeConsumer)
.withStartAction(c -> {
c.start();
LOGGER.log(Level.INFO, "Kafka consumer application started successfully");
})
.withHealthCheck(FunctionalConsumer::isRunning)
.withHealthCheck(KPipeConsumer::isRunning)
.withGracefulShutdown(ConsumerRunner::performGracefulConsumerShutdown)
.withMetricsReporters(List.of(consumerMetricsReporter, processorMetricsReporter))
.withMetricsInterval(config.metricsInterval().toMillis())
Expand All @@ -99,15 +99,15 @@ private ConsumerRunner<FunctionalConsumer<byte[], byte[]>> createConsumerRunner(
/// @param processorRegistry Map of processor functions
/// @param sinkRegistry Map of sink functions
/// @return A configured functional consumer
public static FunctionalConsumer<byte[], byte[]> createConsumer(
public static KPipeConsumer<byte[], byte[]> createConsumer(
final AppConfig config,
final MessageProcessorRegistry processorRegistry,
final MessageSinkRegistry sinkRegistry
) {
final var kafkaProps = KafkaConsumerConfig.createConsumerConfig(config.bootstrapServers(), config.consumerGroup());
final var commandQueue = new ConcurrentLinkedQueue<ConsumerCommand>();

return FunctionalConsumer
return KPipeConsumer
.<byte[], byte[]>builder()
.withProperties(kafkaProps)
.withTopic(config.topic())
Expand All @@ -120,7 +120,7 @@ public static FunctionalConsumer<byte[], byte[]> createConsumer(
.build();
}

/// Creates an OffsetManager provider function that can be used with FunctionalConsumer builder
/// Creates an OffsetManager provider function that can be used with KPipeConsumer builder
///
/// @param commitInterval The interval at which to automatically commit offsets
/// @param commandQueue The command queue
Expand Down
Loading