Skip to content

feat(swordfish): plan caching with ActivePlansRegistry#6278

Open
colin-ho wants to merge 3 commits intocolin/pipeline-messagefrom
colin/plan-caching
Open

feat(swordfish): plan caching with ActivePlansRegistry#6278
colin-ho wants to merge 3 commits intocolin/pipeline-messagefrom
colin/plan-caching

Conversation

@colin-ho
Copy link
Collaborator

Summary

  • Implement `ActivePlansRegistry` that caches plan pipelines by fingerprint+query_id
  • `PlanState` tracks active input_ids and manages pipeline lifecycle
  • `MessageRouter` routes `PipelineMessage` outputs to per-input-id channels
  • `try_finish()` API for callers to signal input completion and collect stats
  • Python integration: `native_executor.py` and `flotilla.py` call `try_finish()`
  • Add fingerprint tests for plan reuse and isolation (`tests/local_plan/test_fingerprint.py`)

This is the final PR in the series that wires everything together. Multiple executions of the same logical plan now share a single pipeline via fingerprint-based caching.

Depends on: #6276 (Plan Fingerprinting) + #6277 (PipelineMessage)

Test plan

  • `cargo check --all-features` passes (verified locally)
  • `DAFT_RUNNER=native make test` passes
  • `DAFT_RUNNER=native make test EXTRA_ARGS="-v tests/local_plan/test_fingerprint.py"` specifically

🤖 Generated with Claude Code

colin-ho and others added 3 commits February 21, 2026 21:16
Add a fingerprint() method that computes a structural hash of the plan
tree for plan caching. Two plans with identical structure (same operators,
expressions, schemas) produce the same fingerprint, enabling pipeline reuse
across multiple executions of the same logical plan.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wire plan caching by fingerprint in NativeExecutor, enabling pipeline
reuse across multiple executions of the same logical plan. Multiple
input_ids route through a single shared pipeline via MessageRouter.

Key changes:
- ActivePlansRegistry caches plan pipelines by fingerprint+query_id
- PlanState tracks active input_ids and manages pipeline lifecycle
- MessageRouter routes PipelineMessage outputs to per-input-id channels
- try_finish() API for callers to signal input completion
- Python integration: native_executor.py and flotilla.py call try_finish()
- Fingerprint tests for plan reuse and isolation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@colin-ho colin-ho requested a review from a team as a code owner February 22, 2026 05:35
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 22, 2026

Greptile Summary

This PR implements plan caching in the native executor, allowing multiple executions of the same logical plan to share a single pipeline via fingerprint-based caching. The implementation includes:

  • ActivePlansRegistry: Caches active pipelines by fingerprint + query_id key, with PlanState tracking active input_ids for each plan
  • MessageRouter: Routes PipelineMessage outputs to per-input-id channels so multiple inputs can share one pipeline
  • try_finish() API: Allows callers to signal input completion and retrieve stats; removes plan from registry when last input finishes
  • Plan fingerprinting: Comprehensive fingerprint() implementation for all plan node types to enable structural plan matching
  • Python integration: Updates native_executor.py and flotilla.py to use new try_finish() API
  • Test coverage: New test_fingerprint.py validates plan reuse and isolation across different plans and query IDs

Issue found: The UDFProject fingerprint implementation extracts but doesn't hash the expr field, which could cause different UDF expressions to be incorrectly identified as the same plan.

Confidence Score: 4/5

  • Safe to merge after fixing the missing expr hash in UDFProject fingerprinting
  • The architecture is well-designed with proper synchronization via channels and mutex guards. The one critical issue is the missing hash for the expr field in UDFProject, which could lead to incorrect plan reuse when different UDF expressions have the same properties. Once fixed, this is production-ready.
  • Pay close attention to src/daft-local-plan/src/plan.rs - the UDFProject fingerprinting bug must be fixed before merge

Important Files Changed

