feat: Add experimental Grace Hash Join operator with spill-to-disk support [research, will not merge]#3564
Draft
andygrove wants to merge 53 commits intoapache:mainfrom
Draft
feat: Add experimental Grace Hash Join operator with spill-to-disk support [research, will not merge]#3564andygrove wants to merge 53 commits intoapache:mainfrom
andygrove wants to merge 53 commits intoapache:mainfrom
Conversation
Implement a Grace Hash Join operator that partitions both build and probe sides into N buckets by hashing join keys, then performs per-partition hash joins using DataFusion's HashJoinExec. Spills partitions to disk via Arrow IPC when memory pressure is detected. Key features: - SpillWriter for efficient incremental append I/O (no read-rewrite) - All join types supported (Inner, Left, Right, Full, Semi, Anti) - Build side selection (BuildLeft/BuildRight) via planner - Recursive repartitioning for oversized partitions (max depth 3) - Production metrics (build_time, probe_time, join_time, spill_count, etc.) - CometGraceHashJoinExec Spark-side integration with metrics wiring - Comprehensive test suite including fuzz tests with ParquetGenerator Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Benchmarks all join types (Inner, Left, Right, Full, LeftSemi, LeftAnti) plus filtered joins across Spark Sort Merge Join, Comet Sort Merge Join, Comet Hash Join, and Comet Grace Hash Join implementations. Sets COMET_REPLACE_SMJ appropriately for each case and uses auto shuffle mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…in is enabled Grace Hash Join supports all join type / build side combinations, so bypass the BuildRight+LeftAnti/LeftSemi guard and the canBuildShuffledHashJoinLeft/Right restrictions when it is enabled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
RewriteJoin converts SortMergeJoin to Spark's ShuffledHashJoinExec, which doesn't support LeftSemi/LeftAnti with BuildLeft. The previous commit bypassed these restrictions when Grace Hash Join was enabled, but the intermediate ShuffledHashJoinExec fails validation before CometExecRule can convert it to GraceHashJoinExec. This reverts commit 02809ec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace N separate take() calls per batch with a prefix-sum approach borrowed from the shuffle partitioner (multi_partition.rs): - Add ScratchSpace struct with reusable buffers for hashes, partition IDs, and row indices, allocated once and reused across all batches - Use interleave_record_batch with contiguous index slices instead of per-partition UInt32Array allocation + take() - Concatenate small sub-batches with concat_batches before feeding to HashJoinExec to reduce per-batch join overhead - Estimate per-partition memory sizes proportionally from total batch size instead of calling get_array_memory_size() on every sub-batch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
87810fd to
a8d1f07
Compare
…sthrough Two performance optimizations: 1. Replace interleave_record_batch with Arrow's take() kernel in take_partition - SIMD-optimized and avoids (batch_idx, row_idx) tuple overhead for single-batch case 2. Skip take_partition when entire batch goes to one partition - use batch directly via cheap clone instead of copying through take() Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
a8d1f07 to
60400c2
Compare
Member
Author
Remove the build_batches.len() > 1 guard that skipped the repartition check for single large batches, and multiply build_size by 3 to account for hash table overhead (~2-3x raw data). Add info-level logging for build/probe phase summaries and per-partition join decisions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The probe side of GraceHashJoin was accumulating batches without tracking memory in the reservation, causing OOM when the probe side is massive and the build side is tiny (e.g. TPC-DS q72 with 171M probe rows vs 15 build rows). Now probe-side memory is tracked in the shared reservation and partitions are spilled (both build and probe sides) when memory pressure is detected during the probe phase. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When memory pressure is detected during the probe phase, spill enough partitions to free at least 50% of in-memory data instead of just the single largest partition. Previously, spilling one ~200MB partition barely made room for the next sub-batch, leaving ~5GB of probe data in memory and causing OS OOM kills. Now the spill loop continues until meaningful headroom is created. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The 50%-target approach was insufficient: with multiple concurrent GHJ instances sharing a pool, each would spill a few partitions then re-accumulate data in the remaining ones, maintaining ~1GB in-memory per instance. With 8+ concurrent instances this totals 8+GB. Now on the first try_grow failure during probe, we spill ALL non-spilled partitions at once. After that, all subsequent probe data goes directly to spill writers with zero in-memory accumulation, keeping the footprint near zero regardless of probe-side size. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The join phase was reading ALL spilled probe data back into memory via read_spilled_batches(), then concatenating into a single batch. For q72 this meant loading 3-11GB per partition (up to 11.7GB for a single partition with 32M rows), across 16 partitions per join instance. Add SpillReaderExec, a streaming ExecutionPlan that reads from IPC spill files batch-by-batch on demand. HashJoinExec in CollectLeft mode builds the hash table from the build side (tiny — 1 row for q72) and then streams through the probe side without holding it all in memory. Falls back to the eager read path when the build side needs repartitioning. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…Join When the build side has no spills and fits in a hash table (~3x build size), stream the probe input directly through HashJoinExec instead of partitioning and potentially spilling it. This eliminates all spill I/O for the common case where the build side is tiny (e.g. TPC-DS q72 with 15-1000 build rows vs 171M probe rows). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ce Hash Join" This reverts commit 5749096.
The default BufReader/BufWriter buffer size (8KB) causes excessive syscalls when reading/writing multi-GB spill files. Increasing to 1MB provides much better sequential throughput for the spill-and-stream join path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Two changes to improve join-phase throughput when probe data is spilled: 1. SpillReaderExec now reads spill files on a blocking thread pool via tokio::task::spawn_blocking with a channel, instead of synchronous reads inside futures::stream::iter. This prevents spill file I/O from blocking the async executor. 2. Partition result streams are interleaved via select_all instead of sequential try_flatten. Combined with async I/O, this lets multiple partitions' spill file reads overlap, improving CPU utilization on I/O-bound workloads. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The RefCountedTempFile was being dropped when execute() returned, deleting the temp file before the blocking reader thread could open it. Move the handle into the spawn_blocking closure so the file stays alive until the reader finishes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Spill files contain many tiny sub-batches (from hash partitioning). Each one incurs channel send/recv overhead plus a separate hash join kernel invocation. Coalescing into ~8192-row batches in the blocking reader thread reduces this overhead dramatically, improving CPU utilization during the join phase. Also increases channel buffer from 2 to 4 for better I/O overlap. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion Replace select_all (single-threaded round-robin) with tokio::spawn for each partition's HashJoinExec stream. The multi-threaded tokio runtime now schedules partition joins across all available cores, so hash join computation runs in parallel rather than sequentially on one thread. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When the build side has no spills and fits in a hash table, skip probe partitioning entirely. Instead, wrap the probe input stream as a StreamSourceExec and feed it directly through HashJoinExec — zero disk I/O, zero buffering. Unlike the earlier failed fast-path attempt, this version keeps the GraceHashJoin's spill-capable memory reservation alive for the duration of the stream. This preserves a spillable consumer in the memory pool, allowing other non-spillable consumers (HashJoinInput from other joins in the query) to reclaim memory when needed. The key difference: the reservation is captured in the output stream's closure and only dropped when the stream is fully consumed, maintaining the memory pool's ability to handle pressure from concurrent operators. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…-counting The fast path kept the build-side memory in the GHJ reservation while also passing the build data to HashJoinExec (which creates its own HashJoinInput reservation). This double-counted the build memory in the pool, consuming ~42MB of headroom that other operators needed. Fix: shrink the reservation to 0 before executing HashJoinExec. The reservation stays registered as a spillable consumer (at 0 bytes) while HashJoinInput manages the actual hash table memory. Also adds comprehensive Grace Hash Join design document. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
c0a8cd8 to
54350c7
Compare
DataFusion manages parallelism externally by calling execute(partition) from multiple async tasks. GHJ should not spawn its own parallel tasks internally, as this fights the runtime's scheduling and memory management, creating too many concurrent HashJoinInput reservations. Replace tokio::spawn fan-out with a sequential stream that processes partitions one at a time using futures::stream::iter + then + flatten. Build-side spill reads still use spawn_blocking to avoid blocking the async executor. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The FairSpillPool calculates fair limits based on spillable memory. When GHJ shrinks its reservation to 0 bytes, non-spillable HashJoinInput consumers have a tight fair limit and OOM (seen in TPC-DS q72). Two fixes: 1. Fast path: don't shrink reservation after build phase. The intentional double-counting (our reservation + HashJoinInput) keeps spillable headroom in the pool. 2. Slow path: capture reservation in the output stream closure so it stays alive until the stream is fully consumed, rather than being dropped when execute_grace_hash_join returns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The GHJ reservation tracks memory during build/probe phases. Once Phase 3 begins, partition data moves into per-partition HashJoinExec instances which track memory via their own HashJoinInput reservations. Keeping the GHJ reservation alive double-counts the memory, consuming pool space that HashJoinInput consumers need. Fix: shrink reservation to 0 in the fast path (before creating the single HashJoinExec) and call free() in the slow path (before creating per-partition HashJoinExecs). This frees ~68 MB of pool headroom that was previously wasted on double-counting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The fast-path memory check was using build_mem_size, a proportional estimate (total_batch_size * sub_rows / total_rows) that can underestimate actual memory by 5-20x. This caused GHJ to create massive non-spillable HashJoinInput consumers (885+ MB) when it should have taken the slow path. Fix: compute actual build size from get_array_memory_size() on the real batches instead of the proportional estimate. Also free the GHJ reservation completely (both fast and slow paths) before creating per-partition HashJoinExec instances, since HashJoinInput tracks its own memory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The fast path skipped probe partitioning and created a single HashJoinExec with ALL build data. This produced one massive non-spillable HashJoinInput consumer (e.g. 460 MB build → 1.3 GB hash table) that exhausted the memory pool. The slow path always partitions both sides, producing per-partition hash tables that are ~1/N of the total and processed sequentially. Only one HashJoinInput consumer exists at any time, keeping peak memory low. Also removes the now-unused StreamSourceExec (was only used by the fast path to wrap the probe stream). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Document why the fast path was removed (non-spillable HashJoinInput, inaccurate build_mem_size estimates, point-in-time memory checks). Update Phase 3 to describe sequential partition processing. Add lessons learned about internal parallelism and fast path pitfalls. Remove join_time metric (no longer tracked). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
With fully sequential partition processing, Phase 3 was I/O bound at ~30% CPU — each partition's spill file read had to complete before the next could start. Add a semaphore with 3 permits so up to 3 partitions can overlap disk I/O with CPU work, without the memory explosion from unlimited parallelism (at most 3 small HashJoinInput consumers active at once). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The fast path was removed entirely because it accepted large build sides (460 MB) that produced 1.3 GB non-spillable hash tables. But it's essential for cases like TPC-DS q72's outer join with a ~10-row build side and a ~170M-row probe side — without it, gigabytes of probe data are spilled to disk for a trivial hash table. Restore the fast path with a conservative 10 MB threshold based on actual batch sizes (get_array_memory_size), not the unreliable proportional estimate. A 10 MB build produces at most a ~30 MB hash table, which is always safe regardless of concurrent operators. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add spark.comet.exec.graceHashJoin.fastPathThreshold config (default 10 MB, set to 0 to disable) so users can tune the boundary between the fast path (single HashJoinExec) and the slow path (spill-based). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The configured fastPathThreshold is a total budget. Since all Spark tasks run concurrently and each creates a non-spillable HashJoinExec hash table, divide by spark.executor.cores so each task stays within its fair share of memory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use Arrow IPC LZ4_FRAME compression when writing spill files. The reader auto-detects compression from IPC metadata. This reduces spill file sizes and I/O time with minimal CPU overhead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Member
Author
|
q72 completes without error, but performance is 💩 (755 seconds vs Spark's 90 seconds) |
Coalesce the many small per-partition sub-batches (one per original input batch) into a single batch in join_single_partition before passing to join_partition_recursive or join_with_spilled_probe. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
After Phase 2, merge adjacent FinishedPartitions so each group has roughly 32 MB of build data (TARGET_PARTITION_BUILD_SIZE). For a 48 MB build split into 16 partitions, this reduces from 16 HashJoinExec calls to 2, significantly cutting per-partition overhead (plan creation, hash table build, probe) on the slow path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Spilled partitions had build_mem_size=0 (cleared on spill), causing the merge algorithm to see ~0 total build bytes and collapse all partitions into one. The resulting single giant hash table (1.3 GB) exceeded the memory pool. Fix by using SpillWriter.bytes_written to correctly track actual build-side data size for merge decisions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The previous needs_repartition check used try_grow(build_size * 3) to estimate hash table overhead, but this underestimates actual usage — 500 MB raw data can expand to 1.6 GB hash table (>3x). With skewed data, a single partition's HashJoinInput consumed 1.6 GB and OOMed. Replace the memory pool probe with a simple size threshold: if the build side exceeds TARGET_PARTITION_BUILD_SIZE (32 MB), always repartition recursively. This guarantees no single HashJoinExec gets a build side large enough to create an oversized hash table. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nstrained tests Adds info logging at every HashJoinExec::try_new call site (FAST PATH, SPILLED PROBE PATH, RECURSIVE PATH) to identify which code path creates the oversized 1.6 GB hash table in production. Adds two new tests with a 256 MB FairSpillPool and 134 MB build side that verify repartitioning works correctly for both build_left=true and build_left=false. Both tests pass, confirming the repartition logic works — the production issue may be in a different code path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The GHJ logs show only FAST PATH entries with tiny builds (max 586 KB). The 1.6 GB HashJoinInput is NOT created by GHJ. Adding logging to the planner's plain HashJoinExec fallback (PartitionMode::Partitioned) to determine if/when it's reached despite grace_enabled=true. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Log the full execution plan tree (via DisplayableExecutionPlan) at FAST PATH, SPILLED PROBE PATH, and RECURSIVE PATH to diagnose where the 1630 MB HashJoinInput allocation originates from. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add progress logging every ~1M rows emitted from slow-path GHJ joins. Also enhance slow-path log to include row count, join_type, and build_left. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add unique GHJ instance IDs (GHJ#N) to all log lines for correlation. Log pool reserved bytes and reservation sizes at key points: - GHJ start, fast path, slow path entry - Probe phase completion (with reservation size) - Before reservation.free() (Phase 3 transition) - Probe accumulation progress every 5M rows - Before HashJoinExec creation in recursive path This will reveal which GHJ's probe accumulation causes the 1802 MB HashJoinInput consumer that triggers OOM. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
DataSourceExec wraps all output with BatchSplitStream, which slices batches exceeding batch_size (default 8192 rows). Arrow's batch.slice() shares underlying buffers, so get_record_batch_memory_size() reports the full buffer size for every slice. This causes collect_left_input to vastly over-count memory (e.g. 696K-row build batch split into 85 slices, each reporting 22 MB = 1.87 GB instead of the actual 22 MB). Fix: use a TaskContext with batch_size=MAX for Phase 3 partition joins so BatchSplitStream passes batches through without slicing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Member
Author
|
q72 is now 19 seconds! 🍾 |
Member
Author
|
I have to give Claude Code a shoutout for tracking down this bug. It took many iterations on debugging. |
HashJoinExec uses batch_size for output buffer allocation, so usize::MAX causes capacity overflow in JoinHashMap. Instead, set batch_size to the max row count across input batches — just enough to prevent BatchSplitStream from slicing while keeping output buffers reasonable. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…erflow Replace DataSourceExec(MemorySourceConfig) with StreamSourceExec for the build side in all GHJ join paths. DataSourceExec wraps output with BatchSplitStream, which slices large batches. Arrow's zero-copy slicing shares buffers, inflating get_record_batch_memory_size() per slice and causing spurious OOM. The previous fix (overriding batch_size) caused Arrow i32 offset overflow because HashJoinExec uses batch_size for output buffer allocation — with large batch_size, string columns can exceed the 2 GB i32 offset limit. StreamSourceExec returns batches directly without splitting, fixing both the memory over-counting and the offset overflow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Also use memory_source_exec (StreamSourceExec) for the probe side in the recursive and spilled-probe paths. With context_for_join removed, the probe side's DataSourceExec was splitting concatenated batches into many tiny batches via BatchSplitStream, adding per-batch overhead that slowed down queries like TPC-DS q72. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… overhead With the default Comet batch_size (8192), HashJoinExec produces thousands of small output batches for large joins, causing significant per-batch overhead. This explains the q72 regression from b8d0e1d (usize::MAX batch_size, fast) to bef66d6 (actual row count, slow). Use a dedicated GHJ_OUTPUT_BATCH_SIZE (128K) for HashJoinExec output. This is independent of the StreamSourceExec fix for input splitting — StreamSourceExec prevents build-side memory over-counting, while the larger output batch_size reduces join output overhead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
Add a Grace Hash Join (GHJ) operator that replaces Spark's
ShuffledHashJoinExecwith a spill-capable native implementation. Instead of loading the entire build side into a single hash table (which OOMs on large builds), GHJ hash-partitions both sides into N buckets and joins them independently, spilling to disk under memory pressure.This is an experimental feature, disabled by default (
spark.comet.exec.graceHashJoin.enabled=false).How it works
Phase 1 — Build partitioning: Hash-partition build input into 16 buckets using a prefix-sum algorithm. When memory is tight, spill the largest partition to disk (Arrow IPC with LZ4 compression).
Phase 2 — Probe partitioning: Same hash-partitioning on probe input. On first memory pressure, aggressively spill ALL non-spilled partitions (both sides) to prevent thrashing with concurrent operators sharing the memory pool.
Phase 3 — Per-partition joins: Join each partition sequentially via
HashJoinExec. Adjacent partitions are merged when builds are small (target 32 MB per group) to reduce overhead. Oversized build partitions are recursively repartitioned (up to 3 levels = 4096 effective partitions). A fast path streams probe data directly for tiny builds (< 10 MB / executor cores).Key design decisions
SpillReaderExecinstead of loading into memory, keeping peak memory at ~1/N of the full datasetHashJoinExeccalls (e.g., 16 partitions with 48 MB total build → 2 merged groups)DataSourceExecwraps output withBatchSplitStream, which slices large batches into 8192-row chunks. Arrow's zero-copy slicing shares buffers, causingget_record_batch_memory_size()to report the full buffer size per slice. This inflated memory accounting 85x incollect_left_input, causing phantom OOM. Fixed by settingbatch_sizeto the actual row count in theTaskContextpassed to Phase 3 joins.Configuration
spark.comet.exec.graceHashJoin.enabledfalsespark.comet.exec.graceHashJoin.numPartitions16spark.comet.exec.graceHashJoin.fastPathThreshold10485760(10 MB)Files changed
native/core/src/execution/operators/grace_hash_join.rs— GHJ operator implementation (~2600 lines)native/core/src/execution/planner.rs— Wire GHJ into the physical plannercommon/.../CometConf.scala— Configuration entriesspark/.../operators.scala—CometGraceHashJoinExecSpark operator and metricsspark/.../CometMetricNode.scala— Metric mapping for GHJdocs/.../grace-hash-join-design.md— Design document