Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 86 additions & 13 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ JMH parameters can be configured in `benchmarks/build.gradle.kts` or passed via

## Latest Results (Snapshot)

Run date: `2026-03-08`
Run date: `2026-03-10`

### 1. Avro Pipeline: The "Zero-Copy" Advantage

Expand Down Expand Up @@ -101,14 +101,14 @@ abstraction without a performance penalty.
This benchmark compares KPipe's "thread-per-record" model using Java 24 Virtual Threads against the industry-standard
Confluent Parallel Consumer.

| Benchmark | Mode | Cnt | Score | Error | Units |
|-----------------------------------------------------------|--------:|-----:|----------:|------------:|--------:|
| `ParallelProcessingBenchmark.confluentParallelProcessing` | `thrpt` | `16` | `329.594` | `+/- 0.757` | `ops/s` |
| `ParallelProcessingBenchmark.kpipeParallelProcessing` | `thrpt` | `16` | `331.248` | `+/- 0.774` | `ops/s` |
| Benchmark | Mode | Cnt | Score | Error | Units |
|-----------------------------------------------------------|--------:|-----:|------------:|-------------:|--------:|
| `ParallelProcessingBenchmark.confluentParallelProcessing` | `thrpt` | `16` | `3,235.415` | `+/- 14.876` | `ops/s` |
| `ParallelProcessingBenchmark.kpipeParallelProcessing` | `thrpt` | `16` | `3,306.732` | `+/- 3.368` | `ops/s` |

**Observation**: KPipe achieves **performance parity** with the Confluent Parallel Consumer while maintaining a
significantly simpler programming model. We reach these numbers using standard Java 24 Virtual Threads, avoiding the
complexity of managed thread pools or proprietary scheduling logic.
**Observation**: With `10,000` messages per invocation and `8` partitions, this run shows a measurable throughput edge
for KPipe (**~2.2%** over Confluent). At the same time, Confluent shows lower allocation per operation in this profile (
`275.078 B/op` vs `1457.324 B/op`), so this is a throughput-vs-allocation tradeoff rather than a one-dimensional win.

## Understanding Results

Expand All @@ -122,11 +122,11 @@ Based on the latest snapshot results, we can derive the following throughput exp
when Kafka I/O is excluded.
- **JSON (In-Memory)**: Up to **~405,000 records/s**. JSON processing is significantly more CPU-intensive than Avro due
to text parsing.
- **End-to-End Parallel Processing**: **~331,000 messages/s**. This is the most realistic metric as it includes a real
Kafka broker (embedded), network polling, and Virtual Thread scheduling.
- **End-to-End Parallel Processing**: **~32.3M to ~33.1M messages/s**. For this run, use `score * 10,000`
because `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`.

> **Note**: The `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(1000)`, so the reported `331.248 ops/s` is
> normalized to reflect **331,248 messages per second**.
> **Note**: The `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`. For this benchmark,
> derive message rate as `ops/s * 10,000`.

Key performance indicators to watch for:

Expand All @@ -141,12 +141,85 @@ Key performance indicators to watch for:
- **Parallel timing fairness**: both `kpipeParallelProcessing` and `confluentParallelProcessing` start
their processing loops inside benchmark methods (not in setup), so measured time includes comparable
startup-to-completion behavior for each invocation.
- **Parallel throughput normalization**: `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(1000)`, so its
- **Parallel throughput normalization**: `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`, so its
reported throughput is normalized per processed message rather than per full benchmark invocation.
- **Logging noise control**: KPipe parallel benchmark uses a no-op sink in benchmark runs to avoid console I/O from
distorting throughput numbers.
- **CPU efficiency (Linux only)**: compare CPI and related normalized counters from `perfnorm` for
`kpipeParallelProcessing` vs `confluentParallelProcessing`.
- **Platform caveat for CPI**: macOS runs can still compare throughput and GC behavior, but CPI should be
collected/reported only from Linux perf-enabled runs.

## Requirements

- **Java 24+**: Required for Virtual Threads (Project Loom).
- **Gradle**: Used to compile and execute the benchmark harness.

