Skip to content

feat: Implement shuffle map side spill support (not flight shuffle)#6133

Open
caican00 wants to merge 1 commit intoEventual-Inc:mainfrom
caican00:support-shuffle-middle-data-spill
Open

feat: Implement shuffle map side spill support (not flight shuffle)#6133
caican00 wants to merge 1 commit intoEventual-Inc:mainfrom
caican00:support-shuffle-middle-data-spill

Conversation

@caican00
Copy link
Contributor

@caican00 caican00 commented Feb 6, 2026

Changes Made

Related Issues

@github-actions github-actions bot added the feat label Feb 6, 2026
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 6, 2026

Greptile Overview

Greptile Summary

This PR implements map-side spill support for shuffle operations in the Daft execution engine. The key change is enabling partitions to contain multiple Ray ObjectRefs instead of a single ref, allowing the shuffle operator to incrementally emit chunks when memory thresholds are exceeded.

Major Changes:

  • Added shuffle_spill_threshold configuration parameter (bytes) to control when shuffle buffers are flushed
  • Refactored RepartitionNode from a blocking sink to a streaming pipeline node that can emit partial results
  • Changed RayPartitionRef from holding a single object_ref to object_refs: Vec<Arc<Py<PyAny>>>
  • Updated RayMaterializedResult to handle list[ObjectRef] and aggregate metadata across chunks
  • Modified partition packing logic in Flotilla to distribute multiple result refs round-robin across expected partitions
  • Added is_into_batches flag to distinguish IntoBatches (which dynamically generates partitions) from regular operators
  • Comprehensive test coverage including spill threshold tests and metadata aggregation tests

Implementation Quality:

  • The spill mechanism correctly tracks current_size_bytes and flushes when threshold is exceeded
  • Proper handling of both IntoBatches (dynamic partitions) and regular ops (fixed partition count)
  • Backward compatibility maintained - threshold defaults to None (no spilling)
  • Added serialization support (__getstate__/__setstate__) for RayPartitionRef

Confidence Score: 4/5

  • This PR is safe to merge with thorough testing and minor architectural considerations
  • The implementation is well-tested with comprehensive test coverage for the spill mechanism. The core logic is sound and properly handles both IntoBatches and regular operations. However, the significant refactoring of the repartition sink from blocking to streaming warrants careful integration testing. The score of 4 reflects high confidence with the recommendation for thorough testing of edge cases in production-like scenarios.
  • Pay close attention to src/daft-local-execution/src/sinks/repartition.rs (major refactoring), daft/runners/ray_runner.py (type signature changes throughout), and daft/runners/flotilla.py (partition packing logic)

Important Files Changed

Filename Overview
src/common/daft-config/src/lib.rs Added shuffle_spill_threshold config field with environment variable support and tests
src/daft-local-execution/src/sinks/repartition.rs Refactored from BlockingSink to streaming PipelineNode with spill support and state management
daft/runners/ray_runner.py Changed RayPartitionSet to handle list[ObjectRef] instead of single ObjectRef for multi-chunk partitions
daft/runners/flotilla.py Updated to pack multiple result refs into partitions with round-robin distribution logic
src/daft-distributed/src/python/ray/task.rs Changed RayPartitionRef from single object_ref to object_refs vector with serialization support
tests/dataframe/test_shuffles.py Added comprehensive spill tests verifying multi-chunk partition behavior

Sequence Diagram

sequenceDiagram
    participant Client
    participant RepartitionNode
    participant State as RepartitionState
    participant TaskSet as OrderingAwareJoinSet
    participant Ray as Ray ObjectStore

    Client->>RepartitionNode: Input MicroPartitions
    RepartitionNode->>State: new(num_partitions)
    
    loop For each input batch
        RepartitionNode->>TaskSet: spawn partition task
        TaskSet->>TaskSet: partition_by_hash/random/range
        TaskSet-->>RepartitionNode: partitioned Vec<MicroPartition>
        RepartitionNode->>State: push(partitioned)
        State->>State: current_size_bytes += part.size_bytes()
        
        alt current_size_bytes >= shuffle_spill_threshold
            State->>State: flush_state()
            State->>State: MicroPartition::concat per partition
            State-->>RepartitionNode: Vec<Arc<MicroPartition>>
            RepartitionNode->>Ray: send outputs (as list[ObjectRef])
            State->>State: clear() & reset size_bytes
        end
    end
    
    RepartitionNode->>State: Final flush if not empty
    State->>State: MicroPartition::concat per partition
    State-->>RepartitionNode: Vec<Arc<MicroPartition>>
    RepartitionNode->>Ray: send final outputs (as list[ObjectRef])
    
    Ray-->>Client: RayMaterializedResult(list[ObjectRef])
