diff --git a/Cargo.lock b/Cargo.lock index c3e581ccb0..15de0463a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3677,6 +3677,7 @@ dependencies = [ "parquet", "tempfile", "tokio", + "uuid", ] [[package]] diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index 6b75c011cb..b1bc83514d 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -159,6 +159,9 @@ impl NanValueCountVisitor { let arrow_arr_partner_accessor = ArrowArrayAccessor {}; let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef; + // todo remove these log lines + println!("----StructArray from record stream: {:?}", struct_arr); + println!("----Schema.as_struct from table: {:?}", schema.as_struct()); visit_struct_with_partner( schema.as_struct(), &struct_arr, diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index f8fd380dd0..88f7f59ef4 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -440,10 +440,12 @@ impl PartnerAccessor for ArrowArrayAccessor { Ok(schema_partner) } + // todo generate field_pos in datafusion instead of passing to here fn field_partner<'a>( &self, struct_partner: &'a ArrayRef, field: &NestedField, + field_pos: Option, ) -> Result<&'a ArrayRef> { let struct_array = struct_partner .as_any() @@ -455,6 +457,14 @@ impl PartnerAccessor for ArrowArrayAccessor { ) })?; + // todo remove unneeded log lines + println!( + "!!!Accessor struct array from struct partner: {:?}", + struct_array + ); + + println!("!!!field: {:?}", field); + let field_pos = struct_array .fields() .iter() @@ -463,12 +473,12 @@ impl PartnerAccessor for ArrowArrayAccessor { .map(|id| id == field.id) .unwrap_or(false) }) - .ok_or_else(|| { + .unwrap_or(field_pos.ok_or_else(|| { Error::new( ErrorKind::DataInvalid, format!("Field id {} not found in struct array", field.id), ) - })?; + })?); Ok(struct_array.column(field_pos)) } diff --git a/crates/iceberg/src/spec/manifest/_serde.rs b/crates/iceberg/src/spec/manifest/_serde.rs index fd7bc2e69a..e9124faf67 100644 --- a/crates/iceberg/src/spec/manifest/_serde.rs +++ b/crates/iceberg/src/spec/manifest/_serde.rs @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_with::serde_as; use super::{Datum, ManifestEntry, Schema, Struct}; -use crate::spec::{Literal, RawLiteral, StructType, Type}; +use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type}; use crate::{Error, ErrorKind}; #[derive(Serialize, Deserialize)] @@ -40,7 +40,7 @@ impl ManifestEntryV2 { snapshot_id: value.snapshot_id, sequence_number: value.sequence_number, file_sequence_number: value.file_sequence_number, - data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?, }) } @@ -74,7 +74,7 @@ impl ManifestEntryV1 { Ok(Self { status: value.status as i32, snapshot_id: value.snapshot_id.unwrap_or_default(), - data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?, + data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?, }) } @@ -129,9 +129,13 @@ impl DataFileSerde { pub fn try_from( value: super::DataFile, partition_type: &StructType, - is_version_1: bool, + format_version: FormatVersion, ) -> Result { - let block_size_in_bytes = if is_version_1 { Some(0) } else { None }; + let block_size_in_bytes = if format_version == FormatVersion::V1 { + Some(0) + } else { + None + }; Ok(Self { content: value.content as i32, file_path: value.file_path, diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 1de59a3874..9ea1fcd0a7 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -297,8 +297,12 @@ pub fn write_data_files_to_avro( let mut writer = AvroWriter::new(&avro_schema, writer); for data_file in data_files { - let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)? - .resolve(&avro_schema)?; + let value = to_value(DataFileSerde::try_from( + data_file, + partition_type, + FormatVersion::V1, + )?)? + .resolve(&avro_schema)?; writer.append(value)?; } diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index 33b7d38706..2a29bccd65 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -33,7 +33,7 @@ use super::{ Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, Schema, Struct, UNASSIGNED_SEQUENCE_NUMBER, }; -use crate::error::Result; +use crate::error::{Error, ErrorKind, Result}; /// A manifest contains metadata and a list of entries. #[derive(Debug, PartialEq, Eq, Clone)] @@ -119,12 +119,45 @@ impl Manifest { } } +/// Serialize a DataFile to a JSON string. +pub fn serialize_data_file_to_json( + data_file: DataFile, + partition_type: &super::StructType, + format_version: FormatVersion, +) -> Result { + let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?; + serde_json::to_string(&serde).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to serialize DataFile to JSON: {}", e), + ) + }) +} + +/// Deserialize a DataFile from a JSON string. +pub fn deserialize_data_file_from_json( + json: &str, + partition_spec_id: i32, + partition_type: &super::StructType, + schema: &Schema, +) -> Result { + let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to deserialize JSON to DataFile: {}", e), + ) + })?; + + serde.try_into(partition_spec_id, partition_type, schema) +} + #[cfg(test)] mod tests { use std::collections::HashMap; use std::fs; use std::sync::Arc; + use arrow_array::StringArray; use tempfile::TempDir; use super::*; @@ -1056,4 +1089,120 @@ mod tests { assert!(!partitions[2].clone().contains_null); assert_eq!(partitions[2].clone().contains_nan, Some(false)); } + + #[test] + fn test_data_file_serialization() { + // Create a simple schema + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_fields(vec![ + crate::spec::NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)) + .into(), + crate::spec::NestedField::required( + 2, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + // Create a partition spec + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("id", "id_partition", crate::spec::Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Get partition type from the partition spec + let partition_type = partition_spec.partition_type(&schema).unwrap(); + + // Create a vector of DataFile objects + let data_files = vec![ + DataFileBuilder::default() + .content(crate::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file1.parquet".to_string()) + .file_size_in_bytes(1024) + .record_count(100) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 512), (2, 512)])) + .value_counts(HashMap::from([(1, 100), (2, 100)])) + .null_value_counts(HashMap::from([(1, 0), (2, 0)])) + .build() + .unwrap(), + DataFileBuilder::default() + .content(crate::spec::DataContentType::Data) + .file_format(DataFileFormat::Parquet) + .file_path("path/to/file2.parquet".to_string()) + .file_size_in_bytes(2048) + .record_count(200) + .partition_spec_id(1) + .partition(Struct::empty()) + .column_sizes(HashMap::from([(1, 1024), (2, 1024)])) + .value_counts(HashMap::from([(1, 200), (2, 200)])) + .null_value_counts(HashMap::from([(1, 10), (2, 5)])) + .build() + .unwrap(), + ]; + + // Serialize the DataFile objects + let serialized_files = data_files + .into_iter() + .map(|f| { + let json = + serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap(); + println!("Test serialized data file: {}", json); + json + }) + .collect::>(); + + // Verify we have the expected number of serialized files + assert_eq!(serialized_files.len(), 2); + + // Verify each serialized file contains expected data + for json in &serialized_files { + assert!(json.contains("path/to/file")); + assert!(json.contains("parquet")); + assert!(json.contains("record_count")); + assert!(json.contains("file_size_in_bytes")); + } + + // Convert Vec to StringArray and print it + let string_array = StringArray::from(serialized_files.clone()); + println!("StringArray: {:?}", string_array); + + // Now deserialize the JSON strings back into DataFile objects + println!("\nDeserializing back to DataFile objects:"); + let deserialized_files: Vec = serialized_files + .into_iter() + .map(|json| { + let data_file = deserialize_data_file_from_json( + &json, + partition_spec.spec_id(), + &partition_type, + &schema, + ) + .unwrap(); + + println!("Deserialized DataFile: {:?}", data_file); + data_file + }) + .collect(); + + // Verify we have the expected number of deserialized files + assert_eq!(deserialized_files.len(), 2); + + // Verify the deserialized files have the expected properties + for file in &deserialized_files { + assert_eq!(file.content_type(), crate::spec::DataContentType::Data); + assert_eq!(file.file_format(), DataFileFormat::Parquet); + assert!(file.file_path().contains("path/to/file")); + assert!(file.record_count() == 100 || file.record_count() == 200); + } + } } diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index ebb9b86bba..faad728a2a 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -192,7 +192,12 @@ pub trait PartnerAccessor

{ /// Get the struct partner from schema partner. fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>; /// Get the field partner from struct partner. - fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>; + fn field_partner<'a>( + &self, + struct_partner: &'a P, + field: &NestedField, + field_pos: Option, + ) -> Result<&'a P>; /// Get the list element partner from list partner. fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>; /// Get the map key partner from map partner. @@ -253,8 +258,8 @@ pub fn visit_struct_with_partner, A: PartnerAc accessor: &A, ) -> Result { let mut results = Vec::with_capacity(s.fields().len()); - for field in s.fields() { - let field_partner = accessor.field_partner(partner, field)?; + for (pos, field) in s.fields().iter().enumerate() { + let field_partner = accessor.field_partner(partner, field, Some(pos))?; visitor.before_struct_field(field, field_partner)?; let result = visit_type_with_partner(&field.field_type, field_partner, visitor, accessor)?; visitor.after_struct_field(field, field_partner)?; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 0f0854f7fc..eed43b0530 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -119,6 +119,13 @@ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeou /// Default value for total maximum retry time (ms). pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes +/// Default file format for data files +pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default"; +/// Default file format for delete files +pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; +/// Default value for data file format +pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..80770b75ea 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,5 @@ pub mod data_file_writer; pub mod equality_delete_writer; +/// Module providing writers that can automatically roll over to new files based on size thresholds. +pub mod rolling_writer; diff --git a/crates/iceberg/src/writer/base_writer/rolling_writer.rs b/crates/iceberg/src/writer/base_writer/rolling_writer.rs new file mode 100644 index 0000000000..31d7226422 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/rolling_writer.rs @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::mem::take; + +use arrow_array::RecordBatch; +use async_trait::async_trait; +use futures::future::try_join_all; + +use crate::runtime::{JoinHandle, spawn}; +use crate::spec::DataFile; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// A writer that can roll over to a new file when certain conditions are met. +/// +/// This trait extends `IcebergWriter` with the ability to determine when to start +/// writing to a new file based on the size of incoming data. +#[async_trait] +pub trait RollingFileWriter: IcebergWriter { + /// Determines if the writer should roll over to a new file. + /// + /// # Arguments + /// + /// * `input_size` - The size in bytes of the incoming data + /// + /// # Returns + /// + /// `true` if a new file should be started, `false` otherwise + fn should_roll(&mut self, input_size: u64) -> bool; +} + +/// Builder for creating a `RollingDataFileWriter` that rolls over to a new file +/// when the data size exceeds a target threshold. +#[derive(Clone)] +pub struct RollingDataFileWriterBuilder { + inner_builder: B, + target_size: u64, +} + +impl RollingDataFileWriterBuilder { + /// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size. + /// + /// # Arguments + /// + /// * `inner_builder` - The builder for the underlying file writer + /// * `target_size` - The target size in bytes before rolling over to a new file + pub fn new(inner_builder: B, target_size: u64) -> Self { + Self { + inner_builder, + target_size, + } + } +} + +#[async_trait] +impl IcebergWriterBuilder for RollingDataFileWriterBuilder { + type R = RollingDataFileWriter; + + async fn build(self) -> Result { + Ok(RollingDataFileWriter { + inner: None, + inner_builder: self.inner_builder, + target_size: self.target_size, + written_size: 0, + close_handles: vec![], + }) + } +} + +/// A writer that automatically rolls over to a new file when the data size +/// exceeds a target threshold. +/// +/// This writer wraps another file writer and tracks the amount of data written. +/// When the data size exceeds the target size, it closes the current file and +/// starts writing to a new one. +pub struct RollingDataFileWriter { + inner: Option, + inner_builder: B, + target_size: u64, + written_size: u64, + close_handles: Vec>>>, +} + +#[async_trait] +impl IcebergWriter for RollingDataFileWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let input_size = input.get_array_memory_size() as u64; + if self.should_roll(input_size) { + if let Some(mut inner) = self.inner.take() { + // close the current writer, roll to a new file + let handle = spawn(async move { inner.close().await }); + self.close_handles.push(handle) + } + + // clear bytes written + self.written_size = 0; + } + + if self.inner.is_none() { + // start a new writer + self.inner = Some(self.inner_builder.clone().build().await?); + } + + // write the input and count bytes written + let Some(writer) = self.inner.as_mut() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Writer is not initialized!", + )); + }; + writer.write(input).await?; + self.written_size += input_size; + Ok(()) + } + + async fn close(&mut self) -> Result> { + let mut data_files = try_join_all(take(&mut self.close_handles)) + .await? + .into_iter() + .flatten() + .collect::>(); + + // close the current writer and merge the output + if let Some(mut current_writer) = take(&mut self.inner) { + data_files.extend(current_writer.close().await?); + } + + Ok(data_files) + } +} + +impl RollingFileWriter for RollingDataFileWriter { + fn should_roll(&mut self, input_size: u64) -> bool { + self.written_size + input_size > self.target_size + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::tests::check_parquet_data_file; + use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch}; + + fn make_test_schema() -> Result { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + } + + fn make_test_arrow_schema() -> ArrowSchema { + ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 1.to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + 2.to_string(), + )])), + ]) + } + + #[tokio::test] + async fn test_rolling_writer_basic() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = make_test_schema()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a large target size so no rolling occurs + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 1024 * 1024, // 1MB, large enough to not trigger rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = make_test_arrow_schema(); + + let batch = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + // Write data + writer.write(batch.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify only one file was created + assert_eq!( + data_files.len(), + 1, + "Expected only one data file to be created" + ); + + // Verify file content + check_parquet_data_file(&file_io, &data_files[0], &batch).await; + + Ok(()) + } + + #[tokio::test] + async fn test_rolling_writer_with_rolling() -> Result<()> { + let temp_dir = TempDir::new()?; + let file_io = FileIOBuilder::new_fs_io().build()?; + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Create schema + let schema = make_test_schema()?; + + // Create writer builders + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(schema), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None, 0); + + // Set a very small target size to trigger rolling + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 100, // Very small target size to ensure rolling + ); + + // Create writer + let mut writer = rolling_writer_builder.build().await?; + + // Create test data + let arrow_schema = make_test_arrow_schema(); + + // Create multiple batches to trigger rolling + let batch1 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ])?; + + let batch2 = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(StringArray::from(vec!["Dave", "Eve", "Frank"])), + ])?; + + let batch3 = RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(Int32Array::from(vec![7, 8, 9])), + Arc::new(StringArray::from(vec!["Grace", "Heidi", "Ivan"])), + ])?; + + // Write data + writer.write(batch1.clone()).await?; + writer.write(batch2.clone()).await?; + writer.write(batch3.clone()).await?; + + // Close writer and get data files + let data_files = writer.close().await?; + + // Verify multiple files were created (at least 2) + assert!( + data_files.len() > 1, + "Expected multiple data files to be created, got {}", + data_files.len() + ); + + // Verify total record count across all files + let total_records: u64 = data_files.iter().map(|file| file.record_count).sum(); + assert_eq!( + total_records, 9, + "Expected 9 total records across all files" + ); + + // Verify each file has the correct content + // Note: We can't easily verify which records went to which file without more complex logic, + // but we can verify the total count and that each file has valid content + + Ok(()) + } +} diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 6954950b06..0ee1738b4f 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,7 +34,9 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } +parquet = { workspace = true } tokio = { workspace = true } +uuid = { workspace = true } [dev-dependencies] expect-test = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs new file mode 100644 index 0000000000..f186d8e816 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::array::{Array, ArrayRef, RecordBatch, StringArray, UInt64Array}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::{DataFusionError, Result as DFResult}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use futures::StreamExt; +use iceberg::Catalog; +use iceberg::spec::{DataFile, deserialize_data_file_from_json}; +use iceberg::table::Table; +use iceberg::transaction::Transaction; + +use crate::physical_plan::DATA_FILES_COL_NAME; +use crate::to_datafusion_error; + +/// IcebergCommitExec is responsible for collecting the files written and use +/// [`Transaction::fast_append`] to commit the data files written. +#[derive(Debug)] +pub(crate) struct IcebergCommitExec { + table: Table, + catalog: Arc, + input: Arc, + schema: ArrowSchemaRef, + count_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergCommitExec { + pub fn new( + table: Table, + catalog: Arc, + input: Arc, + schema: ArrowSchemaRef, + ) -> Self { + let plan_properties = Self::compute_properties(schema.clone()); + + Self { + table, + catalog, + input, + schema, + count_schema: Self::make_count_schema(), + plan_properties, + } + } + + // Compute the plan properties for this execution plan + fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + ) + } + + // Create a record batch with just the count of rows written + fn make_count_batch(count: u64) -> DFResult { + let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef; + + RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, false)]).map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make count batch!".to_string())) + }) + } + + fn make_count_schema() -> ArrowSchemaRef { + // Define a schema. + Arc::new(ArrowSchema::new(vec![Field::new( + "count", + DataType::UInt64, + false, + )])) + } +} + +impl DisplayAs for IcebergCommitExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "IcebergCommitExec: table={}", self.table.identifier()) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergCommitExec: table={}, schema={:?}", + self.table.identifier(), + self.schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "IcebergCommitExec: table={}", self.table.identifier()) + } + } + } +} + +impl ExecutionPlan for IcebergCommitExec { + fn name(&self) -> &str { + "IcebergCommitExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "IcebergCommitExec expects exactly one child, but provided {}", + children.len() + ))); + } + + Ok(Arc::new(IcebergCommitExec::new( + self.table.clone(), + self.catalog.clone(), + children[0].clone(), + self.schema.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + // IcebergCommitExec only has one partition (partition 0) + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "IcebergCommitExec only has one partition, but got partition {}", + partition + ))); + } + + let table = self.table.clone(); + let input_plan = self.input.clone(); + let count_schema = Arc::clone(&self.count_schema); + + // todo revisit this + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let current_schema = self.table.metadata().current_schema().clone(); + + let _catalog = Arc::clone(&self.catalog); + + // Process the input streams from all partitions and commit the data files + let stream = futures::stream::once(async move { + let mut data_files: Vec = Vec::new(); + let mut total_record_count: u64 = 0; + + // Execute and collect results from the input coalesced plan + let mut batch_stream = input_plan.execute(0, context)?; + + while let Some(batch_result) = batch_stream.next().await { + let batch = batch_result?; + + let files_array = batch + .column_by_name(DATA_FILES_COL_NAME) + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column in input batch".to_string(), + ) + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Expected 'data_files' column to be StringArray".to_string(), + ) + })?; + + // todo remove log + println!("files_array to deserialize: {:?}", files_array); + + // Deserialize all data files from the StringArray + let batch_files: Vec = files_array + .into_iter() + .flatten() + .map(|f| -> DFResult { + // Parse JSON to DataFileSerde and convert to DataFile + deserialize_data_file_from_json( + f, + spec_id, + &partition_type, + ¤t_schema, + ) + .map_err(to_datafusion_error) + }) + .collect::>()?; + + // add record_counts from the current batch to total record count + total_record_count += batch_files.iter().map(|f| f.record_count()).sum::(); + + // Add all deserialized files to our collection + data_files.extend(batch_files); + } + + // If no data files were collected, return an empty result + if data_files.is_empty() { + return Ok(RecordBatch::new_empty(count_schema)); + } + + // Create a transaction and commit the data files + let tx = Transaction::new(&table); + let _action = tx.fast_append().add_data_files(data_files); + + // todo uncomment this + // // Apply the action and commit the transaction + // let updated_table = action + // .apply(tx) + // .map_err(to_datafusion_error)? + // .commit(catalog.as_ref()) + // .await + // .map_err(to_datafusion_error)?; + + Self::make_count_batch(total_record_count) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.count_schema), + stream, + ))) + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 58fb065dde..19c8f79239 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod scan; +pub(crate) mod write; + +pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs new file mode 100644 index 0000000000..0f198bde49 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::str::FromStr; +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + execute_input_stream, +}; +use futures::StreamExt; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::spec::{ + DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, + serialize_data_file_to_json, +}; +use iceberg::table::Table; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::base_writer::rolling_writer::RollingDataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Error, ErrorKind}; +use parquet::file::properties::WriterProperties; +use uuid::Uuid; + +use crate::physical_plan::DATA_FILES_COL_NAME; +use crate::to_datafusion_error; + +#[derive(Debug)] +pub(crate) struct IcebergWriteExec { + table: Table, + input: Arc, + result_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergWriteExec { + pub fn new(table: Table, input: Arc, schema: ArrowSchemaRef) -> Self { + let plan_properties = Self::compute_properties(&input, schema); + + Self { + table, + input, + result_schema: Self::make_result_schema(), + plan_properties, + } + } + + fn compute_properties( + input: &Arc, + schema: ArrowSchemaRef, + ) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()), + EmissionType::Final, + Boundedness::Bounded, + ) + } + + // Create a record batch with serialized data files + fn make_result_batch(data_files: Vec) -> DFResult { + let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; + + RecordBatch::try_new(Self::make_result_schema(), vec![files_array]).map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) + }) + } + + fn make_result_schema() -> ArrowSchemaRef { + // Define a schema. + Arc::new(ArrowSchema::new(vec![Field::new( + DATA_FILES_COL_NAME, + DataType::Utf8, + false, + )])) + } +} + +impl DisplayAs for IcebergWriteExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergWriteExec: table={}, result_schema={:?}", + self.table.identifier(), + self.result_schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + } + } +} + +impl ExecutionPlan for IcebergWriteExec { + fn name(&self) -> &str { + "IcebergWriteExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "IcebergWriteExec expects exactly one child, but provided {}", + children.len() + ))); + } + + Ok(Arc::new(Self::new( + self.table.clone(), + Arc::clone(&children[0]), + self.schema(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + // todo non-default partition spec? + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let format_version = self.table.metadata().format_version(); + + // Check data file format + let file_format = DataFileFormat::from_str( + self.table + .metadata() + .properties() + .get(PROPERTY_DEFAULT_FILE_FORMAT) + .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), + ) + .map_err(to_datafusion_error)?; + if file_format != DataFileFormat::Parquet { + return Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "File format {} is not supported for insert_into yet!", + file_format + ), + ))); + } + + // Create data file writer builder + let parquet_file_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new( + "datafusion".to_string(), + Some(Uuid::now_v7().to_string()), + file_format, + ), + ); + let data_file_writer_builder = + DataFileWriterBuilder::new(parquet_file_writer_builder, None, spec_id); + let rolling_writer_builder = RollingDataFileWriterBuilder::new( + data_file_writer_builder, + 100 * 1024 * 1024, // todo use a config + ); + + // Get input data + let data = execute_input_stream( + Arc::clone(&self.input), + Arc::new( + schema_to_arrow_schema(self.table.metadata().current_schema()) + .map_err(to_datafusion_error)?, + ), + partition, + Arc::clone(&context), + )?; + + // Create write stream + let stream = futures::stream::once(async move { + let mut writer = rolling_writer_builder + .build() + .await + .map_err(to_datafusion_error)?; + let mut input_stream = data; + + while let Some(batch) = input_stream.next().await { + writer.write(batch?).await.map_err(to_datafusion_error)?; + } + + let data_files = writer.close().await.map_err(to_datafusion_error)?; + + // Convert builders to data files and then to JSON strings + let data_files_strs: Vec = data_files + .into_iter() + .map(|data_file| -> DFResult { + // Serialize to JSON + let json = + serialize_data_file_to_json(data_file, &partition_type, format_version) + .map_err(to_datafusion_error)?; + + println!("Serialized data file: {}", json); // todo remove log + Ok(json) + }) + .collect::>>()?; + + Self::make_result_batch(data_files_strs) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.result_schema), + stream, + ))) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 7f741a534a..34847c4985 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,17 +24,22 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; +use datafusion::common::DataFusionError; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; +use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; +use crate::physical_plan::commit::IcebergCommitExec; use crate::physical_plan::scan::IcebergTableScan; +use crate::physical_plan::write::IcebergWriteExec; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. @@ -46,6 +51,8 @@ pub struct IcebergTableProvider { snapshot_id: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, + /// The catalog that the table belongs to. + catalog: Option>, } impl IcebergTableProvider { @@ -54,6 +61,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -73,6 +81,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: Some(client), }) } @@ -84,6 +93,7 @@ impl IcebergTableProvider { table, snapshot_id: None, schema, + catalog: None, }) } @@ -108,6 +118,7 @@ impl IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, + catalog: None, }) } @@ -152,11 +163,51 @@ impl TableProvider for IcebergTableProvider { fn supports_filters_pushdown( &self, filters: &[&Expr], - ) -> std::result::Result, datafusion::error::DataFusionError> - { + ) -> std::result::Result, DataFusionError> { // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } + + async fn insert_into( + &self, + _state: &dyn Session, + input: Arc, + _insert_op: InsertOp, + ) -> DFResult> { + if !self + .table + .metadata() + .default_partition_spec() + .is_unpartitioned() + { + // TODO add insert into support for partitioned tables + return Err(DataFusionError::NotImplemented( + "IcebergTableProvider::insert_into does not support partitioned tables yet" + .to_string(), + )); + } + + let Some(catalog) = self.catalog.clone() else { + return Err(DataFusionError::Execution( + "Catalog cannot be none for insert_into".to_string(), + )); + }; + + let write_plan = Arc::new(IcebergWriteExec::new( + self.table.clone(), + input, + self.schema.clone(), + )); + + let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); + + Ok(Arc::new(IcebergCommitExec::new( + self.table.clone(), + catalog, + coalesce_partitions, + self.schema.clone(), + ))) + } } #[cfg(test)] diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 1491e4dbff..ebe43f5a42 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -26,6 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; +use futures::StreamExt; use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; use iceberg::test_utils::check_record_batches; @@ -432,3 +433,69 @@ async fn test_metadata_table() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_insert_into() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + let creation = get_table_creation(temp_path(), "my_table", None)?; + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + let provider = ctx.catalog("catalog").unwrap(); + let schema = provider.schema("test_provider_get_table_schema").unwrap(); + + let table = schema.table("my_table").await.unwrap().unwrap(); + let table_schema = table.schema(); + + let expected = [("foo1", &DataType::Int32), ("foo2", &DataType::Utf8)]; + + for (field, exp) in table_schema.fields().iter().zip(expected.iter()) { + assert_eq!(field.name(), exp.0); + assert_eq!(field.data_type(), exp.1); + assert!(!field.is_nullable()) + } + + let df = ctx + .sql("insert into catalog.test_provider_get_table_schema.my_table values (1, 'alan'),(2, 'turing')") + .await + .unwrap(); + + let task_ctx = Arc::new(df.task_ctx()); + let plan = df.create_physical_plan().await.unwrap(); + let mut stream = plan.execute(0, task_ctx).unwrap(); + + while let Some(batch_result) = stream.next().await { + match batch_result { + Ok(batch) => { + println!("Got RecordBatch with {} rows", batch.num_rows()); + for column in batch.columns() { + println!("{:?}", column); + } + } + Err(e) => { + eprintln!("Error reading batch: {:?}", e); + } + } + } + // Ensure both the plan and the stream conform to the same schema + // assert_eq!(plan.schema(), stream.schema()); + // assert_eq!( + // stream.schema().as_ref(), + // &ArrowSchema::new(vec![ + // Field::new("foo2", DataType::Utf8, false).with_metadata(HashMap::from([( + // PARQUET_FIELD_ID_META_KEY.to_string(), + // "2".to_string(), + // )])) + // ]), + // ); + + Ok(()) +}