Skip to content

Commit 0ff65cc

Browse files
adriangbclaude
andcommitted
Migrate BinaryExpr to PhysicalExpr::try_to_proto / try_from_proto
Second expression to use the new hooks, following the same pattern as Column. The BinaryExpr downcast arm in `serialize_physical_expr_with_converter` and the inline match arm in `parse_physical_expr_with_converter` are removed; both directions now live on `BinaryExpr` itself. Linearization of nested same-op chains stays bit-for-bit identical with the previous behavior, just expressed in terms of `PhysicalExprEncodeCtx::encode_child` / `PhysicalExprDecodeCtx::decode` instead of the proto converter. `BinaryExpr::try_from_proto` takes the whole `PhysicalExprNode` and parses the operator string itself. The proto-string-to-enum mapping moves from `from_proto_binary_op` in `datafusion-proto` down to `Operator::from_proto_name` in `datafusion-expr-common`, so the logical plan path and the physical `BinaryExpr` decoder share one source of truth without `physical-expr` depending on `datafusion-proto`; `from_proto_binary_op` now delegates to it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 1bf7239 commit 0ff65cc

5 files changed

Lines changed: 174 additions & 127 deletions

File tree

datafusion/expr-common/src/operator.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,50 @@ impl Operator {
390390
| Operator::StringConcat => false,
391391
}
392392
}
393+
394+
/// Parse an `Operator` from the string name `datafusion-proto` uses on the
395+
/// wire (the `Debug` name of the variant, e.g. `"Eq"`).
396+
///
397+
/// Returns `None` for names with no binary-operator counterpart. This is
398+
/// the canonical proto-string mapping, shared by `datafusion-proto`
399+
/// (logical plans) and `PhysicalExpr` decoders such as `BinaryExpr`, so the
400+
/// mapping is not duplicated across crates.
401+
pub fn from_proto_name(name: &str) -> Option<Operator> {
402+
Some(match name {
403+
"And" => Operator::And,
404+
"Or" => Operator::Or,
405+
"Eq" => Operator::Eq,
406+
"NotEq" => Operator::NotEq,
407+
"LtEq" => Operator::LtEq,
408+
"Lt" => Operator::Lt,
409+
"Gt" => Operator::Gt,
410+
"GtEq" => Operator::GtEq,
411+
"Plus" => Operator::Plus,
412+
"Minus" => Operator::Minus,
413+
"Multiply" => Operator::Multiply,
414+
"Divide" => Operator::Divide,
415+
"Modulo" => Operator::Modulo,
416+
"IsDistinctFrom" => Operator::IsDistinctFrom,
417+
"IsNotDistinctFrom" => Operator::IsNotDistinctFrom,
418+
"BitwiseAnd" => Operator::BitwiseAnd,
419+
"BitwiseOr" => Operator::BitwiseOr,
420+
"BitwiseXor" => Operator::BitwiseXor,
421+
"BitwiseShiftLeft" => Operator::BitwiseShiftLeft,
422+
"BitwiseShiftRight" => Operator::BitwiseShiftRight,
423+
"RegexIMatch" => Operator::RegexIMatch,
424+
"RegexMatch" => Operator::RegexMatch,
425+
"RegexNotIMatch" => Operator::RegexNotIMatch,
426+
"RegexNotMatch" => Operator::RegexNotMatch,
427+
"LikeMatch" => Operator::LikeMatch,
428+
"ILikeMatch" => Operator::ILikeMatch,
429+
"NotLikeMatch" => Operator::NotLikeMatch,
430+
"NotILikeMatch" => Operator::NotILikeMatch,
431+
"StringConcat" => Operator::StringConcat,
432+
"AtArrow" => Operator::AtArrow,
433+
"ArrowAt" => Operator::ArrowAt,
434+
_ => return None,
435+
})
436+
}
393437
}
394438

