Skip to content

Commit 25c65a6

Browse files
feat: generalize dynamic state injection via with_new_state on ExecutionPlan
Introduce a generic `with_new_state` method to the `ExecutionPlan` trait, superseding the specialized `with_work_table`. This provides a uniform mechanism for execution nodes to accept late-bound, run-time state of any type, improving extensibility for custom operators. Motivation: `RecursiveQueryExec` and other potential operators need to retrofit execution plans with run-time state (e.g. the working table for recursive CTEs). The previous `with_work_table` API was hard-coded to one concrete type, preventing wrapper or third-party nodes from re-using the same mechanism for different kinds of state. Changes: - Replace `with_work_table` with `with_new_state(&self, state: Arc<dyn Any + Send + Sync>)` on `ExecutionPlan`; default implementation returns `None`. - Update rustdoc on the trait to describe the generic contract and thread- safety (`Send + Sync`) requirements. - Implement `with_new_state` in `WorkTableExec`, down-casting the supplied state to `Arc<WorkTable>` and returning an updated plan when applicable. - Refactor `RecursiveQueryExec::assign_work_table` to use the new trait method, passing the work table as generic state. - Remove the now-unused re-export of `WorkTable` from `execution_plan.rs`. - Refresh documentation throughout to emphasise that the mechanism is generic, using `WorkTableExec` solely as an illustrative example. This refactor future-proofs DataFusion’s execution API, enabling any custom node to inject and propagate arbitrary shared state without introducing new trait methods each time.
1 parent aac67eb commit 25c65a6

File tree

4 files changed

+40
-27
lines changed

4 files changed

+40
-27
lines changed

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::filter_pushdown::{
2323
pub use crate::metrics::Metric;
2424
pub use crate::ordering::InputOrderMode;
2525
pub use crate::stream::EmptyRecordBatchStream;
26-
pub use crate::work_table::WorkTable;
2726

2827
pub use datafusion_common::hash_utils;
2928
pub use datafusion_common::utils::project_schema;
@@ -572,20 +571,25 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
572571
))
573572
}
574573

575-
/// Returns a new execution plan that uses the provided work table, if supported.
576-
/// This enables recursive query execution by allowing work table injection.
574+
/// Injects arbitrary run-time state into this execution plan, returning a new plan
575+
/// instance that incorporates that state *if* it is relevant to the concrete
576+
/// node implementation.
577577
///
578-
/// Primarily implemented by [`WorkTableExec`], but custom execution nodes that wrap
579-
/// or contain `WorkTableExec` instances should also implement this to propagate
580-
/// work table injection to their inner components.
578+
/// This is a generic entry point: the `state` can be any type wrapped in
579+
/// `Arc<dyn Any + Send + Sync>`. A node that cares about the state should
580+
/// down-cast it to the concrete type it expects and, if successful, return a
581+
/// modified copy of itself that captures the provided value. If the state is
582+
/// not applicable, the default behaviour is to return `None` so that parent
583+
/// nodes can continue propagating the attempt further down the plan tree.
581584
///
582-
/// See [`WorkTableExec::with_work_table`] for the reference implementation.
583-
///
584-
/// [`WorkTableExec`]: crate::work_table::WorkTableExec
585-
/// [`WorkTableExec::with_work_table`]: crate::work_table::WorkTableExec::with_work_table
586-
fn with_work_table(
585+
/// For example, [`WorkTableExec`](crate::work_table::WorkTableExec)
586+
/// down-casts the supplied state to an `Arc<WorkTable>`
587+
/// in order to wire up the working table used during recursive-CTE execution.
588+
/// Similar patterns can be followed by custom nodes that need late-bound
589+
/// dependencies or shared state.
590+
fn with_new_state(
587591
&self,
588-
_work_table: Arc<WorkTable>,
592+
_state: Arc<dyn Any + Send + Sync>,
589593
) -> Option<Arc<dyn ExecutionPlan>> {
590594
None
591595
}

datafusion/physical-plan/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ pub use crate::ordering::InputOrderMode;
5050
pub use crate::stream::EmptyRecordBatchStream;
5151
pub use crate::topk::TopK;
5252
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};
53-
pub use crate::work_table::WorkTable;
5453
pub use spill::spill_manager::SpillManager;
5554

5655
mod ordering;

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,9 @@ fn assign_work_table(
351351
) -> Result<Arc<dyn ExecutionPlan>> {
352352
let mut work_table_refs = 0;
353353
plan.transform_down(|plan| {
354-
if let Some(new_plan) = plan.with_work_table(Arc::clone(&work_table)) {
354+
if let Some(new_plan) =
355+
plan.with_new_state(Arc::clone(&work_table) as Arc<dyn Any + Send + Sync>)
356+
{
355357
if work_table_refs > 0 {
356358
not_impl_err!(
357359
"Multiple recursive references to the same CTE are not supported"

datafusion/physical-plan/src/work_table.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl ReservedBatches {
5757
/// See <https://wiki.postgresql.org/wiki/CTEReadme#How_Recursion_Works>
5858
/// This table serves as a mirror or buffer between each iteration of a recursive query.
5959
#[derive(Debug)]
60-
pub struct WorkTable {
60+
pub(super) struct WorkTable {
6161
batches: Mutex<Option<ReservedBatches>>,
6262
}
6363

@@ -225,20 +225,28 @@ impl ExecutionPlan for WorkTableExec {
225225
Ok(Statistics::new_unknown(&self.schema()))
226226
}
227227

228-
/// Creates a new `WorkTableExec` with the provided work table for recursive query execution.
229-
/// During query planning, `WorkTableExec` nodes are created as placeholders; this method
230-
/// "wires up" the actual work table that coordinates data between recursive iterations.
231-
fn with_work_table(
228+
/// Injects run-time state into this `WorkTableExec`.
229+
///
230+
/// The only state this node currently understands is an [`Arc<WorkTable>`].
231+
/// If `state` can be down-cast to that type, a new `WorkTableExec` backed
232+
/// by the provided work table is returned. Otherwise `None` is returned
233+
/// so that callers can attempt to propagate the state further down the
234+
/// execution plan tree.
235+
fn with_new_state(
232236
&self,
233-
work_table: Arc<WorkTable>,
237+
state: Arc<dyn Any + Send + Sync>,
234238
) -> Option<Arc<dyn ExecutionPlan>> {
235-
Some(Arc::new(Self {
236-
name: self.name.clone(),
237-
schema: Arc::clone(&self.schema),
238-
metrics: ExecutionPlanMetricsSet::new(),
239-
work_table,
240-
cache: self.cache.clone(),
241-
}))
239+
// Attempt to down-cast the supplied state to a WorkTable
240+
match state.downcast::<WorkTable>() {
241+
Ok(work_table) => Some(Arc::new(Self {
242+
name: self.name.clone(),
243+
schema: Arc::clone(&self.schema),
244+
metrics: ExecutionPlanMetricsSet::new(),
245+
work_table,
246+
cache: self.cache.clone(),
247+
})),
248+
Err(_) => None,
249+
}
242250
}
243251
}
244252

0 commit comments

Comments
 (0)