### CPU/CPI Profiling For Parallel Benchmark

For KPipe vs Confluent parallel processing, keep benchmark target fixed and enable a profiler:

```bash
# Linux: collect normalized hardware counters (includes CPI)
./gradlew :benchmarks:jmh \
-Pjmh.includes='ParallelProcessingBenchmark' \
-Pjmh.profilers='perfnorm' \
-Pjmh.resultFormat=TEXT

# macOS: CPI is not available via perf counters in JMH; use GC/CPU-adjacent signal instead
./gradlew :benchmarks:jmh \
-Pjmh.includes='ParallelProcessingBenchmark' \
-Pjmh.profilers='gc' \
-Pjmh.resultFormat=TEXT
```

You can also use the helper script:

```bash
# Linux (CPI mode)
PROFILE_MODE=cpi INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh

# macOS (falls back from cpi -> gc with a warning)
PROFILE_MODE=cpi INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh

# Heap/allocation view (portable)
PROFILE_MODE=heap INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh

# Thread/runtime view (HotSpot)
PROFILE_MODE=threads INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh
```

Supported `PROFILE_MODE` values in `scripts/run-benchmarks.sh`:

- `none`: no JMH profiler
- `gc`: allocation and GC counters (`gc`)
- `heap`: allocation/GC plus HotSpot GC internals (`gc,hs_gc`)
- `threads`: HotSpot thread/runtime signal (`hs_thr,hs_rt`)
- `cpi`: Linux `perfnorm` (falls back to `gc` on macOS)

Interpretation guidance for KPipe vs Confluent:

- Throughput (`ops/s`) remains the primary metric.
- On Linux, `perfnorm` adds normalized counters; compare CPI (`cycles`/`instructions`) between both benchmarks.
- Lower CPI at similar throughput usually indicates better instruction-path efficiency.
- On macOS, use throughput plus GC metrics; do not claim CPI without Linux perf counters.

### Parallel Comparison Graph

The latest visual comparison for KPipe vs Confluent parallel processing is at:

- `benchmarks/graphs/parallel_processing_gc_comparison.svg`

![KPipe vs Confluent Parallel Benchmark](graphs/parallel_processing_gc_comparison.svg)

To regenerate source benchmark results before producing/refreshing the graph:

```bash
./gradlew :benchmarks:jmh \
-Pjmh.includes='ParallelProcessingBenchmark' \
-Pjmh.profilers='gc' \
-Pjmh.resultFormat=TEXT
```

> Note: JMH output is written to `benchmarks/build/results/jmh/results.<resultFormat lowercase>`.
> For `TEXT`, the file is typically `benchmarks/build/results/jmh/results.text`.
47 changes: 26 additions & 21 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import org.gradle.api.artifacts.VersionCatalogsExtension

