Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 117 additions & 75 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ retries, built-in metrics, and support for both parallel and sequential processi
- Message processors are **pure functions** (e.g., `UnaryOperator<Map<String, Object>>` for JSON or
`UnaryOperator<GenericRecord>` for Avro) that transform data without side effects.
- Build complex, high-performance pipelines that minimize serialization/deserialization cycles.
- **Declarative processing** lets you describe *what* to do, not *how* to do it.
- **Optimized Pipelines**: Deserialization happens once at the start, and serialization happens once at the end,
- **Single SerDe Cycle**: Deserialization happens once at the start, and serialization happens once at the end,
regardless of the number of transformations.
- Teams can **register their own operators** in a central registry via:

Expand All @@ -36,12 +35,10 @@ retries, built-in metrics, and support for both parallel and sequential processi
final var registry = new MessageProcessorRegistry("myApp");

// Register team-specific JSON operators
registry.registerJsonOperator("sanitizeData",
JsonMessageProcessor.removeFields("password", "ssn"));
registry.registerJsonOperator("sanitizeData", JsonMessageProcessor.removeFields("password", "ssn"));

// Create an optimized JSON pipeline from registered operators
final var pipeline = registry.jsonPipeline(
"sanitizeData", "addTimestamp");
final var pipeline = registry.jsonPipeline("sanitizeData", "addTimestamp");

// Apply transformations with built-in error handling and retry logic
final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
Expand Down Expand Up @@ -116,6 +113,65 @@ retries, built-in metrics, and support for both parallel and sequential processi

---

## Architecture and Reliability

KPipe is designed to be a lightweight, high-performance alternative to existing Kafka consumer libraries, focusing on
modern Java 25+ features and predictable behavior.

### 1. The "Single SerDe Cycle" Strategy

Unlike traditional pipelines that often perform `byte[] -> Object -> byte[]` at every transformation step, KPipe
optimizes for throughput:

- **Single Deserialization**: Messages are deserialized **once** into a mutable representation (e.g., `Map` for JSON,
`GenericRecord` for Avro).
- **In-Place Transformations**: A chain of `UnaryOperator` functions is applied to the same object.
- **Single Serialization**: The final object is serialized back to `byte[]` only once before being sent to the sink.

This approach significantly reduces CPU overhead and GC pressure.

### 2. Virtual Threads

By default, KPipe uses **Java Virtual Threads** (Project Loom) to process messages. This provides a "thread-per-record"
model that excels at I/O-bound tasks (e.g., enrichment via REST/DB).

- **I/O Bound**: Use the default parallel mode with virtual threads for maximum concurrency.
- **Sequential**: Use `.withSequentialProcessing(true)` for stateful operations where strict FIFO ordering per
partition is required.

### 3. Reliable "At-Least-Once" Delivery

KPipe implements a **Lowest Pending Offset** strategy to ensure reliability even with parallel processing:

- **In-Flight Tracking**: Every record's offset is tracked in a `ConcurrentSkipListSet` per partition.
- **No-Gap Commits**: Even if message 102 finishes before 101, offset 102 will **not** be committed until 101 is
successfully processed.
- **Crash Recovery**: If the consumer crashes, it will resume from the last committed "safe" offset. While this may
result in some records being re-processed (standard "at-least-once" behavior), it guarantees no message is ever
skipped.

### 4. Error Handling & Retries

KPipe provides a robust, multi-layered error handling mechanism:

- **Built-in Retries**: Configure `.withRetry(maxRetries, backoff)` to automatically retry transient failures.
- **Dead Letter Handling**: Provide a `.withErrorHandler()` to redirect messages that fail after all retries to an
error topic or database.
- **Safe Pipelines**: Use `MessageProcessorRegistry.withErrorHandling()` to wrap individual processors with default
values or logging, preventing a single malformed message from blocking the partition.

### 5. Graceful Shutdown & Interrupt Handling

KPipe respects JVM signals and ensures timely shutdown without data loss:

- **Interrupt Awareness**: Interrupts trigger a coordinated shutdown sequence. They do **not** cause records to be
skipped.
- **Reliable Redelivery**: If a record's processing is interrupted (e.g., during retry backoff or transformation),
the offset is NOT marked as processed. This ensures it will be safely picked up by the next consumer instance,
guaranteeing "at-least-once" delivery even during shutdown.

---

## 📁 Project Structure

KPipe is organized into two main modules:
Expand Down Expand Up @@ -219,18 +275,22 @@ Monitor your consumer with built-in metrics:

```java
// Access consumer metrics
Map<String, Long> metrics = consumer.getMetrics();
System.out.println("Messages received: " + metrics.get("messagesReceived"));
System.out.println("Successfully processed: " + metrics.get("messagesProcessed"));
System.out.println("Processing errors: " + metrics.get("processingErrors"));
final var metrics = consumer.getMetrics();
final var log = System.getLogger("org.kpipe.metrics");
log.log(Level.INFO, "Messages received: " + metrics.get("messagesReceived"));
log.log(Level.INFO, "Successfully processed: " + metrics.get("messagesProcessed"));
log.log(Level.INFO, "Processing errors: " + metrics.get("processingErrors"));
```

