Skip to content

Commit b4c85b6

Browse files
authored
vortex-datafusion: Pipe session through to converter (#8591)
## Rationale for this change To be able to use #7824 in vortex-datafusion. ## What changes are included in this PR? Piping it through and a test for UUID -> fixedsizebinary(16) ## What APIs are changed? Are there any user-facing changes? Default still has the same behavior, so while there is a new API, there's no change to existing users other than adhering to the session they already passed. Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
1 parent 6fe605b commit b4c85b6

2 files changed

Lines changed: 60 additions & 8 deletions

File tree

vortex-datafusion/src/convert/exprs.rs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::sync::Arc;
55

66
use arrow_schema::DataType;
7+
use arrow_schema::Field;
78
use arrow_schema::Schema;
89
use datafusion_common::Result as DFResult;
910
use datafusion_common::exec_datafusion_err;
@@ -20,9 +21,9 @@ use datafusion_physical_expr::utils::collect_columns;
2021
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
2122
use datafusion_physical_plan::expressions as df_expr;
2223
use itertools::Itertools;
23-
use vortex::dtype::DType;
24+
use vortex::VortexSessionDefault;
25+
use vortex::array::arrow::ArrowSessionExt;
2426
use vortex::dtype::Nullability;
25-
use vortex::dtype::arrow::FromArrowType;
2627
use vortex::expr::Expression;
2728
use vortex::expr::and_collect;
2829
use vortex::expr::byte_length;
@@ -42,6 +43,7 @@ use vortex::scalar_fn::fns::binary::Binary;
4243
use vortex::scalar_fn::fns::like::Like;
4344
use vortex::scalar_fn::fns::like::LikeOptions;
4445
use vortex::scalar_fn::fns::operators::Operator;
46+
use vortex::session::VortexSession;
4547

4648
use crate::convert::FromDataFusion;
4749

@@ -109,10 +111,29 @@ pub trait ExpressionConvertor: Send + Sync {
109111
}
110112

111113
/// The default [`ExpressionConvertor`] implementation.
112-
#[derive(Default)]
113-
pub struct DefaultExpressionConvertor {}
114+
pub struct DefaultExpressionConvertor {
115+
/// Session used to resolve Arrow → Vortex dtypes through the extension
116+
/// plugin registry, so registered extension types (e.g. UUID ⇄
117+
/// `FixedSizeBinary[16]`) convert correctly instead of hitting the static,
118+
/// non-plugin-aware `DType::from_arrow`.
119+
session: VortexSession,
120+
}
121+
122+
impl Default for DefaultExpressionConvertor {
123+
fn default() -> Self {
124+
Self {
125+
session: VortexSession::default(),
126+
}
127+
}
128+
}
114129

115130
impl DefaultExpressionConvertor {
131+
/// Create a convertor that resolves Arrow extension types using `session`'s
132+
/// dtype registry.
133+
pub fn new(session: VortexSession) -> Self {
134+
Self { session }
135+
}
136+
116137
/// Attempts to convert DataFusion's `octet_length` function to Vortex `byte_length`.
117138
fn try_convert_octet_length(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult<Expression> {
118139
let [input] = scalar_fn.args() else {
@@ -122,8 +143,15 @@ impl DefaultExpressionConvertor {
122143
};
123144

124145
let input = self.convert(input.as_ref())?;
125-
let return_dtype =
126-
DType::from_arrow((scalar_fn.return_type(), scalar_fn.nullable().into()));
146+
let return_dtype = self
147+
.session
148+
.arrow()
149+
.from_arrow_field(&Field::new(
150+
"",
151+
scalar_fn.return_type().clone(),
152+
scalar_fn.nullable(),
153+
))
154+
.map_err(|e| exec_datafusion_err!("Failed to convert return type to dtype: {e}"))?;
127155
Ok(cast(byte_length(input), return_dtype))
128156
}
129157

@@ -246,7 +274,11 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
246274
}
247275

248276
if let Some(cast_expr) = df.downcast_ref::<df_expr::CastExpr>() {
249-
let cast_dtype = DType::from_arrow(cast_expr.target_field().as_ref());
277+
let cast_dtype = self
278+
.session
279+
.arrow()
280+
.from_arrow_field(cast_expr.target_field().as_ref())
281+
.map_err(|e| exec_datafusion_err!("Failed to convert cast target to dtype: {e}"))?;
250282
let child = self.convert(cast_expr.expr().as_ref())?;
251283
return Ok(cast(child, cast_dtype));
252284
}
@@ -975,6 +1007,25 @@ mod tests {
9751007
Ok(())
9761008
}
9771009

1010+
/// A cast whose target is a UUID-tagged `FixedSizeBinary(16)` must resolve
1011+
/// through the dtype extension registry (UUID is registered on the default
1012+
/// session) instead of the static, non-plugin-aware `DType::from_arrow`,
1013+
/// which does not support `FixedSizeBinary` and previously panicked here.
1014+
#[test]
1015+
fn test_cast_to_uuid_resolves_via_registry() -> anyhow::Result<()> {
1016+
use arrow_schema::extension::Uuid;
1017+
1018+
let mut uuid_field = Field::new("id", DataType::FixedSizeBinary(16), true);
1019+
uuid_field.try_with_extension_type(Uuid)?;
1020+
1021+
let child = Arc::new(df_expr::Column::new("id", 0)) as Arc<dyn PhysicalExpr>;
1022+
let cast = df_expr::CastExpr::new_with_target_field(child, Arc::new(uuid_field), None);
1023+
1024+
// Must convert without panicking — the static path would `unimplemented!()`.
1025+
DefaultExpressionConvertor::default().convert(&cast)?;
1026+
Ok(())
1027+
}
1028+
9781029
/// Test that applying a CASE expression to an Arrow RecordBatch using DataFusion
9791030
/// matches the result of applying the converted Vortex expression.
9801031
#[test]

vortex-datafusion/src/persistent/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ impl VortexSource {
220220
let full_schema = table_schema.table_schema();
221221
let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
222222
let projection = ProjectionExprs::from_indices(&indices, full_schema);
223+
let expression_convertor = Arc::new(DefaultExpressionConvertor::new(session.clone()));
223224

224225
Self {
225226
session,
@@ -231,7 +232,7 @@ impl VortexSource {
231232
_unused_df_metrics: Default::default(),
232233
layout_readers: Arc::new(DashMap::default()),
233234
natural_split_ranges: Arc::new(DashMap::default()),
234-
expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
235+
expression_convertor,
235236
vortex_reader_factory: None,
236237
vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
237238
file_metadata_cache: None,

0 commit comments

Comments
 (0)