Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/common/metrics/src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::{ATTR_NODE_ID, ATTR_NODE_TYPE, NodeID};
pub enum NodeType {
// Sources
// Produces MicroPartitions, never consumes
#[default] // For testing purposes
EmptyScan,
#[default]
GlobScan,
InMemoryScan,
ScanTask,
Expand Down
7 changes: 5 additions & 2 deletions src/daft-distributed/src/pipeline_node/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use common_metrics::{
snapshot::DefaultSnapshot,
};
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::stats::StatsState;
use daft_logical_plan::stats::{PlanStats, StatsState};
use daft_schema::schema::SchemaRef;
use futures::StreamExt;
use opentelemetry::{KeyValue, metrics::Meter};
Expand Down Expand Up @@ -345,8 +345,11 @@ impl LimitNode {
if next_tasks.is_empty() {
// If all rows need to be skipped, send an empty scan task to allow downstream tasks to
// continue running, such as aggregate tasks
let empty_plan = LocalPhysicalPlan::empty_scan(
let empty_plan = LocalPhysicalPlan::in_memory_scan(
self.node_id(),
self.config.schema.clone(),
0,
StatsState::Materialized(PlanStats::empty().into()),
LocalNodeContext {
origin_node_id: Some(self.node_id() as usize),
additional: None,
Expand Down
10 changes: 8 additions & 2 deletions src/daft-distributed/src/pipeline_node/scan_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use common_metrics::{
};
use common_scan_info::{Pushdowns, ScanTaskLikeRef};
use daft_local_plan::{LocalNodeContext, LocalPhysicalPlan};
use daft_logical_plan::{ClusteringSpec, stats::StatsState};
use daft_logical_plan::{
ClusteringSpec,
stats::{PlanStats, StatsState},
};
use daft_schema::schema::SchemaRef;
use futures::{StreamExt, stream};
use opentelemetry::{KeyValue, metrics::Meter};
Expand Down Expand Up @@ -220,8 +223,11 @@ impl PipelineNodeImpl for ScanSourceNode {
_plan_context: &mut PlanExecutionContext,
) -> TaskBuilderStream {
if self.scan_tasks.is_empty() {
let transformed_plan = LocalPhysicalPlan::empty_scan(
let transformed_plan = LocalPhysicalPlan::in_memory_scan(
self.node_id(),
self.config.schema.clone(),
0,
StatsState::Materialized(PlanStats::empty().into()),
LocalNodeContext {
origin_node_id: Some(self.node_id() as usize),
additional: None,
Expand Down
17 changes: 4 additions & 13 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use common_scan_info::ScanTaskLikeRef;
use daft_core::{join::JoinSide, prelude::Schema};
use daft_dsl::{common_treenode::ConcreteTreeNode, join::get_common_join_cols};
use daft_local_plan::{
CommitWrite, Concat, CrossJoin, Dedup, EmptyScan, Explode, Filter, GlobScan, HashAggregate,
HashJoin, InMemoryScan, InputId, IntoBatches, Limit, LocalNodeContext, LocalPhysicalPlan,
CommitWrite, Concat, CrossJoin, Dedup, Explode, Filter, GlobScan, HashAggregate, HashJoin,
InMemoryScan, InputId, IntoBatches, Limit, LocalNodeContext, LocalPhysicalPlan,
MonotonicallyIncreasingId, PhysicalScan, PhysicalWrite, Pivot, Project, Sample, Sort,
SortMergeJoin, SourceId, TopN, UDFProject, UnGroupedAggregate, Unpivot, VLLMProject,
WindowOrderByOnly, WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy,
Expand Down Expand Up @@ -63,9 +63,8 @@ use crate::{
write::{WriteFormat, WriteSink},
},
sources::{
empty_scan::EmptyScanSource, flight_shuffle_read::FlightShuffleReadSource,
glob_scan::GlobScanSource, in_memory::InMemorySource, scan_task::ScanTaskSource,
source::SourceNode,
flight_shuffle_read::FlightShuffleReadSource, glob_scan::GlobScanSource,
in_memory::InMemorySource, scan_task::ScanTaskSource, source::SourceNode,
},
streaming_sink::{
async_udf::AsyncUdfSink, base::StreamingSinkNode, limit::LimitSink,
Expand Down Expand Up @@ -357,14 +356,6 @@ fn physical_plan_to_pipeline(
LocalPhysicalPlan::PlaceholderScan(_) => {
panic!("PlaceholderScan should not be converted to a pipeline node")
}
LocalPhysicalPlan::EmptyScan(EmptyScan {
schema,
stats_state,
context,
}) => {
let source = EmptyScanSource::new(schema.clone());
SourceNode::new(Box::new(source), stats_state.clone(), ctx, context).boxed()
}
LocalPhysicalPlan::PhysicalScan(PhysicalScan {
source_id,
pushdowns,
Expand Down
55 changes: 0 additions & 55 deletions src/daft-local-execution/src/sources/empty_scan.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/daft-local-execution/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod empty_scan;
pub mod flight_shuffle_read;
pub mod glob_scan;
pub mod in_memory;
Expand Down
13 changes: 6 additions & 7 deletions src/daft-local-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use daft_micropartition::MicroPartitionRef;
#[cfg(feature = "python")]
pub use plan::{CatalogWrite, DataSink, DistributedActorPoolProject, LanceWrite};
pub use plan::{
CommitWrite, Concat, CrossJoin, Dedup, EmptyScan, Explode, Filter, FlightShuffleRead,
FlightShuffleWrite, GlobScan, HashAggregate, HashJoin, InMemoryScan, IntoBatches,
IntoPartitions, Limit, LocalNodeContext, LocalPhysicalPlan, LocalPhysicalPlanRef,
MonotonicallyIncreasingId, PhysicalScan, PhysicalWrite, Pivot, Project, Repartition, Sample,
SamplingMethod, Sort, SortMergeJoin, TopN, UDFProject, UnGroupedAggregate, Unpivot,
VLLMProject, WindowOrderByOnly, WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy,
WindowPartitionOnly,
CommitWrite, Concat, CrossJoin, Dedup, Explode, Filter, FlightShuffleRead, FlightShuffleWrite,
GlobScan, HashAggregate, HashJoin, InMemoryScan, IntoBatches, IntoPartitions, Limit,
LocalNodeContext, LocalPhysicalPlan, LocalPhysicalPlanRef, MonotonicallyIncreasingId,
PhysicalScan, PhysicalWrite, Pivot, Project, Repartition, Sample, SamplingMethod, Sort,
SortMergeJoin, TopN, UDFProject, UnGroupedAggregate, Unpivot, VLLMProject, WindowOrderByOnly,
WindowPartitionAndDynamicFrame, WindowPartitionAndOrderBy, WindowPartitionOnly,
};
#[cfg(feature = "python")]
pub use python::{PyLocalPhysicalPlan, register_modules};
Expand Down
33 changes: 5 additions & 28 deletions src/daft-local-plan/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ pub enum LocalPhysicalPlan {
InMemoryScan(InMemoryScan),
PhysicalScan(PhysicalScan),
GlobScan(GlobScan),
EmptyScan(EmptyScan),
PlaceholderScan(PlaceholderScan),
Project(Project),
UDFProject(UDFProject),
Expand Down Expand Up @@ -120,7 +119,6 @@ impl LocalPhysicalPlan {
| Self::PhysicalScan(PhysicalScan { stats_state, .. })
| Self::GlobScan(GlobScan { stats_state, .. })
| Self::PlaceholderScan(PlaceholderScan { stats_state, .. })
| Self::EmptyScan(EmptyScan { stats_state, .. })
| Self::Project(Project { stats_state, .. })
| Self::UDFProject(UDFProject { stats_state, .. })
| Self::Filter(Filter { stats_state, .. })
Expand Down Expand Up @@ -171,7 +169,6 @@ impl LocalPhysicalPlan {
| Self::PhysicalScan(PhysicalScan { context, .. })
| Self::GlobScan(GlobScan { context, .. })
| Self::PlaceholderScan(PlaceholderScan { context, .. })
| Self::EmptyScan(EmptyScan { context, .. })
| Self::Project(Project { context, .. })
| Self::UDFProject(UDFProject { context, .. })
| Self::Filter(Filter { context, .. })
Expand Down Expand Up @@ -279,15 +276,6 @@ impl LocalPhysicalPlan {
.arced()
}

pub fn empty_scan(schema: SchemaRef, context: LocalNodeContext) -> LocalPhysicalPlanRef {
Self::EmptyScan(EmptyScan {
schema,
stats_state: StatsState::Materialized(PlanStats::empty().into()),
context,
})
.arced()
}

pub fn filter(
input: LocalPhysicalPlanRef,
predicate: BoundExpr,
Expand Down Expand Up @@ -1038,7 +1026,6 @@ impl LocalPhysicalPlan {
Self::PhysicalScan(PhysicalScan { schema, .. })
| Self::GlobScan(GlobScan { schema, .. })
| Self::PlaceholderScan(PlaceholderScan { schema, .. })
| Self::EmptyScan(EmptyScan { schema, .. })
| Self::Filter(Filter { schema, .. })
| Self::IntoBatches(IntoBatches { schema, .. })
| Self::Limit(Limit { schema, .. })
Expand Down Expand Up @@ -1119,7 +1106,6 @@ impl LocalPhysicalPlan {
Self::PhysicalScan(_)
| Self::GlobScan(_)
| Self::PlaceholderScan(_)
| Self::EmptyScan(_)
| Self::InMemoryScan(_) => vec![],
Self::Filter(Filter { input, .. })
| Self::Limit(Limit { input, .. })
Expand Down Expand Up @@ -1175,12 +1161,11 @@ impl LocalPhysicalPlan {
"LocalPhysicalPlan::with_new_children: Empty children not handled for FlightShuffleRead"
),
[new_child] => match self {
Self::PhysicalScan(_)
| Self::PlaceholderScan(_)
| Self::EmptyScan(_)
| Self::InMemoryScan(_) => panic!(
"LocalPhysicalPlan::with_new_children: PhysicalScan, PlaceholderScan, EmptyScan, and InMemoryScan do not have children"
),
Self::PhysicalScan(_) | Self::PlaceholderScan(_) | Self::InMemoryScan(_) => {
panic!(
"LocalPhysicalPlan::with_new_children: PhysicalScan, PlaceholderScan, and InMemoryScan do not have children"
)
}
Self::Filter(Filter {
predicate, context, ..
}) => Self::filter(
Expand Down Expand Up @@ -1816,14 +1801,6 @@ pub struct PlaceholderScan {
pub context: LocalNodeContext,
}

#[derive(Serialize, Deserialize)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct EmptyScan {
pub schema: SchemaRef,
pub stats_state: StatsState,
pub context: LocalNodeContext,
}

#[derive(Serialize, Deserialize)]
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct Project {
Expand Down
Loading