Skip to content

Commit bbcdcfb

Browse files
committed
refactor(physical-expr): adopt new proto helpers in already-migrated expressions
Ports the existing `try_to_proto` / `try_from_proto` implementations onto the three helpers introduced in the previous commit (`expect_expr_variant!`, `expr_node`, and the wider use of `decode_required_expression` / `decode_children_expressions` / `encode_children_expressions` from apache#22513). Covers every expression already migrated under apache#22418: - `Column`, `BinaryExpr` (originally apache#21929) - `LikeExpr` (apache#22471) - `InListExpr` (apache#22503) - `NegativeExpr` (apache#22483) - `HashTableLookupExpr` (apache#22451) — uses `expr_node` for its `lit(true)` replacement node `BinaryExpr` additionally switches its `l`/`r` legacy-decode arms to `decode_required_expression`, removing two more hand-rolled "missing required field" strings. One existing test changes assertion text — `InListExpr`'s rejected- variant message was the only one using the article "an" instead of the macro's article-free "a"; updated to match. No wire-format change; `cargo test -p datafusion-proto --test proto_integration` is green (173 / 173).
1 parent 87c540e commit bbcdcfb

6 files changed

Lines changed: 73 additions & 92 deletions

File tree

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ impl PhysicalExpr for BinaryExpr {
616616
&self,
617617
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
618618
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
619+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
619620
use datafusion_proto_models::protobuf;
620621

621622
// Linearize a nested binary expression tree of the same operator
@@ -638,22 +639,18 @@ impl PhysicalExpr for BinaryExpr {
638639
// Reverse so operands are ordered from left innermost to right outermost.
639640
operand_refs.reverse();
640641

641-
let operands = operand_refs
642-
.iter()
643-
.map(|e| ctx.encode_child(e))
644-
.collect::<Result<Vec<_>>>()?;
642+
let operands = ctx.encode_children_expressions(operand_refs)?;
645643

646-
Ok(Some(protobuf::PhysicalExprNode {
647-
expr_id: None,
648-
expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
649-
Box::new(protobuf::PhysicalBinaryExprNode {
644+
Ok(Some(expr_node(
645+
protobuf::physical_expr_node::ExprType::BinaryExpr(Box::new(
646+
protobuf::PhysicalBinaryExprNode {
650647
l: None,
651648
r: None,
652649
op: format!("{op:?}"),
653650
operands,
654-
}),
651+
},
655652
)),
656-
}))
653+
)))
657654
}
658655
}
659656

@@ -675,11 +672,13 @@ impl BinaryExpr {
675672
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
676673
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
677674
) -> Result<Arc<dyn PhysicalExpr>> {
675+
use datafusion_physical_expr_common::expect_expr_variant;
678676
use datafusion_proto_models::protobuf;
679-
let node = match &node.expr_type {
680-
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => b.as_ref(),
681-
_ => return internal_err!("PhysicalExprNode is not a BinaryExpr"),
682-
};
677+
let node = expect_expr_variant!(
678+
node,
679+
protobuf::physical_expr_node::ExprType::BinaryExpr,
680+
"BinaryExpr",
681+
);
683682
let op = Operator::from_proto_name(&node.op).ok_or_else(|| {
684683
datafusion_common::DataFusionError::Internal(format!(
685684
"Unsupported binary operator '{}'",
@@ -690,17 +689,12 @@ impl BinaryExpr {
690689
if !node.operands.is_empty() {
691690
// New linearized format: reduce the flat operands list back into
692691
// a nested binary expression tree.
693-
let operands = node
694-
.operands
695-
.iter()
696-
.map(|e| ctx.decode(e))
697-
.collect::<Result<Vec<_>>>()?;
692+
let operands = ctx.decode_children_expressions(&node.operands)?;
698693

699694
if operands.len() < 2 {
700-
return Err(datafusion_common::DataFusionError::Internal(
695+
return internal_err!(
701696
"A binary expression must always have at least 2 operands"
702-
.to_string(),
703-
));
697+
);
704698
}
705699

706700
Ok(operands
@@ -711,21 +705,11 @@ impl BinaryExpr {
711705
.expect("Binary expression could not be reduced to a single expression."))
712706
} else {
713707
// Legacy format with l/r fields.
714-
let left = node.l.as_deref().ok_or_else(|| {
715-
datafusion_common::DataFusionError::Internal(
716-
"BinaryExpr is missing required field 'left'".to_string(),
717-
)
718-
})?;
719-
let right = node.r.as_deref().ok_or_else(|| {
720-
datafusion_common::DataFusionError::Internal(
721-
"BinaryExpr is missing required field 'right'".to_string(),
722-
)
723-
})?;
724-
Ok(Arc::new(BinaryExpr::new(
725-
ctx.decode(left)?,
726-
op,
727-
ctx.decode(right)?,
728-
)))
708+
let left =
709+
ctx.decode_required_expression(node.l.as_deref(), "BinaryExpr", "left")?;
710+
let right =
711+
ctx.decode_required_expression(node.r.as_deref(), "BinaryExpr", "right")?;
712+
Ok(Arc::new(BinaryExpr::new(left, op, right)))
729713
}
730714
}
731715
}

datafusion/physical-expr/src/expressions/column.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,14 @@ impl PhysicalExpr for Column {
152152
&self,
153153
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
154154
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
155+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
155156
use datafusion_proto_models::protobuf;
156-
Ok(Some(protobuf::PhysicalExprNode {
157-
expr_id: None,
158-
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
159-
protobuf::PhysicalColumn {
160-
name: self.name.clone(),
161-
index: self.index as u32,
162-
},
163-
)),
164-
}))
157+
Ok(Some(expr_node(
158+
protobuf::physical_expr_node::ExprType::Column(protobuf::PhysicalColumn {
159+
name: self.name.clone(),
160+
index: self.index as u32,
161+
}),
162+
)))
165163
}
166164
}
167165

@@ -182,11 +180,13 @@ impl Column {
182180
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
183181
_ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
184182
) -> Result<Arc<dyn PhysicalExpr>> {
183+
use datafusion_physical_expr_common::expect_expr_variant;
185184
use datafusion_proto_models::protobuf;
186-
let protobuf::PhysicalColumn { name, index } = match &node.expr_type {
187-
Some(protobuf::physical_expr_node::ExprType::Column(c)) => c,
188-
_ => return internal_err!("PhysicalExprNode is not a Column"),
189-
};
185+
let protobuf::PhysicalColumn { name, index } = expect_expr_variant!(
186+
node,
187+
protobuf::physical_expr_node::ExprType::Column,
188+
"Column",
189+
);
190190
Ok(Arc::new(Column::new(name, *index as usize)))
191191
}
192192
}

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,14 @@ impl InListExpr {
252252
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
253253
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
254254
) -> Result<Arc<dyn PhysicalExpr>> {
255+
use datafusion_physical_expr_common::expect_expr_variant;
255256
use datafusion_proto_models::protobuf;
256257

257-
let node = match &node.expr_type {
258-
Some(protobuf::physical_expr_node::ExprType::InList(n)) => n,
259-
_ => {
260-
return datafusion_common::internal_err!(
261-
"PhysicalExprNode is not an InList"
262-
);
263-
}
264-
};
258+
let node = expect_expr_variant!(
259+
node,
260+
protobuf::physical_expr_node::ExprType::InList,
261+
"InList",
262+
);
265263

266264
let expr =
267265
ctx.decode_required_expression(node.expr.as_deref(), "InListExpr", "expr")?;
@@ -476,18 +474,18 @@ impl PhysicalExpr for InListExpr {
476474
&self,
477475
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
478476
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
477+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
479478
use datafusion_proto_models::protobuf;
480479

481-
Ok(Some(protobuf::PhysicalExprNode {
482-
expr_id: None,
483-
expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
480+
Ok(Some(expr_node(
481+
protobuf::physical_expr_node::ExprType::InList(Box::new(
484482
protobuf::PhysicalInListNode {
485483
expr: Some(Box::new(ctx.encode_child(&self.expr)?)),
486484
list: ctx.encode_children_expressions(&self.list)?,
487485
negated: self.negated,
488486
},
489-
))),
490-
}))
487+
)),
488+
)))
491489
}
492490
}
493491

