Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/common/metrics/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::{ATTR_NODE_ID, ATTR_NODE_TYPE, NodeID};
pub enum NodeType {
// Sources
// Produces MicroPartitions, never consumes
#[default] // For testing purposes
EmptyScan,
#[default]
GlobScan,
InMemoryScan,
ScanTask,
Expand Down
10 changes: 7 additions & 3 deletions src/daft-distributed/src/pipeline_node/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use common_metrics::{
snapshot::DefaultSnapshot,
};
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_logical_plan::stats::{PlanStats, StatsState};
use daft_schema::schema::SchemaRef;
use futures::StreamExt;
use opentelemetry::{KeyValue, metrics::Meter};
Expand Down Expand Up @@ -345,15 +345,19 @@ impl LimitNode {
if next_tasks.is_empty() {
// If all rows need to be skipped, send an empty scan task to allow downstream tasks to
// continue running, such as aggregate tasks
let empty_plan = LocalPhysicalPlan::empty_scan(
let empty_plan = LocalPhysicalPlan::in_memory_scan(
self.node_id(),
self.config.schema.clone(),
0,
StatsState::Materialized(PlanStats::empty().into()),
LocalNodeContext {
origin_node_id: Some(self.node_id() as usize),
additional: None,
},
);
let empty_scan_builder =
SwordfishTaskBuilder::new(empty_plan, self.as_ref());
SwordfishTaskBuilder::new(empty_plan, self.as_ref())
.with_psets(self.node_id(), vec![]);
if result_tx.send(empty_scan_builder).await.is_err() {
return Ok(());
}
Expand Down
9 changes: 7 additions & 2 deletions src/daft-distributed/src/pipeline_node/scan_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,19 @@ impl PipelineNodeImpl for ScanSourceNode {
_plan_context: &mut PlanExecutionContext,
) -> TaskBuilderStream {
if self.scan_tasks.is_empty() {
let transformed_plan = LocalPhysicalPlan::empty_scan(
let physical_scan = LocalPhysicalPlan::physical_scan(
self.node_id(),
self.pushdowns.clone(),
self.config.schema.clone(),
StatsState::NotMaterialized,
LocalNodeContext {
origin_node_id: Some(self.node_id() as usize),
additional: None,
},
);
let empty_scan_task = SwordfishTaskBuilder::new(transformed_plan, self.as_ref());

let empty_scan_task = SwordfishTaskBuilder::new(physical_scan, self.as_ref())
.with_scan_tasks(self.node_id(), vec![]);
TaskBuilderStream::new(stream::iter(std::iter::once(empty_scan_task)).boxed())
} else {
let slf = self.clone();
Expand Down
17 changes: 4 additions & 13 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use common_scan_info::ScanTaskLikeRef;
use daft_core::{join::JoinSide, prelude::Schema};
use daft_dsl::{common_treenode::ConcreteTreeNode, join::get_common_join_cols};
use daft_local_plan::{
CommitWrite, Concat, CrossJoin, Dedup, EmptyScan, Explode, Filter, GlobScan, HashAggregate,
HashJoin, InMemoryScan, InputId, IntoBatches, Limit, LocalNodeContext, LocalPhysicalPlan,
CommitWrite, Concat, CrossJoin, Dedup, Explode, Filter, GlobScan, HashAggregate, HashJoin,
InMemoryScan, InputId, IntoBatches, Limit, LocalNodeContext, LocalPhysicalPlan,
MonotonicallyIncreasingId, PhysicalScan, PhysicalWrite, Pivot, Project, Sample, Sort,
SortMergeJoin, SourceId, TopN, UDFProject, UnGroupedAggregate, Unpivot, VLLMProject,
WindowOrderByOnly, WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy,
Expand Down Expand Up @@ -63,9 +63,8 @@ use crate::{
write::{WriteFormat, WriteSink},
},
sources::{
empty_scan::EmptyScanSource, flight_shuffle_read::FlightShuffleReadSource,
glob_scan::GlobScanSource, in_memory::InMemorySource, scan_task::ScanTaskSource,
source::SourceNode,
flight_shuffle_read::FlightShuffleReadSource, glob_scan::GlobScanSource,
in_memory::InMemorySource, scan_task::ScanTaskSource, source::SourceNode,
},
streaming_sink::{
async_udf::AsyncUdfSink, base::StreamingSinkNode, limit::LimitSink,
Expand Down Expand Up @@ -357,14 +356,6 @@ fn physical_plan_to_pipeline(
LocalPhysicalPlan::PlaceholderScan(_) => {
panic!("PlaceholderScan should not be converted to a pipeline node")
}
LocalPhysicalPlan::EmptyScan(EmptyScan {
schema,
stats_state,
context,
}) => {
let source = EmptyScanSource::new(schema.clone());
SourceNode::new(Box::new(source), stats_state.clone(), ctx, context).boxed()
}
LocalPhysicalPlan::PhysicalScan(PhysicalScan {
source_id,
pushdowns,
Expand Down
55 changes: 0 additions & 55 deletions src/daft-local-execution/src/sources/empty_scan.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/daft-local-execution/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod empty_scan;
pub mod flight_shuffle_read;
pub mod glob_scan;
pub mod in_memory;
Expand Down
13 changes: 6 additions & 7 deletions src/daft-local-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use daft_micropartition::MicroPartitionRef;
#[cfg(feature = "python")]
pub use plan::{CatalogWrite, DataSink, DistributedActorPoolProject, LanceWrite};
pub use plan::{
CommitWrite, Concat, CrossJoin, Dedup, EmptyScan, Explode, Filter, FlightShuffleRead,
FlightShuffleWrite, GlobScan, HashAggregate, HashJoin, InMemoryScan, IntoBatches,
IntoPartitions, Limit, LocalNodeContext, LocalPhysicalPlan, LocalPhysicalPlanRef,
MonotonicallyIncreasingId, PhysicalScan, PhysicalWrite, Pivot, Project, Repartition, Sample,
SamplingMethod, Sort, SortMergeJoin, TopN, UDFProject, UnGroupedAggregate, Unpivot,
VLLMProject, WindowOrderByOnly, WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy,
WindowPartitionOnly,
CommitWrite, Concat, CrossJoin, Dedup, Explode, Filter, FlightShuffleRead, FlightShuffleWrite,
GlobScan, HashAggregate, HashJoin, InMemoryScan, IntoBatches, IntoPartitions, Limit,
LocalNodeContext, LocalPhysicalPlan, LocalPhysicalPlanRef, MonotonicallyIncreasingId,
PhysicalScan, PhysicalWrite, Pivot, Project, Repartition, Sample, SamplingMethod, Sort,
SortMergeJoin, TopN, UDFProject, UnGroupedAggregate, Unpivot, VLLMProject, WindowOrderByOnly,
WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy, WindowPartitionOnly,
};
#[cfg(feature = "python")]
pub use python::{PyLocalPhysicalPlan, register_modules};
Expand Down
33 changes: 5 additions & 28 deletions src/daft-local-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub enum LocalPhysicalPlan {
InMemoryScan(InMemoryScan),
PhysicalScan(PhysicalScan),
GlobScan(GlobScan),
EmptyScan(EmptyScan),
PlaceholderScan(PlaceholderScan),
Project(Project),
UDFProject(UDFProject),
Expand Down Expand Up @@ -120,7 +119,6 @@ impl LocalPhysicalPlan {
| Self::PhysicalScan(PhysicalScan { stats_state, .. })
| Self::GlobScan(GlobScan { stats_state, .. })
| Self::PlaceholderScan(PlaceholderScan { stats_state, .. })
| Self::EmptyScan(EmptyScan { stats_state, .. })
| Self::Project(Project { stats_state, .. })
| Self::UDFProject(UDFProject { stats_state, .. })
| Self::Filter(Filter { stats_state, .. })
Expand Down Expand Up @@ -171,7 +169,6 @@ impl LocalPhysicalPlan {
| Self::PhysicalScan(PhysicalScan { context, .. })
| Self::GlobScan(GlobScan { context, .. })
| Self::PlaceholderScan(PlaceholderScan { context, .. })
| Self::EmptyScan(EmptyScan { context, .. })
| Self::Project(Project { context, .. })
| Self::UDFProject(UDFProject { context, .. })
| Self::Filter(Filter { context, .. })
Expand Down Expand Up @@ -279,15 +276,6 @@ impl LocalPhysicalPlan {
.arced()
}

pub fn empty_scan(schema: SchemaRef, context: LocalNodeContext) -> LocalPhysicalPlanRef {
Self::EmptyScan(EmptyScan {
schema,
stats_state: StatsState::Materialized(PlanStats::empty().into()),
context,
})
.arced()
}

pub fn filter(
input: LocalPhysicalPlanRef,
predicate: BoundExpr,
Expand Down Expand Up @@ -1038,7 +1026,6 @@ impl LocalPhysicalPlan {
Self::PhysicalScan(PhysicalScan { schema, .. })
| Self::GlobScan(GlobScan { schema, .. })
| Self::PlaceholderScan(PlaceholderScan { schema, .. })
| Self::EmptyScan(EmptyScan { schema, .. })
| Self::Filter(Filter { schema, .. })
| Self::IntoBatches(IntoBatches { schema, .. })
| Self::Limit(Limit { schema, .. })
Expand Down Expand Up @@ -1119,7 +1106,6 @@ impl LocalPhysicalPlan {
Self::PhysicalScan(_)
| Self::GlobScan(_)
| Self::PlaceholderScan(_)
| Self::EmptyScan(_)
| Self::InMemoryScan(_) => vec![],
Self::Filter(Filter { input, .. })
| Self::Limit(Limit { input, .. })
Expand Down Expand Up @@ -1175,12 +1161,11 @@ impl LocalPhysicalPlan {
"LocalPhysicalPlan::with_new_children: Empty children not handled for FlightShuffleRead"
),
[new_child] => match self {
Self::PhysicalScan(_)
| Self::PlaceholderScan(_)
| Self::EmptyScan(_)
| Self::InMemoryScan(_) => panic!(
"LocalPhysicalPlan::with_new_children: PhysicalScan, PlaceholderScan, EmptyScan, and InMemoryScan do not have children"
),
Self::PhysicalScan(_) | Self::PlaceholderScan(_) | Self::InMemoryScan(_) => {
panic!(
"LocalPhysicalPlan::with_new_children: PhysicalScan, PlaceholderScan, and InMemoryScan do not have children"
)
}
Self::Filter(Filter {
predicate, context, ..
}) => Self::filter(
Expand Down Expand Up @@ -1816,14 +1801,6 @@ pub struct PlaceholderScan {
pub context: LocalNodeContext,
}

#[derive(Serialize, Deserialize)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct EmptyScan {
pub schema: SchemaRef,
pub stats_state: StatsState,
pub context: LocalNodeContext,
}

#[derive(Serialize, Deserialize)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct Project {
Expand Down
Loading