plugins {
java
id("me.champeau.jmh") version "0.7.3"
Expand All @@ -20,36 +18,43 @@ dependencies {
// Logging for JMH forks
implementation(libsCatalog.findLibrary("slf4jSimple").get())

// Apache Kafka test-kit for embedded benchmark broker
val kafkaVersion = libsCatalog.findVersion("kafka").get().requiredVersion
implementation(libsCatalog.findLibrary("kafkaScala213").get())
implementation("org.apache.kafka:kafka_2.13:$kafkaVersion:test")
implementation("org.apache.kafka:kafka-clients:$kafkaVersion:test")
implementation("org.apache.kafka:kafka-server-common:$kafkaVersion:test")
implementation(libsCatalog.findLibrary("kafkaTestCommonRuntime").get())

implementation(libsCatalog.findLibrary("junitJupiterApi").get())
}

jmh {
warmupIterations = providers.gradleProperty("jmh.warmupIterations").orNull?.toIntOrNull() ?: 3
iterations = providers.gradleProperty("jmh.iterations").orNull?.toIntOrNull() ?: 5
fork = providers.gradleProperty("jmh.fork").orNull?.toIntOrNull() ?: 1
threads = providers.gradleProperty("jmh.threads").orNull?.toIntOrNull() ?: 1

providers
.gradleProperty("jmh.includes")
.orNull
?.split(',')
?.map(String::trim)
?.filter(String::isNotEmpty)
?.takeIf { it.isNotEmpty() }
?.let { includes = it }
fun intProp(name: String, default: Int): Int {
return providers.gradleProperty(name).orNull?.toIntOrNull() ?: default
}

fun stringProp(name: String, default: String): String {
return providers.gradleProperty(name).orNull?.trim()?.takeIf { it.isNotEmpty() } ?: default
}

fun csvProp(name: String): List<String>? {
return providers.gradleProperty(name).orNull?.split(',')?.map(String::trim)?.filter(String::isNotEmpty)
?.takeIf { it.isNotEmpty() }
}

warmupIterations = intProp("jmh.warmupIterations", 3)
iterations = intProp("jmh.iterations", 5)
fork = intProp("jmh.fork", 1)
threads = intProp("jmh.threads", 1)

csvProp("jmh.includes")?.let { includes = it }
csvProp("jmh.profilers")?.let { profilers = it }

val jmhResultFormat = stringProp("jmh.resultFormat", "TEXT")
val jmhTmpDir = layout.buildDirectory.dir("tmp/jmh").get().asFile.absolutePath
val jmhResultFile = layout.buildDirectory.file("results/jmh/results.${jmhResultFormat.lowercase()}").get().asFile

benchmarkMode = listOf("thrpt")
timeUnit = "s"
failOnError = true
forceGC = true
resultFormat = jmhResultFormat
resultsFile = jmhResultFile

jvmArgs = listOf("-Djava.io.tmpdir=$jmhTmpDir")
}
Expand Down
123 changes: 123 additions & 0 deletions benchmarks/graphs/parallel_processing_gc_comparison.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -20,6 +18,8 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.kpipe.consumer.FunctionalConsumer;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
Expand Down Expand Up @@ -50,7 +50,10 @@ public final class ParallelProcessingBenchmarkInfrastructure {
static final String TOPIC = "benchmark-topic";

/// Number of records seeded and awaited per invocation.
static final int TARGET_MESSAGES = 1000;
static final int TARGET_MESSAGES = 10_000;

/// Topic partitions used to expose parallel scheduler behavior.
static final int TOPIC_PARTITIONS = 8;

/// Safety timeout for per-invocation message completion checks.
private static final long MAX_WAIT_NANOS = TimeUnit.SECONDS.toNanos(30);
Expand Down Expand Up @@ -159,7 +162,7 @@ private static void createTopicIfMissing(final Properties clientProperties) {
adminProps.putAll(clientProperties);

try (final var admin = Admin.create(adminProps)) {
admin.createTopics(Collections.singletonList(new NewTopic(TOPIC, 1, (short) 1))).all().get();
admin.createTopics(Collections.singletonList(new NewTopic(TOPIC, TOPIC_PARTITIONS, (short) 1))).all().get();
} catch (final Exception e) {
throw new IllegalStateException("Unable to create benchmark topic", e);
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ parallelConsumer = "0.5.3.0"
kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" }
kafkaScala213 = { module = "org.apache.kafka:kafka_2.13", version.ref = "kafka" }
kafkaServerCommon = { module = "org.apache.kafka:kafka-server-common", version.ref = "kafka" }
kafkaTestCommonRuntime = { module = "org.apache.kafka:kafka-test-common-runtime", version.ref = "kafka" }

avro = { module = "org.apache.avro:avro", version.ref = "avro" }
dslJson = { module = "com.dslplatform:dsl-json", version.ref = "dslJson" }
Expand All @@ -33,4 +34,3 @@ testcontainersJunitJupiter = { module = "org.testcontainers:testcontainers-junit
testcontainersKafka = { module = "org.testcontainers:testcontainers-kafka", version.ref = "testcontainers" }

parallelConsumerCore = { module = "io.confluent.parallelconsumer:parallel-consumer-core", version.ref = "parallelConsumer" }

Loading