diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f4..d910b5c8f 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -162,6 +162,10 @@ pub struct Table { } impl Table { + pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) { + self.metadata = metadata; + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 007a3745f..46e86d6a9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; +use std::sync::Arc; use arrow_array::StringArray; use futures::TryStreamExt; @@ -44,7 +45,8 @@ const META_ROOT_PATH: &str = "metadata"; /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + base_table: &'a Table, + current_table: Table, updates: Vec, requirements: Vec, } @@ -53,38 +55,60 @@ impl<'a> Transaction<'a> { /// Creates a new transaction. pub fn new(table: &'a Table) -> Self { Self { - table, + base_table: table, + current_table: table.clone(), updates: vec![], requirements: vec![], } } - fn append_updates(&mut self, updates: Vec) -> Result<()> { - for update in &updates { - for up in &self.updates { - if discriminant(up) == discriminant(update) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot apply update with same type at same time: {:?}", - update - ), - )); - } - } + fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); + for update in updates { + metadata_builder = update.clone().apply(metadata_builder)?; } - self.updates.extend(updates); + + self.current_table + .with_metadata(Arc::new(metadata_builder.build()?.metadata)); + Ok(()) } - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { - self.requirements.extend(requirements); + fn apply( + &mut self, + updates: Vec, + requirements: Vec, + ) -> Result<()> { + for requirement in &requirements { + requirement.check(Some(self.current_table.metadata()))?; + } + + self.update_table_metadata(&updates)?; + + self.updates.extend(updates); + + // For the requirements, it does not make sense to add a requirement more than once + // For example, you cannot assert that the current schema has two different IDs + for new_requirement in requirements { + if self + .requirements + .iter() + .map(discriminant) + .all(|d| d != discriminant(&new_requirement)) + { + self.requirements.push(new_requirement); + } + } + + // # TODO + // Support auto commit later. + Ok(()) } /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.table.metadata().format_version(); + let current_version = self.current_table.metadata().format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -96,7 +120,7 @@ impl<'a> Transaction<'a> { )); } Ordering::Less => { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; } Ordering::Equal => { // Do nothing. @@ -107,7 +131,7 @@ impl<'a> Transaction<'a> { /// Update table's property. pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; + self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; Ok(self) } @@ -123,7 +147,7 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .table + .current_table .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) @@ -159,14 +183,17 @@ impl<'a> Transaction<'a> { /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; + self.apply( + vec![TableUpdate::RemoveProperties { removals: keys }], + vec![], + )?; Ok(self) } /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { let table_commit = TableCommit::builder() - .ident(self.table.identifier().clone()) + .ident(self.base_table.identifier().clone()) .updates(self.updates) .requirements(self.requirements) .build(); @@ -215,7 +242,7 @@ impl<'a> FastAppendAction<'a> { if !self .snapshot_produce_action .tx - .table + .current_table .metadata() .default_spec .is_unpartitioned() @@ -226,12 +253,10 @@ impl<'a> FastAppendAction<'a> { )); } - let table_metadata = self.snapshot_produce_action.tx.table.metadata(); - let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.table.file_io(), + self.snapshot_produce_action.tx.current_table.file_io(), file_path, - table_metadata, + self.snapshot_produce_action.tx.current_table.metadata(), ) .await?; @@ -253,7 +278,7 @@ impl<'a> FastAppendAction<'a> { let mut manifest_stream = self .snapshot_produce_action .tx - .table + .current_table .inspect() .manifests() .scan() @@ -314,14 +339,19 @@ impl SnapshotProduceOperation for FastAppendOperation { &self, snapshot_produce: &SnapshotProduceAction<'_>, ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + let Some(snapshot) = snapshot_produce + .tx + .current_table + .metadata() + .current_snapshot() + else { return Ok(vec![]); }; let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.table.file_io(), - &snapshot_produce.tx.table.metadata_ref(), + snapshot_produce.tx.current_table.file_io(), + snapshot_produce.tx.current_table.metadata(), ) .await?; @@ -435,7 +465,7 @@ impl<'a> SnapshotProduceAction<'a> { for data_file in data_files { Self::validate_partition_value( data_file.partition(), - self.tx.table.metadata().default_partition_type(), + self.tx.current_table.metadata().default_partition_type(), )?; if data_file.content_type() == DataContentType::Data { self.added_data_files.push(data_file); @@ -449,13 +479,16 @@ impl<'a> SnapshotProduceAction<'a> { fn new_manifest_output(&mut self) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx.table.file_io().new_output(new_manifest_path) + self.tx + .current_table + .file_io() + .new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. @@ -464,6 +497,7 @@ impl<'a> SnapshotProduceAction<'a> { added_data_files: Vec, ) -> Result { let snapshot_id = self.snapshot_id; + let format_version = self.tx.current_table.metadata().format_version(); let content_type = { let mut data_num = 0; let mut delete_num = 0; @@ -489,7 +523,7 @@ impl<'a> SnapshotProduceAction<'a> { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if format_version == FormatVersion::V1 { builder.snapshot_id(snapshot_id).build() } else { // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when @@ -502,15 +536,15 @@ impl<'a> SnapshotProduceAction<'a> { self.new_manifest_output()?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.table.metadata().current_schema().clone(), + self.tx.current_table.metadata().current_schema().clone(), self.tx - .table + .current_table .metadata() .default_partition_spec() .as_ref() .clone(), ); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { match content_type { @@ -560,7 +594,7 @@ impl<'a> SnapshotProduceAction<'a> { fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -578,28 +612,28 @@ impl<'a> SnapshotProduceAction<'a> { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.table.metadata().next_sequence_number(); + let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); let summary = self.summary(&snapshot_produce_operation); let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -610,34 +644,36 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_schema_id(self.tx.current_table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.append_updates(vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ])?; - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; + self.tx.apply( + vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot.clone(), + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ], + vec![ + TableRequirement::UuidMatch { + uuid: self.tx.current_table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), + }, + ], + )?; Ok(self.tx) } } @@ -674,15 +710,24 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + current_schema_id: self + .tx + .current_table + .metadata() + .current_schema() + .schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, + default_sort_order_id: self + .tx + .current_table + .metadata() + .default_sort_order() + .order_id, }, ]; - self.tx.append_requirements(requirements)?; - self.tx.append_updates(updates)?; + self.tx.apply(updates, requirements)?; Ok(self.tx) } @@ -694,7 +739,7 @@ impl<'a> ReplaceSortOrderAction<'a> { ) -> Result { let field_id = self .tx - .table + .current_table .metadata() .current_schema() .field_id_by_name(name) @@ -924,14 +969,15 @@ mod tests { assert!( matches!((&tx.updates[0],&tx.updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) ); + // requriments is based on original table metadata assert_eq!( vec![ TableRequirement::UuidMatch { - uuid: tx.table.metadata().uuid() + uuid: table.metadata().uuid() }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: tx.table.metadata().current_snapshot_id + snapshot_id: table.metadata().current_snapshot_id() } ], tx.requirements @@ -973,22 +1019,6 @@ mod tests { assert_eq!(data_file, *manifest.entries()[0].data_file()); } - #[test] - fn test_do_same_update_in_same_transaction() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); - - assert!( - tx.is_err(), - "Should not allow to do same kinds update in same transaction" - ); - } - #[tokio::test] async fn test_add_existing_parquet_files_to_unpartitioned_table() { let mut fixture = TableTestFixture::new_unpartitioned(); @@ -1067,4 +1097,22 @@ mod tests { assert!(manifest_paths.contains(&path)); } } + + #[tokio::test] + async fn test_transaction_apply_upgrade() { + let table = make_v1_table(); + let tx = Transaction::new(&table); + // Upgrade v1 to v1, do nothing. + let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); + // Upgrade v1 to v2, success. + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates + ); + // Upgrade v2 to v1, return error. + assert!(tx.upgrade_table_version(FormatVersion::V1).is_err()); + } }