@@ -3981,7 +3979,7 @@ mod proto_tests {
39813979
let err = InListExpr::try_from_proto(&node, &ctx).unwrap_err();
39823980
assert!(matches!(
39833981
err,
3984-
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not an InList")
3982+
DataFusionError::Internal(msg) if msg.contains("PhysicalExprNode is not a InList")
39853983
));
39863984
}
39873985

datafusion/physical-expr/src/expressions/like.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,19 +151,19 @@ impl PhysicalExpr for LikeExpr {
151151
&self,
152152
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
153153
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
154+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
154155
use datafusion_proto_models::protobuf;
155156

156-
Ok(Some(protobuf::PhysicalExprNode {
157-
expr_id: None,
158-
expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
157+
Ok(Some(expr_node(
158+
protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
159159
protobuf::PhysicalLikeExprNode {
160160
negated: self.negated,
161161
case_insensitive: self.case_insensitive,
162162
expr: Some(Box::new(ctx.encode_child(&self.expr)?)),
163163
pattern: Some(Box::new(ctx.encode_child(&self.pattern)?)),
164164
},
165-
))),
166-
}))
165+
)),
166+
)))
167167
}
168168
}
169169

@@ -180,15 +180,14 @@ impl LikeExpr {
180180
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
181181
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
182182
) -> Result<Arc<dyn PhysicalExpr>> {
183-
use datafusion_common::internal_err;
183+
use datafusion_physical_expr_common::expect_expr_variant;
184184
use datafusion_proto_models::protobuf;
185185

186-
let like_expr = match &node.expr_type {
187-
Some(protobuf::physical_expr_node::ExprType::LikeExpr(like_expr)) => {
188-
like_expr.as_ref()
189-
}
190-
_ => return internal_err!("PhysicalExprNode is not a LikeExpr"),
191-
};
186+
let like_expr = expect_expr_variant!(
187+
node,
188+
protobuf::physical_expr_node::ExprType::LikeExpr,
189+
"LikeExpr",
190+
);
192191

193192
Ok(Arc::new(LikeExpr::new(
194193
like_expr.negated,

datafusion/physical-expr/src/expressions/negative.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,16 @@ impl PhysicalExpr for NegativeExpr {
180180
&self,
181181
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
182182
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
183+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
183184
use datafusion_proto_models::protobuf;
184185

185-
Ok(Some(protobuf::PhysicalExprNode {
186-
expr_id: None,
187-
expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
186+
Ok(Some(expr_node(
187+
protobuf::physical_expr_node::ExprType::Negative(Box::new(
188188
protobuf::PhysicalNegativeNode {
189189
expr: Some(Box::new(ctx.encode_child(&self.arg)?)),
190190
},
191-
))),
192-
}))
191+
)),
192+
)))
193193
}
194194
}
195195

