Skip to content

refactor: remove EmptyScan plan node#6275

Open
colin-ho wants to merge 2 commits intomainfrom
colin/remove-empty-scan
Open

refactor: remove EmptyScan plan node#6275
colin-ho wants to merge 2 commits intomainfrom
colin/remove-empty-scan

Conversation

@colin-ho
Copy link
Collaborator

@colin-ho colin-ho commented Feb 22, 2026

Summary

Remove the EmptyScan plan node variant, instead in favor of InMemoryScan or PhysicalScan with 0 inputs. This is so that we will be able to reuse a streaming swordfish task.

EmptyScan was a special-case node for producing empty partitions.
Replace all usages with InMemoryScan (with size_bytes=0), which
has the same semantics and simplifies the plan node hierarchy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@colin-ho colin-ho requested a review from a team as a code owner February 22, 2026 05:13
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 22, 2026

Greptile Summary

This PR removes the EmptyScan plan node variant and replaces all usages with existing alternatives (InMemoryScan with size_bytes=0 or PhysicalScan with empty scan tasks). The refactor successfully consolidates empty partition handling into existing infrastructure:

  • limit.rs: Uses InMemoryScan with size_bytes=0 to produce empty partitions when all rows are skipped
  • scan_source.rs: Uses PhysicalScan with empty scan task list, leveraging existing empty batch handling in ScanTaskSource
  • plan.rs: Removes EmptyScan variant, struct, and constructor method
  • Deletions: Removes empty_scan.rs source implementation and all associated imports

Both replacement approaches are correct: InMemorySource handles empty partition lists by producing empty MicroPartitions, and ScanTaskSource has explicit logic to handle empty scan task batches (scan_task.rs:117-132).

Confidence Score: 5/5

  • This PR is safe to merge - it's a clean refactoring that removes duplicate functionality
  • The refactoring is mechanical and correct: all EmptyScan usages are replaced with appropriate alternatives that have verified empty-handling logic, the deleted code is no longer referenced, and the changes simplify the plan node hierarchy without altering behavior
  • No files require special attention

Important Files Changed

Filename Overview
src/daft-distributed/src/pipeline_node/limit.rs Replaced empty_scan with in_memory_scan call, correctly passing all required parameters including size_bytes=0
src/daft-distributed/src/pipeline_node/scan_source.rs Replaced empty_scan with physical_scan for empty scan tasks - correctly leverages existing empty scan task handling in ScanTaskSource
src/daft-local-execution/src/pipeline.rs Removed EmptyScan variant from pattern matching and cleaned up imports
src/daft-local-plan/src/plan.rs Removed EmptyScan variant, struct definition, and empty_scan constructor from LocalPhysicalPlan

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Empty Partition Scenarios] --> B[limit.rs: All rows skipped]
    A --> C[scan_source.rs: No scan tasks]
    
    B -->|Before| D[EmptyScan plan node]
    B -->|After| E[InMemoryScan with size_bytes=0]
    
    C -->|Before| F[EmptyScan plan node]
    C -->|After| G[PhysicalScan with empty scan task list]
    
    D --> H[EmptyScanSource]
    E --> I[InMemorySource]
    G --> J[ScanTaskSource]
    
    H -->|Produces| K[Empty MicroPartition]
    I -->|Produces| K
    J -->|Produces| K
    
    style D fill:#ffcccc
    style F fill:#ffcccc
    style H fill:#ffcccc
    style E fill:#ccffcc
    style G fill:#ccffcc
Loading

Last reviewed commit: f9af1f4

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 22, 2026

Additional Comments (2)

src/daft-distributed/src/pipeline_node/limit.rs
InMemoryScan with no inputs may produce 0 partitions instead of 1 empty partition. When the plan is executed, a channel is created but no data is sent (since inputs is empty), so when the sender is dropped, InMemorySource receives None and exits without producing any partition. The old EmptyScanSource always produced exactly 1 empty partition. Check if downstream aggregates require at least one partition.


src/daft-distributed/src/pipeline_node/scan_source.rs
InMemoryScan with no inputs may produce 0 partitions instead of 1 empty partition. When the plan is executed, a channel is created but no data is sent (since inputs is empty), so when the sender is dropped, InMemorySource receives None and exits without producing any partition. The old EmptyScanSource always produced exactly 1 empty partition.