Configure automatic metrics reporting:

```java
new App(config)
.withMetricsInterval(Duration.ofSeconds(30))
.start();
final var runner = ConsumerRunner.builder(consumer)
.withMetricsReporters(List.of(new ConsumerMetricsReporter(consumer::getMetrics, () -> uptimeMs, null)))
.withMetricsInterval(30_000)
.build();

runner.start();
```

---
Expand All @@ -239,17 +299,16 @@ Configure automatic metrics reporting:
The consumer supports graceful shutdown with in-flight message handling:

```java
final var log = System.getLogger("org.kpipe.app.Shutdown");

// Initiate graceful shutdown with 5-second timeout
boolean allProcessed = kafkaApp.shutdownGracefully(5000);
if (allProcessed) {
LOGGER.info("All messages processed successfully before shutdown");
} else {
LOGGER.warning("Shutdown completed with some messages still in flight");
}
boolean allProcessed = runner.shutdownGracefully(5000);
if (allProcessed) log.log(Level.INFO, "All messages processed successfully before shutdown");
else log.log(Level.WARNING, "Shutdown completed with some messages still in flight");

// Register as JVM shutdown hook
Runtime.getRuntime().addShutdownHook(
new Thread(() -> app.shutdownGracefully(5000))
new Thread(() -> runner.close())
);
```