@@ -200,14 +200,16 @@ impl NegativeExpr {
200200
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
201201
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
202202
) -> Result<Arc<dyn PhysicalExpr>> {
203+
use datafusion_physical_expr_common::expect_expr_variant;
203204
use datafusion_proto_models::protobuf;
204205

205-
let expr = match &node.expr_type {
206-
Some(protobuf::physical_expr_node::ExprType::Negative(n)) => {
207-
ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")?
208-
}
209-
_ => return internal_err!("PhysicalExprNode is not a Negative"),
210-
};
206+
let n = expect_expr_variant!(
207+
node,
208+
protobuf::physical_expr_node::ExprType::Negative,
209+
"Negative",
210+
);
211+
let expr =
212+
ctx.decode_required_expression(n.expr.as_deref(), "NegativeExpr", "expr")?;
211213

212214
Ok(Arc::new(NegativeExpr::new(expr)))
213215
}

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,6 @@ impl PhysicalExpr for HashTableLookupExpr {
340340
&self,
341341
_ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
342342
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
343-
use datafusion_proto_models::protobuf;
344343
use datafusion_proto_models::protobuf::physical_expr_node::ExprType;
345344

346345
// HashTableLookupExpr holds a runtime Arc<Map> (the build-side hash
@@ -357,15 +356,14 @@ impl PhysicalExpr for HashTableLookupExpr {
357356
// HashTableLookupExpr is replaced during serialization. Re-executing
358357
// the plan requires reset_state(), after which HashJoinExec rebuilds
359358
// fresh dynamic filters at runtime.
359+
use datafusion_physical_expr_common::physical_expr::proto_encode::expr_node;
360+
360361
let value = datafusion_proto_common::ScalarValue {
361362
value: Some(datafusion_proto_common::scalar_value::Value::BoolValue(
362363
true,
363364
)),
364365
};
365-
Ok(Some(protobuf::PhysicalExprNode {
366-
expr_id: None,
367-
expr_type: Some(ExprType::Literal(value)),
368-
}))
366+
Ok(Some(expr_node(ExprType::Literal(value))))
369367
}
370368
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
371369
write!(f, "{}", self.description)

0 commit comments

Comments
 (0)