Skip to content

Commit d8d4e04

Browse files
committed
WIP update to main on datafusion
1 parent 030873b commit d8d4e04

File tree

11 files changed

+283
-247
lines changed

11 files changed

+283
-247
lines changed

Cargo.lock

Lines changed: 241 additions & 226 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ pyo3 = { version = "0.25", features = ["extension-module", "abi3", "abi3-py39"]
3939
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"]}
4040
pyo3-log = "0.12.4"
4141
arrow = { version = "56", features = ["pyarrow"] }
42-
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
43-
datafusion-substrait = { version = "50", optional = true }
44-
datafusion-proto = { version = "50" }
45-
datafusion-ffi = { version = "50" }
42+
datafusion = { version = "50.3.0", features = ["avro", "unicode_expressions"] }
43+
datafusion-substrait = { version = "50.3.0", optional = true }
44+
datafusion-proto = { version = "50.3.0" }
45+
datafusion-ffi = { version = "50.3.0" }
4646
prost = "0.13.1" # keep in line with `datafusion-substrait`
4747
uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
@@ -64,3 +64,9 @@ crate-type = ["cdylib", "rlib"]
6464
[profile.release]
6565
lto = true
6666
codegen-units = 1
67+
68+
[patch.crates-io]
69+
datafusion = { git = "https://github.com/apache/datafusion", branch = "main" }
70+
datafusion-substrait = { git = "https://github.com/apache/datafusion", branch = "main" }
71+
datafusion-proto = { git = "https://github.com/apache/datafusion", branch = "main" }
72+
datafusion-ffi = { git = "https://github.com/apache/datafusion", branch = "main" }

src/common/data_type.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use datafusion::arrow::array::Array;
1919
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
2020
use datafusion::common::ScalarValue;
21-
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
21+
use datafusion::logical_expr::expr::NullTreatment as DFNullTreatment;
2222
use pyo3::exceptions::PyNotImplementedError;
2323
use pyo3::{exceptions::PyValueError, prelude::*};
2424

@@ -261,6 +261,12 @@ impl DataTypeMap {
261261
ScalarValue::Float16(_) => Ok(DataType::Float16),
262262
ScalarValue::Float32(_) => Ok(DataType::Float32),
263263
ScalarValue::Float64(_) => Ok(DataType::Float64),
264+
ScalarValue::Decimal32(_, precision, scale) => {
265+
Ok(DataType::Decimal32(*precision, *scale))
266+
}
267+
ScalarValue::Decimal64(_, precision, scale) => {
268+
Ok(DataType::Decimal64(*precision, *scale))
269+
}
264270
ScalarValue::Decimal128(_, precision, scale) => {
265271
Ok(DataType::Decimal128(*precision, *scale))
266272
}

src/dataframe.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ impl PyDataFrame {
412412

413413
/// Returns the schema from the logical plan
414414
fn schema(&self) -> PyArrowType<Schema> {
415-
PyArrowType(self.df.schema().into())
415+
PyArrowType(self.df.schema().as_arrow().to_owned())
416416
}
417417

418418
/// Convert this DataFrame into a Table Provider that can be used in register_table
@@ -925,7 +925,7 @@ impl PyDataFrame {
925925
requested_schema: Option<Bound<'py, PyCapsule>>,
926926
) -> PyDataFusionResult<Bound<'py, PyCapsule>> {
927927
let mut batches = wait_for_future(py, self.df.as_ref().clone().collect())??;
928-
let mut schema: Schema = self.df.schema().to_owned().into();
928+
let mut schema: Schema = self.df.schema().as_arrow().clone();
929929

930930
if let Some(schema_capsule) = requested_schema {
931931
validate_pycapsule(&schema_capsule, "arrow_schema")?;

src/expr/create_external_table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,15 @@ impl Display for PyCreateExternalTable {
6666
impl PyCreateExternalTable {
6767
#[allow(clippy::too_many_arguments)]
6868
#[new]
69-
#[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))]
69+
#[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, or_replace, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))]
7070
pub fn new(
7171
schema: PyDFSchema,
7272
name: String,
7373
location: String,
7474
file_type: String,
7575
table_partition_cols: Vec<String>,
7676
if_not_exists: bool,
77+
or_replace: bool,
7778
temporary: bool,
7879
order_exprs: Vec<Vec<PySortExpr>>,
7980
unbounded: bool,
@@ -89,6 +90,7 @@ impl PyCreateExternalTable {
8990
file_type,
9091
table_partition_cols,
9192
if_not_exists,
93+
or_replace,
9294
temporary,
9395
definition,
9496
order_exprs: order_exprs

src/expr/placeholder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ impl PyPlaceholder {
3838
self.placeholder.id.clone()
3939
}
4040

41+
// TODO now that the inner struct holds field we should expose it instead
4142
fn data_type(&self) -> Option<PyDataType> {
4243
self.placeholder
43-
.data_type
44+
.field
4445
.as_ref()
45-
.map(|e| e.clone().into())
46+
.map(|f| f.data_type().clone().into())
4647
}
4748
}

src/expr/statement.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::datatypes::{DataType, Field};
1819
use datafusion::logical_expr::{
1920
Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
2021
TransactionEnd, TransactionIsolationLevel, TransactionStart,
2122
};
2223
use pyo3::{prelude::*, IntoPyObjectExt};
24+
use std::sync::Arc;
2325

2426
use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
2527

@@ -334,17 +336,22 @@ impl LogicalNode for PyPrepare {
334336

335337
#[pymethods]
336338
impl PyPrepare {
339+
// TODO Now that we have field instead of datatype in the inner struct
340+
// we should update the signatures below to match
337341
#[new]
338342
pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
339343
let input = input.plan().clone();
340-
let data_types = data_types
344+
let fields = data_types
341345
.into_iter()
342-
.map(|data_type| data_type.into())
346+
.map(|data_type| DataType::from(data_type))
347+
.enumerate()
348+
.map(|(idx, data_type)| Field::new(format!("field_{idx}"), data_type, true))
349+
.map(Arc::new)
343350
.collect();
344351
PyPrepare {
345352
prepare: Prepare {
346353
name,
347-
data_types,
354+
fields,
348355
input,
349356
},
350357
}
@@ -356,10 +363,9 @@ impl PyPrepare {
356363

357364
pub fn data_types(&self) -> Vec<PyDataType> {
358365
self.prepare
359-
.data_types
360-
.clone()
361-
.into_iter()
362-
.map(|t| t.into())
366+
.fields
367+
.iter()
368+
.map(|f| f.data_type().to_owned().into())
363369
.collect()
364370
}
365371

src/functions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use datafusion::functions;
4141
use datafusion::functions_aggregate;
4242
use datafusion::functions_window;
4343
use datafusion::logical_expr::expr::Alias;
44-
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
44+
use datafusion::logical_expr::expr::NullTreatment as DFNullTreatment;
4545
use datafusion::logical_expr::{expr::WindowFunction, lit, Expr, WindowFunctionDefinition};
4646

4747
fn add_builder_fns_to_aggregate(

src/physical_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl PyExecutionPlan {
8383
})?;
8484

8585
let codec = DefaultPhysicalExtensionCodec {};
86-
let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), &codec)?;
86+
let plan = proto_plan.try_into_physical_plan(&ctx.ctx.task_ctx(), &codec)?;
8787
Ok(Self::new(plan))
8888
}
8989

src/sql/logical.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl PyLogicalPlan {
206206
})?;
207207

208208
let codec = DefaultLogicalExtensionCodec {};
209-
let plan = proto_plan.try_into_logical_plan(&ctx.ctx, &codec)?;
209+
let plan = proto_plan.try_into_logical_plan(&ctx.ctx.task_ctx(), &codec)?;
210210
Ok(Self::new(plan))
211211
}
212212
}

0 commit comments

Comments
 (0)