395439
impl fmt::Display for Operator {

datafusion/physical-expr/src/expressions/binary.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,124 @@ impl PhysicalExpr for BinaryExpr {
610610
write!(f, " {} ", self.op)?;
611611
write_child(f, self.right.as_ref(), precedence)
612612
}
613+
614+
#[cfg(feature = "proto")]
615+
fn try_to_proto(
616+
&self,
617+
ctx: &datafusion_physical_expr_common::physical_expr::proto_encode::PhysicalExprEncodeCtx<'_>,
618+
) -> Result<Option<datafusion_proto_models::protobuf::PhysicalExprNode>> {
619+
use datafusion_proto_models::protobuf;
620+
621+
// Linearize a nested binary expression tree of the same operator
622+
// into a flat vector of operands to avoid deep recursion in proto.
623+
let op = self.op;
624+
let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![&self.right];
625+
let mut current_expr: &BinaryExpr = self;
626+
loop {
627+
match current_expr.left.downcast_ref::<BinaryExpr>() {
628+
Some(bin) if bin.op == op => {
629+
operand_refs.push(&bin.right);
630+
current_expr = bin;
631+
}
632+
_ => {
633+
operand_refs.push(&current_expr.left);
634+
break;
635+
}
636+
}
637+
}
638+
// Reverse so operands are ordered from left innermost to right outermost.
639+
operand_refs.reverse();
640+
641+
let operands = operand_refs
642+
.iter()
643+
.map(|e| ctx.encode_child(e))
644+
.collect::<Result<Vec<_>>>()?;
645+
646+
Ok(Some(protobuf::PhysicalExprNode {
647+
expr_id: None,
648+
expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
649+
Box::new(protobuf::PhysicalBinaryExprNode {
650+
l: None,
651+
r: None,
652+
op: format!("{op:?}"),
653+
operands,
654+
}),
655+
)),
656+
}))
657+
}
658+
}
659+
660+
#[cfg(feature = "proto")]
661+
impl BinaryExpr {
662+
/// Reconstruct a [`BinaryExpr`] (or a left-deep tree of them when the proto
663+
/// uses the linearized `operands` form) from its protobuf representation.
664+
///
665+
/// Takes the whole [`PhysicalExprNode`] — the exact inverse of what
666+
/// [`PhysicalExpr::try_to_proto`] produces — so every expression's
667+
/// `try_from_proto` shares one signature. The operator string is parsed
668+
/// via the canonical [`Operator::from_proto_name`] mapping, so no `op`
669+
/// argument needs to be threaded in by the caller.
670+
///
671+
/// [`PhysicalExprNode`]: datafusion_proto_models::protobuf::PhysicalExprNode
672+
/// [`PhysicalExpr::try_to_proto`]: datafusion_physical_expr_common::physical_expr::PhysicalExpr::try_to_proto
673+
/// [`PhysicalExprDecodeCtx::decode`]: datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx::decode
674+
pub fn try_from_proto(
675+
node: &datafusion_proto_models::protobuf::PhysicalExprNode,
676+
ctx: &datafusion_physical_expr_common::physical_expr::proto_decode::PhysicalExprDecodeCtx<'_>,
677+
) -> Result<Arc<dyn PhysicalExpr>> {
678+
use datafusion_proto_models::protobuf;
679+
let node = match &node.expr_type {
680+
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(b)) => b.as_ref(),
681+
_ => return internal_err!("PhysicalExprNode is not a BinaryExpr"),
682+
};
683+
let op = Operator::from_proto_name(&node.op).ok_or_else(|| {
684+
datafusion_common::DataFusionError::Internal(format!(
685+
"Unsupported binary operator '{}'",
686+
node.op
687+
))
688+
})?;
689+
690+
if !node.operands.is_empty() {
691+
// New linearized format: reduce the flat operands list back into
692+
// a nested binary expression tree.
693+
let operands = node
694+
.operands
695+
.iter()
696+
.map(|e| ctx.decode(e))
697+
.collect::<Result<Vec<_>>>()?;
698+
699+
if operands.len() < 2 {
700+
return Err(datafusion_common::DataFusionError::Internal(
701+
"A binary expression must always have at least 2 operands"
702+
.to_string(),
703+
));
704+
}
705+
706+
Ok(operands
707+
.into_iter()
708+
.reduce(|left, right| {
709+
Arc::new(BinaryExpr::new(left, op, right)) as Arc<dyn PhysicalExpr>
710+
})
711+
.expect("Binary expression could not be reduced to a single expression."))
712+
} else {
713+
// Legacy format with l/r fields.
714+
let left = node.l.as_deref().ok_or_else(|| {
715+
datafusion_common::DataFusionError::Internal(
716+
"BinaryExpr is missing required field 'left'".to_string(),
717+
)
718+
})?;
719+
let right = node.r.as_deref().ok_or_else(|| {
720+
datafusion_common::DataFusionError::Internal(
721+
"BinaryExpr is missing required field 'right'".to_string(),
722+
)
723+
})?;
724+
Ok(Arc::new(BinaryExpr::new(
725+
ctx.decode(left)?,
726+
op,
727+
ctx.decode(right)?,
728+
)))
729+
}
730+
}
613731
}
614732

