Skip to content

Commit 935db91

Browse files
committed
Use Tokio's task budget consistently
1 parent 6cd03e2 commit 935db91

File tree

19 files changed

+272
-367
lines changed

19 files changed

+272
-367
lines changed

datafusion/common/src/config.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -722,15 +722,6 @@ config_namespace! {
722722
/// then the output will be coerced to a non-view.
723723
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
724724
pub expand_views_at_output: bool, default = false
725-
726-
/// When DataFusion detects that a plan might not be promply cancellable
727-
/// due to the presence of tight-looping operators, it will attempt to
728-
/// mitigate this by inserting explicit yielding (in as few places as
729-
/// possible to avoid performance degradation). This value represents the
730-
/// yielding period (in batches) at such explicit yielding points. The
731-
/// default value is 64. If set to 0, no DataFusion will not perform
732-
/// any explicit yielding.
733-
pub yield_period: usize, default = 64
734725
}
735726
}
736727

datafusion/core/tests/execution/infinite_cancel.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_expr_common::operator::Operator::Gt;
3838
use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, Literal};
3939
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
4040
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
41-
use datafusion_physical_optimizer::insert_yield_exec::InsertYieldExec;
41+
use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
4242
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4343
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4444
use datafusion_physical_plan::filter::FilterExec;
@@ -146,7 +146,7 @@ async fn test_infinite_agg_cancel(
146146

147147
// 3) optimize the plan with InsertYieldExec to auto-insert Yield
148148
let config = ConfigOptions::new();
149-
let optimized = InsertYieldExec::new().optimize(aggr, &config)?;
149+
let optimized = EnsureCooperative::new().optimize(aggr, &config)?;
150150

151151
// 4) get the stream
152152
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
@@ -199,7 +199,7 @@ async fn test_infinite_sort_cancel(
199199

200200
// 4) optimize the plan with InsertYieldExec to auto-insert Yield
201201
let config = ConfigOptions::new();
202-
let optimized = InsertYieldExec::new().optimize(sort_exec, &config)?;
202+
let optimized = EnsureCooperative::new().optimize(sort_exec, &config)?;
203203

204204
// 5) get the stream
205205
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
@@ -284,7 +284,7 @@ async fn test_infinite_interleave_cancel(
284284
// 6) Apply InsertYieldExec to insert YieldStreamExec under every leaf.
285285
// Each InfiniteExec → FilterExec → CoalesceBatchesExec chain will yield periodically.
286286
let config = ConfigOptions::new();
287-
let optimized = InsertYieldExec::new().optimize(coalesced_top, &config)?;
287+
let optimized = EnsureCooperative::new().optimize(coalesced_top, &config)?;
288288

289289
// 7) Execute the optimized plan with a 1-second timeout.
290290
// Because the top-level FilterExec always discards rows and the inputs are infinite,
@@ -385,7 +385,7 @@ async fn test_infinite_interleave_agg_cancel(
385385
// That way, each InfiniteExec (through the FilterExec/CoalesceBatchesExec/RepartitionExec chain)
386386
// yields to the runtime periodically instead of spinning CPU.
387387
let config = ConfigOptions::new();
388-
let optimized = InsertYieldExec::new().optimize(aggr, &config)?;
388+
let optimized = EnsureCooperative::new().optimize(aggr, &config)?;
389389

390390
// 6) Execute the stream. Because AggregateExec(mode=Single) only emits a final batch
391391
// after all inputs finish—and those inputs are infinite—we expect no output
@@ -472,7 +472,7 @@ async fn test_infinite_join_cancel(
472472

473473
// 3) Wrap yields under each infinite leaf
474474
let config = ConfigOptions::new();
475-
let optimized = InsertYieldExec::new().optimize(join, &config)?;
475+
let optimized = EnsureCooperative::new().optimize(join, &config)?;
476476

477477
// 4) Execute + 1 sec timeout
478478
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
@@ -590,7 +590,7 @@ async fn test_infinite_join_agg_cancel(
590590

591591
// 5) Wrap yields under each infinite leaf
592592
let config = ConfigOptions::new();
593-
let optimized = InsertYieldExec::new().optimize(aggr, &config)?;
593+
let optimized = EnsureCooperative::new().optimize(aggr, &config)?;
594594

595595
// 6) Execute + 1 sec timeout
596596
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
@@ -644,7 +644,7 @@ async fn test_filter_reject_all_batches_cancel(
644644

645645
// 3) InsertYieldExec to insert YieldExec—so that the InfiniteExec yields control between batches
646646
let config = ConfigOptions::new();
647-
let optimized = InsertYieldExec::new().optimize(coalesced, &config)?;
647+
let optimized = EnsureCooperative::new().optimize(coalesced, &config)?;
648648

649649
// 4) Execute with a 1-second timeout. Because Filter discards all 8192 rows each time
650650
// without ever producing output, no batch will arrive within 1 second. And since
@@ -723,7 +723,7 @@ async fn test_infinite_hash_join_without_repartition_and_no_agg(
723723
// because there is no aggregation so no wrapper is inserted. Here we simply do
724724
// not call InsertYieldExec, ensuring the plan has neither aggregation nor repartition.
725725
let config = ConfigOptions::new();
726-
let optimized = InsertYieldExec::new().optimize(join, &config)?;
726+
let optimized = EnsureCooperative::new().optimize(join, &config)?;
727727

728728
// 4) Execute with a 1 second timeout
729729
let mut stream = physical_plan::execute_stream(optimized, session_ctx.task_ctx())?;
@@ -801,7 +801,7 @@ async fn test_infinite_sort_merge_join_without_repartition_and_no_agg(
801801

802802
// 3) Do not apply InsertYieldExec (no aggregation, no repartition → no built-in yields).
803803
let config = ConfigOptions::new();
804-
let optimized = InsertYieldExec::new().optimize(join, &config)?;
804+
let optimized = EnsureCooperative::new().optimize(join, &config)?;
805805

806806
// 4) Execute with a 1-second timeout. Because both sides are infinite and never match,
807807
// the SortMergeJoin will never produce output within 1s.

datafusion/datasource/src/source.rs

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ use datafusion_common::{Constraints, Result, Statistics};
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3636
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
3737
use datafusion_physical_expr_common::sort_expr::LexOrdering;
38+
use datafusion_physical_plan::coop::cooperative_wrapper;
3839
use datafusion_physical_plan::filter_pushdown::{
3940
ChildPushdownResult, FilterPushdownPropagation,
4041
};
41-
use datafusion_physical_plan::yield_stream::wrap_yield_stream;
4242

4343
/// A source of data, typically a list of files or memory
4444
///
@@ -186,8 +186,6 @@ pub struct DataSourceExec {
186186
data_source: Arc<dyn DataSource>,
187187
/// Cached plan properties such as sort order
188188
cache: PlanProperties,
189-
/// Indicates whether to enable cooperative yielding mode.
190-
cooperative: bool,
191189
}
192190

193191
impl DisplayAs for DataSourceExec {
@@ -261,11 +259,11 @@ impl ExecutionPlan for DataSourceExec {
261259
) -> Result<SendableRecordBatchStream> {
262260
self.data_source
263261
.open(partition, Arc::clone(&context))
264-
.map(|stream| wrap_yield_stream(stream, &context, self.cooperative))
262+
.map(|stream| cooperative_wrapper(stream))
265263
}
266264

267265
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
268-
self.cooperative.then_some(self)
266+
Some(self)
269267
}
270268

271269
fn metrics(&self) -> Option<MetricsSet> {
@@ -298,11 +296,7 @@ impl ExecutionPlan for DataSourceExec {
298296
let data_source = self.data_source.with_fetch(limit)?;
299297
let cache = self.cache.clone();
300298

301-
Some(Arc::new(Self {
302-
data_source,
303-
cache,
304-
cooperative: self.cooperative,
305-
}))
299+
Some(Arc::new(Self { data_source, cache }))
306300
}
307301

308302
fn fetch(&self) -> Option<usize> {
@@ -353,11 +347,7 @@ impl DataSourceExec {
353347
// Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`.
354348
pub fn new(data_source: Arc<dyn DataSource>) -> Self {
355349
let cache = Self::compute_properties(Arc::clone(&data_source));
356-
Self {
357-
data_source,
358-
cache,
359-
cooperative: true,
360-
}
350+
Self { data_source, cache }
361351
}
362352

363353
/// Return the source object
@@ -383,12 +373,6 @@ impl DataSourceExec {
383373
self
384374
}
385375

386-
/// Assign yielding mode
387-
pub fn with_cooperative(mut self, cooperative: bool) -> Self {
388-
self.cooperative = cooperative;
389-
self
390-
}
391-
392376
fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
393377
PlanProperties::new(
394378
data_source.eq_properties(),

datafusion/physical-optimizer/src/insert_yield_exec.rs renamed to datafusion/physical-optimizer/src/ensure_coop.rs

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! The [`InsertYieldExec`] optimizer rule inspects the physical plan to find all leaf
19-
//! nodes corresponding to tight-looping operators. It first attempts to replace
20-
//! each leaf with a cooperative-yielding variant via `with_cooperative_yields`,
21-
//! and only if no built-in variant exists does it wrap the node in a
22-
//! [`YieldStreamExec`] operator to enforce periodic yielding, ensuring the plan
23-
//! remains cancellation-friendly.
18+
//! The [`EnsureCooperative`] optimizer rule inspects the physical plan to find all
19+
//! portions of the plan that will not yield cooperatively.
20+
//! It will insert `CooperativeExec` nodes where appropriate to ensure execution plans
21+
//! always yield cooperatively.
2422
2523
use std::fmt::{Debug, Formatter};
2624
use std::sync::Arc;
@@ -30,65 +28,59 @@ use crate::PhysicalOptimizerRule;
3028
use datafusion_common::config::ConfigOptions;
3129
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
3230
use datafusion_common::Result;
33-
use datafusion_physical_plan::yield_stream::YieldStreamExec;
31+
use datafusion_physical_plan::coop::CooperativeExec;
3432
use datafusion_physical_plan::ExecutionPlan;
3533

36-
/// `InsertYieldExec` is a [`PhysicalOptimizerRule`] that finds every leaf node in
34+
/// `EnsureCooperative` is a [`PhysicalOptimizerRule`] that finds every leaf node in
3735
/// the plan and replaces it with a variant that yields cooperatively if supported.
3836
/// If the node does not provide a built-in yielding variant via
39-
/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`YieldStreamExec`] parent to
40-
/// enforce a configured yield frequency.
41-
pub struct InsertYieldExec {}
37+
/// [`ExecutionPlan::with_cooperative_yields`], it is wrapped in a [`CooperativeExec`] parent.
38+
pub struct EnsureCooperative {}
4239

43-
impl InsertYieldExec {
40+
impl EnsureCooperative {
4441
pub fn new() -> Self {
4542
Self {}
4643
}
4744
}
4845

49-
impl Default for InsertYieldExec {
46+
impl Default for EnsureCooperative {
5047
fn default() -> Self {
5148
Self::new()
5249
}
5350
}
5451

55-
impl Debug for InsertYieldExec {
52+
impl Debug for EnsureCooperative {
5653
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
5754
f.debug_struct("InsertYieldExec").finish()
5855
}
5956
}
6057

61-
impl PhysicalOptimizerRule for InsertYieldExec {
58+
impl PhysicalOptimizerRule for EnsureCooperative {
6259
fn name(&self) -> &str {
6360
"insert_yield_exec"
6461
}
6562

6663
fn optimize(
6764
&self,
6865
plan: Arc<dyn ExecutionPlan>,
69-
config: &ConfigOptions,
66+
_config: &ConfigOptions,
7067
) -> Result<Arc<dyn ExecutionPlan>> {
71-
// Only activate if user has configured a non-zero yield frequency.
72-
let yield_period = config.optimizer.yield_period;
73-
if yield_period != 0 {
74-
plan.transform_down(|plan| {
75-
if !plan.children().is_empty() {
76-
// Not a leaf, keep recursing down.
77-
return Ok(Transformed::no(plan));
78-
}
79-
// For leaf nodes, try to get a built-in cooperative-yielding variant.
80-
let new_plan = Arc::clone(&plan)
68+
plan.transform_down(|plan| {
69+
if !plan.children().is_empty() {
70+
// Not a leaf, keep recursing down.
71+
return Ok(Transformed::no(plan));
72+
}
73+
// For leaf nodes, try to get a built-in cooperative-yielding variant.
74+
let new_plan =
75+
Arc::clone(&plan)
8176
.with_cooperative_yields()
8277
.unwrap_or_else(|| {
83-
// Only if no built-in variant exists, insert a `YieldStreamExec`.
84-
Arc::new(YieldStreamExec::new(plan, yield_period))
78+
// Only if no built-in variant exists, insert a `CooperativeExec`.
79+
Arc::new(CooperativeExec::new(plan))
8580
});
86-
Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
87-
})
88-
.map(|t| t.data)
89-
} else {
90-
Ok(plan)
91-
}
81+
Ok(Transformed::new(new_plan, true, TreeNodeRecursion::Jump))
82+
})
83+
.map(|t| t.data)
9284
}
9385

9486
fn schema_check(&self) -> bool {
@@ -105,10 +97,10 @@ mod tests {
10597
use insta::assert_snapshot;
10698

10799
#[tokio::test]
108-
async fn test_yield_stream_exec_for_custom_exec() {
100+
async fn test_cooperative_exec_for_custom_exec() {
109101
let test_custom_exec = scan_partitioned(1);
110102
let config = ConfigOptions::new();
111-
let optimized = InsertYieldExec::new()
103+
let optimized = EnsureCooperative::new()
112104
.optimize(test_custom_exec, &config)
113105
.unwrap();
114106

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ pub mod coalesce_batches;
2929
pub mod combine_partial_final_agg;
3030
pub mod enforce_distribution;
3131
pub mod enforce_sorting;
32+
pub mod ensure_coop;
3233
pub mod filter_pushdown;
33-
pub mod insert_yield_exec;
3434
pub mod join_selection;
3535
pub mod limit_pushdown;
3636
pub mod limited_distinct_aggregation;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ use crate::coalesce_batches::CoalesceBatches;
2525
use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
2626
use crate::enforce_distribution::EnforceDistribution;
2727
use crate::enforce_sorting::EnforceSorting;
28+
use crate::ensure_coop::EnsureCooperative;
2829
use crate::filter_pushdown::FilterPushdown;
29-
use crate::insert_yield_exec::InsertYieldExec;
3030
use crate::join_selection::JoinSelection;
3131
use crate::limit_pushdown::LimitPushdown;
3232
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
@@ -138,7 +138,7 @@ impl PhysicalOptimizer {
138138
// are not present, the load of executors such as join or union will be
139139
// reduced by narrowing their input tables.
140140
Arc::new(ProjectionPushdown::new()),
141-
Arc::new(InsertYieldExec::new()),
141+
Arc::new(EnsureCooperative::new()),
142142
// The SanityCheckPlan rule checks whether the order and
143143
// distribution requirements of each node in the plan
144144
// is satisfied. It will also reject non-runnable query

0 commit comments

Comments
 (0)