From a5593b4edac23a6063d321b26aff2e46ed8f9064 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 26 Jun 2025 16:14:49 -0700 Subject: [PATCH 01/31] Support Datafusion insert_into --- .../iceberg/src/arrow/nan_val_cnt_visitor.rs | 2 + crates/iceberg/src/arrow/value.rs | 13 +- crates/iceberg/src/spec/manifest/_serde.rs | 5 +- crates/iceberg/src/spec/manifest/mod.rs | 2 + crates/iceberg/src/spec/schema/visitor.rs | 11 +- crates/integrations/datafusion/Cargo.toml | 3 + .../datafusion/src/physical_plan/commit.rs | 288 +++++++++++++++ .../datafusion/src/physical_plan/mod.rs | 2 + .../datafusion/src/physical_plan/write.rs | 328 ++++++++++++++++++ .../integrations/datafusion/src/table/mod.rs | 36 ++ .../tests/integration_datafusion_test.rs | 67 ++++ 11 files changed, 751 insertions(+), 6 deletions(-) create mode 100644 crates/integrations/datafusion/src/physical_plan/commit.rs create mode 100644 crates/integrations/datafusion/src/physical_plan/write.rs diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 6b75c011cb..0a4bda28f9 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -159,6 +159,8 @@ impl NanValueCountVisitor { let arrow_arr_partner_accessor = ArrowArrayAccessor {}; let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; + println!("----StructArray from record stream: {:?}", struct_arr); + println!("----Schema.as_struct from table: {:?}", schema.as_struct()); visit_struct_with_partner( schema.as_struct(), &struct_arr, diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index f8fd380dd0..29fb34ff8b 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -440,10 +440,12 @@ impl PartnerAccessor for ArrowArrayAccessor { Ok(schema_partner) } + // todo generate field_pos in datafusion instead of passing to here fn field_partner<'a>( &self, struct_partner: &'a ArrayRef, field: &NestedField, + field_pos: Option, ) -> Result<&'a ArrayRef> { let struct_array = struct_partner .as_any() @@ -455,6 +457,13 @@ impl PartnerAccessor for ArrowArrayAccessor { ) })?; + println!( + "!!!Accessor struct array from struct partner: {:?}", + struct_array + ); + + println!("!!!field: {:?}", field); + let field_pos = struct_array .fields() .iter() @@ -463,12 +472,12 @@ impl PartnerAccessor for ArrowArrayAccessor { .map(|id| id == field.id) .unwrap_or(false) }) - .ok_or_else(|| { + .unwrap_or(field_pos.ok_or_else(|| { Error::new( ErrorKind::DataInvalid, format!("Field id {} not found in struct array", field.id), ) - })?; + })?); Ok(struct_array.column(field_pos)) } diff --git a/crates/iceberg/src/spec/manifest/_serde.rs b/crates/iceberg/src/spec/manifest/_serde.rs index fd7bc2e69a..bc3d4a698f 100644 --- a/crates/iceberg/src/spec/manifest/_serde.rs +++ b/crates/iceberg/src/spec/manifest/_serde.rs @@ -96,9 +96,10 @@ impl ManifestEntryV1 { } } +/// todo doc #[serde_as] #[derive(Serialize, Deserialize)] -pub(super) struct DataFileSerde { +pub struct DataFileSerde { #[serde(default)] content: i32, file_path: String, @@ -126,6 +127,7 @@ pub(super) struct DataFileSerde { } impl DataFileSerde { + /// todo doc pub fn try_from( value: super::DataFile, partition_type: &StructType, @@ -160,6 +162,7 @@ impl DataFileSerde { }) } + /// todo doc pub fn try_into( self, partition_spec_id: i32, diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 33b7d38706..48ce5d7b64 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +// todo fix encapsulation mod _serde; +pub use _serde::*; mod data_file; pub use data_file::*; diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index ebb9b86bba..faad728a2a 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -192,7 +192,12 @@ pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. - fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; + fn field_partner<'a>( + &self, + struct_partner: &'a P, + field: &NestedField, + field_pos: Option, + ) -> Result<&'a P>; /// Get the list element partner from list partner. fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>; /// Get the map key partner from map partner. @@ -253,8 +258,8 @@ pub fn visit_struct_with_partner, A: PartnerAc accessor: &A, ) -> Result { let mut results = Vec::with_capacity(s.fields().len()); - for field in s.fields() { - let field_partner = accessor.field_partner(partner, field)?; + for (pos, field) in s.fields().iter().enumerate() { + let field_partner = accessor.field_partner(partner, field, Some(pos))?; visitor.before_struct_field(field, field_partner)?; let result = visit_type_with_partner(&field.field_type, field_partner, visitor, accessor)?; visitor.after_struct_field(field, field_partner)?; diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 6954950b06..88f5b1b523 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,9 +34,12 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } +parquet = { workspace = true } tokio = { workspace = true } +serde_json = { workspace = true } [dev-dependencies] expect-test = { workspace = true } +iceberg-catalog-memory = { workspace = true } parquet = { workspace = true } tempfile = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs new file mode 100644 index 0000000000..a1f133748f --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt64Array}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::{DataFusionError, Result as DFResult}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream_partitioned, +}; +use futures::{StreamExt, TryStreamExt}; +use iceberg::Catalog; +use iceberg::spec::{DataFile, DataFileSerde}; +use iceberg::table::Table; +use iceberg::transaction::Transaction; +use serde_json; + +use crate::to_datafusion_error; + +/// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec +/// instances and using Transaction::fast_append to commit the data files written. +pub(crate) struct IcebergCommitExec { + table: Table, + catalog: Arc, + write_plan: Arc, + schema: ArrowSchemaRef, + count_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergCommitExec { + pub fn new( + table: Table, + catalog: Arc, + write_plan: Arc, + schema: ArrowSchemaRef, + ) -> Self { + let plan_properties = Self::compute_properties(schema.clone()); + + Self { + table, + catalog, + write_plan, + schema, + count_schema: Self::make_count_schema(), + plan_properties, + } + } + + /// Compute the plan properties for this execution plan + fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } + + // Create a record batch with just the count of rows written + fn make_count_batch(count: u64) -> RecordBatch { + let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + + RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, false)]).unwrap() + } + + fn make_count_schema() -> ArrowSchemaRef { + // Define a schema. + Arc::new(ArrowSchema::new(vec![Field::new( + "count", + DataType::UInt64, + false, + )])) + } +} + +impl Debug for IcebergCommitExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "IcebergCommitExec") + } +} + +impl DisplayAs for IcebergCommitExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "IcebergCommitExec: table={}", self.table.identifier()) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergCommitExec: table={}, schema={:?}", + self.table.identifier(), + self.schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "IcebergCommitExec: table={}", self.table.identifier()) + } + } + } +} + +impl ExecutionPlan for IcebergCommitExec { + fn name(&self) -> &str { + "IcebergCommitExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.write_plan] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal( + "IcebergCommitExec expects exactly one child".to_string(), + )); + } + + Ok(Arc::new(IcebergCommitExec::new( + self.table.clone(), + self.catalog.clone(), + children[0].clone(), + self.schema.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + // IcebergCommitExec only has one partition (partition 0) + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "IcebergCommitExec only has one partition, but got partition {}", + partition + ))); + } + + let table = self.table.clone(); + let input_plan = self.write_plan.clone(); + let count_schema = Arc::clone(&self.count_schema); + + // todo revisit this + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let current_schema = self.table.metadata().current_schema().clone(); + + let _catalog = Arc::clone(&self.catalog); + + // Process the input streams from all partitions and commit the data files + let stream = futures::stream::once(async move { + let mut data_files: Vec = Vec::new(); + let mut total_count: u64 = 0; + + // Execute and collect results from all partitions of the input plan + let batches = execute_stream_partitioned(input_plan, context)?; + + // Collect all data files from this partition's stream + for mut batch_stream in batches { + while let Some(batch_result) = batch_stream.as_mut().next().await { + let batch = batch_result?; + + let count_array = batch + .column_by_name("count") + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'count' column in input batch".to_string(), + ) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'count' column to be UInt64Array".to_string(), + ) + })?; + + let files_array = batch + .column_by_name("data_files") + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column in input batch".to_string(), + ) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column to be StringArray".to_string(), + ) + })?; + + // todo remove log + println!("files_array to deserialize: {:?}", files_array); + + // Sum all values in the count_array + total_count += count_array.iter().flatten().sum::(); + + // Deserialize all data files from the StringArray + let batch_files: Vec> = (0..files_array.len()) + .map(|i| { + let files_json = files_array.value(i); + serde_json::from_str::(files_json) + .map_err(|e| { + DataFusionError::Internal(format!( + "Failed to deserialize data files: {}", + e + )) + })? + .try_into(spec_id, &partition_type, ¤t_schema) + .map_err(to_datafusion_error) + }) + .collect(); + + // Collect results, propagating any errors + let batch_files: Vec = batch_files + .into_iter() + .collect::>()?; + + // Add all deserialized files to our collection + data_files.extend(batch_files); + } + } + + // If no data files were collected, return an empty result + if data_files.is_empty() { + return Ok(RecordBatch::new_empty(count_schema)); + } + + // Create a transaction and commit the data files + let tx = Transaction::new(&table); + let _action = tx.fast_append().add_data_files(data_files); + + // todo uncomment this + // // Apply the action and commit the transaction + // let updated_table = action + // .apply(tx) + // .map_err(to_datafusion_error)? + // .commit(catalog.as_ref()) + // .await + // .map_err(to_datafusion_error)?; + + Ok(Self::make_count_batch(total_count)) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.count_schema), + stream, + ))) + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 58fb065dde..1e86225127 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod scan; +pub(crate) mod write; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs new file mode 100644 index 0000000000..eb5cf24028 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::Result as DFResult; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::StreamExt; +use iceberg::spec::{DataFile, DataFileFormat, DataFileSerde, FormatVersion}; +use iceberg::table::Table; +use iceberg::writer::CurrentFileStatus; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder}; +use parquet::file::properties::WriterProperties; + +use crate::to_datafusion_error; + +pub(crate) struct IcebergWriteExec { + table: Table, + input: Arc, + result_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergWriteExec { + pub fn new(table: Table, input: Arc, schema: ArrowSchemaRef) -> Self { + let plan_properties = Self::compute_properties(schema.clone()); + + Self { + table, + input, + result_schema: Self::make_result_schema(), + plan_properties, + } + } + + /// todo: Copied from scan.rs + fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + // TODO: + // This is more or less a placeholder, to be replaced + // once we support output-partitioning + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } + + // Create a record batch with count and serialized data files + fn make_result_batch(count: u64, data_files: Vec) -> RecordBatch { + let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; + + RecordBatch::try_from_iter_with_nullable(vec![ + ("count", count_array, false), + ("data_files", files_array, false), + ]) + .unwrap() + } + + fn make_result_schema() -> ArrowSchemaRef { + // Define a schema. + Arc::new(ArrowSchema::new(vec![ + Field::new("data_files", DataType::Utf8, false), + Field::new("count", DataType::UInt64, false), + ])) + } +} + +impl Debug for IcebergWriteExec { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +impl DisplayAs for IcebergWriteExec { + fn fmt_as(&self, _t: DisplayFormatType, _f: &mut Formatter) -> std::fmt::Result { + todo!() + } +} + +impl ExecutionPlan for IcebergWriteExec { + fn name(&self) -> &str { + "IcebergWriteExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let parquet_writer_fut = ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo actual filename + DefaultFileNameGenerator::new("what".to_string(), None, DataFileFormat::Parquet), + ) + .build(); + + // todo repartition + let data = self.input.execute(partition, context)?; + let result_schema = Arc::clone(&self.result_schema); + + // todo non-default partition spec? + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let is_version_1 = self.table.metadata().format_version() == FormatVersion::V1; + + let stream = futures::stream::once(async move { + let mut writer = parquet_writer_fut.await.map_err(to_datafusion_error)?; + + let mut input_stream = data; + + while let Some(batch_res) = input_stream.next().await { + let batch = batch_res?; + writer.write(&batch).await.map_err(to_datafusion_error)?; + } + + let count = writer.current_row_num() as u64; + let data_file_builders = writer.close().await.map_err(to_datafusion_error)?; + + // Convert builders to data files + let data_files = data_file_builders + .into_iter() + .map(|mut builder| builder.partition_spec_id(spec_id).build().unwrap()) + .collect::>(); + + let data_files = data_files + .into_iter() + .map(|f| { + let serde = DataFileSerde::try_from(f, &partition_type, is_version_1).unwrap(); + let json = serde_json::to_string(&serde).unwrap(); + println!("Serialized data file: {}", json); // todo remove log + json + }) + .collect::>(); + + Ok(Self::make_result_batch(count, data_files)) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + result_schema, + stream, + ))) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use datafusion::arrow::array::StringArray; + use iceberg::spec::{ + DataFile, DataFileBuilder, DataFileFormat, DataFileSerde, PartitionSpec, PrimitiveType, + Schema, Struct, Type, + }; + + // todo move this to DataFileSerde? + #[test] + fn test_data_file_serialization() { + // Create a simple schema + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + iceberg::spec::NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)) + .into(), + iceberg::spec::NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + // Create a partition spec + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("id", "id_partition", iceberg::spec::Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Get partition type from the partition spec + let partition_type = partition_spec.partition_type(&schema).unwrap(); + + // Set version flag + let is_version_1 = false; + + // Create a vector of DataFile objects + let data_files = vec![ + DataFileBuilder::default() + .content(iceberg::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file1.parquet".to_string()) + .file_size_in_bytes(1024) + .record_count(100) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 512), (2, 512)])) + .value_counts(HashMap::from([(1, 100), (2, 100)])) + .null_value_counts(HashMap::from([(1, 0), (2, 0)])) + .build() + .unwrap(), + DataFileBuilder::default() + .content(iceberg::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file2.parquet".to_string()) + .file_size_in_bytes(2048) + .record_count(200) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 1024), (2, 1024)])) + .value_counts(HashMap::from([(1, 200), (2, 200)])) + .null_value_counts(HashMap::from([(1, 10), (2, 5)])) + .build() + .unwrap(), + ]; + + // Serialize the DataFile objects + let serialized_files = data_files + .into_iter() + .map(|f| { + let serde = DataFileSerde::try_from(f, &partition_type, is_version_1).unwrap(); + let json = serde_json::to_string(&serde).unwrap(); + println!("Test serialized data file: {}", json); + json + }) + .collect::>(); + + // Verify we have the expected number of serialized files + assert_eq!(serialized_files.len(), 2); + + // Verify each serialized file contains expected data + for json in &serialized_files { + assert!(json.contains("path/to/file")); + assert!(json.contains("parquet")); + assert!(json.contains("record_count")); + assert!(json.contains("file_size_in_bytes")); + } + + // Convert Vec to StringArray and print it + let string_array = StringArray::from(serialized_files.clone()); + println!("StringArray: {:?}", string_array); + + // Now deserialize the JSON strings back into DataFile objects + println!("\nDeserializing back to DataFile objects:"); + let deserialized_files: Vec = serialized_files + .into_iter() + .map(|json| { + // First deserialize to DataFileSerde + let data_file_serde: DataFileSerde = + serde_json::from_str(&json).expect("Failed to deserialize to DataFileSerde"); + + // Then convert to DataFile + let data_file = data_file_serde + .try_into(partition_spec.spec_id(), &partition_type, &schema) + .expect("Failed to convert DataFileSerde to DataFile"); + + println!("Deserialized DataFile: {:?}", data_file); + data_file + }) + .collect(); + + // Verify we have the expected number of deserialized files + assert_eq!(deserialized_files.len(), 2); + + // Verify the deserialized files have the expected properties + for file in &deserialized_files { + assert_eq!(file.content_type(), iceberg::spec::DataContentType::Data); + assert_eq!(file.file_format(), DataFileFormat::Parquet); + assert!(file.file_path().contains("path/to/file")); + assert!(file.record_count() == 100 || file.record_count() == 200); + } + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 7f741a534a..6bf9ba8cf9 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,8 +24,10 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; +use datafusion::common::DataFusionError; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; +use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; @@ -34,7 +36,9 @@ use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; +use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::write::IcebergWriteExec; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. @@ -46,6 +50,8 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// The catalog that the table belongs to. + catalog: Option>, } impl IcebergTableProvider { @@ -54,6 +60,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -73,6 +80,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: Some(client), }) } @@ -84,6 +92,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, }) } @@ -108,6 +117,7 @@ impl IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, + catalog: None, }) } @@ -157,6 +167,32 @@ impl TableProvider for IcebergTableProvider { // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } + + async fn insert_into( + &self, + _state: &dyn Session, + input: Arc, + _insert_op: InsertOp, + ) -> DFResult> { + let write_plan = Arc::new(IcebergWriteExec::new( + self.table.clone(), + input, + self.schema.clone(), + )); + + if let Some(catalog) = self.catalog.clone() { + Ok(Arc::new(IcebergCommitExec::new( + self.table.clone(), + catalog, + write_plan, + self.schema.clone(), + ))) + } else { + Err(DataFusionError::Execution( + "Catalog cannot be none for insert_into".to_string(), + )) + } + } } #[cfg(test)] diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 1491e4dbff..ebe43f5a42 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -26,6 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; +use futures::StreamExt; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; use iceberg::test_utils::check_record_batches; @@ -432,3 +433,69 @@ async fn test_metadata_table() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_insert_into() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let creation = get_table_creation(temp_path(), "my_table", None)?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + let provider = ctx.catalog("catalog").unwrap(); + let schema = provider.schema("test_provider_get_table_schema").unwrap(); + + let table = schema.table("my_table").await.unwrap().unwrap(); + let table_schema = table.schema(); + + let expected = [("foo1", &DataType::Int32), ("foo2", &DataType::Utf8)]; + + for (field, exp) in table_schema.fields().iter().zip(expected.iter()) { + assert_eq!(field.name(), exp.0); + assert_eq!(field.data_type(), exp.1); + assert!(!field.is_nullable()) + } + + let df = ctx + .sql("insert into catalog.test_provider_get_table_schema.my_table values (1, 'alan'),(2, 'turing')") + .await + .unwrap(); + + let task_ctx = Arc::new(df.task_ctx()); + let plan = df.create_physical_plan().await.unwrap(); + let mut stream = plan.execute(0, task_ctx).unwrap(); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + println!("Got RecordBatch with {} rows", batch.num_rows()); + for column in batch.columns() { + println!("{:?}", column); + } + } + Err(e) => { + eprintln!("Error reading batch: {:?}", e); + } + } + } + // Ensure both the plan and the stream conform to the same schema + // assert_eq!(plan.schema(), stream.schema()); + // assert_eq!( + // stream.schema().as_ref(), + // &ArrowSchema::new(vec![ + // Field::new("foo2", DataType::Utf8, false).with_metadata(HashMap::from([( + // PARQUET_FIELD_ID_META_KEY.to_string(), + // "2".to_string(), + // )])) + // ]), + // ); + + Ok(()) +} From 558b40230d79dd4e44b9f82c9cbd37c3f83b8e8a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 15:40:26 -0700 Subject: [PATCH 02/31] cleanup --- crates/integrations/datafusion/Cargo.toml | 1 - crates/integrations/datafusion/src/physical_plan/commit.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 88f5b1b523..ed05d65922 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -40,6 +40,5 @@ serde_json = { workspace = true } [dev-dependencies] expect-test = { workspace = true } -iceberg-catalog-memory = { workspace = true } parquet = { workspace = true } tempfile = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index a1f133748f..998bb8367c 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -31,7 +31,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream_partitioned, }; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use iceberg::Catalog; use iceberg::spec::{DataFile, DataFileSerde}; use iceberg::table::Table; From 847a2bb798cc417370fff207ae563c4b64d24bde Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 15:44:49 -0700 Subject: [PATCH 03/31] minor --- .../datafusion/src/physical_plan/write.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index eb5cf24028..d965b66ef5 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -95,14 +95,29 @@ impl IcebergWriteExec { } impl Debug for IcebergWriteExec { - fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { - todo!() + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "IcebergWriteExec") } } impl DisplayAs for IcebergWriteExec { - fn fmt_as(&self, _t: DisplayFormatType, _f: &mut Formatter) -> std::fmt::Result { - todo!() + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergWriteExec: table={}, result_schema={:?}", + self.table.identifier(), + self.result_schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + } } } From b067656d9226ca0b4a817ca944e2e78fa576fcbf Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 15:51:34 -0700 Subject: [PATCH 04/31] minor --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 831b3aa4f4..29e9c0da1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3676,6 +3676,7 @@ dependencies = [ "futures", "iceberg", "parquet", + "serde_json", "tempfile", "tokio", ] From f52a69899820fcf3136ad800c9c9674bf63df6ad Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 16:09:08 -0700 Subject: [PATCH 05/31] clippy ftw --- crates/integrations/datafusion/src/physical_plan/commit.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 998bb8367c..3e0926157a 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -36,7 +36,6 @@ use iceberg::Catalog; use iceberg::spec::{DataFile, DataFileSerde}; use iceberg::table::Table; use iceberg::transaction::Transaction; -use serde_json; use crate::to_datafusion_error; From d367a7cfb4fa0438ff38ca6e57d6703fde162b27 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 17:52:56 -0700 Subject: [PATCH 06/31] minor --- crates/integrations/datafusion/src/physical_plan/write.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index d965b66ef5..9bd7279638 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -19,7 +19,7 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, StructArray, UInt64Array}; use datafusion::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; @@ -74,7 +74,7 @@ impl IcebergWriteExec { } // Create a record batch with count and serialized data files - fn make_result_batch(count: u64, data_files: Vec) -> RecordBatch { + fn make_result_batch(count: u64, data_files: Vec) -> DFResult { let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; @@ -82,7 +82,6 @@ impl IcebergWriteExec { ("count", count_array, false), ("data_files", files_array, false), ]) - .unwrap() } fn make_result_schema() -> ArrowSchemaRef { @@ -199,7 +198,7 @@ impl ExecutionPlan for IcebergWriteExec { }) .collect::>(); - Ok(Self::make_result_batch(count, data_files)) + Ok(Self::make_result_batch(count, data_files)?) }) .boxed(); From 99af43084758e4f5adfd2608a3942442232ebd3e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 18:13:02 -0700 Subject: [PATCH 07/31] minor --- crates/integrations/datafusion/src/physical_plan/write.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 9bd7279638..d01be99299 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -19,11 +19,12 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, StructArray, UInt64Array}; +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; use datafusion::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; @@ -82,6 +83,9 @@ impl IcebergWriteExec { ("count", count_array, false), ("data_files", files_array, false), ]) + .map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) + }) } fn make_result_schema() -> ArrowSchemaRef { From 2f9efa805ad7712042826d8c4388a02ea3c1240c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 20:18:20 -0700 Subject: [PATCH 08/31] i luv cleaning up --- Cargo.lock | 1 + crates/integrations/datafusion/Cargo.toml | 1 + .../datafusion/src/physical_plan/commit.rs | 21 +++--- .../datafusion/src/physical_plan/write.rs | 65 ++++++++++++------- 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29e9c0da1b..8a13de28ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3679,6 +3679,7 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "uuid", ] [[package]] diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index ed05d65922..3d653b295f 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -37,6 +37,7 @@ iceberg = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } serde_json = { workspace = true } +uuid = { workspace = true } [dev-dependencies] expect-test = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 3e0926157a..c229e21e71 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -80,10 +80,12 @@ impl IcebergCommitExec { } // Create a record batch with just the count of rows written - fn make_count_batch(count: u64) -> RecordBatch { + fn make_count_batch(count: u64) -> DFResult { let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; - RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, false)]).unwrap() + RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, false)]).map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make count batch!".to_string())) + }) } fn make_count_schema() -> ArrowSchemaRef { @@ -232,10 +234,10 @@ impl ExecutionPlan for IcebergCommitExec { total_count += count_array.iter().flatten().sum::(); // Deserialize all data files from the StringArray - let batch_files: Vec> = (0..files_array.len()) - .map(|i| { - let files_json = files_array.value(i); - serde_json::from_str::(files_json) + let batch_files: Vec = (0..files_array.len()) + .map(|i| -> DFResult { + // Parse JSON to DataFileSerde and convert to DataFile + serde_json::from_str::(files_array.value(i)) .map_err(|e| { DataFusionError::Internal(format!( "Failed to deserialize data files: {}", @@ -245,11 +247,6 @@ impl ExecutionPlan for IcebergCommitExec { .try_into(spec_id, &partition_type, ¤t_schema) .map_err(to_datafusion_error) }) - .collect(); - - // Collect results, propagating any errors - let batch_files: Vec = batch_files - .into_iter() .collect::>()?; // Add all deserialized files to our collection @@ -275,7 +272,7 @@ impl ExecutionPlan for IcebergCommitExec { // .await // .map_err(to_datafusion_error)?; - Ok(Self::make_count_batch(total_count)) + Self::make_count_batch(total_count) }) .boxed(); diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index d01be99299..966fd57b73 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -29,9 +29,12 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_input_stream, +}; use futures::StreamExt; -use iceberg::spec::{DataFile, DataFileFormat, DataFileSerde, FormatVersion}; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::spec::{DataFileFormat, DataFileSerde, FormatVersion}; use iceberg::table::Table; use iceberg::writer::CurrentFileStatus; use iceberg::writer::file_writer::location_generator::{ @@ -39,6 +42,7 @@ use iceberg::writer::file_writer::location_generator::{ }; use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder}; use parquet::file::properties::WriterProperties; +use uuid::Uuid; use crate::to_datafusion_error; @@ -159,14 +163,24 @@ impl ExecutionPlan for IcebergWriteExec { self.table.file_io().clone(), DefaultLocationGenerator::new(self.table.metadata().clone()) .map_err(to_datafusion_error)?, - // todo actual filename - DefaultFileNameGenerator::new("what".to_string(), None, DataFileFormat::Parquet), + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new( + "datafusion".to_string(), + Some(Uuid::now_v7().to_string()), + DataFileFormat::Parquet, + ), ) .build(); - // todo repartition - let data = self.input.execute(partition, context)?; - let result_schema = Arc::clone(&self.result_schema); + let data = execute_input_stream( + Arc::clone(&self.input), + Arc::new( + schema_to_arrow_schema(self.table.metadata().current_schema()) + .map_err(to_datafusion_error)?, + ), + partition, + Arc::clone(&context), + )?; // todo non-default partition spec? let spec_id = self.table.metadata().default_partition_spec_id(); @@ -175,7 +189,6 @@ impl ExecutionPlan for IcebergWriteExec { let stream = futures::stream::once(async move { let mut writer = parquet_writer_fut.await.map_err(to_datafusion_error)?; - let mut input_stream = data; while let Some(batch_res) = input_stream.next().await { @@ -186,28 +199,36 @@ impl ExecutionPlan for IcebergWriteExec { let count = writer.current_row_num() as u64; let data_file_builders = writer.close().await.map_err(to_datafusion_error)?; - // Convert builders to data files - let data_files = data_file_builders - .into_iter() - .map(|mut builder| builder.partition_spec_id(spec_id).build().unwrap()) - .collect::>(); - - let data_files = data_files + // Convert builders to data files and then to JSON strings + let data_files: Vec = data_file_builders .into_iter() - .map(|f| { - let serde = DataFileSerde::try_from(f, &partition_type, is_version_1).unwrap(); - let json = serde_json::to_string(&serde).unwrap(); + .map(|mut builder| -> DFResult { + // Build the data file + let data_file = builder.partition_spec_id(spec_id).build().map_err(|e| { + DataFusionError::Execution(format!("Failed to build data file: {}", e)) + })?; + + // Convert to DataFileSerde + let serde = DataFileSerde::try_from(data_file, &partition_type, is_version_1).map_err(|e| { + DataFusionError::Execution(format!("Failed to convert to DataFileSerde: {}", e)) + })?; + + // Serialize to JSON + let json = serde_json::to_string(&serde).map_err(|e| { + DataFusionError::Execution(format!("Failed to serialize to JSON: {}", e)) + })?; + println!("Serialized data file: {}", json); // todo remove log - json + Ok(json) }) - .collect::>(); + .collect::>>()?; - Ok(Self::make_result_batch(count, data_files)?) + Self::make_result_batch(count, data_files) }) .boxed(); Ok(Box::pin(RecordBatchStreamAdapter::new( - result_schema, + Arc::clone(&self.result_schema), stream, ))) } From 9d7c1c37ece92b1dc9c86a0f4f0f90b3bbae2740 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 15 Jul 2025 20:38:55 -0700 Subject: [PATCH 09/31] fmt not working? --- .../datafusion/src/physical_plan/write.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 966fd57b73..4290912b78 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -207,17 +207,21 @@ impl ExecutionPlan for IcebergWriteExec { let data_file = builder.partition_spec_id(spec_id).build().map_err(|e| { DataFusionError::Execution(format!("Failed to build data file: {}", e)) })?; - + // Convert to DataFileSerde - let serde = DataFileSerde::try_from(data_file, &partition_type, is_version_1).map_err(|e| { - DataFusionError::Execution(format!("Failed to convert to DataFileSerde: {}", e)) + let serde = DataFileSerde::try_from(data_file, &partition_type, is_version_1) + .map_err(|e| { + DataFusionError::Execution(format!( + "Failed to convert to DataFileSerde: {}", + e + )) })?; - + // Serialize to JSON let json = serde_json::to_string(&serde).map_err(|e| { DataFusionError::Execution(format!("Failed to serialize to JSON: {}", e)) })?; - + println!("Serialized data file: {}", json); // todo remove log Ok(json) }) From e25f8886ab121c1297f5b6d9abc615e405187280 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 16 Jul 2025 10:12:28 -0700 Subject: [PATCH 10/31] do not expose serde --- crates/iceberg/src/spec/manifest/_serde.rs | 5 +-- crates/iceberg/src/spec/manifest/mod.rs | 36 +++++++++++++++-- .../datafusion/src/physical_plan/commit.rs | 24 ++++++------ .../datafusion/src/physical_plan/write.rs | 39 +++++++------------ 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/_serde.rs b/crates/iceberg/src/spec/manifest/_serde.rs index bc3d4a698f..fd7bc2e69a 100644 --- a/crates/iceberg/src/spec/manifest/_serde.rs +++ b/crates/iceberg/src/spec/manifest/_serde.rs @@ -96,10 +96,9 @@ impl ManifestEntryV1 { } } -/// todo doc #[serde_as] #[derive(Serialize, Deserialize)] -pub struct DataFileSerde { +pub(super) struct DataFileSerde { #[serde(default)] content: i32, file_path: String, @@ -127,7 +126,6 @@ pub struct DataFileSerde { } impl DataFileSerde { - /// todo doc pub fn try_from( value: super::DataFile, partition_type: &StructType, @@ -162,7 +160,6 @@ impl DataFileSerde { }) } - /// todo doc pub fn try_into( self, partition_spec_id: i32, diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 48ce5d7b64..286e8d102c 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -// todo fix encapsulation mod _serde; -pub use _serde::*; mod data_file; pub use data_file::*; @@ -35,7 +33,7 @@ use super::{ Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct, UNASSIGNED_SEQUENCE_NUMBER, }; -use crate::error::Result; +use crate::error::{Error, ErrorKind, Result}; /// A manifest contains metadata and a list of entries. #[derive(Debug, PartialEq, Eq, Clone)] @@ -121,6 +119,38 @@ impl Manifest { } } +/// Serialize a DataFile to a JSON string. +pub fn serialize_data_file_to_json( + data_file: DataFile, + partition_type: &super::StructType, + is_version_1: bool, +) -> Result { + let serde = _serde::DataFileSerde::try_from(data_file, partition_type, is_version_1)?; + serde_json::to_string(&serde).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to serialize DataFile to JSON: {}", e), + ) + }) +} + +/// Deserialize a DataFile from a JSON string. +pub fn deserialize_data_file_from_json( + json: &str, + partition_spec_id: i32, + partition_type: &super::StructType, + schema: &Schema, +) -> Result { + let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to deserialize JSON to DataFile: {}", e), + ) + })?; + + serde.try_into(partition_spec_id, partition_type, schema) +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index c229e21e71..dd83cd3975 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -33,7 +33,7 @@ use datafusion::physical_plan::{ }; use futures::StreamExt; use iceberg::Catalog; -use iceberg::spec::{DataFile, DataFileSerde}; +use iceberg::spec::{DataFile, deserialize_data_file_from_json}; use iceberg::table::Table; use iceberg::transaction::Transaction; @@ -234,18 +234,18 @@ impl ExecutionPlan for IcebergCommitExec { total_count += count_array.iter().flatten().sum::(); // Deserialize all data files from the StringArray - let batch_files: Vec = (0..files_array.len()) - .map(|i| -> DFResult { + let batch_files: Vec = files_array + .into_iter() + .flatten() + .map(|f| -> DFResult { // Parse JSON to DataFileSerde and convert to DataFile - serde_json::from_str::(files_array.value(i)) - .map_err(|e| { - DataFusionError::Internal(format!( - "Failed to deserialize data files: {}", - e - )) - })? - .try_into(spec_id, &partition_type, ¤t_schema) - .map_err(to_datafusion_error) + deserialize_data_file_from_json( + f, + spec_id, + &partition_type, + ¤t_schema, + ) + .map_err(to_datafusion_error) }) .collect::>()?; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4290912b78..68aca23a0b 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -34,7 +34,7 @@ use datafusion::physical_plan::{ }; use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; -use iceberg::spec::{DataFileFormat, DataFileSerde, FormatVersion}; +use iceberg::spec::{DataFileFormat, FormatVersion, serialize_data_file_to_json}; use iceberg::table::Table; use iceberg::writer::CurrentFileStatus; use iceberg::writer::file_writer::location_generator::{ @@ -208,19 +208,10 @@ impl ExecutionPlan for IcebergWriteExec { DataFusionError::Execution(format!("Failed to build data file: {}", e)) })?; - // Convert to DataFileSerde - let serde = DataFileSerde::try_from(data_file, &partition_type, is_version_1) - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to convert to DataFileSerde: {}", - e - )) - })?; - // Serialize to JSON - let json = serde_json::to_string(&serde).map_err(|e| { - DataFusionError::Execution(format!("Failed to serialize to JSON: {}", e)) - })?; + let json = + serialize_data_file_to_json(data_file, &partition_type, is_version_1) + .map_err(to_datafusion_error)?; println!("Serialized data file: {}", json); // todo remove log Ok(json) @@ -244,8 +235,8 @@ mod tests { use datafusion::arrow::array::StringArray; use iceberg::spec::{ - DataFile, DataFileBuilder, DataFileFormat, DataFileSerde, PartitionSpec, PrimitiveType, - Schema, Struct, Type, + DataFile, DataFileBuilder, DataFileFormat, PartitionSpec, PrimitiveType, Schema, Struct, + Type, deserialize_data_file_from_json, serialize_data_file_to_json, }; // todo move this to DataFileSerde? @@ -316,8 +307,7 @@ mod tests { let serialized_files = data_files .into_iter() .map(|f| { - let serde = DataFileSerde::try_from(f, &partition_type, is_version_1).unwrap(); - let json = serde_json::to_string(&serde).unwrap(); + let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap(); println!("Test serialized data file: {}", json); json }) @@ -343,14 +333,13 @@ mod tests { let deserialized_files: Vec = serialized_files .into_iter() .map(|json| { - // First deserialize to DataFileSerde - let data_file_serde: DataFileSerde = - serde_json::from_str(&json).expect("Failed to deserialize to DataFileSerde"); - - // Then convert to DataFile - let data_file = data_file_serde - .try_into(partition_spec.spec_id(), &partition_type, &schema) - .expect("Failed to convert DataFileSerde to DataFile"); + let data_file = deserialize_data_file_from_json( + &json, + partition_spec.spec_id(), + &partition_type, + &schema, + ) + .unwrap(); println!("Deserialized DataFile: {:?}", data_file); data_file From b554701ecabd6c63b49d488ff1a2750a1eca312a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 16 Jul 2025 10:24:02 -0700 Subject: [PATCH 11/31] cut it down --- Cargo.lock | 1 - crates/iceberg/src/spec/manifest/mod.rs | 119 ++++++++++++++++ crates/integrations/datafusion/Cargo.toml | 1 - .../datafusion/src/physical_plan/write.rs | 130 ------------------ 4 files changed, 119 insertions(+), 132 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8a13de28ad..4a8d086a41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3676,7 +3676,6 @@ dependencies = [ "futures", "iceberg", "parquet", - "serde_json", "tempfile", "tokio", "uuid", diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 286e8d102c..dd0e288d06 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -157,6 +157,7 @@ mod tests { use std::fs; use std::sync::Arc; + use arrow_array::StringArray; use tempfile::TempDir; use super::*; @@ -1088,4 +1089,122 @@ mod tests { assert!(!partitions[2].clone().contains_null); assert_eq!(partitions[2].clone().contains_nan, Some(false)); } + + #[test] + fn test_data_file_serialization() { + // Create a simple schema + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + crate::spec::NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)) + .into(), + crate::spec::NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + // Create a partition spec + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("id", "id_partition", crate::spec::Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Get partition type from the partition spec + let partition_type = partition_spec.partition_type(&schema).unwrap(); + + // Set version flag + let is_version_1 = false; + + // Create a vector of DataFile objects + let data_files = vec![ + DataFileBuilder::default() + .content(crate::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file1.parquet".to_string()) + .file_size_in_bytes(1024) + .record_count(100) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 512), (2, 512)])) + .value_counts(HashMap::from([(1, 100), (2, 100)])) + .null_value_counts(HashMap::from([(1, 0), (2, 0)])) + .build() + .unwrap(), + DataFileBuilder::default() + .content(crate::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file2.parquet".to_string()) + .file_size_in_bytes(2048) + .record_count(200) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 1024), (2, 1024)])) + .value_counts(HashMap::from([(1, 200), (2, 200)])) + .null_value_counts(HashMap::from([(1, 10), (2, 5)])) + .build() + .unwrap(), + ]; + + // Serialize the DataFile objects + let serialized_files = data_files + .into_iter() + .map(|f| { + let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap(); + println!("Test serialized data file: {}", json); + json + }) + .collect::>(); + + // Verify we have the expected number of serialized files + assert_eq!(serialized_files.len(), 2); + + // Verify each serialized file contains expected data + for json in &serialized_files { + assert!(json.contains("path/to/file")); + assert!(json.contains("parquet")); + assert!(json.contains("record_count")); + assert!(json.contains("file_size_in_bytes")); + } + + // Convert Vec to StringArray and print it + let string_array = StringArray::from(serialized_files.clone()); + println!("StringArray: {:?}", string_array); + + // Now deserialize the JSON strings back into DataFile objects + println!("\nDeserializing back to DataFile objects:"); + let deserialized_files: Vec = serialized_files + .into_iter() + .map(|json| { + let data_file = deserialize_data_file_from_json( + &json, + partition_spec.spec_id(), + &partition_type, + &schema, + ) + .unwrap(); + + println!("Deserialized DataFile: {:?}", data_file); + data_file + }) + .collect(); + + // Verify we have the expected number of deserialized files + assert_eq!(deserialized_files.len(), 2); + + // Verify the deserialized files have the expected properties + for file in &deserialized_files { + assert_eq!(file.content_type(), crate::spec::DataContentType::Data); + assert_eq!(file.file_format(), DataFileFormat::Parquet); + assert!(file.file_path().contains("path/to/file")); + assert!(file.record_count() == 100 || file.record_count() == 200); + } + } } diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 3d653b295f..0ee1738b4f 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -36,7 +36,6 @@ futures = { workspace = true } iceberg = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } -serde_json = { workspace = true } uuid = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 68aca23a0b..95425bcade 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -228,133 +228,3 @@ impl ExecutionPlan for IcebergWriteExec { ))) } } - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use datafusion::arrow::array::StringArray; - use iceberg::spec::{ - DataFile, DataFileBuilder, DataFileFormat, PartitionSpec, PrimitiveType, Schema, Struct, - Type, deserialize_data_file_from_json, serialize_data_file_to_json, - }; - - // todo move this to DataFileSerde? - #[test] - fn test_data_file_serialization() { - // Create a simple schema - let schema = Schema::builder() - .with_schema_id(1) - .with_identifier_field_ids(vec![1]) - .with_fields(vec![ - iceberg::spec::NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)) - .into(), - iceberg::spec::NestedField::required( - 2, - "name", - Type::Primitive(PrimitiveType::String), - ) - .into(), - ]) - .build() - .unwrap(); - - // Create a partition spec - let partition_spec = PartitionSpec::builder(schema.clone()) - .with_spec_id(1) - .add_partition_field("id", "id_partition", iceberg::spec::Transform::Identity) - .unwrap() - .build() - .unwrap(); - - // Get partition type from the partition spec - let partition_type = partition_spec.partition_type(&schema).unwrap(); - - // Set version flag - let is_version_1 = false; - - // Create a vector of DataFile objects - let data_files = vec![ - DataFileBuilder::default() - .content(iceberg::spec::DataContentType::Data) - .file_format(DataFileFormat::Parquet) - .file_path("path/to/file1.parquet".to_string()) - .file_size_in_bytes(1024) - .record_count(100) - .partition_spec_id(1) - .partition(Struct::empty()) - .column_sizes(HashMap::from([(1, 512), (2, 512)])) - .value_counts(HashMap::from([(1, 100), (2, 100)])) - .null_value_counts(HashMap::from([(1, 0), (2, 0)])) - .build() - .unwrap(), - DataFileBuilder::default() - .content(iceberg::spec::DataContentType::Data) - .file_format(DataFileFormat::Parquet) - .file_path("path/to/file2.parquet".to_string()) - .file_size_in_bytes(2048) - .record_count(200) - .partition_spec_id(1) - .partition(Struct::empty()) - .column_sizes(HashMap::from([(1, 1024), (2, 1024)])) - .value_counts(HashMap::from([(1, 200), (2, 200)])) - .null_value_counts(HashMap::from([(1, 10), (2, 5)])) - .build() - .unwrap(), - ]; - - // Serialize the DataFile objects - let serialized_files = data_files - .into_iter() - .map(|f| { - let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap(); - println!("Test serialized data file: {}", json); - json - }) - .collect::>(); - - // Verify we have the expected number of serialized files - assert_eq!(serialized_files.len(), 2); - - // Verify each serialized file contains expected data - for json in &serialized_files { - assert!(json.contains("path/to/file")); - assert!(json.contains("parquet")); - assert!(json.contains("record_count")); - assert!(json.contains("file_size_in_bytes")); - } - - // Convert Vec to StringArray and print it - let string_array = StringArray::from(serialized_files.clone()); - println!("StringArray: {:?}", string_array); - - // Now deserialize the JSON strings back into DataFile objects - println!("\nDeserializing back to DataFile objects:"); - let deserialized_files: Vec = serialized_files - .into_iter() - .map(|json| { - let data_file = deserialize_data_file_from_json( - &json, - partition_spec.spec_id(), - &partition_type, - &schema, - ) - .unwrap(); - - println!("Deserialized DataFile: {:?}", data_file); - data_file - }) - .collect(); - - // Verify we have the expected number of deserialized files - assert_eq!(deserialized_files.len(), 2); - - // Verify the deserialized files have the expected properties - for file in &deserialized_files { - assert_eq!(file.content_type(), iceberg::spec::DataContentType::Data); - assert_eq!(file.file_format(), DataFileFormat::Parquet); - assert!(file.file_path().contains("path/to/file")); - assert!(file.record_count() == 100 || file.record_count() == 200); - } - } -} From 77b349bc8c1968ea364b7f74b613c2f6d20db5cb Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 16 Jul 2025 16:14:54 -0700 Subject: [PATCH 12/31] Use stricter wrapper data file wrapper --- crates/iceberg/src/spec/table_metadata.rs | 7 ++ .../datafusion/src/physical_plan/write.rs | 90 ++++++++++++------- 2 files changed, 66 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 2604eac03d..43854252f8 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -118,6 +118,13 @@ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeou /// Default value for total maximum retry time (ms). pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes +/// Default file format for data files +pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default"; +/// Default file format for delete files +pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; +/// Default value for data file format +pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 95425bcade..b5019ad374 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -17,6 +17,7 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::str::FromStr; use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; @@ -34,13 +35,18 @@ use datafusion::physical_plan::{ }; use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; -use iceberg::spec::{DataFileFormat, FormatVersion, serialize_data_file_to_json}; +use iceberg::spec::{ + DataFileFormat, FormatVersion, PROPERTY_DEFAULT_FILE_FORMAT, + PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, serialize_data_file_to_json, +}; use iceberg::table::Table; -use iceberg::writer::CurrentFileStatus; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; @@ -157,21 +163,50 @@ impl ExecutionPlan for IcebergWriteExec { partition: usize, context: Arc, ) -> DFResult { - let parquet_writer_fut = ParquetWriterBuilder::new( - WriterProperties::default(), - self.table.metadata().current_schema().clone(), - self.table.file_io().clone(), - DefaultLocationGenerator::new(self.table.metadata().clone()) - .map_err(to_datafusion_error)?, - // todo filename prefix/suffix should be configurable - DefaultFileNameGenerator::new( - "datafusion".to_string(), - Some(Uuid::now_v7().to_string()), - DataFileFormat::Parquet, - ), + // todo non-default partition spec? + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let is_version_1 = self.table.metadata().format_version() == FormatVersion::V1; + + // Check data file format + let file_format = DataFileFormat::from_str( + self.table + .metadata() + .properties() + .get(PROPERTY_DEFAULT_FILE_FORMAT) + .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), ) - .build(); + .map_err(to_datafusion_error)?; + if file_format != DataFileFormat::Parquet { + return Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "File format {} is not supported for insert_into yet!", + file_format + ), + ))); + } + // Create data file writer builder + let data_file_writer_builder = DataFileWriterBuilder::new( + ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new( + "datafusion".to_string(), + Some(Uuid::now_v7().to_string()), + file_format, + ), + ), + None, + spec_id, + ); + + // Get input data let data = execute_input_stream( Arc::clone(&self.input), Arc::new( @@ -182,18 +217,16 @@ impl ExecutionPlan for IcebergWriteExec { Arc::clone(&context), )?; - // todo non-default partition spec? - let spec_id = self.table.metadata().default_partition_spec_id(); - let partition_type = self.table.metadata().default_partition_type().clone(); - let is_version_1 = self.table.metadata().format_version() == FormatVersion::V1; - + // Create write stream let stream = futures::stream::once(async move { - let mut writer = parquet_writer_fut.await.map_err(to_datafusion_error)?; + let mut writer = data_file_writer_builder + .build() + .await + .map_err(to_datafusion_error)?; let mut input_stream = data; - while let Some(batch_res) = input_stream.next().await { - let batch = batch_res?; - writer.write(&batch).await.map_err(to_datafusion_error)?; + while let Some(batch) = input_stream.next().await { + writer.write(batch?).await.map_err(to_datafusion_error)?; } let count = writer.current_row_num() as u64; @@ -202,12 +235,7 @@ impl ExecutionPlan for IcebergWriteExec { // Convert builders to data files and then to JSON strings let data_files: Vec = data_file_builders .into_iter() - .map(|mut builder| -> DFResult { - // Build the data file - let data_file = builder.partition_spec_id(spec_id).build().map_err(|e| { - DataFusionError::Execution(format!("Failed to build data file: {}", e)) - })?; - + .map(|data_file| -> DFResult { // Serialize to JSON let json = serialize_data_file_to_json(data_file, &partition_type, is_version_1) From 88afe824bcd7265031bc7b78f6c62e8e0b07172f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 16 Jul 2025 16:32:13 -0700 Subject: [PATCH 13/31] fix partitioning, and fmt ofc --- .../datafusion/src/physical_plan/write.rs | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index b5019ad374..4ec2d1edd0 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -27,11 +27,11 @@ use datafusion::arrow::datatypes::{ use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; -use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_input_stream, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + execute_input_stream, }; use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; @@ -41,10 +41,10 @@ use iceberg::spec::{ }; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; @@ -61,7 +61,7 @@ pub(crate) struct IcebergWriteExec { impl IcebergWriteExec { pub fn new(table: Table, input: Arc, schema: ArrowSchemaRef) -> Self { - let plan_properties = Self::compute_properties(schema.clone()); + let plan_properties = Self::compute_properties(&input, schema.clone()); Self { table, @@ -71,16 +71,15 @@ impl IcebergWriteExec { } } - /// todo: Copied from scan.rs - fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { - // TODO: - // This is more or less a placeholder, to be replaced - // once we support output-partitioning + fn compute_properties( + input: &Arc, + schema: ArrowSchemaRef, + ) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, + input.output_partitioning().clone(), + input.pipeline_behavior(), + input.boundedness(), ) } @@ -101,8 +100,8 @@ impl IcebergWriteExec { fn make_result_schema() -> ArrowSchemaRef { // Define a schema. Arc::new(ArrowSchema::new(vec![ - Field::new("data_files", DataType::Utf8, false), Field::new("count", DataType::UInt64, false), + Field::new("data_files", DataType::Utf8, false), ])) } } From 295e9b671d23d378c4419a354603b06a967419be Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 16 Jul 2025 20:38:57 -0700 Subject: [PATCH 14/31] minor --- crates/integrations/datafusion/src/physical_plan/commit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index dd83cd3975..6969faa7c9 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -69,7 +69,7 @@ impl IcebergCommitExec { } } - /// Compute the plan properties for this execution plan + // Compute the plan properties for this execution plan fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), From 92588f503c4aee5b7baa916ecde293f73f9b58c8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 17 Jul 2025 14:31:53 -0700 Subject: [PATCH 15/31] partitioned shall not pass --- .../integrations/datafusion/src/table/mod.rs | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6bf9ba8cf9..3f390164ad 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -162,7 +162,7 @@ impl TableProvider for IcebergTableProvider { fn supports_filters_pushdown( &self, filters: &[&Expr], - ) -> std::result::Result, datafusion::error::DataFusionError> + ) -> std::result::Result, DataFusionError> { // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) @@ -174,24 +174,31 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { + if !self.table.metadata().default_partition_spec().is_unpartitioned() { + // TODO add insert into support for partitioned tables + return Err(DataFusionError::NotImplemented( + "IcebergTableProvider::insert_into does not support partitioned tables yet".to_string() + )); + } + + let Some(catalog) = self.catalog.clone() else { + return Err(DataFusionError::Execution( + "Catalog cannot be none for insert_into".to_string(), + )); + }; + let write_plan = Arc::new(IcebergWriteExec::new( self.table.clone(), input, self.schema.clone(), )); - if let Some(catalog) = self.catalog.clone() { - Ok(Arc::new(IcebergCommitExec::new( - self.table.clone(), - catalog, - write_plan, - self.schema.clone(), - ))) - } else { - Err(DataFusionError::Execution( - "Catalog cannot be none for insert_into".to_string(), - )) - } + Ok(Arc::new(IcebergCommitExec::new( + self.table.clone(), + catalog, + write_plan, + self.schema.clone(), + ))) } } From 7db94323ab4bd404910f5e8ce06c02dfc77c33b0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 17 Jul 2025 15:12:55 -0700 Subject: [PATCH 16/31] implement children and with_new_children for write node, fix fmt --- .../datafusion/src/physical_plan/write.rs | 22 ++++++++++++++----- .../integrations/datafusion/src/table/mod.rs | 13 +++++++---- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4ec2d1edd0..4762a5d747 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -27,7 +27,7 @@ use datafusion::arrow::datatypes::{ use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, @@ -61,7 +61,7 @@ pub(crate) struct IcebergWriteExec { impl IcebergWriteExec { pub fn new(table: Table, input: Arc, schema: ArrowSchemaRef) -> Self { - let plan_properties = Self::compute_properties(&input, schema.clone()); + let plan_properties = Self::compute_properties(&input, schema); Self { table, @@ -77,7 +77,7 @@ impl IcebergWriteExec { ) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), - input.output_partitioning().clone(), + Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()), input.pipeline_behavior(), input.boundedness(), ) @@ -147,14 +147,24 @@ impl ExecutionPlan for IcebergWriteExec { } fn children(&self) -> Vec<&Arc> { - vec![] + vec![&self.input] } fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> DFResult> { - Ok(self) + if children.len() != 1 { + return Err(DataFusionError::Internal( + "IcebergWriteExec expects exactly one child".to_string(), + )); + } + + Ok(Arc::new(Self::new( + self.table.clone(), + Arc::clone(&children[0]), + self.schema(), + ))) } fn execute( diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 3f390164ad..9f629c38c4 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -162,8 +162,7 @@ impl TableProvider for IcebergTableProvider { fn supports_filters_pushdown( &self, filters: &[&Expr], - ) -> std::result::Result, DataFusionError> - { + ) -> std::result::Result, DataFusionError> { // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } @@ -174,10 +173,16 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - if !self.table.metadata().default_partition_spec().is_unpartitioned() { + if !self + .table + .metadata() + .default_partition_spec() + .is_unpartitioned() + { // TODO add insert into support for partitioned tables return Err(DataFusionError::NotImplemented( - "IcebergTableProvider::insert_into does not support partitioned tables yet".to_string() + "IcebergTableProvider::insert_into does not support partitioned tables yet" + .to_string(), )); } From 8c780460b9d656e71f860bde931dc838ba9a0ccc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 17 Jul 2025 15:53:48 -0700 Subject: [PATCH 17/31] get row counts from data files directly --- .../datafusion/src/physical_plan/commit.rs | 25 +++------------ .../datafusion/src/physical_plan/write.rs | 31 ++++++++----------- 2 files changed, 18 insertions(+), 38 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 6969faa7c9..5405ca7339 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -187,7 +187,7 @@ impl ExecutionPlan for IcebergCommitExec { // Process the input streams from all partitions and commit the data files let stream = futures::stream::once(async move { let mut data_files: Vec = Vec::new(); - let mut total_count: u64 = 0; + let mut total_record_count: u64 = 0; // Execute and collect results from all partitions of the input plan let batches = execute_stream_partitioned(input_plan, context)?; @@ -197,21 +197,6 @@ impl ExecutionPlan for IcebergCommitExec { while let Some(batch_result) = batch_stream.as_mut().next().await { let batch = batch_result?; - let count_array = batch - .column_by_name("count") - .ok_or_else(|| { - DataFusionError::Internal( - "Expected 'count' column in input batch".to_string(), - ) - })? - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal( - "Expected 'count' column to be UInt64Array".to_string(), - ) - })?; - let files_array = batch .column_by_name("data_files") .ok_or_else(|| { @@ -230,9 +215,6 @@ impl ExecutionPlan for IcebergCommitExec { // todo remove log println!("files_array to deserialize: {:?}", files_array); - // Sum all values in the count_array - total_count += count_array.iter().flatten().sum::(); - // Deserialize all data files from the StringArray let batch_files: Vec = files_array .into_iter() @@ -249,6 +231,9 @@ impl ExecutionPlan for IcebergCommitExec { }) .collect::>()?; + // add record_counts from the current batch to total record count + total_record_count += batch_files.iter().map(|f| f.record_count()).sum::(); + // Add all deserialized files to our collection data_files.extend(batch_files); } @@ -272,7 +257,7 @@ impl ExecutionPlan for IcebergCommitExec { // .await // .map_err(to_datafusion_error)?; - Self::make_count_batch(total_count) + Self::make_count_batch(total_record_count) }) .boxed(); diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4762a5d747..4aba266128 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter}; use std::str::FromStr; use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray, UInt64Array}; +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray}; use datafusion::arrow::datatypes::{ DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; @@ -45,7 +45,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; @@ -83,26 +83,22 @@ impl IcebergWriteExec { ) } - // Create a record batch with count and serialized data files - fn make_result_batch(count: u64, data_files: Vec) -> DFResult { - let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + // Create a record batch with serialized data files + fn make_result_batch(data_files: Vec) -> DFResult { let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; - RecordBatch::try_from_iter_with_nullable(vec![ - ("count", count_array, false), - ("data_files", files_array, false), - ]) - .map_err(|e| { - DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) - }) + RecordBatch::try_from_iter_with_nullable(vec![("data_files", files_array, false)]).map_err( + |e| DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())), + ) } fn make_result_schema() -> ArrowSchemaRef { // Define a schema. - Arc::new(ArrowSchema::new(vec![ - Field::new("count", DataType::UInt64, false), - Field::new("data_files", DataType::Utf8, false), - ])) + Arc::new(ArrowSchema::new(vec![Field::new( + "data_files", + DataType::Utf8, + false, + )])) } } @@ -238,7 +234,6 @@ impl ExecutionPlan for IcebergWriteExec { writer.write(batch?).await.map_err(to_datafusion_error)?; } - let count = writer.current_row_num() as u64; let data_file_builders = writer.close().await.map_err(to_datafusion_error)?; // Convert builders to data files and then to JSON strings @@ -255,7 +250,7 @@ impl ExecutionPlan for IcebergWriteExec { }) .collect::>>()?; - Self::make_result_batch(count, data_files) + Self::make_result_batch(data_files) }) .boxed(); From 724ec7d16422622a15a2c7a6f3f4878a974afecd Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 10:02:58 -0700 Subject: [PATCH 18/31] Update crates/integrations/datafusion/src/physical_plan/write.rs Co-authored-by: Renjie Liu --- crates/integrations/datafusion/src/physical_plan/write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4aba266128..e2e9124429 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -152,7 +152,7 @@ impl ExecutionPlan for IcebergWriteExec { ) -> DFResult> { if children.len() != 1 { return Err(DataFusionError::Internal( - "IcebergWriteExec expects exactly one child".to_string(), + "IcebergWriteExec expects exactly one child, but provided {} ".to_string(), )); } From 2f5616970bdd3ac74e7fba39c233f0f639442e0f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 10:03:35 -0700 Subject: [PATCH 19/31] Update crates/integrations/datafusion/src/physical_plan/commit.rs Co-authored-by: Renjie Liu --- crates/integrations/datafusion/src/physical_plan/commit.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 5405ca7339..e78cf079ad 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -148,7 +148,7 @@ impl ExecutionPlan for IcebergCommitExec { ) -> DFResult> { if children.len() != 1 { return Err(DataFusionError::Internal( - "IcebergCommitExec expects exactly one child".to_string(), + "IcebergCommitExec expects exactly one child, but provided {children.len()}".to_string(), )); } From 53b8b828e03b3a36543494e5e911b6b388f4fb1a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 11:34:14 -0700 Subject: [PATCH 20/31] fix fmt, input boundedness --- crates/iceberg/src/arrow/nan_val_cnt_visitor.rs | 1 + crates/iceberg/src/arrow/value.rs | 1 + crates/integrations/datafusion/src/physical_plan/commit.rs | 5 +++-- crates/integrations/datafusion/src/physical_plan/write.rs | 5 +++-- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 0a4bda28f9..b1bc83514d 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -159,6 +159,7 @@ impl NanValueCountVisitor { let arrow_arr_partner_accessor = ArrowArrayAccessor {}; let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; + // todo remove these log lines println!("----StructArray from record stream: {:?}", struct_arr); println!("----Schema.as_struct from table: {:?}", schema.as_struct()); visit_struct_with_partner( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 29fb34ff8b..88f7f59ef4 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -457,6 +457,7 @@ impl PartnerAccessor for ArrowArrayAccessor { ) })?; + // todo remove unneeded log lines println!( "!!!Accessor struct array from struct partner: {:?}", struct_array diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index e78cf079ad..85e7a47f59 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -74,7 +74,7 @@ impl IcebergCommitExec { PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, + EmissionType::Final, Boundedness::Bounded, ) } @@ -148,7 +148,8 @@ impl ExecutionPlan for IcebergCommitExec { ) -> DFResult> { if children.len() != 1 { return Err(DataFusionError::Internal( - "IcebergCommitExec expects exactly one child, but provided {children.len()}".to_string(), + "IcebergCommitExec expects exactly one child, but provided {children.len()}" + .to_string(), )); } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index e2e9124429..3f869ca755 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -28,6 +28,7 @@ use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, @@ -78,8 +79,8 @@ impl IcebergWriteExec { PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()), - input.pipeline_behavior(), - input.boundedness(), + EmissionType::Final, + Boundedness::Bounded, ) } From d2168f225abb5336c3db0bdd933280e9d356c949 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 11:43:33 -0700 Subject: [PATCH 21/31] make data_files constant --- .../datafusion/src/physical_plan/commit.rs | 3 ++- .../integrations/datafusion/src/physical_plan/mod.rs | 2 ++ .../integrations/datafusion/src/physical_plan/write.rs | 10 ++++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 85e7a47f59..b6cb1c4614 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -37,6 +37,7 @@ use iceberg::spec::{DataFile, deserialize_data_file_from_json}; use iceberg::table::Table; use iceberg::transaction::Transaction; +use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; /// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec @@ -199,7 +200,7 @@ impl ExecutionPlan for IcebergCommitExec { let batch = batch_result?; let files_array = batch - .column_by_name("data_files") + .column_by_name(DATA_FILES_COL_NAME) .ok_or_else(|| { DataFusionError::Internal( "Expected 'data_files' column in input batch".to_string(), diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 1e86225127..19c8f79239 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -20,3 +20,5 @@ pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod scan; pub(crate) mod write; + +pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 3f869ca755..c2503f9ce5 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -51,6 +51,7 @@ use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; +use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; pub(crate) struct IcebergWriteExec { @@ -88,15 +89,16 @@ impl IcebergWriteExec { fn make_result_batch(data_files: Vec) -> DFResult { let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; - RecordBatch::try_from_iter_with_nullable(vec![("data_files", files_array, false)]).map_err( - |e| DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())), - ) + RecordBatch::try_from_iter_with_nullable(vec![(DATA_FILES_COL_NAME, files_array, false)]) + .map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) + }) } fn make_result_schema() -> ArrowSchemaRef { // Define a schema. Arc::new(ArrowSchema::new(vec![Field::new( - "data_files", + DATA_FILES_COL_NAME, DataType::Utf8, false, )])) From 59a3428d143079b6dcbe1bca162d561d45075cac Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 12:05:25 -0700 Subject: [PATCH 22/31] use format version when serde datafiles --- crates/iceberg/src/spec/manifest/_serde.rs | 14 +++++++++----- crates/iceberg/src/spec/manifest/data_file.rs | 8 ++++++-- crates/iceberg/src/spec/manifest/mod.rs | 10 ++++------ .../datafusion/src/physical_plan/write.rs | 4 ++-- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/_serde.rs b/crates/iceberg/src/spec/manifest/_serde.rs index fd7bc2e69a..e9124faf67 100644 --- a/crates/iceberg/src/spec/manifest/_serde.rs +++ b/crates/iceberg/src/spec/manifest/_serde.rs @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use super::{Datum, ManifestEntry, Schema, Struct}; -use crate::spec::{Literal, RawLiteral, StructType, Type}; +use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type}; use crate::{Error, ErrorKind}; #[derive(Serialize, Deserialize)] @@ -40,7 +40,7 @@ impl ManifestEntryV2 { snapshot_id: value.snapshot_id, sequence_number: value.sequence_number, file_sequence_number: value.file_sequence_number, - data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?, }) } @@ -74,7 +74,7 @@ impl ManifestEntryV1 { Ok(Self { status: value.status as i32, snapshot_id: value.snapshot_id.unwrap_or_default(), - data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?, }) } @@ -129,9 +129,13 @@ impl DataFileSerde { pub fn try_from( value: super::DataFile, partition_type: &StructType, - is_version_1: bool, + format_version: FormatVersion, ) -> Result { - let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; + let block_size_in_bytes = if format_version == FormatVersion::V1 { + Some(0) + } else { + None + }; Ok(Self { content: value.content as i32, file_path: value.file_path, diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 1de59a3874..9ea1fcd0a7 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -297,8 +297,12 @@ pub fn write_data_files_to_avro( let mut writer = AvroWriter::new(&avro_schema, writer); for data_file in data_files { - let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)? - .resolve(&avro_schema)?; + let value = to_value(DataFileSerde::try_from( + data_file, + partition_type, + FormatVersion::V1, + )?)? + .resolve(&avro_schema)?; writer.append(value)?; } diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index dd0e288d06..2a29bccd65 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -123,9 +123,9 @@ impl Manifest { pub fn serialize_data_file_to_json( data_file: DataFile, partition_type: &super::StructType, - is_version_1: bool, + format_version: FormatVersion, ) -> Result { - let serde = _serde::DataFileSerde::try_from(data_file, partition_type, is_version_1)?; + let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?; serde_json::to_string(&serde).map_err(|e| { Error::new( ErrorKind::DataInvalid, @@ -1120,9 +1120,6 @@ mod tests { // Get partition type from the partition spec let partition_type = partition_spec.partition_type(&schema).unwrap(); - // Set version flag - let is_version_1 = false; - // Create a vector of DataFile objects let data_files = vec![ DataFileBuilder::default() @@ -1157,7 +1154,8 @@ mod tests { let serialized_files = data_files .into_iter() .map(|f| { - let json = serialize_data_file_to_json(f, &partition_type, is_version_1).unwrap(); + let json = + serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap(); println!("Test serialized data file: {}", json); json }) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index c2503f9ce5..29738dd2ec 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -174,7 +174,7 @@ impl ExecutionPlan for IcebergWriteExec { // todo non-default partition spec? let spec_id = self.table.metadata().default_partition_spec_id(); let partition_type = self.table.metadata().default_partition_type().clone(); - let is_version_1 = self.table.metadata().format_version() == FormatVersion::V1; + let format_version = self.table.metadata().format_version(); // Check data file format let file_format = DataFileFormat::from_str( @@ -245,7 +245,7 @@ impl ExecutionPlan for IcebergWriteExec { .map(|data_file| -> DFResult { // Serialize to JSON let json = - serialize_data_file_to_json(data_file, &partition_type, is_version_1) + serialize_data_file_to_json(data_file, &partition_type, format_version) .map_err(to_datafusion_error)?; println!("Serialized data file: {}", json); // todo remove log From 3b4dc9db62a60c425db9f578739846bc5527a93c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 14:35:24 -0700 Subject: [PATCH 23/31] use try_new instead --- crates/integrations/datafusion/src/physical_plan/write.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 29738dd2ec..3a1a4392c9 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -89,10 +89,9 @@ impl IcebergWriteExec { fn make_result_batch(data_files: Vec) -> DFResult { let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; - RecordBatch::try_from_iter_with_nullable(vec![(DATA_FILES_COL_NAME, files_array, false)]) - .map_err(|e| { - DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) - }) + RecordBatch::try_new(Self::make_result_schema(), vec![files_array]).map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) + }) } fn make_result_schema() -> ArrowSchemaRef { From 2b1c3df19a5771ea0610853f081b595df93a4b86 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 15:19:42 -0700 Subject: [PATCH 24/31] minor --- crates/integrations/datafusion/src/physical_plan/commit.rs | 2 +- crates/integrations/datafusion/src/physical_plan/write.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index b6cb1c4614..db4f807083 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -196,7 +196,7 @@ impl ExecutionPlan for IcebergCommitExec { // Collect all data files from this partition's stream for mut batch_stream in batches { - while let Some(batch_result) = batch_stream.as_mut().next().await { + while let Some(batch_result) = batch_stream.next().await { let batch = batch_result?; let files_array = batch diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 3a1a4392c9..dd93e8e2b8 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -37,7 +37,7 @@ use datafusion::physical_plan::{ use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; use iceberg::spec::{ - DataFileFormat, FormatVersion, PROPERTY_DEFAULT_FILE_FORMAT, + DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, serialize_data_file_to_json, }; use iceberg::table::Table; From db20df1da417b586b11b6980f9881db685fec877 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 16:25:55 -0700 Subject: [PATCH 25/31] coalesce partitions --- .../datafusion/src/physical_plan/commit.rs | 120 ++++++++---------- .../datafusion/src/physical_plan/write.rs | 18 +-- .../integrations/datafusion/src/table/mod.rs | 5 +- 3 files changed, 67 insertions(+), 76 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index db4f807083..cd33eeb4f1 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -29,7 +29,7 @@ use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, execute_stream_partitioned, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; use futures::StreamExt; use iceberg::Catalog; @@ -42,10 +42,11 @@ use crate::to_datafusion_error; /// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec /// instances and using Transaction::fast_append to commit the data files written. +#[derive(Debug)] pub(crate) struct IcebergCommitExec { table: Table, catalog: Arc, - write_plan: Arc, + input: Arc, schema: ArrowSchemaRef, count_schema: ArrowSchemaRef, plan_properties: PlanProperties, @@ -55,7 +56,7 @@ impl IcebergCommitExec { pub fn new( table: Table, catalog: Arc, - write_plan: Arc, + input: Arc, schema: ArrowSchemaRef, ) -> Self { let plan_properties = Self::compute_properties(schema.clone()); @@ -63,7 +64,7 @@ impl IcebergCommitExec { Self { table, catalog, - write_plan, + input, schema, count_schema: Self::make_count_schema(), plan_properties, @@ -99,12 +100,6 @@ impl IcebergCommitExec { } } -impl Debug for IcebergCommitExec { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "IcebergCommitExec") - } -} - impl DisplayAs for IcebergCommitExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { @@ -140,7 +135,7 @@ impl ExecutionPlan for IcebergCommitExec { } fn children(&self) -> Vec<&Arc> { - vec![&self.write_plan] + vec![&self.input] } fn with_new_children( @@ -148,10 +143,10 @@ impl ExecutionPlan for IcebergCommitExec { children: Vec>, ) -> DFResult> { if children.len() != 1 { - return Err(DataFusionError::Internal( - "IcebergCommitExec expects exactly one child, but provided {children.len()}" - .to_string(), - )); + return Err(DataFusionError::Internal(format!( + "IcebergCommitExec expects exactly one child, but provided {}", + children.len() + ))); } Ok(Arc::new(IcebergCommitExec::new( @@ -176,7 +171,7 @@ impl ExecutionPlan for IcebergCommitExec { } let table = self.table.clone(); - let input_plan = self.write_plan.clone(); + let input_plan = self.input.clone(); let count_schema = Arc::clone(&self.count_schema); // todo revisit this @@ -191,54 +186,51 @@ impl ExecutionPlan for IcebergCommitExec { let mut data_files: Vec = Vec::new(); let mut total_record_count: u64 = 0; - // Execute and collect results from all partitions of the input plan - let batches = execute_stream_partitioned(input_plan, context)?; - - // Collect all data files from this partition's stream - for mut batch_stream in batches { - while let Some(batch_result) = batch_stream.next().await { - let batch = batch_result?; - - let files_array = batch - .column_by_name(DATA_FILES_COL_NAME) - .ok_or_else(|| { - DataFusionError::Internal( - "Expected 'data_files' column in input batch".to_string(), - ) - })? - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal( - "Expected 'data_files' column to be StringArray".to_string(), - ) - })?; - - // todo remove log - println!("files_array to deserialize: {:?}", files_array); - - // Deserialize all data files from the StringArray - let batch_files: Vec = files_array - .into_iter() - .flatten() - .map(|f| -> DFResult { - // Parse JSON to DataFileSerde and convert to DataFile - deserialize_data_file_from_json( - f, - spec_id, - &partition_type, - ¤t_schema, - ) - .map_err(to_datafusion_error) - }) - .collect::>()?; - - // add record_counts from the current batch to total record count - total_record_count += batch_files.iter().map(|f| f.record_count()).sum::(); - - // Add all deserialized files to our collection - data_files.extend(batch_files); - } + // Execute and collect results from the input coalesced plan + let mut batch_stream = input_plan.execute(0, context)?; + + while let Some(batch_result) = batch_stream.next().await { + let batch = batch_result?; + + let files_array = batch + .column_by_name(DATA_FILES_COL_NAME) + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column in input batch".to_string(), + ) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column to be StringArray".to_string(), + ) + })?; + + // todo remove log + println!("files_array to deserialize: {:?}", files_array); + + // Deserialize all data files from the StringArray + let batch_files: Vec = files_array + .into_iter() + .flatten() + .map(|f| -> DFResult { + // Parse JSON to DataFileSerde and convert to DataFile + deserialize_data_file_from_json( + f, + spec_id, + &partition_type, + ¤t_schema, + ) + .map_err(to_datafusion_error) + }) + .collect::>()?; + + // add record_counts from the current batch to total record count + total_record_count += batch_files.iter().map(|f| f.record_count()).sum::(); + + // Add all deserialized files to our collection + data_files.extend(batch_files); } // If no data files were collected, return an empty result diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index dd93e8e2b8..9767666ab0 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -37,8 +37,8 @@ use datafusion::physical_plan::{ use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; use iceberg::spec::{ - DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, - PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, serialize_data_file_to_json, + DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, + serialize_data_file_to_json, }; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; @@ -54,6 +54,7 @@ use uuid::Uuid; use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; +#[derive(Debug)] pub(crate) struct IcebergWriteExec { table: Table, input: Arc, @@ -104,12 +105,6 @@ impl IcebergWriteExec { } } -impl Debug for IcebergWriteExec { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "IcebergWriteExec") - } -} - impl DisplayAs for IcebergWriteExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { @@ -153,9 +148,10 @@ impl ExecutionPlan for IcebergWriteExec { children: Vec>, ) -> DFResult> { if children.len() != 1 { - return Err(DataFusionError::Internal( - "IcebergWriteExec expects exactly one child, but provided {} ".to_string(), - )); + return Err(DataFusionError::Internal(format!( + "IcebergWriteExec expects exactly one child, but provided {}", + children.len() + ))); } Ok(Arc::new(Self::new( diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 9f629c38c4..34847c4985 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -30,6 +30,7 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; use iceberg::table::Table; @@ -198,10 +199,12 @@ impl TableProvider for IcebergTableProvider { self.schema.clone(), )); + let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); + Ok(Arc::new(IcebergCommitExec::new( self.table.clone(), catalog, - write_plan, + coalesce_partitions, self.schema.clone(), ))) } From e56ab4ede9261a2cec855fd63ed9ba6008530772 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 16:31:43 -0700 Subject: [PATCH 26/31] minor --- crates/integrations/datafusion/src/physical_plan/commit.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index cd33eeb4f1..82a45bb19d 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -29,7 +29,7 @@ use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; use futures::StreamExt; use iceberg::Catalog; @@ -40,8 +40,8 @@ use iceberg::transaction::Transaction; use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; -/// IcebergCommitExec is responsible for collecting results from multiple IcebergWriteExec -/// instances and using Transaction::fast_append to commit the data files written. +/// IcebergCommitExec is responsible for collecting the files written and use +/// [`Transaction::fast_append`] to commit the data files written. #[derive(Debug)] pub(crate) struct IcebergCommitExec { table: Table, From 04a44b3abb53c1d452adef5cf3514275357f293c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 16:50:16 -0700 Subject: [PATCH 27/31] fmt --- crates/integrations/datafusion/src/physical_plan/commit.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 82a45bb19d..f186d8e816 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -28,9 +28,7 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, -}; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::StreamExt; use iceberg::Catalog; use iceberg::spec::{DataFile, deserialize_data_file_from_json}; From c5b1c38f1a9132219ad41029d132b9d6a92e51c6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 18:20:54 -0700 Subject: [PATCH 28/31] rolling --- crates/iceberg/src/writer/base_writer/mod.rs | 1 + .../src/writer/base_writer/rolling_writer.rs | 112 ++++++++++++++++++ .../datafusion/src/physical_plan/write.rs | 35 +++--- 3 files changed, 132 insertions(+), 16 deletions(-) create mode 100644 crates/iceberg/src/writer/base_writer/rolling_writer.rs diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..f156fba2a9 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs new file mode 100644 index 0000000000..b8a8fd64d9 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::mem::take; + +use arrow_array::RecordBatch; +use async_trait::async_trait; + +use crate::spec::DataFile; +use crate::writer::base_writer::data_file_writer::DataFileWriter; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +#[async_trait] +pub trait RollingFileWriter: IcebergWriter { + fn should_roll(&mut self, input_size: u64) -> bool; +} + +#[derive(Clone)] +pub struct RollingDataFileWriterBuilder { + inner_builder: B, + target_size: u64, +} + +impl RollingDataFileWriterBuilder { + pub fn new(inner_builder: B, target_size: u64) -> Self { + Self { + inner_builder, + target_size, + } + } +} + +#[async_trait] +impl IcebergWriterBuilder for RollingDataFileWriterBuilder { + type R = RollingDataFileWriter; + + async fn build(self) -> Result { + Ok(RollingDataFileWriter { + inner: None, + inner_builder: self.inner_builder, + target_size: self.target_size, + written_size: 0, + data_files: vec![], + }) + } +} + +pub struct RollingDataFileWriter { + inner: Option>, + inner_builder: B, + target_size: u64, + written_size: u64, + data_files: Vec, +} + +#[async_trait] +impl IcebergWriter for RollingDataFileWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let input_size = input.get_array_memory_size() as u64; + if self.should_roll(input_size) { + if let Some(mut inner) = self.inner.take() { + // close the current writer, roll to a new file + self.data_files.extend(inner.close().await?); + } + + // clear bytes written + self.written_size = 0; + } + + if self.inner.is_none() { + // start a new writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + + // write the input and count bytes written + let Some(writer) = self.inner.as_mut() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )); + }; + writer.write(input).await?; + self.written_size += input_size; + Ok(()) + } + + async fn close(&mut self) -> Result> { + Ok(take(&mut self.data_files)) + } +} + +impl RollingFileWriter for RollingDataFileWriter { + fn should_roll(&mut self, input_size: u64) -> bool { + self.written_size + input_size > self.target_size + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 9767666ab0..a3a4870a26 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -42,6 +42,7 @@ use iceberg::spec::{ }; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::base_writer::rolling_writer::RollingDataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, @@ -191,22 +192,24 @@ impl ExecutionPlan for IcebergWriteExec { } // Create data file writer builder - let data_file_writer_builder = DataFileWriterBuilder::new( - ParquetWriterBuilder::new( - WriterProperties::default(), - self.table.metadata().current_schema().clone(), - self.table.file_io().clone(), - DefaultLocationGenerator::new(self.table.metadata().clone()) - .map_err(to_datafusion_error)?, - // todo filename prefix/suffix should be configurable - DefaultFileNameGenerator::new( - "datafusion".to_string(), - Some(Uuid::now_v7().to_string()), - file_format, - ), + let parquet_file_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new( + "datafusion".to_string(), + Some(Uuid::now_v7().to_string()), + file_format, ), - None, - spec_id, + ); + let data_file_writer_builder = + DataFileWriterBuilder::new(parquet_file_writer_builder, None, spec_id); + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 100 * 1024 * 1024, // todo use a config ); // Get input data @@ -222,7 +225,7 @@ impl ExecutionPlan for IcebergWriteExec { // Create write stream let stream = futures::stream::once(async move { - let mut writer = data_file_writer_builder + let mut writer = rolling_writer_builder .build() .await .map_err(to_datafusion_error)?; From 0f9bce0ff22d35d5a04ebb9392f21e4b9872cbae Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:39:03 -0700 Subject: [PATCH 29/31] rolling in the deep --- crates/iceberg/src/writer/base_writer/mod.rs | 1 + .../src/writer/base_writer/rolling_writer.rs | 65 +++++++++++++++---- .../datafusion/src/physical_plan/write.rs | 6 +- 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index f156fba2a9..80770b75ea 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,4 +19,5 @@ pub mod data_file_writer; pub mod equality_delete_writer; +/// Module providing writers that can automatically roll over to new files based on size thresholds. pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index b8a8fd64d9..31b40d5252 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -19,25 +19,46 @@ use std::mem::take; use arrow_array::RecordBatch; use async_trait::async_trait; +use futures::future::try_join_all; +use crate::runtime::{JoinHandle, spawn}; use crate::spec::DataFile; -use crate::writer::base_writer::data_file_writer::DataFileWriter; -use crate::writer::file_writer::FileWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; +/// A writer that can roll over to a new file when certain conditions are met. +/// +/// This trait extends `IcebergWriter` with the ability to determine when to start +/// writing to a new file based on the size of incoming data. #[async_trait] pub trait RollingFileWriter: IcebergWriter { + /// Determines if the writer should roll over to a new file. + /// + /// # Arguments + /// + /// * `input_size` - The size in bytes of the incoming data + /// + /// # Returns + /// + /// `true` if a new file should be started, `false` otherwise fn should_roll(&mut self, input_size: u64) -> bool; } +/// Builder for creating a `RollingDataFileWriter` that rolls over to a new file +/// when the data size exceeds a target threshold. #[derive(Clone)] -pub struct RollingDataFileWriterBuilder { +pub struct RollingDataFileWriterBuilder { inner_builder: B, target_size: u64, } -impl RollingDataFileWriterBuilder { +impl RollingDataFileWriterBuilder { + /// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size. + /// + /// # Arguments + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `target_size` - The target size in bytes before rolling over to a new file pub fn new(inner_builder: B, target_size: u64) -> Self { Self { inner_builder, @@ -47,7 +68,7 @@ impl RollingDataFileWriterBuilder { } #[async_trait] -impl IcebergWriterBuilder for RollingDataFileWriterBuilder { +impl IcebergWriterBuilder for RollingDataFileWriterBuilder { type R = RollingDataFileWriter; async fn build(self) -> Result { @@ -56,27 +77,34 @@ impl IcebergWriterBuilder for RollingDataFileWriterBuilder inner_builder: self.inner_builder, target_size: self.target_size, written_size: 0, - data_files: vec![], + close_handles: vec![], }) } } -pub struct RollingDataFileWriter { - inner: Option>, +/// A writer that automatically rolls over to a new file when the data size +/// exceeds a target threshold. +/// +/// This writer wraps another file writer and tracks the amount of data written. +/// When the data size exceeds the target size, it closes the current file and +/// starts writing to a new one. +pub struct RollingDataFileWriter { + inner: Option, inner_builder: B, target_size: u64, written_size: u64, - data_files: Vec, + close_handles: Vec>>>, } #[async_trait] -impl IcebergWriter for RollingDataFileWriter { +impl IcebergWriter for RollingDataFileWriter { async fn write(&mut self, input: RecordBatch) -> Result<()> { let input_size = input.get_array_memory_size() as u64; if self.should_roll(input_size) { if let Some(mut inner) = self.inner.take() { // close the current writer, roll to a new file - self.data_files.extend(inner.close().await?); + let handle = spawn(async move { inner.close().await }); + self.close_handles.push(handle) } // clear bytes written @@ -101,11 +129,22 @@ impl IcebergWriter for RollingDataFileWriter { } async fn close(&mut self) -> Result> { - Ok(take(&mut self.data_files)) + let mut data_files = try_join_all(take(&mut self.close_handles)) + .await? + .into_iter() + .flatten() + .collect::>(); + + // close the current writer and merge the output + if let Some(mut current_writer) = take(&mut self.inner) { + data_files.extend(current_writer.close().await?); + } + + Ok(data_files) } } -impl RollingFileWriter for RollingDataFileWriter { +impl RollingFileWriter for RollingDataFileWriter { fn should_roll(&mut self, input_size: u64) -> bool { self.written_size + input_size > self.target_size } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index a3a4870a26..0f198bde49 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -235,10 +235,10 @@ impl ExecutionPlan for IcebergWriteExec { writer.write(batch?).await.map_err(to_datafusion_error)?; } - let data_file_builders = writer.close().await.map_err(to_datafusion_error)?; + let data_files = writer.close().await.map_err(to_datafusion_error)?; // Convert builders to data files and then to JSON strings - let data_files: Vec = data_file_builders + let data_files_strs: Vec = data_files .into_iter() .map(|data_file| -> DFResult { // Serialize to JSON @@ -251,7 +251,7 @@ impl ExecutionPlan for IcebergWriteExec { }) .collect::>>()?; - Self::make_result_batch(data_files) + Self::make_result_batch(data_files_strs) }) .boxed(); From d8f05cfa001731c6a82c41356b03526be1b9215b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:48:46 -0700 Subject: [PATCH 30/31] rolls the unit tests --- .../src/writer/base_writer/rolling_writer.rs | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index 31b40d5252..cdb4a6531c 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -149,3 +149,192 @@ impl RollingFileWriter for RollingDataFileWriter { self.written_size + input_size > self.target_size } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::tests::check_parquet_data_file; + use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + + #[tokio::test] + async fn test_rolling_writer_basic() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a large target size so no rolling occurs + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 1024 * 1024, // 1MB, large enough to not trigger rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + // Write data + writer.write(batch.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify only one file was created + assert_eq!( + data_files.len(), + 1, + "Expected only one data file to be created" + ); + + // Verify file content + check_parquet_data_file(&file_io, &data_files[0], &batch).await; + + Ok(()) + } + + #[tokio::test] + async fn test_rolling_writer_with_rolling() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a very small target size to trigger rolling + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 100, // Very small target size to ensure rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]); + + // Create multiple batches to trigger rolling + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(StringArray::from(vec!["Dave", "Eve", "Frank"])), + ])?; + + let batch3 = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(StringArray::from(vec!["Grace", "Heidi", "Ivan"])), + ])?; + + // Write data + writer.write(batch1.clone()).await?; + writer.write(batch2.clone()).await?; + writer.write(batch3.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify multiple files were created (at least 2) + assert!( + data_files.len() > 1, + "Expected multiple data files to be created, got {}", + data_files.len() + ); + + // Verify total record count across all files + let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); + assert_eq!( + total_records, 9, + "Expected 9 total records across all files" + ); + + // Verify each file has the correct content + // Note: We can't easily verify which records went to which file without more complex logic, + // but we can verify the total count and that each file has valid content + + Ok(()) + } +} From 1ea4a0f0b2357511b4689de6596ca49816d4ab3a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 21 Jul 2025 21:55:51 -0700 Subject: [PATCH 31/31] could have it all for tests --- .../src/writer/base_writer/rolling_writer.rs | 69 +++++++++---------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs index cdb4a6531c..31d7226422 100644 --- a/crates/iceberg/src/writer/base_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -171,23 +171,40 @@ mod tests { use crate::writer::tests::check_parquet_data_file; use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + fn make_test_schema() -> Result { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + } + + fn make_test_arrow_schema() -> ArrowSchema { + ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]) + } + #[tokio::test] async fn test_rolling_writer_basic() -> Result<()> { - let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; let location_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; + let schema = make_test_schema()?; // Create writer builders let parquet_writer_builder = ParquetWriterBuilder::new( @@ -209,16 +226,7 @@ mod tests { let mut writer = rolling_writer_builder.build().await?; // Create test data - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 1.to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 2.to_string(), - )])), - ]); + let arrow_schema = make_test_arrow_schema(); let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), @@ -246,21 +254,15 @@ mod tests { #[tokio::test] async fn test_rolling_writer_with_rolling() -> Result<()> { - let temp_dir = TempDir::new().unwrap(); - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; let location_gen = MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; + let schema = make_test_schema()?; // Create writer builders let parquet_writer_builder = ParquetWriterBuilder::new( @@ -282,16 +284,7 @@ mod tests { let mut writer = rolling_writer_builder.build().await?; // Create test data - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 1.to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - 2.to_string(), - )])), - ]); + let arrow_schema = make_test_arrow_schema(); // Create multiple batches to trigger rolling let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![