615733
/// Casts dictionary array to result type for binary numerical operators. Such operators

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -753,42 +753,11 @@ fn parse_escape_char(s: &str) -> Result<Option<char>> {
753753
}
754754

755755
pub fn from_proto_binary_op(op: &str) -> Result<Operator, Error> {
756-
match op {
757-
"And" => Ok(Operator::And),
758-
"Or" => Ok(Operator::Or),
759-
"Eq" => Ok(Operator::Eq),
760-
"NotEq" => Ok(Operator::NotEq),
761-
"LtEq" => Ok(Operator::LtEq),
762-
"Lt" => Ok(Operator::Lt),
763-
"Gt" => Ok(Operator::Gt),
764-
"GtEq" => Ok(Operator::GtEq),
765-
"Plus" => Ok(Operator::Plus),
766-
"Minus" => Ok(Operator::Minus),
767-
"Multiply" => Ok(Operator::Multiply),
768-
"Divide" => Ok(Operator::Divide),
769-
"Modulo" => Ok(Operator::Modulo),
770-
"IsDistinctFrom" => Ok(Operator::IsDistinctFrom),
771-
"IsNotDistinctFrom" => Ok(Operator::IsNotDistinctFrom),
772-
"BitwiseAnd" => Ok(Operator::BitwiseAnd),
773-
"BitwiseOr" => Ok(Operator::BitwiseOr),
774-
"BitwiseXor" => Ok(Operator::BitwiseXor),
775-
"BitwiseShiftLeft" => Ok(Operator::BitwiseShiftLeft),
776-
"BitwiseShiftRight" => Ok(Operator::BitwiseShiftRight),
777-
"RegexIMatch" => Ok(Operator::RegexIMatch),
778-
"RegexMatch" => Ok(Operator::RegexMatch),
779-
"RegexNotIMatch" => Ok(Operator::RegexNotIMatch),
780-
"RegexNotMatch" => Ok(Operator::RegexNotMatch),
781-
"LikeMatch" => Ok(Operator::LikeMatch),
782-
"ILikeMatch" => Ok(Operator::ILikeMatch),
783-
"NotLikeMatch" => Ok(Operator::NotLikeMatch),
784-
"NotILikeMatch" => Ok(Operator::NotILikeMatch),
785-
"StringConcat" => Ok(Operator::StringConcat),
786-
"AtArrow" => Ok(Operator::AtArrow),
787-
"ArrowAt" => Ok(Operator::ArrowAt),
788-
other => Err(proto_error(format!(
789-
"Unsupported binary operator '{other:?}'"
790-
))),
791-
}
756+
// The proto-string <-> `Operator` mapping is canonically owned by
757+
// `datafusion-expr-common` so `datafusion-proto` (logical plans) and
758+
// `PhysicalExpr` decoders (e.g. `BinaryExpr`) share one source of truth.
759+
Operator::from_proto_name(op)
760+
.ok_or_else(|| proto_error(format!("Unsupported binary operator '{op:?}'")))
792761
}
793762