@codecov
Copy link

codecov bot commented Feb 22, 2026

Codecov Report

❌ Patch coverage is 0% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.45%. Comparing base (8cb880d) to head (f9af1f4).
⚠️ Report is 7 commits behind head on main.

Files with missing lines Patch % Lines
.../daft-distributed/src/pipeline_node/scan_source.rs 0.00% 6 Missing ⚠️
src/daft-distributed/src/pipeline_node/limit.rs 0.00% 5 Missing ⚠️
src/daft-local-plan/src/plan.rs 0.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6275      +/-   ##
==========================================
+ Coverage   73.44%   73.45%   +0.01%     
==========================================
  Files        1003     1002       -1     
  Lines      133436   133118     -318     
==========================================
- Hits        98005    97788     -217     
+ Misses      35431    35330     -101     
Files with missing lines Coverage Δ
src/common/metrics/src/ops.rs 78.57% <ø> (ø)
src/daft-local-execution/src/pipeline.rs 73.96% <ø> (+0.35%) ⬆️
src/daft-local-plan/src/lib.rs 100.00% <ø> (ø)
src/daft-local-plan/src/plan.rs 43.08% <0.00%> (+0.33%) ⬆️
src/daft-distributed/src/pipeline_node/limit.rs 13.54% <0.00%> (-0.15%) ⬇️
.../daft-distributed/src/pipeline_node/scan_source.rs 52.25% <0.00%> (-1.39%) ⬇️

... and 37 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@colin-ho
Copy link
Collaborator Author

colin-ho commented Feb 22, 2026

@greptileai limit and scan source now use with_psets and with_scantasks with empty inputs which will allow the corresponding local pipeline nodes to produce empty micropartitions

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 22, 2026

Exactly! The refactoring elegantly delegates empty partition handling to the existing pipeline sources.

How it works:

  1. limit.rs: Uses InMemoryScan with .with_psets(self.node_id(), vec![]) - passes an empty partition set vector
  2. scan_source.rs: Uses PhysicalScan with .with_scan_tasks(self.node_id(), vec![]) - passes an empty scan task vector

What happens downstream:

Looking at InMemorySource (the local execution pipeline node), it has built-in handling for empty partition batches:

if partitions.is_empty() {
    let empty = Arc::new(MicroPartition::empty(Some(schema)));
    let _ = sender.send(empty).await;
}

So instead of needing a dedicated EmptyScan node type, the existing sources naturally produce empty micropartitions when given empty inputs. This is cleaner because:

  • Reuses existing infrastructure rather than special-casing
  • The empty handling logic lives in the source implementations where it belongs
  • One less plan node variant to maintain

The distributed layer just needs to pass empty vectors, and the local execution layer handles producing the appropriate empty results.

@colin-ho
Copy link
Collaborator Author

@greptileai re-review

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@colin-ho colin-ho requested a review from srilman February 23, 2026 19:17
Copy link
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good, but I'm a bit confused about the purpose. Why would this help with plan reuse?

@colin-ho
Copy link
Collaborator Author

The PR looks good, but I'm a bit confused about the purpose. Why would this help with plan reuse?

e.g. if there are existing tasks post-limit, we can reuse them.

@colin-ho
Copy link
Collaborator Author

The PR looks good, but I'm a bit confused about the purpose. Why would this help with plan reuse?

e.g. if there are existing tasks post-limit, we can reuse them.

on second thought, u might be right, if there were existing tasks post-limit, why would we need to emit this task to produce an empty mp anyway?

@colin-ho
Copy link
Collaborator Author

The PR looks good, but I'm a bit confused about the purpose. Why would this help with plan reuse?

e.g. if there are existing tasks post-limit, we can reuse them.

on second thought, u might be right, if there were existing tasks post-limit, why would we need to emit this task to produce an empty mp anyway?

Ah no ok i remember the REAL reason for this now.

Back in the streaming sources PR we made ScanTaskSource pull from a receiver<(scantask, inputid)> (likewise for inmemory and glob), but not empty scan.

I didn't think it was necessary to introduce the receiver and inputid plumbing into empty scan, so i opted to just remove it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants