Skip to content

Commit 6cd03e2

Browse files
zhuqi-lucasalamb
andauthored
Minor: add testing case for add YieldStreamExec and polish docs (#16369)
* Add more testing case for add YieldStreamExec * fmt * fix doc * Update datafusion/physical-optimizer/src/insert_yield_exec.rs Co-authored-by: Andrew Lamb <[email protected]> * Update datafusion/physical-optimizer/src/insert_yield_exec.rs Co-authored-by: Andrew Lamb <[email protected]> * Address comments --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 3a371ed commit 6cd03e2

File tree

5 files changed

+39
-10
lines changed

5 files changed

+39
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-optimizer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,4 @@ recursive = { workspace = true, optional = true }
5454
datafusion-expr = { workspace = true }
5555
datafusion-functions-nested = { workspace = true }
5656
insta = { workspace = true }
57+
tokio = { workspace = true }

datafusion/physical-optimizer/src/insert_yield_exec.rs

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! The `InsertYieldExec` optimizer rule inspects the physical plan to look for
19-
//! tight-looping operators and inserts explicit yielding mechanisms (whether
20-
//! as a separate operator, or via a yielding variant) at leaf nodes to make
21-
//! the plan cancellation friendly.
18+
//! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find all leaf
19+
//! nodes corresponding to tight-looping operators. It first attempts to replace
20+
//! each leaf with a cooperative-yielding variant via `with_cooperative_yields`,
21+
//! and only if no built-in variant exists does it wrap the node in a
22+
//! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the plan
23+
//! remains cancellation-friendly.
2224
2325
use std::fmt::{Debug, Formatter};
2426
use std::sync::Arc;
@@ -32,9 +34,10 @@ use datafusion_physical_plan::yield_stream::YieldStreamExec;
3234
use datafusion_physical_plan::ExecutionPlan;
3335

3436
/// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in
35-
/// the plan, and replaces it with a variant that cooperatively yields
36-
/// either using the its yielding variant given by `with_cooperative_yields`,
37-
/// or, if none exists, by inserting a [`YieldStreamExec`] operator as a parent.
37+
/// the plan and replaces it with a variant that yields cooperatively if supported.
38+
/// If the node does not provide a built-in yielding variant via
39+
/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`YieldStreamExec`] parent to
40+
/// enforce a configured yield frequency.
3841
pub struct InsertYieldExec {}
3942

4043
impl InsertYieldExec {
@@ -73,10 +76,11 @@ impl PhysicalOptimizerRule for InsertYieldExec {
7376
// Not a leaf, keep recursing down.
7477
return Ok(Transformed::no(plan));
7578
}
79+
// For leaf nodes, try to get a built-in cooperative-yielding variant.
7680
let new_plan = Arc::clone(&plan)
7781
.with_cooperative_yields()
7882
.unwrap_or_else(|| {
79-
// Otherwise, insert a `YieldStreamExec` to enforce periodic yielding.
83+
// Only if no built-in variant exists, insert a `YieldStreamExec`.
8084
Arc::new(YieldStreamExec::new(plan, yield_period))
8185
});
8286
Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
@@ -92,3 +96,27 @@ impl PhysicalOptimizerRule for InsertYieldExec {
9296
true
9397
}
9498
}
99+
100+
#[cfg(test)]
101+
mod tests {
102+
use super::*;
103+
use datafusion_common::config::ConfigOptions;
104+
use datafusion_physical_plan::{displayable, test::scan_partitioned};
105+
use insta::assert_snapshot;
106+
107+
#[tokio::test]
108+
async fn test_yield_stream_exec_for_custom_exec() {
109+
let test_custom_exec = scan_partitioned(1);
110+
let config = ConfigOptions::new();
111+
let optimized = InsertYieldExec::new()
112+
.optimize(test_custom_exec, &config)
113+
.unwrap();
114+
115+
let display = displayable(optimized.as_ref()).indent(true).to_string();
116+
// Use insta snapshot to ensure full plan structure
117+
assert_snapshot!(display, @r###"
118+
YieldStreamExec frequency=64
119+
DataSourceExec: partitions=1, partition_sizes=[1]
120+
"###);
121+
}
122+
}

datafusion/physical-plan/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,5 @@ pub mod udaf {
9292
}
9393

9494
pub mod coalesce;
95-
#[cfg(any(test, feature = "bench"))]
9695
pub mod test;
9796
pub mod yield_stream;

datafusion/physical-plan/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ impl TestMemoryExec {
300300
}
301301

302302
/// refer to `try_with_sort_information` at MemorySourceConfig for more information.
303-
/// https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs
303+
/// <https://github.com/apache/datafusion/tree/main/datafusion/datasource/src/memory.rs>
304304
pub fn try_with_sort_information(
305305
mut self,
306306
mut sort_information: Vec<LexOrdering>,

0 commit comments

Comments
 (0)