diff --git a/benchmarks/README.md b/benchmarks/README.md index b3bdb19..85a7954 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -66,7 +66,7 @@ JMH parameters can be configured in `benchmarks/build.gradle.kts` or passed via ## Latest Results (Snapshot) -Run date: `2026-03-08` +Run date: `2026-03-10` ### 1. Avro Pipeline: The "Zero-Copy" Advantage @@ -101,14 +101,14 @@ abstraction without a performance penalty. This benchmark compares KPipe's "thread-per-record" model using Java 24 Virtual Threads against the industry-standard Confluent Parallel Consumer. -| Benchmark | Mode | Cnt | Score | Error | Units | -|-----------------------------------------------------------|--------:|-----:|----------:|------------:|--------:| -| `ParallelProcessingBenchmark.confluentParallelProcessing` | `thrpt` | `16` | `329.594` | `+/- 0.757` | `ops/s` | -| `ParallelProcessingBenchmark.kpipeParallelProcessing` | `thrpt` | `16` | `331.248` | `+/- 0.774` | `ops/s` | +| Benchmark | Mode | Cnt | Score | Error | Units | +|-----------------------------------------------------------|--------:|-----:|------------:|-------------:|--------:| +| `ParallelProcessingBenchmark.confluentParallelProcessing` | `thrpt` | `16` | `3,235.415` | `+/- 14.876` | `ops/s` | +| `ParallelProcessingBenchmark.kpipeParallelProcessing` | `thrpt` | `16` | `3,306.732` | `+/- 3.368` | `ops/s` | -**Observation**: KPipe achieves **performance parity** with the Confluent Parallel Consumer while maintaining a -significantly simpler programming model. We reach these numbers using standard Java 24 Virtual Threads, avoiding the -complexity of managed thread pools or proprietary scheduling logic. +**Observation**: With `10,000` messages per invocation and `8` partitions, this run shows a measurable throughput edge +for KPipe (**~2.2%** over Confluent). At the same time, Confluent shows lower allocation per operation in this profile ( +`275.078 B/op` vs `1457.324 B/op`), so this is a throughput-vs-allocation tradeoff rather than a one-dimensional win. ## Understanding Results @@ -122,11 +122,11 @@ Based on the latest snapshot results, we can derive the following throughput exp when Kafka I/O is excluded. - **JSON (In-Memory)**: Up to **~405,000 records/s**. JSON processing is significantly more CPU-intensive than Avro due to text parsing. -- **End-to-End Parallel Processing**: **~331,000 messages/s**. This is the most realistic metric as it includes a real - Kafka broker (embedded), network polling, and Virtual Thread scheduling. +- **End-to-End Parallel Processing**: **~32.3M to ~33.1M messages/s**. For this run, use `score * 10,000` + because `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`. -> **Note**: The `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(1000)`, so the reported `331.248 ops/s` is -> normalized to reflect **331,248 messages per second**. +> **Note**: The `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`. For this benchmark, +> derive message rate as `ops/s * 10,000`. Key performance indicators to watch for: @@ -141,12 +141,85 @@ Key performance indicators to watch for: - **Parallel timing fairness**: both `kpipeParallelProcessing` and `confluentParallelProcessing` start their processing loops inside benchmark methods (not in setup), so measured time includes comparable startup-to-completion behavior for each invocation. -- **Parallel throughput normalization**: `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(1000)`, so its +- **Parallel throughput normalization**: `ParallelProcessingBenchmark` uses `@OperationsPerInvocation(10000)`, so its reported throughput is normalized per processed message rather than per full benchmark invocation. - **Logging noise control**: KPipe parallel benchmark uses a no-op sink in benchmark runs to avoid console I/O from distorting throughput numbers. +- **CPU efficiency (Linux only)**: compare CPI and related normalized counters from `perfnorm` for + `kpipeParallelProcessing` vs `confluentParallelProcessing`. +- **Platform caveat for CPI**: macOS runs can still compare throughput and GC behavior, but CPI should be + collected/reported only from Linux perf-enabled runs. ## Requirements - **Java 24+**: Required for Virtual Threads (Project Loom). - **Gradle**: Used to compile and execute the benchmark harness. + +### CPU/CPI Profiling For Parallel Benchmark + +For KPipe vs Confluent parallel processing, keep benchmark target fixed and enable a profiler: + +```bash +# Linux: collect normalized hardware counters (includes CPI) +./gradlew :benchmarks:jmh \ + -Pjmh.includes='ParallelProcessingBenchmark' \ + -Pjmh.profilers='perfnorm' \ + -Pjmh.resultFormat=TEXT + +# macOS: CPI is not available via perf counters in JMH; use GC/CPU-adjacent signal instead +./gradlew :benchmarks:jmh \ + -Pjmh.includes='ParallelProcessingBenchmark' \ + -Pjmh.profilers='gc' \ + -Pjmh.resultFormat=TEXT +``` + +You can also use the helper script: + +```bash +# Linux (CPI mode) +PROFILE_MODE=cpi INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh + +# macOS (falls back from cpi -> gc with a warning) +PROFILE_MODE=cpi INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh + +# Heap/allocation view (portable) +PROFILE_MODE=heap INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh + +# Thread/runtime view (HotSpot) +PROFILE_MODE=threads INCLUDES='ParallelProcessingBenchmark' ./scripts/run-benchmarks.sh +``` + +Supported `PROFILE_MODE` values in `scripts/run-benchmarks.sh`: + +- `none`: no JMH profiler +- `gc`: allocation and GC counters (`gc`) +- `heap`: allocation/GC plus HotSpot GC internals (`gc,hs_gc`) +- `threads`: HotSpot thread/runtime signal (`hs_thr,hs_rt`) +- `cpi`: Linux `perfnorm` (falls back to `gc` on macOS) + +Interpretation guidance for KPipe vs Confluent: + +- Throughput (`ops/s`) remains the primary metric. +- On Linux, `perfnorm` adds normalized counters; compare CPI (`cycles`/`instructions`) between both benchmarks. +- Lower CPI at similar throughput usually indicates better instruction-path efficiency. +- On macOS, use throughput plus GC metrics; do not claim CPI without Linux perf counters. + +### Parallel Comparison Graph + +The latest visual comparison for KPipe vs Confluent parallel processing is at: + +- `benchmarks/graphs/parallel_processing_gc_comparison.svg` + +![KPipe vs Confluent Parallel Benchmark](graphs/parallel_processing_gc_comparison.svg) + +To regenerate source benchmark results before producing/refreshing the graph: + +```bash +./gradlew :benchmarks:jmh \ + -Pjmh.includes='ParallelProcessingBenchmark' \ + -Pjmh.profilers='gc' \ + -Pjmh.resultFormat=TEXT +``` + +> Note: JMH output is written to `benchmarks/build/results/jmh/results.`. +> For `TEXT`, the file is typically `benchmarks/build/results/jmh/results.text`. diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts index da675f9..18b1fe8 100644 --- a/benchmarks/build.gradle.kts +++ b/benchmarks/build.gradle.kts @@ -1,5 +1,3 @@ -import org.gradle.api.artifacts.VersionCatalogsExtension - plugins { java id("me.champeau.jmh") version "0.7.3" @@ -20,36 +18,43 @@ dependencies { // Logging for JMH forks implementation(libsCatalog.findLibrary("slf4jSimple").get()) - // Apache Kafka test-kit for embedded benchmark broker - val kafkaVersion = libsCatalog.findVersion("kafka").get().requiredVersion - implementation(libsCatalog.findLibrary("kafkaScala213").get()) - implementation("org.apache.kafka:kafka_2.13:$kafkaVersion:test") - implementation("org.apache.kafka:kafka-clients:$kafkaVersion:test") - implementation("org.apache.kafka:kafka-server-common:$kafkaVersion:test") + implementation(libsCatalog.findLibrary("kafkaTestCommonRuntime").get()) + implementation(libsCatalog.findLibrary("junitJupiterApi").get()) } jmh { - warmupIterations = providers.gradleProperty("jmh.warmupIterations").orNull?.toIntOrNull() ?: 3 - iterations = providers.gradleProperty("jmh.iterations").orNull?.toIntOrNull() ?: 5 - fork = providers.gradleProperty("jmh.fork").orNull?.toIntOrNull() ?: 1 - threads = providers.gradleProperty("jmh.threads").orNull?.toIntOrNull() ?: 1 - - providers - .gradleProperty("jmh.includes") - .orNull - ?.split(',') - ?.map(String::trim) - ?.filter(String::isNotEmpty) - ?.takeIf { it.isNotEmpty() } - ?.let { includes = it } + fun intProp(name: String, default: Int): Int { + return providers.gradleProperty(name).orNull?.toIntOrNull() ?: default + } + + fun stringProp(name: String, default: String): String { + return providers.gradleProperty(name).orNull?.trim()?.takeIf { it.isNotEmpty() } ?: default + } + + fun csvProp(name: String): List? { + return providers.gradleProperty(name).orNull?.split(',')?.map(String::trim)?.filter(String::isNotEmpty) + ?.takeIf { it.isNotEmpty() } + } + + warmupIterations = intProp("jmh.warmupIterations", 3) + iterations = intProp("jmh.iterations", 5) + fork = intProp("jmh.fork", 1) + threads = intProp("jmh.threads", 1) + + csvProp("jmh.includes")?.let { includes = it } + csvProp("jmh.profilers")?.let { profilers = it } + val jmhResultFormat = stringProp("jmh.resultFormat", "TEXT") val jmhTmpDir = layout.buildDirectory.dir("tmp/jmh").get().asFile.absolutePath + val jmhResultFile = layout.buildDirectory.file("results/jmh/results.${jmhResultFormat.lowercase()}").get().asFile benchmarkMode = listOf("thrpt") timeUnit = "s" failOnError = true forceGC = true + resultFormat = jmhResultFormat + resultsFile = jmhResultFile jvmArgs = listOf("-Djava.io.tmpdir=$jmhTmpDir") } diff --git a/benchmarks/graphs/parallel_processing_gc_comparison.svg b/benchmarks/graphs/parallel_processing_gc_comparison.svg new file mode 100644 index 0000000..2d5354c --- /dev/null +++ b/benchmarks/graphs/parallel_processing_gc_comparison.svg @@ -0,0 +1,123 @@ + + + + + + + + ParallelProcessingBenchmark: KPipe vs Confluent (Latest Baseline) + JMH Throughput + gc profiler (16 samples), TARGET_MESSAGES=10,000, partitions=8 + + + + + + Throughput (ops/s) + + + + Confluent + + KPipe + + + + + + + + + + + Confluent + KPipe + 3235.415 +/- 14.876 + 3306.732 +/- 3.368 + + Readout + Delta: +2.20% (KPipe over Confluent) + Gap: +71.317 ops/s + + + Cost Profile (allocation and GC behavior) + + + + gc.alloc.rate (MB/sec) + + Confluent + + KPipe + + + + + Confluent + KPipe + 0.729 + 3.826 + + + + gc.alloc.rate.norm (B/op) + + Confluent + + KPipe + + + + + Confluent + KPipe + 275.078 + 1457.324 + + + + GC activity (lower is better) + + Confluent + + KPipe + + + + + + gc.count + + + 80 + 43 + + + gc.time (ms) + + + 128 + 72 + + + + Summary + - Throughput: KPipe leads by +2.20% in this baseline. + - Allocation: Confluent is lower on MB/sec and B/op. + - GC behavior: KPipe shows fewer events and less total GC time here. + + diff --git a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java index 9075cc1..87b1ca6 100644 --- a/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java +++ b/benchmarks/src/jmh/java/org/kpipe/benchmarks/ParallelProcessingBenchmarkInfrastructure.java @@ -9,8 +9,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import kafka.testkit.KafkaClusterTestKit; -import kafka.testkit.TestKitNodes; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -20,6 +18,8 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.test.KafkaClusterTestKit; +import org.apache.kafka.common.test.TestKitNodes; import org.kpipe.consumer.FunctionalConsumer; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Scope; @@ -50,7 +50,10 @@ public final class ParallelProcessingBenchmarkInfrastructure { static final String TOPIC = "benchmark-topic"; /// Number of records seeded and awaited per invocation. - static final int TARGET_MESSAGES = 1000; + static final int TARGET_MESSAGES = 10_000; + + /// Topic partitions used to expose parallel scheduler behavior. + static final int TOPIC_PARTITIONS = 8; /// Safety timeout for per-invocation message completion checks. private static final long MAX_WAIT_NANOS = TimeUnit.SECONDS.toNanos(30); @@ -159,7 +162,7 @@ private static void createTopicIfMissing(final Properties clientProperties) { adminProps.putAll(clientProperties); try (final var admin = Admin.create(adminProps)) { - admin.createTopics(Collections.singletonList(new NewTopic(TOPIC, 1, (short) 1))).all().get(); + admin.createTopics(Collections.singletonList(new NewTopic(TOPIC, TOPIC_PARTITIONS, (short) 1))).all().get(); } catch (final Exception e) { throw new IllegalStateException("Unable to create benchmark topic", e); } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 24000d4..1b66edd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -13,6 +13,7 @@ parallelConsumer = "0.5.3.0" kafkaClients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" } kafkaScala213 = { module = "org.apache.kafka:kafka_2.13", version.ref = "kafka" } kafkaServerCommon = { module = "org.apache.kafka:kafka-server-common", version.ref = "kafka" } +kafkaTestCommonRuntime = { module = "org.apache.kafka:kafka-test-common-runtime", version.ref = "kafka" } avro = { module = "org.apache.avro:avro", version.ref = "avro" } dslJson = { module = "com.dslplatform:dsl-json", version.ref = "dslJson" } @@ -33,4 +34,3 @@ testcontainersJunitJupiter = { module = "org.testcontainers:testcontainers-junit testcontainersKafka = { module = "org.testcontainers:testcontainers-kafka", version.ref = "testcontainers" } parallelConsumerCore = { module = "io.confluent.parallelconsumer:parallel-consumer-core", version.ref = "parallelConsumer" } - diff --git a/scripts/run-benchmarks.sh b/scripts/run-benchmarks.sh index f78a524..a98136f 100755 --- a/scripts/run-benchmarks.sh +++ b/scripts/run-benchmarks.sh @@ -8,34 +8,88 @@ ITERATIONS="${ITERATIONS:-8}" FORK="${FORK:-2}" THREADS="${THREADS:-1}" INCLUDES="${INCLUDES:-}" +PROFILE_MODE="${PROFILE_MODE:-none}" # none | gc | heap | threads | cpi +RESULT_FORMAT="${RESULT_FORMAT:-TEXT}" LOG_FILE="benchmarks_execution.log" +OS_NAME="$(uname -s)" +PROFILERS="" + +case "$PROFILE_MODE" in + none) + ;; + gc) + PROFILERS="gc" + ;; + heap) + # heap-oriented signal: allocation + GC counters + HotSpot GC internals + PROFILERS="gc,hs_gc" + ;; + threads) + # thread/runtime-oriented signal from HotSpot + PROFILERS="hs_thr,hs_rt" + ;; + cpi) + if [ "$OS_NAME" = "Linux" ]; then + # perfnorm reports normalized HW counters including CPI (cycles/instruction) + PROFILERS="perfnorm" + else + echo "WARN: PROFILE_MODE=cpi requires Linux perf events. Falling back to gc profiler on $OS_NAME." + PROFILERS="gc" + fi + ;; + *) + echo "ERROR: Unsupported PROFILE_MODE='$PROFILE_MODE'." + echo "Supported modes: none, gc, heap, threads, cpi" + exit 1 + ;; +esac + echo "Starting KPipe Benchmarks..." echo "Results will be saved to $LOG_FILE" -echo "Run config: warmup=$WARMUP iterations=$ITERATIONS fork=$FORK threads=$THREADS includes=${INCLUDES:-}" - -# Create stable temp directory for JMH -mkdir -p benchmarks/build/tmp/jmh && -cd .. +echo "Run config: warmup=$WARMUP iterations=$ITERATIONS fork=$FORK threads=$THREADS includes=${INCLUDES:-} profile=$PROFILE_MODE profilers=${PROFILERS:-} resultFormat=$RESULT_FORMAT" -# Clean and run all benchmarks +# Clean and run benchmarks from repository root GRADLE_CMD=(./gradlew :benchmarks:clean :benchmarks:jmh \ -Pjmh.warmupIterations="$WARMUP" \ -Pjmh.iterations="$ITERATIONS" \ -Pjmh.fork="$FORK" \ - -Pjmh.threads="$THREADS") + -Pjmh.threads="$THREADS" \ + -Pjmh.resultFormat="$RESULT_FORMAT") if [ -n "$INCLUDES" ]; then GRADLE_CMD+=("-Pjmh.includes=$INCLUDES") fi +if [ -n "$PROFILERS" ]; then + GRADLE_CMD+=("-Pjmh.profilers=$PROFILERS") +fi + "${GRADLE_CMD[@]}" 2>&1 | tee "$LOG_FILE" echo "--------------------------------------------------" echo "Benchmark Summary:" -if [ -f "benchmarks/build/results/jmh/results.txt" ]; then - cat benchmarks/build/results/jmh/results.txt + +RESULT_EXT="$(echo "$RESULT_FORMAT" | tr '[:upper:]' '[:lower:]')" +CANDIDATES=( + "benchmarks/build/results/jmh/results.${RESULT_EXT}" + "benchmarks/build/results/jmh/results.text" + "benchmarks/build/results/jmh/results.txt" +) + +SUMMARY_FILE="" +for file in "${CANDIDATES[@]}"; do + if [ -f "$file" ]; then + SUMMARY_FILE="$file" + break + fi +done + +if [ -n "$SUMMARY_FILE" ]; then + cat "$SUMMARY_FILE" else - echo "Results file not found. Check $LOG_FILE for details." + echo "Results file not found. Checked: ${CANDIDATES[*]}. Check $LOG_FILE for details." fi + +echo "Profiler outputs (if enabled) are emitted next to JMH results under benchmarks/build/results/jmh/." echo "--------------------------------------------------"