Skip to content

Support MemoryExec in proto try_from_physical_plan #14082

Open
@jayzhan211

Description

@jayzhan211

Is your feature request related to a problem or challenge?

In try_from_physical_plan

fn try_from_physical_plan(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Self>
where
Self: Sized,
{
let plan_clone = Arc::clone(&plan);
let plan = plan.as_any();
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
schema: Some(exec.schema().as_ref().try_into()?),
stringified_plans: exec
.stringified_plans()
.iter()
.map(|plan| plan.into())
.collect(),
verbose: exec.verbose(),
},
)),
});
}
if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
protobuf::ProjectionExecNode {
input: Some(Box::new(input)),
expr,
expr_name,
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
protobuf::AnalyzeExecNode {
verbose: exec.verbose(),
show_statistics: exec.show_statistics(),
input: Some(Box::new(input)),
schema: Some(exec.schema().as_ref().try_into()?),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<FilterExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(serialize_physical_expr(
exec.predicate(),
extension_codec,
)?),
default_filter_selectivity: exec.default_selectivity() as u32,
projection: exec
.projection()
.as_ref()
.map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
},
))),
});
}
if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
limit.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
protobuf::GlobalLimitExecNode {
input: Some(Box::new(input)),
skip: limit.skip() as u32,
fetch: match limit.fetch() {
Some(n) => n as i64,
_ => -1, // no limit
},
},
))),
});
}
if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
limit.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
protobuf::LocalLimitExecNode {
input: Some(Box::new(input)),
fetch: limit.fetch() as u32,
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
let on: Vec<protobuf::JoinOn> = exec
.on()
.iter()
.map(|tuple| {
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
Ok::<_, DataFusionError>(protobuf::JoinOn {
left: Some(l),
right: Some(r),
})
})
.collect::<Result<_>>()?;
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
.map(|f| {
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
.map(|i| {
let side: protobuf::JoinSide = i.side.to_owned().into();
protobuf::ColumnIndex {
index: i.index as u32,
side: side.into(),
}
})
.collect();
let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
schema: Some(schema),
})
})
.map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
let partition_mode = match exec.partition_mode() {
PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
PartitionMode::Auto => protobuf::PartitionMode::Auto,
};
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
protobuf::HashJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: exec.null_equals_null(),
filter,
projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
}),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
let on = exec
.on()
.iter()
.map(|tuple| {
let l = serialize_physical_expr(&tuple.0, extension_codec)?;
let r = serialize_physical_expr(&tuple.1, extension_codec)?;
Ok::<_, DataFusionError>(protobuf::JoinOn {
left: Some(l),
right: Some(r),
})
})
.collect::<Result<_>>()?;
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
.map(|f| {
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
.map(|i| {
let side: protobuf::JoinSide = i.side.to_owned().into();
protobuf::ColumnIndex {
index: i.index as u32,
side: side.into(),
}
})
.collect();
let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
schema: Some(schema),
})
})
.map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
let partition_mode = match exec.partition_mode() {
StreamJoinPartitionMode::SinglePartition => {
protobuf::StreamPartitionMode::SinglePartition
}
StreamJoinPartitionMode::Partitioned => {
protobuf::StreamPartitionMode::PartitionedExec
}
};
let left_sort_exprs = exec
.left_sort_exprs()
.map(|exprs| {
exprs
.iter()
.map(|expr| {
Ok(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
})
})
.collect::<Result<Vec<_>>>()
})
.transpose()?
.unwrap_or(vec![]);
let right_sort_exprs = exec
.right_sort_exprs()
.map(|exprs| {
exprs
.iter()
.map(|expr| {
Ok(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
})
})
.collect::<Result<Vec<_>>>()
})
.transpose()?
.unwrap_or(vec![]);
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
protobuf::SymmetricHashJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: exec.null_equals_null(),
left_sort_exprs,
right_sort_exprs,
filter,
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
protobuf::CrossJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
let groups: Vec<bool> = exec
.group_expr()
.groups()
.iter()
.flatten()
.copied()
.collect();
let group_names = exec
.group_expr()
.expr()
.iter()
.map(|expr| expr.1.to_owned())
.collect();
let filter = exec
.filter_expr()
.iter()
.map(|expr| serialize_maybe_filter(expr.to_owned(), extension_codec))
.collect::<Result<Vec<_>>>()?;
let agg = exec
.aggr_expr()
.iter()
.map(|expr| {
serialize_physical_aggr_expr(expr.to_owned(), extension_codec)
})
.collect::<Result<Vec<_>>>()?;
let agg_names = exec
.aggr_expr()
.iter()
.map(|expr| expr.name().to_string())
.collect::<Vec<_>>();
let agg_mode = match exec.mode() {
AggregateMode::Partial => protobuf::AggregateMode::Partial,
AggregateMode::Final => protobuf::AggregateMode::Final,
AggregateMode::FinalPartitioned => {
protobuf::AggregateMode::FinalPartitioned
}
AggregateMode::Single => protobuf::AggregateMode::Single,
AggregateMode::SinglePartitioned => {
protobuf::AggregateMode::SinglePartitioned
}
};
let input_schema = exec.input_schema();
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let null_expr = exec
.group_expr()
.null_expr()
.iter()
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;
let group_expr = exec
.group_expr()
.expr()
.iter()
.map(|expr| serialize_physical_expr(&expr.0, extension_codec))
.collect::<Result<Vec<_>>>()?;
let limit = exec.limit().map(|value| protobuf::AggLimit {
limit: value as u64,
});
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
protobuf::AggregateExecNode {
group_expr,
group_expr_name: group_names,
aggr_expr: agg,
filter_expr: filter,
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
input_schema: Some(input_schema.as_ref().try_into()?),
null_expr,
groups,
limit,
},
))),
});
}
if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
let schema = empty.schema().as_ref().try_into()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Empty(
protobuf::EmptyExecNode {
schema: Some(schema),
},
)),
});
}
if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
let schema = empty.schema().as_ref().try_into()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
protobuf::PlaceholderRowExecNode {
schema: Some(schema),
},
)),
});
}
if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
coalesce_batches.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
protobuf::CoalesceBatchesExecNode {
input: Some(Box::new(input)),
target_batch_size: coalesce_batches.target_batch_size() as u32,
fetch: coalesce_batches.fetch().map(|n| n as u32),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<CsvExec>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CsvScan(
protobuf::CsvScanExecNode {
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
has_header: exec.has_header(),
delimiter: byte_to_string(exec.delimiter(), "delimiter")?,
quote: byte_to_string(exec.quote(), "quote")?,
optional_escape: if let Some(escape) = exec.escape() {
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
byte_to_string(escape, "escape")?,
))
} else {
None
},
optional_comment: if let Some(comment) = exec.comment() {
Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
byte_to_string(comment, "comment")?,
))
} else {
None
},
newlines_in_values: exec.newlines_in_values(),
},
)),
});
}
#[cfg(feature = "parquet")]
if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let predicate = exec
.predicate()
.map(|pred| serialize_physical_expr(pred, extension_codec))
.transpose()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
predicate,
},
)),
});
}
if let Some(exec) = plan.downcast_ref::<AvroExec>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
base_conf: Some(serialize_file_scan_config(
exec.base_config(),
extension_codec,
)?),
},
)),
});
}
if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
protobuf::CoalescePartitionsExecNode {
input: Some(Box::new(input)),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let pb_partitioning =
serialize_partitioning(exec.partitioning(), extension_codec)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
protobuf::RepartitionExecNode {
input: Some(Box::new(input)),
partitioning: Some(pb_partitioning),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<SortExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
});
Ok(protobuf::PhysicalExprNode {
expr_type: Some(ExprType::Sort(sort_expr)),
})
})
.collect::<Result<Vec<_>>>()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
protobuf::SortExecNode {
input: Some(Box::new(input)),
expr,
fetch: match exec.fetch() {
Some(n) => n as i64,
_ => -1,
},
preserve_partitioning: exec.preserve_partitioning(),
},
))),
});
}
if let Some(union) = plan.downcast_ref::<UnionExec>() {
let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
for input in union.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
extension_codec,
)?);
}
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Union(
protobuf::UnionExecNode { inputs },
)),
});
}
if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
for input in interleave.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
extension_codec,
)?);
}
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Interleave(
protobuf::InterleaveExecNode { inputs },
)),
});
}
if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
});
Ok(protobuf::PhysicalExprNode {
expr_type: Some(ExprType::Sort(sort_expr)),
})
})
.collect::<Result<Vec<_>>>()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(
Box::new(protobuf::SortPreservingMergeExecNode {
input: Some(Box::new(input)),
expr,
fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
}),
)),
});
}
if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
.map(|f| {
let expression =
serialize_physical_expr(f.expression(), extension_codec)?;
let column_indices = f
.column_indices()
.iter()
.map(|i| {
let side: protobuf::JoinSide = i.side.to_owned().into();
protobuf::ColumnIndex {
index: i.index as u32,
side: side.into(),
}
})
.collect();
let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
schema: Some(schema),
})
})
.map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
protobuf::NestedLoopJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
filter,
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
partition_keys,
input_order_mode: None,
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let window_expr = exec
.window_expr()
.iter()
.map(|e| serialize_physical_window_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
let partition_keys = exec
.partition_keys
.iter()
.map(|e| serialize_physical_expr(e, extension_codec))
.collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
let input_order_mode = match &exec.input_order_mode {
InputOrderMode::Linear => window_agg_exec_node::InputOrderMode::Linear(
protobuf::EmptyMessage {},
),
InputOrderMode::PartiallySorted(columns) => {
window_agg_exec_node::InputOrderMode::PartiallySorted(
protobuf::PartiallySortedInputOrderMode {
columns: columns.iter().map(|c| *c as u64).collect(),
},
)
}
InputOrderMode::Sorted => window_agg_exec_node::InputOrderMode::Sorted(
protobuf::EmptyMessage {},
),
};
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
partition_keys,
input_order_mode: Some(input_order_mode),
},
))),
});
}
if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let sort_order = match exec.sort_order() {
Some(requirements) => {
let expr = requirements
.iter()
.map(|requirement| {
let expr: PhysicalSortExpr = requirement.to_owned().into();
let sort_expr = protobuf::PhysicalSortExprNode {
expr: Some(Box::new(serialize_physical_expr(
&expr.expr,
extension_codec,
)?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
};
Ok(sort_expr)
})
.collect::<Result<Vec<_>>>()?;
Some(protobuf::PhysicalSortExprNodeCollection {
physical_sort_expr_nodes: expr,
})
}
None => None,
};
if let Some(sink) = exec.sink().as_any().downcast_ref::<JsonSink>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
protobuf::JsonSinkExecNode {
input: Some(Box::new(input)),
sink: Some(sink.try_into()?),
sink_schema: Some(exec.schema().as_ref().try_into()?),
sort_order,
},
))),
});
}
if let Some(sink) = exec.sink().as_any().downcast_ref::<CsvSink>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
protobuf::CsvSinkExecNode {
input: Some(Box::new(input)),
sink: Some(sink.try_into()?),
sink_schema: Some(exec.schema().as_ref().try_into()?),
sort_order,
},
))),
});
}
#[cfg(feature = "parquet")]
if let Some(sink) = exec.sink().as_any().downcast_ref::<ParquetSink>() {
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
protobuf::ParquetSinkExecNode {
input: Some(Box::new(input)),
sink: Some(sink.try_into()?),
sink_schema: Some(exec.schema().as_ref().try_into()?),
sort_order,
},
))),
});
}
// If unknown DataSink then let extension handle it
}
if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
return Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
protobuf::UnnestExecNode {
input: Some(Box::new(input)),
schema: Some(exec.schema().try_into()?),
list_type_columns: exec
.list_column_indices()
.iter()
.map(|c| ProtoListUnnest {
index_in_input_schema: c.index_in_input_schema as _,
depth: c.depth as _,
})
.collect(),
struct_type_columns: exec
.struct_column_indices()
.iter()
.map(|c| *c as _)
.collect(),
options: Some(exec.options().into()),
},
))),
});
}
let mut buf: Vec<u8> = vec![];
match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
.children()
.into_iter()
.cloned()
.map(|i| {
protobuf::PhysicalPlanNode::try_from_physical_plan(
i,
extension_codec,
)
})
.collect::<Result<_>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Extension(
protobuf::PhysicalExtensionNode { node: buf, inputs },
)),
})
}
Err(e) => internal_err!(
"Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
),
}
}
}

MemoryExec is not supported yet

Describe the solution you'd like

Support memory exec with ordering

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or requestprotoRelated to proto crate

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions