diff --git a/iceberg-rust/src/materialized_view/transaction/mod.rs b/iceberg-rust/src/materialized_view/transaction/mod.rs index 9b8893d9..51acd03d 100644 --- a/iceberg-rust/src/materialized_view/transaction/mod.rs +++ b/iceberg-rust/src/materialized_view/transaction/mod.rs @@ -14,7 +14,10 @@ use crate::{ error::Error, table::{ delete_all_table_files, - transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX}, + transaction::{ + operation::{DataFileWithIncrement, Operation as TableOperation}, + APPEND_INDEX, REPLACE_INDEX, + }, }, view::transaction::operation::Operation as ViewOperation, }; @@ -102,6 +105,13 @@ impl<'view> Transaction<'view> { refresh_state: RefreshState, ) -> Result { let refresh_state = serde_json::to_string(&refresh_state)?; + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment: None, + }) + .collect(); if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, @@ -110,7 +120,7 @@ impl<'view> Transaction<'view> { additional_summary: old_lineage, } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -119,7 +129,7 @@ impl<'view> Transaction<'view> { } else { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), - data_files: files, + data_files: files_with_increments, delete_files: Vec::new(), additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), @@ -137,6 +147,13 @@ impl<'view> Transaction<'view> { refresh_state: RefreshState, ) -> Result { let refresh_state = serde_json::to_string(&refresh_state)?; + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment: None, + }) + .collect(); if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] { if let TableOperation::Append { branch: _, @@ -145,7 +162,7 @@ impl<'view> Transaction<'view> { additional_summary: old_lineage, } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); *old_lineage = Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state.clone(), @@ -155,7 +172,7 @@ impl<'view> Transaction<'view> { self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append { branch: self.branch.clone(), data_files: Vec::new(), - delete_files: files, + delete_files: files_with_increments, additional_summary: Some(HashMap::from_iter(vec![( REFRESH_STATE.to_owned(), refresh_state, diff --git a/iceberg-rust/src/table/transaction/mod.rs b/iceberg-rust/src/table/transaction/mod.rs index 66654b0a..035b2b99 100644 --- a/iceberg-rust/src/table/transaction/mod.rs +++ b/iceberg-rust/src/table/transaction/mod.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference}; use crate::table::transaction::append::append_summary; +use crate::table::transaction::operation::DataFileWithIncrement; use crate::{catalog::commit::CommitTable, error::Error, table::Table}; use self::operation::Operation; @@ -116,20 +117,37 @@ impl<'table> TableTransaction<'table> { /// .commit() /// .await?; /// ``` - pub fn append_data(mut self, files: Vec) -> Self { + pub fn append_data(self, files: Vec) -> Self { + self.append_data_with_dsn_increment(files, None) + } + + /// Appends data files to the table, increasing the Data Sequence Number by a given amount + /// + pub fn append_data_with_dsn_increment( + mut self, + files: Vec, + dsn_increment: Option, + ) -> Self { let summary = append_summary(&files); + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment, + }) + .collect(); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { data_files: old, .. } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), - data_files: files, + data_files: files_with_increments, delete_files: Vec::new(), additional_summary: summary, }); @@ -155,7 +173,24 @@ impl<'table> TableTransaction<'table> { /// .commit() /// .await?; /// ``` - pub fn append_delete(mut self, files: Vec) -> Self { + pub fn append_delete(self, files: Vec) -> Self { + self.append_delete_with_dsn_increment(files, None) + } + + /// Appends delete files to the table, increasing the Data Sequence Number by a given amount + /// + pub fn append_delete_with_dsn_increment( + mut self, + files: Vec, + dsn_increment: Option, + ) -> Self { + let files_with_increments: Vec = files + .into_iter() + .map(|f| DataFileWithIncrement { + data_file: f, + dsn_increment, + }) + .collect(); if let Some(ref mut operation) = self.operations[APPEND_INDEX] { if let Operation::Append { branch: _, @@ -164,13 +199,13 @@ impl<'table> TableTransaction<'table> { additional_summary: None, } = operation { - old.extend_from_slice(&files); + old.extend_from_slice(&files_with_increments); } } else { self.operations[APPEND_INDEX] = Some(Operation::Append { branch: self.branch.clone(), data_files: Vec::new(), - delete_files: files, + delete_files: files_with_increments, additional_summary: None, }); } diff --git a/iceberg-rust/src/table/transaction/operation.rs b/iceberg-rust/src/table/transaction/operation.rs index 5f045dba..062b43bf 100644 --- a/iceberg-rust/src/table/transaction/operation.rs +++ b/iceberg-rust/src/table/transaction/operation.rs @@ -37,6 +37,13 @@ use super::append::split_datafiles; /// The target number of datafiles per manifest is dynamic, but we don't want to go below this number. static MIN_DATAFILES_PER_MANIFEST: usize = 4; +#[derive(Debug, Clone)] +///Table operations +pub struct DataFileWithIncrement { + pub data_file: DataFile, + pub dsn_increment: Option, +} + #[derive(Debug)] ///Table operations pub enum Operation { @@ -55,8 +62,8 @@ pub enum Operation { /// Append new files to the table Append { branch: Option, - data_files: Vec, - delete_files: Vec, + data_files: Vec, + delete_files: Vec, additional_summary: Option>, }, // /// Quickly append new files to the table @@ -116,7 +123,10 @@ impl Operation { return Ok((None, Vec::new())); } - let data_files_iter = delete_files.iter().chain(data_files.iter()); + let data_files_iter = delete_files + .iter() + .chain(data_files.iter()) + .map(|f| &f.data_file); let manifest_list_writer = if let Some(manifest_list_bytes) = prefetch_manifest_list(old_snapshot, &object_store) @@ -144,11 +154,19 @@ impl Operation { delete_files .into_iter() .chain(data_files.into_iter()) - .map(|data_file| { - ManifestEntry::builder() + .map(|dfi| { + let mut builder = ManifestEntry::builder(); + builder .with_format_version(table_metadata.format_version) .with_status(Status::Added) - .with_data_file(data_file) + .with_data_file(dfi.data_file); + if let Some(dsn_increment) = dfi.dsn_increment { + builder.with_sequence_number( + table_metadata.last_sequence_number + (dsn_increment as i64), + ); + } + + builder .build() .map_err(crate::spec::error::Error::from) .map_err(Error::from)