Expand Down Expand Up @@ -325,17 +384,11 @@ public interface MessageSink<K, V> {
KPipe provides several built-in sinks:

```java
// Create a JSON console sink that logs messages at INFO level
final var jsonConsoleSink = new JsonConsoleSink<>(
System.getLogger("org.kpipe.sink.JsonConsoleSink"),
Level.INFO
);

// Create an Avro console sink that logs messages at INFO level
final var avroConsoleSink = new AvroConsoleSink<>(
System.getLogger("org.kpipe.sink.AvroConsoleSink"),
Level.INFO
);
// Create a JSON console sink
final var jsonConsoleSink = new JsonConsoleSink<>();

// Create an Avro console sink
final var avroConsoleSink = new AvroConsoleSink<>();

// Use a sink with a consumer
final var consumer = new KPipeConsumer.<String, byte[]>builder()
Expand All @@ -355,15 +408,15 @@ You can create custom sinks using lambda expressions:
MessageSink<String, byte[]> databaseSink = (record, processedValue) -> {
try {
// Parse the processed value
Map<String, Object> data = JsonMessageProcessor.parseJson().apply(processedValue);
final var data = JsonMessageProcessor.parseJson().apply(processedValue);

// Write to database
databaseService.insert(data);

// Log success
logger.info("Successfully wrote message to database: " + record.key());
log.log(Level.INFO, "Successfully wrote message to database: " + record.key());
} catch (Exception e) {
logger.error("Failed to write message to database", e);
log.log(Level.ERROR, "Failed to write message to database", e);
}
};

Expand All @@ -382,7 +435,7 @@ The `MessageSinkRegistry` provides a centralized repository for registering and
final var registry = new MessageSinkRegistry();

// Register sinks
registry.register("console", new JsonConsoleSink<>(logger, Level.INFO));
registry.register("console", new JsonConsoleSink<>());
registry.register("database", databaseSink);
registry.register("metrics", (record, value) -> metricsService.recordMessage(record.topic(), value.length));

Expand All @@ -401,16 +454,11 @@ The registry provides utilities for adding error handling to sinks:

```java
// Create a sink with error handling
final var safeSink = MessageSinkRegistry.withErrorHandling(
riskySink,
(record, value, error) -> logger.error("Error in sink: " + error.getMessage())
);

// Or use the registry's error handling
final var safePipeline = registry.<String, byte[]>pipelineWithErrorHandling(
"console", "database", "metrics",
(record, value, error) -> errorService.reportError(record.topic(), error)
);
final var safeSink = MessageSinkRegistry.withErrorHandling(riskySink);

// Register and use the wrapped sink
registry.register("safeDatabase", safeSink);
final var safePipeline = registry.<String, byte[]>pipeline("console", "safeDatabase", "metrics");
```

---
Expand Down Expand Up @@ -438,33 +486,27 @@ The `ConsumerRunner` supports extensive configuration options:

```java
// Create a consumer runner with advanced configuration
ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
final var runner = ConsumerRunner.builder(consumer)
// Configure metrics reporting
.withMetricsReporter(new ConsumerMetricsReporter(
consumer::getMetrics,
() -> System.currentTimeMillis() - startTime
.withMetricsReporters(List.of(
new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime, null)
))
.withMetricsInterval(30000) // Report metrics every 30 seconds

// Configure health checks
.withHealthCheck(c -> c.getState() == ConsumerState.RUNNING)

.withHealthCheck(KPipeConsumer::isRunning)
// Configure graceful shutdown
.withShutdownTimeout(10000) // 10 seconds timeout for shutdown
.withShutdownHook(true) // Register JVM shutdown hook

// Configure custom start action
.withStartAction(c -> {
logger.info("Starting consumer for topic: " + c.getTopic());
log.log(Level.INFO, "Starting consumer");
c.start();
})

// Configure custom graceful shutdown
.withGracefulShutdown((c, timeoutMs) -> {
logger.info("Initiating graceful shutdown with timeout: " + timeoutMs + "ms");
return c.shutdownGracefully(timeoutMs);
log.log(Level.INFO, "Initiating graceful shutdown with timeout: " + timeoutMs + "ms");
return ConsumerRunner.performGracefulConsumerShutdown(c, timeoutMs);
})

.build();
```

Expand Down Expand Up @@ -494,7 +536,7 @@ The `ConsumerRunner` integrates with metrics reporting:
// Add multiple metrics reporters
ConsumerRunner<KPipeConsumer<String, String>> runner = ConsumerRunner.builder(consumer)
.withMetricsReporters(List.of(
new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime),
new ConsumerMetricsReporter(consumer::getMetrics, () -> System.currentTimeMillis() - startTime, null),
new ProcessorMetricsReporter(registry)
))
.withMetricsInterval(60000) // Report every minute
Expand All @@ -521,26 +563,27 @@ Here's a concise example of a KPipe application:

```java
public class KPipeApp implements AutoCloseable {
private static final System.Logger LOGGER = System.getLogger(KPipeApp.class.getName());
private final ConsumerRunner<KPipeConsumer<byte[], byte[]>> runner;

static void main() {
// Load configuration from environment variables
final var config = AppConfig.fromEnv();

try (final var app = new MyKafkaApp(config)) {
try (final var app = new KPipeApp(config)) {
app.start();
app.awaitShutdown();
} catch (final Exception e) {
System.getLogger(MyKafkaApp.class.getName())
.log(System.Logger.Level.ERROR, "Fatal error in application", e);
LOGGER.log(Level.ERROR, "Fatal error in application", e);
System.exit(1);
}
}

public MyKafkaApp(final AppConfig config) {
public KPipeApp(final AppConfig config) {
// Create processor and sink registries
final var processorRegistry = new MessageProcessorRegistry(config.appName());
final var sinkRegistry = new MessageSinkRegistry();
final var commandQueue = new ConcurrentLinkedQueue<ConsumerCommand>();

// Create the functional consumer
final var functionalConsumer = KPipeConsumer.<byte[], byte[]>builder()
Expand All @@ -549,11 +592,13 @@ public class KPipeApp implements AutoCloseable {
.withTopic(config.topic())
.withProcessor(processorRegistry.jsonPipeline(
"addSource", "markProcessed", "addTimestamp"))
.withMessageSink(sinkRegistry.<byte[], byte[]>pipeline("logging"))
.withMessageSink(sinkRegistry.<byte[], byte[]>pipeline("jsonLogging"))
.withCommandQueue(commandQueue)
.withOffsetManagerProvider(consumer ->
OffsetManager.builder(consumer)
.withCommitInterval(Duration.ofSeconds(30))
.build())
OffsetManager.builder(consumer)
.withCommandQueue(commandQueue)
.withCommitInterval(Duration.ofSeconds(30))
.build())
.withMetrics(true)
.build();

Expand Down Expand Up @@ -584,12 +629,9 @@ public class KPipeApp implements AutoCloseable {
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_CONSUMER_GROUP=my-group
export KAFKA_TOPIC=json-events

# Run the application
./gradlew run

# Test with a sample message
echo '{"message":"Hello from KPipe!"}' | kcat -P -b localhost:9092 -t json-events
export KAFKA_PROCESSORS=parseJson,validateSchema,addTimestamp
export METRICS_INTERVAL_MS=30000
export SHUTDOWN_TIMEOUT_MS=5000
```

---
Expand Down Expand Up @@ -774,9 +816,9 @@ multiple records concurrently.

- **Execution Order**: Messages start in order but may finish out of order (e.g., a small message finishing before a
large one).
- **No-Gap Offset Management**: Even with parallel processing, KPipe's `OffsetManager` uses a **Lowest Pending Offset**
- **At-Least-Once Delivery**: Even with parallel processing, KPipe's `OffsetManager` uses a **Lowest Pending Offset**
strategy. It will never commit a higher offset until all lower offsets in that partition are successfully processed.
This ensures **at-least-once delivery** and prevents "holes" in your data if the application crashes.
This ensures **no gaps** in your committed data.

### 3. Key-Level Ordering for Critical Systems (e.g., Payments)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
///
/// ### Design Integrity:
/// - Both frameworks start from the beginning of the topic (`earliest`) for each iteration.
/// - Both process exactly **1,000 messages** per iteration.
/// - Both process exactly **10,000 messages** per iteration.
/// - KPipe uses Loom; Confluent uses a max concurrency of **100**.
///
/// ### Running the Benchmark:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ void start() {
@TearDown(Level.Invocation)
public void tearDown() {
if (processor != null) processor.closeDontDrainFirst(PC_CLOSE_TIMEOUT);
if (kafkaConsumer != null) kafkaConsumer.close(PC_CLOSE_TIMEOUT);
if (kafkaConsumer != null) kafkaConsumer.close();
}

/// @return invocation-scoped processed message counter
Expand Down
Loading
Loading