Filename Overview
src/daft-local-execution/src/run.rs Implements ActivePlansRegistry for caching pipelines by fingerprint+query_id, MessageRouter for routing outputs to per-input-id channels, and try_finish() API for cleanup. Found one missing field hash in UDFProject fingerprinting.
src/daft-local-plan/src/plan.rs Adds comprehensive fingerprint() implementation for all plan node types. Missing hash for expr field in UDFProject case - only hashes UDF properties and passthrough columns.
daft/execution/native_executor.py Updates finish() to try_finish() to match new API for signaling input completion and collecting stats.
daft/runners/flotilla.py Updates finish() to try_finish() and moves task_id extraction before resolving inputs for better code organization.
tests/local_plan/test_fingerprint.py Comprehensive tests for plan fingerprinting and reuse, verifying same plans share pipelines and different plans/query_ids are isolated.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Python: executor.run] -->|1. compute fingerprint| B[NativeExecutor::run]
    B -->|2. check registry| C{Plan exists<br/>for fingerprint?}
    C -->|No| D[Create new pipeline]
    D -->|spawn task| E[Pipeline execution task]
    C -->|Yes| F[Reuse existing pipeline]
    F -->|get sender| G[EnqueueInputMessage]
    D -->|get sender| G
    G -->|3. send inputs| E
    E -->|4. route outputs| H[MessageRouter]
    H -->|by input_id| I[Per-input channel]
    I -->|5. stream results| J[Python iterator]
    J -->|6. try_finish| K{Last input_id<br/>for plan?}
    K -->|Yes| L[Remove from registry<br/>await final stats]
    K -->|No| M[Keep plan active<br/>return snapshot]
    L -->|stats| N[Return ExecutionEngineFinalResult]
    M -->|stats| N
    
    style E fill:#e1f5ff
    style H fill:#e1f5ff
    style C fill:#fff4e1
    style K fill:#fff4e1
Loading

Last reviewed commit: fb180dc

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

13 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +170 to +192
Self::UDFProject(UDFProject {
expr,
udf_properties,
passthrough_columns,
schema,
..
}) => {
// Hash UDF properties (excluding any RuntimePyObject)
udf_properties.name.hash(hasher);
udf_properties.resource_request.hash(hasher);
udf_properties.batch_size.hash(hasher);
udf_properties.concurrency.hash(hasher);
udf_properties.use_process.hash(hasher);
udf_properties.max_retries.hash(hasher);
udf_properties.builtin_name.hash(hasher);
udf_properties.is_async.hash(hasher);
udf_properties.is_scalar.hash(hasher);
udf_properties.on_error.hash(hasher);
for expr in passthrough_columns {
expr.hash(hasher);
}
schema.hash(hasher);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing hash for expr field in UDFProject fingerprinting - the expr field is extracted but never hashed, only UDF properties and passthrough columns are hashed

Suggested change
Self::UDFProject(UDFProject {
expr,
udf_properties,
passthrough_columns,
schema,
..
}) => {
// Hash UDF properties (excluding any RuntimePyObject)
udf_properties.name.hash(hasher);
udf_properties.resource_request.hash(hasher);
udf_properties.batch_size.hash(hasher);
udf_properties.concurrency.hash(hasher);
udf_properties.use_process.hash(hasher);
udf_properties.max_retries.hash(hasher);
udf_properties.builtin_name.hash(hasher);
udf_properties.is_async.hash(hasher);
udf_properties.is_scalar.hash(hasher);
udf_properties.on_error.hash(hasher);
for expr in passthrough_columns {
expr.hash(hasher);
}
schema.hash(hasher);
}
Self::UDFProject(UDFProject {
expr,
udf_properties,
passthrough_columns,
schema,
..
}) => {
// Hash the expression
expr.hash(hasher);
// Hash UDF properties (excluding any RuntimePyObject)
udf_properties.name.hash(hasher);
udf_properties.resource_request.hash(hasher);
udf_properties.batch_size.hash(hasher);
udf_properties.concurrency.hash(hasher);
udf_properties.use_process.hash(hasher);
udf_properties.max_retries.hash(hasher);
udf_properties.builtin_name.hash(hasher);
udf_properties.is_async.hash(hasher);
udf_properties.is_scalar.hash(hasher);
udf_properties.on_error.hash(hasher);
for expr in passthrough_columns {
expr.hash(hasher);
}
schema.hash(hasher);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant