diff --git a/.gitignore b/.gitignore index 05b37627bd..15cac247ed 100644 --- a/.gitignore +++ b/.gitignore @@ -17,8 +17,10 @@ filtered_rat.txt dev/dist apache-rat-*.jar venv +.venv dev/release/comet-rm/workdir spark/benchmarks .DS_Store comet-event-trace.json __pycache__ +output diff --git a/benchmarks/tpc/README.md b/benchmarks/tpc/README.md index 779ad1753a..5a9188eda2 100644 --- a/benchmarks/tpc/README.md +++ b/benchmarks/tpc/README.md @@ -26,6 +26,20 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C [Comet Benchmarking on EC2 Guide]: https://datafusion.apache.org/comet/contributor-guide/benchmarking_aws_ec2.html +## Setup + +TPC queries are bundled in `benchmarks/tpc/queries/` (derived from TPC-H/DS under the TPC Fair Use Policy). + +Create a virtual environment and install dependencies: + +```shell +cd benchmarks/tpc +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + + ## Usage All benchmarks are run via `run.py`: @@ -41,8 +55,10 @@ python3 run.py --engine --benchmark [options] | `--iterations` | Number of iterations (default: 1) | | `--output` | Output directory (default: `.`) | | `--query` | Run a single query number | -| `--no-restart` | Skip Spark master/worker restart | -| `--dry-run` | Print the spark-submit command without executing | +| `--no-restart` | Skip Spark master/worker restart | +| `--dry-run` | Print the spark-submit command without executing | +| `--profile` | Enable executor metrics profiling via Spark REST API | +| `--profile-interval` | Profiling poll interval in seconds (default: 2.0) | Available engines: `spark`, `comet`, `comet-iceberg`, `gluten` @@ -55,10 +71,9 @@ export SPARK_HOME=/opt/spark-3.5.3-bin-hadoop3/ export SPARK_MASTER=spark://yourhostname:7077 ``` -Set path to queries and data: +Set path to data (TPC queries are bundled in `benchmarks/tpc/queries/`): ```shell -export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ export TPCH_DATA=/mnt/bigdata/tpch/sf100/ ``` @@ -100,6 +115,21 @@ Generating charts: python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json ``` +## Profiling + +Use `--profile` to collect executor memory metrics from the Spark REST API during the benchmark run. +A background thread polls `/api/v1/applications/{appId}/executors` at a configurable interval and +writes the time-series data to a CSV file alongside the benchmark results. + +```shell +python3 run.py --engine comet --benchmark tpch --profile +python3 run.py --engine comet --benchmark tpch --profile --profile-interval 1.0 +``` + +The output CSV is written to `{output}/{name}-{benchmark}-metrics.csv` and contains per-executor +columns including `memoryUsed`, `maxMemory`, heap/off-heap storage metrics, and peak memory metrics. + + ## Engine Configuration Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides, @@ -135,9 +165,9 @@ $SPARK_HOME/bin/spark-submit \ --master $SPARK_MASTER \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 \ --conf spark.driver.memory=8G \ - --conf spark.executor.instances=1 \ + --conf spark.executor.instances=2 \ --conf spark.executor.cores=8 \ - --conf spark.cores.max=8 \ + --conf spark.cores.max=16 \ --conf spark.executor.memory=16g \ create-iceberg-tables.py \ --benchmark tpch \ @@ -166,7 +196,6 @@ export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar export ICEBERG_JAR=/path/to/iceberg-spark-runtime-3.5_2.12-1.8.1.jar export ICEBERG_WAREHOUSE=/mnt/bigdata/iceberg-warehouse -export TPCH_QUERIES=/mnt/bigdata/tpch/queries/ sudo ./drop-caches.sh python3 run.py --engine comet-iceberg --benchmark tpch ``` @@ -185,6 +214,164 @@ physical plan output. | `--catalog` | No | `local` | Iceberg catalog name | | `--database` | No | benchmark name | Database name for the tables | +## Running with Docker + +A Docker Compose setup is provided in `infra/docker/` for running benchmarks in an isolated +Spark standalone cluster. The Docker image supports both **Linux (amd64)** and **macOS (arm64)** +via architecture-agnostic Java symlinks created at build time. + +### Build the image + +The image must be built for the correct platform to match the native libraries in the +engine JARs (e.g. Comet bundles `libcomet.so` for a specific OS/arch). + +**Linux (amd64):** + +```shell +docker build -t comet-bench -f benchmarks/tpc/infra/docker/Dockerfile . +``` + +**macOS (Apple Silicon):** Engine JARs contain platform-specific native libraries. A +Comet JAR built on macOS includes `darwin/aarch64` libraries which won't work inside +Linux containers. You need a JAR with Linux native libraries. + +Build a Comet JAR with Linux native libraries using the provided build Dockerfile: + +```shell +mkdir -p output +docker build -t comet-builder \ + -f benchmarks/tpc/infra/docker/Dockerfile.build-comet . +docker run --rm -v $(pwd)/output:/output comet-builder +export COMET_JAR=$(pwd)/output/comet-spark-spark3.5_2.12-*.jar +``` + +Then build the benchmark image (the architecture is auto-detected): + +```shell +docker build -t comet-bench -f benchmarks/tpc/infra/docker/Dockerfile . +``` + +### Platform notes + +**macOS (Apple Silicon):** Docker Desktop is required. + +- **Memory:** Docker Desktop defaults to a small memory allocation (often 8 GB) which + is not enough for Spark benchmarks. Go to **Docker Desktop > Settings > Resources > + Memory** and increase it to at least 48 GB (each worker requests 16 GB for its executor + plus overhead, and the driver needs 8 GB). Without enough memory, executors will be + OOM-killed (exit code 137). +- **File Sharing:** You may need to add your data directory (e.g. `/opt`) to + **Docker Desktop > Settings > Resources > File Sharing** before mounting host volumes. + +**Linux (amd64):** No special configuration is needed. Docker uses cgroup memory limits +directly without a VM layer. + +The Docker image auto-detects the container architecture (amd64/arm64) and sets up +arch-agnostic Java symlinks. The compose file uses `BENCH_JAVA_HOME` (not `JAVA_HOME`) +to avoid inheriting the host's Java path into the container. + +### Start the cluster + +Set environment variables pointing to your host paths, then start the Spark master and +two workers: + +```shell +export DATA_DIR=/mnt/bigdata/tpch/sf100 +export RESULTS_DIR=/tmp/bench-results +export COMET_JAR=/opt/comet/comet-spark-spark3.5_2.12-0.10.0.jar + +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d +``` + +Set `COMET_JAR`, `GLUTEN_JAR`, or `ICEBERG_JAR` to the host path of the engine JAR you +want to use. Each JAR is mounted individually into the container, so you can easily switch +between versions by changing the path and restarting. + +### Run benchmarks + +Use `docker compose run --rm` to execute benchmarks. The `--rm` flag removes the +container when it exits, preventing port conflicts on subsequent runs. Pass +`--no-restart` since the cluster is already managed by Compose, and `--output /results` +so that output files land in the mounted results directory (alongside cgroup metrics +CSVs): + +```shell +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ + run --rm -p 4040:4040 bench \ + sh -c 'mkdir -p /tmp/spark-events && \ + python3 /opt/benchmarks/run.py \ + --engine comet --benchmark tpch --output /results --no-restart' +``` + +> **Note:** The `mkdir -p /tmp/spark-events` is needed because the common Spark +> config enables event logging. The bench container is ephemeral so this directory +> does not persist between runs. + +The `-p 4040:4040` flag exposes the Spark Application UI on the host. The following +UIs are available during a benchmark run: + +| UI | URL | +| -------------------- | ---------------------- | +| Spark Master | http://localhost:8080 | +| Worker 1 | http://localhost:8081 | +| Worker 2 | http://localhost:8082 | +| Spark Application | http://localhost:4040 | + +> **Note:** The Master UI links to the Application UI using the container's internal +> hostname, which is not reachable from the host. Use `http://localhost:4040` directly +> to access the Application UI. + +For Gluten (requires Java 8), you must restart the **entire cluster** with `JAVA_HOME` +set so that all services (master, workers, and bench) use Java 8: + +```shell +export BENCH_JAVA_HOME=/usr/lib/jvm/java-8-openjdk +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml down +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d + +docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \ + run --rm bench sh -c 'mkdir -p /tmp/spark-events && \ + python3 /opt/benchmarks/run.py \ + --engine gluten --benchmark tpch --output /results --no-restart' +``` + +> **Important:** Only passing `-e JAVA_HOME=...` to the `bench` container is not +> sufficient -- the workers also need Java 8 or Gluten will fail at runtime with +> `sun.misc.Unsafe` errors. Unset `BENCH_JAVA_HOME` (or switch it back to Java 17) +> and restart the cluster before running Comet or Spark benchmarks. + +### Memory limits and metrics + +Hard memory limits are enforced on all worker and bench containers. A metrics-collector +sidecar runs alongside each worker to collect cgroup metrics. Configure via environment +variables: `WORKER_MEM_LIMIT` (default: 32g per worker), `BENCH_MEM_LIMIT` (default: 10g), +`METRICS_INTERVAL` (default: 1 second). + +Raw cgroup metrics are continuously written to +`$RESULTS_DIR/container-metrics-spark-worker-{1,2}.csv`. These files are overwritten each +time the cluster restarts. + +When `--profile` is used, the profiler automatically snapshots the cgroup data for the +benchmark time window, producing per-engine files: + +- `{name}-{benchmark}-metrics.csv` -- JVM executor metrics +- `{name}-{benchmark}-container-metrics-spark-worker-1.csv` -- cgroup snapshot for worker 1 +- `{name}-{benchmark}-container-metrics-spark-worker-2.csv` -- cgroup snapshot for worker 2 + +This ensures each engine run has its own paired JVM + cgroup dataset even when multiple +engines are benchmarked on the same cluster. + +Use `visualize-metrics.py` to generate memory charts from these files: + +```shell +python3 visualize-metrics.py \ + --jvm-metrics /tmp/bench-results/comet-tpch-metrics.csv \ + --cgroup-metrics /tmp/bench-results/comet-tpch-container-metrics-spark-worker-1.csv \ + /tmp/bench-results/comet-tpch-container-metrics-spark-worker-2.csv \ + --output-dir /tmp/comet-charts --title "Comet TPC-H" +``` + + ### Comparing Parquet vs Iceberg performance Run both benchmarks and compare: diff --git a/benchmarks/tpc/infra/docker/Dockerfile b/benchmarks/tpc/infra/docker/Dockerfile new file mode 100644 index 0000000000..202ec66de6 --- /dev/null +++ b/benchmarks/tpc/infra/docker/Dockerfile @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Benchmark image for running TPC-H and TPC-DS benchmarks across engines +# (Spark, Comet, Gluten). +# +# Build (from repository root): +# docker build -t comet-bench -f benchmarks/tpc/infra/docker/Dockerfile . + +ARG SPARK_IMAGE=apache/spark:3.5.2-python3 +FROM ${SPARK_IMAGE} + +USER root + +# Install Java 8 (Gluten) and Java 17 (Comet) plus Python 3. +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + openjdk-8-jdk-headless \ + openjdk-17-jdk-headless \ + python3 python3-pip procps \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Default to Java 17 (override with JAVA_HOME at runtime for Gluten). +# Detect architecture (amd64 or arm64) so the image works on both Linux and macOS. +ARG TARGETARCH +RUN ln -s /usr/lib/jvm/java-17-openjdk-${TARGETARCH} /usr/lib/jvm/java-17-openjdk && \ + ln -s /usr/lib/jvm/java-8-openjdk-${TARGETARCH} /usr/lib/jvm/java-8-openjdk +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk + +# Copy the benchmark scripts into the image. +COPY benchmarks/tpc/run.py /opt/benchmarks/run.py +COPY benchmarks/tpc/tpcbench.py /opt/benchmarks/tpcbench.py +COPY benchmarks/tpc/profiling.py /opt/benchmarks/profiling.py +COPY benchmarks/tpc/engines /opt/benchmarks/engines +COPY benchmarks/tpc/queries /opt/benchmarks/queries +COPY benchmarks/tpc/create-iceberg-tables.py /opt/benchmarks/create-iceberg-tables.py +COPY benchmarks/tpc/generate-comparison.py /opt/benchmarks/generate-comparison.py +COPY benchmarks/tpc/visualize-metrics.py /opt/benchmarks/visualize-metrics.py + +# Copy the metrics collector script. +COPY benchmarks/tpc/infra/docker/collect-metrics.sh /opt/benchmarks/collect-metrics.sh +RUN chmod +x /opt/benchmarks/collect-metrics.sh + +# Engine JARs are bind-mounted or copied in at runtime via --jars. +# Data and query paths are also bind-mounted. + +WORKDIR /opt/benchmarks + +# Defined in the base apache/spark image. +ARG spark_uid +USER ${spark_uid} diff --git a/benchmarks/tpc/infra/docker/Dockerfile.build-comet b/benchmarks/tpc/infra/docker/Dockerfile.build-comet new file mode 100644 index 0000000000..7c0c5e4cd2 --- /dev/null +++ b/benchmarks/tpc/infra/docker/Dockerfile.build-comet @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Build a Comet JAR with native libraries for the current platform. +# +# This is useful on macOS (Apple Silicon) where the host-built JAR contains +# darwin/aarch64 native libraries but Docker containers need linux/aarch64. +# +# Usage (from repository root): +# docker build -t comet-builder -f benchmarks/tpc/infra/docker/Dockerfile.build-comet . +# docker run --rm -v $(pwd)/output:/output comet-builder +# +# The JAR is copied to ./output/ on the host. + +# Use Ubuntu 20.04 to match the GLIBC version (2.31) in apache/spark images. +FROM ubuntu:20.04 AS builder + +ARG TARGETARCH +ENV DEBIAN_FRONTEND=noninteractive + +# Install build dependencies: Java 17, Maven wrapper prerequisites, GCC 11. +# Ubuntu 20.04's default GCC 9 has a memcmp bug (GCC #95189) that breaks aws-lc-sys. +RUN apt-get update && apt-get install -y --no-install-recommends \ + openjdk-17-jdk-headless \ + curl ca-certificates git pkg-config \ + libssl-dev unzip software-properties-common \ + && add-apt-repository -y ppa:ubuntu-toolchain-r/test \ + && apt-get update \ + && apt-get install -y --no-install-recommends gcc-11 g++-11 make \ + && update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-11 110 \ + && update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-11 110 \ + && update-alternatives --install /usr/bin/cc cc /usr/bin/gcc-11 110 \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +# Install protoc 25.x (Ubuntu 22.04's protoc is too old for proto3 optional fields). +ARG PROTOC_VERSION=25.6 +RUN ARCH=$(uname -m) && \ + if [ "$ARCH" = "aarch64" ]; then PROTOC_ARCH="linux-aarch_64"; \ + else PROTOC_ARCH="linux-x86_64"; fi && \ + curl -sLO "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-${PROTOC_ARCH}.zip" && \ + unzip -o "protoc-${PROTOC_VERSION}-${PROTOC_ARCH}.zip" -d /usr/local bin/protoc && \ + rm "protoc-${PROTOC_VERSION}-${PROTOC_ARCH}.zip" && \ + protoc --version + +# Set JAVA_HOME and LD_LIBRARY_PATH so the Rust build can find libjvm. +RUN ln -s /usr/lib/jvm/java-17-openjdk-${TARGETARCH} /usr/lib/jvm/java-17-openjdk +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk +ENV LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${LD_LIBRARY_PATH} + +# Install Rust. +RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y +ENV PATH="/root/.cargo/bin:${PATH}" + +WORKDIR /build + +# Copy the full source tree. +COPY . . + +# Build native code + package the JAR (skip tests). +RUN make release-nogit + +# The entrypoint copies the built JAR to /output (bind-mounted from host). +RUN mkdir -p /output +CMD ["sh", "-c", "cp spark/target/comet-spark-spark3.5_2.12-*-SNAPSHOT.jar /output/ && echo 'Comet JAR copied to /output/' && ls -lh /output/*.jar"] diff --git a/benchmarks/tpc/infra/docker/collect-metrics.sh b/benchmarks/tpc/infra/docker/collect-metrics.sh new file mode 100755 index 0000000000..fd9c1d848f --- /dev/null +++ b/benchmarks/tpc/infra/docker/collect-metrics.sh @@ -0,0 +1,103 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Container-level memory metrics collector. +# +# Polls cgroup memory stats at a fixed interval and writes a CSV with +# columns: timestamp, memory_usage_bytes, memory_limit_bytes, rss_bytes, +# cache_bytes, swap_bytes. +# +# Works with both cgroup v1 and v2. +# +# Usage: +# collect-metrics.sh [INTERVAL_SECS] [OUTPUT_CSV] +# +# Defaults: interval=1, output=/results/container-metrics.csv + +set -e + +INTERVAL="${1:-1}" +OUTPUT="${2:-/results/container-metrics.csv}" + +# Detect cgroup version +if [ -f /sys/fs/cgroup/memory/memory.usage_in_bytes ]; then + CGROUP_VERSION=1 +elif [ -f /sys/fs/cgroup/memory.current ]; then + CGROUP_VERSION=2 +else + echo "Warning: cannot detect cgroup memory files; polling disabled" >&2 + # Still write a header so downstream tools don't break on a missing file. + echo "timestamp_ms,memory_usage_bytes,memory_limit_bytes,rss_bytes,cache_bytes,swap_bytes" > "$OUTPUT" + # Sleep forever so the container stays up (compose expects it to keep running). + exec sleep infinity +fi + +# ---- helpers ---- + +read_file() { + # Return the contents of a file, or "0" if it doesn't exist. + if [ -f "$1" ]; then cat "$1"; else echo "0"; fi +} + +read_stat() { + # Extract a named field from memory.stat (cgroup v1 format: "key value"). + grep "^$1 " "$2" 2>/dev/null | awk '{print $2}' || echo "0" +} + +poll_v1() { + local usage limit rss cache swap + usage=$(read_file /sys/fs/cgroup/memory/memory.usage_in_bytes) + limit=$(read_file /sys/fs/cgroup/memory/memory.limit_in_bytes) + local stat=/sys/fs/cgroup/memory/memory.stat + rss=$(read_stat total_rss "$stat") + cache=$(read_stat total_cache "$stat") + swap=$(read_file /sys/fs/cgroup/memory/memory.memsw.usage_in_bytes) + # swap file reports memory+swap; subtract memory to get swap only + if [ "$swap" != "0" ]; then + swap=$((swap - usage)) + [ "$swap" -lt 0 ] && swap=0 + fi + echo "$usage,$limit,$rss,$cache,$swap" +} + +poll_v2() { + local usage limit rss cache swap + usage=$(read_file /sys/fs/cgroup/memory.current) + limit=$(read_file /sys/fs/cgroup/memory.max) + [ "$limit" = "max" ] && limit=0 + local stat=/sys/fs/cgroup/memory.stat + rss=$(read_stat anon "$stat") + cache=$(read_stat file "$stat") + swap=$(read_file /sys/fs/cgroup/memory.swap.current) + echo "$usage,$limit,$rss,$cache,$swap" +} + +# ---- main loop ---- + +echo "timestamp_ms,memory_usage_bytes,memory_limit_bytes,rss_bytes,cache_bytes,swap_bytes" > "$OUTPUT" +echo "Collecting container memory metrics every ${INTERVAL}s -> ${OUTPUT} (cgroup v${CGROUP_VERSION})" >&2 + +while true; do + ts=$(date +%s%3N 2>/dev/null || python3 -c 'import time; print(int(time.time()*1000))') + if [ "$CGROUP_VERSION" = "1" ]; then + vals=$(poll_v1) + else + vals=$(poll_v2) + fi + echo "${ts},${vals}" >> "$OUTPUT" + sleep "$INTERVAL" +done diff --git a/benchmarks/tpc/infra/docker/docker-compose.yml b/benchmarks/tpc/infra/docker/docker-compose.yml new file mode 100644 index 0000000000..a8a8d807c1 --- /dev/null +++ b/benchmarks/tpc/infra/docker/docker-compose.yml @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Spark standalone cluster for TPC benchmarks. +# +# Two workers are used so that shuffles go through the network stack, +# which better reflects real cluster behavior. +# +# Usage: +# export COMET_JAR=/path/to/comet-spark-0.10.0.jar +# docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml up -d +# +# Environment variables (set in .env or export before running): +# BENCH_IMAGE - Docker image to use (default: comet-bench) +# DATA_DIR - Host path to TPC data (default: /tmp/tpc-data) +# RESULTS_DIR - Host path for results output (default: /tmp/bench-results) +# COMET_JAR - Host path to Comet JAR +# GLUTEN_JAR - Host path to Gluten JAR +# ICEBERG_JAR - Host path to Iceberg Spark runtime JAR +# WORKER_MEM_LIMIT - Hard memory limit per worker container (default: 32g) +# BENCH_MEM_LIMIT - Hard memory limit for the bench runner (default: 10g) +# METRICS_INTERVAL - Metrics collection interval in seconds (default: 1) +# BENCH_JAVA_HOME - Java home inside container (default: /usr/lib/jvm/java-17-openjdk) +# Set to /usr/lib/jvm/java-8-openjdk for Gluten + +x-volumes: &volumes + - ${DATA_DIR:-/tmp/tpc-data}:/data:ro + - ${RESULTS_DIR:-/tmp/bench-results}:/results + - ${COMET_JAR:-/dev/null}:/jars/comet.jar:ro + - ${GLUTEN_JAR:-/dev/null}:/jars/gluten.jar:ro + - ${ICEBERG_JAR:-/dev/null}:/jars/iceberg.jar:ro + - ${RESULTS_DIR:-/tmp/bench-results}/logs:/opt/spark/logs + - ${RESULTS_DIR:-/tmp/bench-results}/work:/opt/spark/work + +x-worker: &worker + image: ${BENCH_IMAGE:-comet-bench} + depends_on: + - spark-master + # The metrics collector runs inside the worker container so that it + # reads the worker's own cgroup memory stats (a separate sidecar + # container would only see its own cgroup, not the worker's). + command: + - sh + - -c + - >- + /opt/benchmarks/collect-metrics.sh ${METRICS_INTERVAL:-1} + /results/container-metrics-$${HOSTNAME}.csv & + exec /opt/spark/sbin/start-worker.sh spark://spark-master:7077 + volumes: *volumes + environment: + - JAVA_HOME=${BENCH_JAVA_HOME:-/usr/lib/jvm/java-17-openjdk} + - SPARK_WORKER_CORES=${WORKER_CORES:-8} + - SPARK_WORKER_MEMORY=${WORKER_MEMORY:-16g} + - SPARK_NO_DAEMONIZE=true + mem_limit: ${WORKER_MEM_LIMIT:-32g} + memswap_limit: ${WORKER_MEM_LIMIT:-32g} + +services: + spark-master: + image: ${BENCH_IMAGE:-comet-bench} + container_name: spark-master + hostname: spark-master + command: /opt/spark/sbin/start-master.sh --host spark-master + ports: + - "7077:7077" + - "8080:8080" + volumes: *volumes + environment: + - JAVA_HOME=${BENCH_JAVA_HOME:-/usr/lib/jvm/java-17-openjdk} + - SPARK_MASTER_HOST=spark-master + - SPARK_NO_DAEMONIZE=true + + spark-worker-1: + <<: *worker + container_name: spark-worker-1 + hostname: spark-worker-1 + ports: + - "8081:8081" + + spark-worker-2: + <<: *worker + container_name: spark-worker-2 + hostname: spark-worker-2 + ports: + - "8082:8081" + + bench: + image: ${BENCH_IMAGE:-comet-bench} + container_name: bench-runner + depends_on: + - spark-master + - spark-worker-1 + - spark-worker-2 + # Override 'command' to run a specific benchmark, e.g.: + # docker compose run bench python3 /opt/benchmarks/run.py \ + # --engine comet --benchmark tpch --no-restart + command: ["echo", "Use 'docker compose run bench python3 /opt/benchmarks/run.py ...' to run benchmarks"] + volumes: *volumes + environment: + - JAVA_HOME=${BENCH_JAVA_HOME:-/usr/lib/jvm/java-17-openjdk} + - SPARK_HOME=/opt/spark + - SPARK_MASTER=spark://spark-master:7077 + - COMET_JAR=/jars/comet.jar + - GLUTEN_JAR=/jars/gluten.jar + - ICEBERG_JAR=/jars/iceberg.jar + - TPCH_DATA=/data + - TPCDS_DATA=/data + mem_limit: ${BENCH_MEM_LIMIT:-10g} + memswap_limit: ${BENCH_MEM_LIMIT:-10g} + diff --git a/benchmarks/tpc/profiling.py b/benchmarks/tpc/profiling.py new file mode 100644 index 0000000000..23a8a7f34e --- /dev/null +++ b/benchmarks/tpc/profiling.py @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Level 1 profiling hooks: JVM metrics via the Spark REST API. + +Polls ``/api/v1/applications/{appId}/executors`` at a configurable interval +and records executor memory metrics as a time-series CSV alongside the +benchmark results. + +Usage:: + + profiler = SparkMetricsProfiler(spark, interval_secs=2) + profiler.start() + # ... run benchmark ... + profiler.stop() + profiler.write_csv("/path/to/output/metrics.csv") +""" + +import csv +import glob +import os +import threading +import time +from typing import Any, Dict, List, Optional + +from pyspark.sql import SparkSession + +try: + from urllib.request import urlopen + import json as _json + + def _fetch_json(url: str) -> Any: + with urlopen(url, timeout=5) as resp: + return _json.loads(resp.read().decode()) +except ImportError: + _fetch_json = None # type: ignore[assignment] + + +# Metrics we extract per executor from the REST API response +_EXECUTOR_METRICS = [ + "memoryUsed", + "maxMemory", + "totalOnHeapStorageMemory", + "usedOnHeapStorageMemory", + "totalOffHeapStorageMemory", + "usedOffHeapStorageMemory", +] + +# Metrics nested under peakMemoryMetrics (if available) +_PEAK_MEMORY_METRICS = [ + "JVMHeapMemory", + "JVMOffHeapMemory", + "OnHeapExecutionMemory", + "OffHeapExecutionMemory", + "OnHeapStorageMemory", + "OffHeapStorageMemory", + "OnHeapUnifiedMemory", + "OffHeapUnifiedMemory", + "ProcessTreeJVMRSSMemory", +] + + +class SparkMetricsProfiler: + """Periodically polls executor metrics from the Spark REST API.""" + + def __init__( + self, + spark: SparkSession, + interval_secs: float = 2.0, + ): + self._spark = spark + self._interval = interval_secs + self._samples: List[Dict[str, Any]] = [] + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._start_time: float = 0.0 + self._stop_time: float = 0.0 + + @property + def samples(self) -> List[Dict[str, Any]]: + """Return collected samples (each is a flat dict).""" + return list(self._samples) + + def _ui_url(self) -> Optional[str]: + """Return the Spark UI base URL, or None if unavailable.""" + url = self._spark.sparkContext.uiWebUrl + if url: + return url.rstrip("/") + return None + + def _app_id(self) -> str: + return self._spark.sparkContext.applicationId + + def _poll_once(self) -> None: + """Fetch executor metrics and append a timestamped sample.""" + base = self._ui_url() + if base is None or _fetch_json is None: + return + + url = f"{base}/api/v1/applications/{self._app_id()}/executors" + try: + executors = _fetch_json(url) + except Exception: + return + + now = time.time() + elapsed = now - self._start_time + timestamp_ms = int(now * 1000) + for exc in executors: + row: Dict[str, Any] = { + "timestamp_ms": timestamp_ms, + "elapsed_secs": round(elapsed, 2), + "executor_id": exc.get("id", ""), + "is_active": exc.get("isActive", True), + } + for key in _EXECUTOR_METRICS: + row[key] = exc.get(key, 0) + + peak = exc.get("peakMemoryMetrics", {}) + for key in _PEAK_MEMORY_METRICS: + row[f"peak_{key}"] = peak.get(key, 0) + + self._samples.append(row) + + def _run(self) -> None: + """Background polling loop.""" + while not self._stop_event.is_set(): + self._poll_once() + self._stop_event.wait(self._interval) + + def start(self) -> None: + """Start background polling thread.""" + if self._thread is not None: + return + self._start_time = time.time() + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, name="spark-metrics-profiler", daemon=True + ) + self._thread.start() + print( + f"Profiler started (interval={self._interval}s, " + f"ui={self._ui_url()})" + ) + + def stop(self) -> None: + """Stop the polling thread and collect a final sample.""" + if self._thread is None: + return + self._stop_event.set() + self._thread.join(timeout=self._interval + 2) + self._thread = None + # One last poll to capture final state + self._poll_once() + self._stop_time = time.time() + print(f"Profiler stopped ({len(self._samples)} samples collected)") + + def write_csv(self, path: str) -> str: + """Write collected samples to a CSV file. Returns the path.""" + if not self._samples: + print("Profiler: no samples to write") + return path + + fieldnames = list(self._samples[0].keys()) + with open(path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for row in self._samples: + writer.writerow(row) + print(f"Profiler: wrote {len(self._samples)} samples to {path}") + return path + + def snapshot_cgroup_metrics( + self, output_dir: str, name: str, benchmark: str + ) -> List[str]: + """Filter cgroup CSVs to the profiling time window and write snapshots. + + Looks for ``container-metrics-*.csv`` in *output_dir*, keeps only + rows whose ``timestamp_ms`` falls within the profiler's start/stop + window, and writes each filtered file as + ``{name}-{benchmark}-container-metrics-{label}.csv``. + + Returns the list of snapshot file paths written. + """ + start_ms = int(self._start_time * 1000) + stop_ms = int(self._stop_time * 1000) if self._stop_time else int( + time.time() * 1000 + ) + + source_files = sorted( + glob.glob(os.path.join(output_dir, "container-metrics-*.csv")) + ) + if not source_files: + print("Profiler: no container-metrics CSVs found to snapshot") + return [] + + written: List[str] = [] + for src in source_files: + # Extract label, e.g. "spark-worker-1" from + # "container-metrics-spark-worker-1.csv" + basename = os.path.basename(src) + label = basename.replace("container-metrics-", "").replace( + ".csv", "" + ) + dest = os.path.join( + output_dir, + f"{name}-{benchmark}-container-metrics-{label}.csv", + ) + + with open(src, "r", newline="") as fin: + reader = csv.DictReader(fin) + fieldnames = reader.fieldnames + if not fieldnames: + continue + rows = [ + row + for row in reader + if start_ms <= int(row["timestamp_ms"]) <= stop_ms + ] + + with open(dest, "w", newline="") as fout: + writer = csv.DictWriter(fout, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + + print( + f"Profiler: snapshot {len(rows)} cgroup rows -> {dest}" + ) + written.append(dest) + + return written diff --git a/benchmarks/tpc/requirements.txt b/benchmarks/tpc/requirements.txt new file mode 100644 index 0000000000..48f63d718f --- /dev/null +++ b/benchmarks/tpc/requirements.txt @@ -0,0 +1 @@ +pyspark==3.5.2 diff --git a/benchmarks/tpc/run.py b/benchmarks/tpc/run.py index 223a7d08ee..f562039324 100755 --- a/benchmarks/tpc/run.py +++ b/benchmarks/tpc/run.py @@ -120,9 +120,9 @@ def load_toml(path): BENCHMARK_PROFILES = { "tpch": { - "executor_instances": "1", + "executor_instances": "2", "executor_cores": "8", - "max_cores": "8", + "max_cores": "16", "data_env": "TPCH_DATA", "format": "parquet", }, @@ -293,6 +293,11 @@ def build_spark_submit_cmd(config, benchmark, args): if profile["format"] and not use_iceberg: cmd += ["--format", profile["format"]] + if args.profile: + cmd += ["--profile"] + cmd += ["--profile-interval", str(args.profile_interval)] + + return cmd @@ -360,6 +365,17 @@ def main(): action="store_true", help="Print the spark-submit command without executing", ) + parser.add_argument( + "--profile", + action="store_true", + help="Enable executor metrics profiling via Spark REST API", + ) + parser.add_argument( + "--profile-interval", + type=float, + default=2.0, + help="Profiling poll interval in seconds (default: 2.0)", + ) args = parser.parse_args() config = load_engine_config(args.engine) diff --git a/benchmarks/tpc/tpcbench.py b/benchmarks/tpc/tpcbench.py index 400ccd175a..1ff54ed3c7 100644 --- a/benchmarks/tpc/tpcbench.py +++ b/benchmarks/tpc/tpcbench.py @@ -26,6 +26,7 @@ import argparse from datetime import datetime import json +import os from pyspark.sql import SparkSession import time from typing import Dict @@ -50,22 +51,33 @@ def main( data_path: str, catalog: str, database: str, - query_path: str, iterations: int, output: str, name: str, format: str, query_num: int = None, write_path: str = None, - options: Dict[str, str] = None + options: Dict[str, str] = None, + profile: bool = False, + profile_interval: float = 2.0, ): if options is None: options = {} + query_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "queries", benchmark + ) + spark = SparkSession.builder \ .appName(f"{name} benchmark derived from {benchmark}") \ .getOrCreate() + profiler = None + if profile: + from profiling import SparkMetricsProfiler + profiler = SparkMetricsProfiler(spark, interval_secs=profile_interval) + profiler.start() + # Define tables for each benchmark if benchmark == "tpch": num_queries = 22 @@ -94,7 +106,10 @@ def main( print(f"Registering table {table} from {source}") df = spark.table(source) else: + # Support both "customer/" and "customer.parquet/" layouts source = f"{data_path}/{table}.{format}" + if not os.path.exists(source): + source = f"{data_path}/{table}" print(f"Registering table {table} from {source}") df = spark.read.format(format).options(**options).load(source) df.createOrReplaceTempView(table) @@ -104,7 +119,6 @@ def main( results = { 'engine': 'datafusion-comet', 'benchmark': benchmark, - 'query_path': query_path, 'spark_conf': conf_dict, } if using_iceberg: @@ -176,6 +190,12 @@ def main( with open(results_path, "w") as f: f.write(result_str) + if profiler is not None: + profiler.stop() + metrics_path = f"{output}/{name}-{benchmark}-metrics.csv" + profiler.write_csv(metrics_path) + profiler.snapshot_cgroup_metrics(output, name, benchmark) + spark.stop() @@ -215,10 +235,6 @@ def main( help="Database containing TPC tables (only used with --catalog)" ) - parser.add_argument( - "--queries", required=True, - help="Path to query SQL files" - ) parser.add_argument( "--iterations", type=int, default=1, help="Number of iterations" @@ -239,6 +255,14 @@ def main( "--write", help="Path to save query results as Parquet" ) + parser.add_argument( + "--profile", action="store_true", + help="Enable executor metrics profiling via Spark REST API" + ) + parser.add_argument( + "--profile-interval", type=float, default=2.0, + help="Profiling poll interval in seconds (default: 2.0)" + ) args = parser.parse_args() main( @@ -246,12 +270,13 @@ def main( args.data, args.catalog, args.database, - args.queries, args.iterations, args.output, args.name, args.format, args.query, args.write, - args.options + args.options, + args.profile, + args.profile_interval, ) diff --git a/benchmarks/tpc/visualize-metrics.py b/benchmarks/tpc/visualize-metrics.py new file mode 100644 index 0000000000..4859265135 --- /dev/null +++ b/benchmarks/tpc/visualize-metrics.py @@ -0,0 +1,403 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import argparse +import csv +import os +from collections import defaultdict + +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt + + +# Stable colors: driver=blue, executor-0/worker-1=orange, executor-1/worker-2=green +ENTITY_COLORS = { + 'driver': 'tab:blue', + '0': 'tab:orange', + '1': 'tab:green', + '2': 'tab:red', + '3': 'tab:purple', + 'worker-1': 'tab:orange', + 'worker-2': 'tab:green', +} + +DEFAULT_COLORS = ['tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink'] + +PEAK_SERIES = [ + ('peak_JVMHeapMemory', 'Heap', 'solid', 'tab:blue'), + ('peak_JVMOffHeapMemory', 'OffHeap', 'solid', 'tab:orange'), + ('peak_OffHeapExecutionMemory', 'OffHeapExec', 'solid', 'tab:red'), +] + +PEAK_BAR_FIELDS = [ + 'peak_JVMHeapMemory', + 'peak_JVMOffHeapMemory', + 'peak_OnHeapExecutionMemory', + 'peak_OffHeapExecutionMemory', +] + + +def color_for(entity_id): + """Return a stable color for the given entity identifier.""" + if entity_id in ENTITY_COLORS: + return ENTITY_COLORS[entity_id] + # Hash-based fallback for unknown entities + idx = hash(entity_id) % len(DEFAULT_COLORS) + return DEFAULT_COLORS[idx] + + +def auto_unit(max_bytes): + """Pick MB or GB and return (divisor, label).""" + if max_bytes > 1e9: + return 1e9, 'GB' + return 1e6, 'MB' + + +def load_jvm_metrics(path): + """Load JVM metrics CSV and return data grouped by executor_id.""" + data = defaultdict(lambda: defaultdict(list)) + with open(path, newline='') as f: + reader = csv.DictReader(f) + for row in reader: + eid = row['executor_id'] + for key, val in row.items(): + if key == 'executor_id': + data[eid][key].append(val) + elif key == 'is_active': + data[eid][key].append(val == 'True') + else: + data[eid][key].append(float(val)) + return data + + +def load_cgroup_metrics(path, label): + """Load a cgroup metrics CSV and return dict with lists of values.""" + data = {'label': label} + with open(path, newline='') as f: + reader = csv.DictReader(f) + rows = list(reader) + if not rows: + return data + timestamps = [float(r['timestamp_ms']) for r in rows] + data['timestamp_ms'] = timestamps + min_ts = min(timestamps) + data['elapsed_secs'] = [(ts - min_ts) / 1000.0 for ts in timestamps] + for key in ['memory_usage_bytes', 'memory_limit_bytes', 'rss_bytes', 'cache_bytes', 'swap_bytes']: + if key in rows[0]: + data[key] = [float(r[key]) for r in rows] + return data + + +def _shared_time_zero(jvm_data, cgroup_datasets): + """Compute the global time-zero from absolute timestamps across both sources. + + Returns ``t_zero`` (ms) or ``None`` if absolute timestamps are unavailable. + """ + exec_data = {eid: s for eid, s in jvm_data.items() if eid != 'driver'} + has_jvm_ts = any('timestamp_ms' in s for s in exec_data.values()) + has_cg_ts = any('timestamp_ms' in ds for ds in cgroup_datasets) + if not (has_jvm_ts and has_cg_ts): + return None + all_abs = [] + for s in exec_data.values(): + all_abs.extend(s['timestamp_ms']) + for ds in cgroup_datasets: + all_abs.extend(ds.get('timestamp_ms', [])) + return min(all_abs) + + +def _jvm_time_range(jvm_data, cgroup_datasets): + """Compute the x-axis limits from JVM data using absolute timestamps. + + Returns ``(x_min, x_max)`` — the elapsed-time window (in seconds, relative + to the global time-zero across both data sources) during which the JVM + profiler was active, or ``None`` if absolute timestamps are not available. + """ + t_zero = _shared_time_zero(jvm_data, cgroup_datasets) + if t_zero is None: + return None + exec_data = {eid: s for eid, s in jvm_data.items() if eid != 'driver'} + jvm_min = float('inf') + jvm_max = 0 + for s in exec_data.values(): + jvm_min = min(jvm_min, min(s['timestamp_ms'])) + jvm_max = max(jvm_max, max(s['timestamp_ms'])) + return ((jvm_min - t_zero) / 1000.0, (jvm_max - t_zero) / 1000.0) + + +def generate_jvm_memory_usage(jvm_data, output_dir, title): + """Generate one JVM peak-memory time-series chart per executor (driver excluded).""" + executors = {eid: s for eid, s in jvm_data.items() if eid != 'driver'} + if not executors: + print(' Skipped jvm_memory (no executor data)') + return + + for eid, series in sorted(executors.items()): + all_values = [] + for field, _, _, _ in PEAK_SERIES: + all_values.extend(series.get(field, [])) + if not all_values or max(all_values) == 0: + print(f' Skipped jvm_memory_executor_{eid}.png (all peak values are zero)') + continue + + divisor, unit = auto_unit(max(all_values)) + + fig, ax = plt.subplots(figsize=(14, 6)) + for field, label, ls, color in PEAK_SERIES: + vals = series.get(field, []) + if vals and max(vals) > 0: + ax.plot(series['elapsed_secs'], + [v / divisor for v in vals], + color=color, linestyle=ls, linewidth=1.5, + label=label) + + suffix = f' — Executor {eid} JVM Peak Memory' if title else f'Executor {eid} JVM Peak Memory' + ax.set_title(f'{title}{suffix}') + ax.set_xlabel('Elapsed Time (seconds)') + ax.set_ylabel(f'Peak Memory ({unit})') + ax.legend(fontsize=8) + ax.yaxis.grid(True) + plt.tight_layout() + fname = f'jvm_memory_executor_{eid}.png' + plt.savefig(os.path.join(output_dir, fname), format='png') + plt.close(fig) + print(f' Created {fname}') + + +def generate_jvm_peak_memory(jvm_data, output_dir, title): + """Peak memory breakdown per executor, excluding driver (grouped bar).""" + executor_data = {eid: s for eid, s in jvm_data.items() if eid != 'driver'} + + # Check if all peak values are zero — skip chart if so + all_zero = True + for eid, series in executor_data.items(): + for field in PEAK_BAR_FIELDS: + if field in series and max(series[field]) > 0: + all_zero = False + break + if not all_zero: + break + if all_zero: + print(' Skipped jvm_peak_memory.png (all peak values are zero)') + return + + executors = sorted(executor_data.keys()) + # Use the max of each peak field per executor + peak_values = {} + all_vals = [] + for eid in executors: + peak_values[eid] = {} + for field in PEAK_BAR_FIELDS: + val = max(executor_data[eid].get(field, [0])) + peak_values[eid][field] = val + all_vals.append(val) + + divisor, unit = auto_unit(max(all_vals)) if all_vals else (1e6, 'MB') + + import numpy as np + x = np.arange(len(executors)) + n_fields = len(PEAK_BAR_FIELDS) + bar_width = 0.8 / n_fields + field_colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red'] + + fig, ax = plt.subplots(figsize=(max(8, len(executors) * 2), 6)) + for i, field in enumerate(PEAK_BAR_FIELDS): + vals = [peak_values[eid][field] / divisor for eid in executors] + short_label = field.replace('peak_', '') + ax.bar(x + i * bar_width, vals, bar_width, label=short_label, color=field_colors[i]) + + ax.set_title(f'{title} — JVM Peak Memory' if title else 'JVM Peak Memory') + ax.set_xlabel('Executor') + ax.set_ylabel(f'Peak Memory ({unit})') + ax.set_xticks(x + bar_width * (n_fields - 1) / 2) + ax.set_xticklabels([f'executor {eid}' for eid in executors]) + ax.legend(fontsize=8) + ax.yaxis.grid(True) + plt.tight_layout() + plt.savefig(os.path.join(output_dir, 'jvm_peak_memory.png'), format='png') + plt.close(fig) + print(f' Created jvm_peak_memory.png') + + +def generate_cgroup_memory(cgroup_datasets, jvm_data, output_dir, title): + """Generate one cgroup memory chart per worker.""" + jvm_range = _jvm_time_range(jvm_data, cgroup_datasets) + t_zero = _shared_time_zero(jvm_data, cgroup_datasets) + + for ds in cgroup_datasets: + label = ds['label'] + c = color_for(label) + + if t_zero is not None and 'timestamp_ms' in ds: + elapsed = [(t - t_zero) / 1000.0 for t in ds['timestamp_ms']] + else: + elapsed = ds.get('elapsed_secs', []) + + all_values = [] + all_values.extend(ds.get('memory_usage_bytes', [])) + all_values.extend(ds.get('rss_bytes', [])) + if not all_values: + continue + + divisor, unit = auto_unit(max(all_values)) + + fig, ax = plt.subplots(figsize=(14, 6)) + if 'memory_usage_bytes' in ds: + ax.plot(elapsed, + [v / divisor for v in ds['memory_usage_bytes']], + color='tab:blue', linewidth=1.5, label='usage') + if 'rss_bytes' in ds: + ax.plot(elapsed, + [v / divisor for v in ds['rss_bytes']], + color='tab:orange', linewidth=1.5, label='RSS') + + if jvm_range is not None: + x_min, x_max = jvm_range + ax.set_xlim(left=x_min, right=x_max * 1.02) + + suffix = f' — {label} Container Memory (cgroup)' + ax.set_title(f'{title}{suffix}' if title else suffix.lstrip(' — ')) + ax.set_xlabel('Elapsed Time (seconds)') + ax.set_ylabel(f'Memory ({unit})') + ax.legend(fontsize=8) + ax.yaxis.grid(True) + plt.tight_layout() + fname = f'cgroup_memory_{label}.png' + plt.savefig(os.path.join(output_dir, fname), format='png') + plt.close(fig) + print(f' Created {fname}') + + +def generate_combined_memory(jvm_data, cgroup_datasets, cgroup_offset, output_dir, title): + """Generate one combined log-scale chart per worker, pairing executor N with worker-(N+1).""" + exec_data = {eid: s for eid, s in jvm_data.items() if eid != 'driver'} + t_zero = _shared_time_zero(jvm_data, cgroup_datasets) + jvm_range = _jvm_time_range(jvm_data, cgroup_datasets) + + # Build elapsed-time helpers + if t_zero is not None: + def jvm_elapsed(series): + return [(t - t_zero) / 1000.0 for t in series['timestamp_ms']] + def cg_elapsed(ds): + return [(t - t_zero) / 1000.0 for t in ds['timestamp_ms']] + else: + def jvm_elapsed(series): + return series['elapsed_secs'] + def cg_elapsed(ds): + elapsed = ds.get('elapsed_secs', []) + if cgroup_offset != 0: + return [t + cgroup_offset for t in elapsed] + return elapsed + + # Pair executor IDs with cgroup datasets by index: + # sorted executors ['0','1'] map to cgroup_datasets [worker-1, worker-2] + sorted_eids = sorted(exec_data.keys()) + + for idx, ds in enumerate(cgroup_datasets): + label = ds['label'] + eid = sorted_eids[idx] if idx < len(sorted_eids) else None + + fig, ax = plt.subplots(figsize=(14, 6)) + + # All series plotted in GB on a single log-scale axis + divisor = 1e9 + unit = 'GB' + + # --- JVM: peak memory for this executor --- + if eid is not None: + series = exec_data[eid] + for field, flabel, ls, color in PEAK_SERIES: + vals = series.get(field, []) + if vals and max(vals) > 0: + ax.plot(jvm_elapsed(series), + [v / divisor for v in vals], + color=color, linestyle=ls, linewidth=1.5, + label=f'JVM {flabel}') + + # --- Cgroup: usage for this worker --- + if 'memory_usage_bytes' in ds: + ax.plot(cg_elapsed(ds), + [v / divisor for v in ds['memory_usage_bytes']], + color='tab:purple', linewidth=1.5, linestyle='--', label='cgroup usage') + if 'rss_bytes' in ds: + ax.plot(cg_elapsed(ds), + [v / divisor for v in ds['rss_bytes']], + color='tab:brown', linewidth=1, linestyle='--', label='cgroup RSS') + + ax.set_yscale('log') + ax.set_xlabel('Elapsed Time (seconds)') + ax.set_ylabel(f'Memory ({unit}, log scale)') + + if jvm_range is not None: + x_min, x_max = jvm_range + ax.set_xlim(left=x_min, right=x_max * 1.02) + + ax.legend(fontsize=8, loc='upper left') + + eid_label = f'executor {eid} / {label}' if eid else label + suffix = f' — {eid_label} Combined Memory' + ax.set_title(f'{title}{suffix}' if title else suffix.lstrip(' — ')) + ax.yaxis.grid(True, which='both') + plt.tight_layout() + fname = f'combined_memory_{label}.png' + plt.savefig(os.path.join(output_dir, fname), format='png') + plt.close(fig) + print(f' Created {fname}') + + +def main(): + parser = argparse.ArgumentParser( + description='Visualize TPC benchmark memory metrics as time-series charts.') + parser.add_argument('--jvm-metrics', type=str, required=True, + help='Path to JVM metrics CSV (comet-tpch-metrics.csv)') + parser.add_argument('--cgroup-metrics', type=str, nargs='+', default=None, + help='Paths to cgroup metrics CSVs (container-metrics-worker-*.csv)') + parser.add_argument('--output-dir', type=str, default='.', + help='Directory to write chart PNGs (default: current directory)') + parser.add_argument('--title', type=str, default='', + help='Title prefix for charts') + parser.add_argument('--cgroup-offset', type=float, default=0, + help='Seconds to shift cgroup timestamps for alignment with JVM data (default: 0)') + args = parser.parse_args() + + os.makedirs(args.output_dir, exist_ok=True) + + print(f'Loading JVM metrics from {args.jvm_metrics}') + jvm_data = load_jvm_metrics(args.jvm_metrics) + print(f' Found executors: {", ".join(sorted(jvm_data.keys()))}') + + generate_jvm_memory_usage(jvm_data, args.output_dir, args.title) + generate_jvm_peak_memory(jvm_data, args.output_dir, args.title) + + if args.cgroup_metrics: + cgroup_datasets = [] + for i, path in enumerate(args.cgroup_metrics, start=1): + label = f'worker-{i}' + print(f'Loading cgroup metrics from {path} (as {label})') + cgroup_datasets.append(load_cgroup_metrics(path, label)) + + generate_cgroup_memory(cgroup_datasets, jvm_data, + args.output_dir, args.title) + generate_combined_memory(jvm_data, cgroup_datasets, args.cgroup_offset, + args.output_dir, args.title) + + print(f'Done. Charts written to {args.output_dir}') + + +if __name__ == '__main__': + main() diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index 3511e1483e..36c4b877d6 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -29,3 +29,4 @@ spark/src/test/resources/test-data/*.csv spark/src/test/resources/test-data/*.ndjson spark/inspections/CometTPC*results.txt benchmarks/tpc/queries/**/*.sql +benchmarks/tpc/requirements.txt diff --git a/docs/source/contributor-guide/benchmarking.md b/docs/source/contributor-guide/benchmarking.md index 49af73376f..ce98fc4be6 100644 --- a/docs/source/contributor-guide/benchmarking.md +++ b/docs/source/contributor-guide/benchmarking.md @@ -21,7 +21,7 @@ under the License. To track progress on performance, we regularly run benchmarks derived from TPC-H and TPC-DS. -The benchmarking scripts are contained [here](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). +The benchmarking scripts are contained at [https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc](https://github.com/apache/datafusion-comet/tree/main/benchmarks/tpc). Data generation scripts are available in the [DataFusion Benchmarks](https://github.com/apache/datafusion-benchmarks) GitHub repository. diff --git a/pom.xml b/pom.xml index 2eaccd37d4..50d9dbd4df 100644 --- a/pom.xml +++ b/pom.xml @@ -1101,6 +1101,7 @@ under the License. dev/release/requirements.txt native/proto/src/generated/** benchmarks/tpc/queries/** + benchmarks/tpc/requirements.txt