794763
fn parse_optional_expr(

datafusion/proto/src/physical_plan/from_proto.rs

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use super::{
5858
PhysicalProtoConverterExtension,
5959
};
6060
use crate::convert::TryFromProto;
61-
use crate::logical_plan::{self};
6261
use crate::protobuf::physical_expr_node::ExprType;
6362
use crate::{convert_required, convert_required_proto, protobuf};
6463
use datafusion_physical_expr::expressions::{
@@ -263,8 +262,7 @@ pub fn parse_physical_expr_with_converter(
263262

264263
// Decoder context handed to per-expression `try_from_proto` constructors.
265264
// This is the new shape the codebase is migrating toward (see #21835);
266-
// for now only `Column` is migrated and the rest of the variants are still
267-
// matched inline.
265+
// the remaining `ExprType` variants stay matched inline until they migrate.
268266
let decoder = ConverterDecoder {
269267
ctx,
270268
proto_converter,
@@ -277,54 +275,12 @@ pub fn parse_physical_expr_with_converter(
277275

278276
let pexpr: Arc<dyn PhysicalExpr> = match expr_type {
279277
// Migrated expressions take the whole `PhysicalExprNode` and unwrap
280-
// their own `ExprType` variant (see #21835); this match only routes.
278+
// their own `ExprType` variant — see #21835. This match only routes
279+
// to the right constructor.
281280
ExprType::Column(_) => Column::try_from_proto(proto, &decode_ctx)?,
282281
ExprType::UnknownColumn(c) => Arc::new(UnKnownColumn::new(&c.name)),
283282
ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)),
284-
ExprType::BinaryExpr(binary_expr) => {
285-
let op = logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?;
286-
if !binary_expr.operands.is_empty() {
287-
// New linearized format: reduce the flat operands list back into
288-
// a nested binary expression tree.
289-
let operands: Vec<Arc<dyn PhysicalExpr>> = binary_expr
290-
.operands
291-
.iter()
292-
.map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
293-
.collect::<Result<Vec<_>>>()?;
294-
295-
if operands.len() < 2 {
296-
return Err(proto_error(
297-
"A binary expression must always have at least 2 operands",
298-
));
299-
}
300-
301-
operands
302-
.into_iter()
303-
.reduce(|left, right| Arc::new(BinaryExpr::new(left, op, right)))
304-
.expect(
305-
"Binary expression could not be reduced to a single expression.",
306-
)
307-
} else {
308-
// Legacy format with l/r fields
309-
Arc::new(BinaryExpr::new(
310-
parse_required_physical_expr(
311-
binary_expr.l.as_deref(),
312-
ctx,
313-
"left",
314-
input_schema,
315-
proto_converter,
316-
)?,
317-
op,
318-
parse_required_physical_expr(
319-
binary_expr.r.as_deref(),
320-
ctx,
321-
"right",
322-
input_schema,
323-
proto_converter,
324-
)?,
325-
))
326-
}
327-
}
283+
ExprType::BinaryExpr(_) => BinaryExpr::try_from_proto(proto, &decode_ctx)?,
328284
ExprType::AggregateExpr(_) => {
329285
return not_impl_err!(
330286
"Cannot convert aggregate expr node to physical expression"

datafusion/proto/src/physical_plan/to_proto.rs

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use datafusion_physical_expr::scalar_subquery::ScalarSubqueryExpr;
3636
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
3737
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3838
use datafusion_physical_plan::expressions::{
39-
BinaryExpr, CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr,
40-
IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
39+
CaseExpr, CastExpr, DynamicFilterPhysicalExpr, InListExpr, IsNotNullExpr, IsNullExpr,
40+
LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
4141
};
4242
use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr};
4343
use datafusion_physical_plan::udaf::AggregateFunctionExpr;
@@ -252,7 +252,7 @@ pub fn serialize_physical_expr(
252252
}
253253

254254
/// Concrete [`PhysicalExprEncode`] driver used to back
255-
/// [`PhysicalExprEncodeCtx`] when expressions invoke `PhysicalExpr::try_to_proto`.
255+
/// [`PhysicalExprEncodeCtx`] when expressions invoke `PhysicalExpr::to_proto`.
256256
///
257257
/// Wraps the existing extension codec + converter pair so individual
258258
/// expressions can recurse into children without depending on
@@ -336,46 +336,6 @@ pub fn serialize_physical_expr_with_converter(
336336
},
337337
)),
338338
})
339-
} else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
340-
// Linearize a nested binary expression tree of the same operator
341-
// into a flat vector of operands to avoid deep recursion in proto.
342-
let op = expr.op();
343-
let mut operand_refs: Vec<&Arc<dyn PhysicalExpr>> = vec![expr.right()];
344-
let mut current_expr: &BinaryExpr = expr;
345-
loop {
346-
match current_expr.left().downcast_ref::<BinaryExpr>() {
347-
Some(bin) if bin.op() == op => {
348-
operand_refs.push(bin.right());
349-
current_expr = bin;
350-
}
351-
_ => {
352-
operand_refs.push(current_expr.left());
353-
break;
354-
}
355-
}
356-
}
357-
358-
// Reverse so operands are ordered from left innermost to right outermost
359-
operand_refs.reverse();
360-
361-
let operands = operand_refs
362-
.iter()
363-
.map(|e| proto_converter.physical_expr_to_proto(e, codec))
364-
.collect::<Result<Vec<_>>>()?;
365-
366-
let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
367-
l: None,
368-
r: None,
369-
op: format!("{:?}", op),
370-
operands,
371-
});
372-
373-
Ok(protobuf::PhysicalExprNode {
374-
expr_id,
375-
expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
376-
binary_expr,
377-
)),
378-
})
379339
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
380340
Ok(protobuf::PhysicalExprNode {
381341
expr_id,

0 commit comments

Comments
 (0)