Loading

Last reviewed commit: 5f6e8f8

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch 7 times, most recently from 1bc2920 to 949b78d Compare February 9, 2026 08:29
@codecov
Copy link

codecov bot commented Feb 9, 2026

Codecov Report

❌ Patch coverage is 24.66368% with 336 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.13%. Comparing base (c82288a) to head (42e4afe).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/sinks/repartition.rs 0.00% 221 Missing ⚠️
src/daft-distributed/src/python/ray/task.rs 4.08% 47 Missing ⚠️
src/daft-distributed/src/pipeline_node/mod.rs 0.00% 11 Missing ⚠️
...daft-distributed/src/pipeline_node/into_batches.rs 0.00% 10 Missing ⚠️
src/daft-distributed/src/scheduling/task.rs 9.09% 10 Missing ⚠️
src/daft-local-plan/src/plan.rs 0.00% 10 Missing ⚠️
daft/runners/flotilla.py 75.86% 7 Missing ⚠️
src/daft-distributed/src/pipeline_node/sort.rs 0.00% 6 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 0.00% 6 Missing ⚠️
daft/runners/ray_runner.py 94.64% 3 Missing ⚠️
... and 2 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6133      +/-   ##
==========================================
- Coverage   73.32%   73.13%   -0.20%     
==========================================
  Files         994      994              
  Lines      129287   129827     +540     
==========================================
+ Hits        94799    94947     +148     
- Misses      34488    34880     +392     
Files with missing lines Coverage Δ
daft/context.py 84.33% <ø> (ø)
src/common/daft-config/src/lib.rs 97.77% <100.00%> (+0.21%) ⬆️
...tributed/src/pipeline_node/shuffles/repartition.rs 42.85% <0.00%> (+0.30%) ⬆️
daft/runners/ray_runner.py 66.37% <94.64%> (+2.65%) ⬆️
src/common/daft-config/src/python.rs 55.31% <57.14%> (+0.04%) ⬆️
src/daft-distributed/src/pipeline_node/sort.rs 0.00% <0.00%> (ø)
src/daft-local-execution/src/pipeline.rs 76.88% <0.00%> (-0.16%) ⬇️
daft/runners/flotilla.py 55.07% <75.86%> (+10.26%) ⬆️
...daft-distributed/src/pipeline_node/into_batches.rs 0.00% <0.00%> (ø)
src/daft-distributed/src/scheduling/task.rs 60.44% <9.09%> (-1.02%) ⬇️
... and 4 more

... and 7 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch 14 times, most recently from 22af7f9 to 0fca246 Compare February 11, 2026 06:16
@caican00 caican00 changed the title feat: Implement shuffle spill support with composite partition refs feat: Implement shuffle spill support (not flight shuffle) Feb 11, 2026
@caican00
Copy link
Contributor Author

Hi @srilman could you help review this pr?Thank you

@caican00 caican00 changed the title feat: Implement shuffle spill support (not flight shuffle) feat: Implement shuffle map side spill support (not flight shuffle) Feb 11, 2026
@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch 2 times, most recently from 1fa45d2 to 8a2f978 Compare February 12, 2026 15:54
@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch 2 times, most recently from 9c6f278 to bcb2cda Compare February 13, 2026 03:11
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

22 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch from bcb2cda to 5f6e8f8 Compare February 13, 2026 08:20
@caican00
Copy link
Contributor Author

@greptile

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

22 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch 3 times, most recently from 42e4afe to 496eae1 Compare February 14, 2026 17:41
@caican00 caican00 force-pushed the support-shuffle-middle-data-spill branch from 496eae1 to fbda57b Compare February 16, 2026 15:57
@srilman
Copy link
Contributor

srilman commented Feb 18, 2026

hey @caican00! this looks like a really good change that I do want to get in soon, but can I ask you to put a pause on this for the time being? I ask because we're restructuring a lot of shuffle infra for Flight shuffle support, which will make it easier to implement PRs like this in the future. Once that's ready, can you start looking into this again?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants