Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions iceberg-rust/src/materialized_view/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::{
error::Error,
table::{
delete_all_table_files,
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
transaction::{
operation::{DataFileWithIncrement, Operation as TableOperation},
APPEND_INDEX, REPLACE_INDEX,
},
},
view::transaction::operation::Operation as ViewOperation,
};
Expand Down Expand Up @@ -102,6 +105,13 @@ impl<'view> Transaction<'view> {
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_state = serde_json::to_string(&refresh_state)?;
let files_with_increments: Vec<DataFileWithIncrement> = files
.into_iter()
.map(|f| DataFileWithIncrement {
data_file: f,
dsn_increment: None,
})
.collect();
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
Expand All @@ -110,7 +120,7 @@ impl<'view> Transaction<'view> {
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
old.extend_from_slice(&files_with_increments);
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
Expand All @@ -119,7 +129,7 @@ impl<'view> Transaction<'view> {
} else {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: files,
data_files: files_with_increments,
delete_files: Vec::new(),
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
Expand All @@ -137,6 +147,13 @@ impl<'view> Transaction<'view> {
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_state = serde_json::to_string(&refresh_state)?;
let files_with_increments: Vec<DataFileWithIncrement> = files
.into_iter()
.map(|f| DataFileWithIncrement {
data_file: f,
dsn_increment: None,
})
.collect();
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
Expand All @@ -145,7 +162,7 @@ impl<'view> Transaction<'view> {
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
old.extend_from_slice(&files_with_increments);
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
Expand All @@ -155,7 +172,7 @@ impl<'view> Transaction<'view> {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
delete_files: files_with_increments,
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
Expand Down
47 changes: 41 additions & 6 deletions iceberg-rust/src/table/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::collections::HashMap;
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};

use crate::table::transaction::append::append_summary;
use crate::table::transaction::operation::DataFileWithIncrement;
use crate::{catalog::commit::CommitTable, error::Error, table::Table};

use self::operation::Operation;
Expand Down Expand Up @@ -116,20 +117,37 @@ impl<'table> TableTransaction<'table> {
/// .commit()
/// .await?;
/// ```
pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
pub fn append_data(self, files: Vec<DataFile>) -> Self {
self.append_data_with_dsn_increment(files, None)
}

/// Appends data files to the table, increasing the Data Sequence Number by a given amount
///
pub fn append_data_with_dsn_increment(
mut self,
files: Vec<DataFile>,
dsn_increment: Option<u64>,
) -> Self {
let summary = append_summary(&files);
let files_with_increments: Vec<DataFileWithIncrement> = files
.into_iter()
.map(|f| DataFileWithIncrement {
data_file: f,
dsn_increment,
})
.collect();

if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
data_files: old, ..
} = operation
{
old.extend_from_slice(&files);
old.extend_from_slice(&files_with_increments);
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: files,
data_files: files_with_increments,
delete_files: Vec::new(),
additional_summary: summary,
});
Expand All @@ -155,7 +173,24 @@ impl<'table> TableTransaction<'table> {
/// .commit()
/// .await?;
/// ```
pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
pub fn append_delete(self, files: Vec<DataFile>) -> Self {
self.append_delete_with_dsn_increment(files, None)
}

/// Appends delete files to the table, increasing the Data Sequence Number by a given amount
///
pub fn append_delete_with_dsn_increment(
mut self,
files: Vec<DataFile>,
dsn_increment: Option<u64>,
) -> Self {
let files_with_increments: Vec<DataFileWithIncrement> = files
.into_iter()
.map(|f| DataFileWithIncrement {
data_file: f,
dsn_increment,
})
.collect();
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
branch: _,
Expand All @@ -164,13 +199,13 @@ impl<'table> TableTransaction<'table> {
additional_summary: None,
} = operation
{
old.extend_from_slice(&files);
old.extend_from_slice(&files_with_increments);
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
delete_files: files_with_increments,
additional_summary: None,
});
}
Expand Down
30 changes: 24 additions & 6 deletions iceberg-rust/src/table/transaction/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ use super::append::split_datafiles;
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
static MIN_DATAFILES_PER_MANIFEST: usize = 4;

#[derive(Debug, Clone)]
///Table operations
pub struct DataFileWithIncrement {
pub data_file: DataFile,
pub dsn_increment: Option<u64>,
}

#[derive(Debug)]
///Table operations
pub enum Operation {
Expand All @@ -55,8 +62,8 @@ pub enum Operation {
/// Append new files to the table
Append {
branch: Option<String>,
data_files: Vec<DataFile>,
delete_files: Vec<DataFile>,
data_files: Vec<DataFileWithIncrement>,
delete_files: Vec<DataFileWithIncrement>,
additional_summary: Option<HashMap<String, String>>,
},
// /// Quickly append new files to the table
Expand Down Expand Up @@ -116,7 +123,10 @@ impl Operation {
return Ok((None, Vec::new()));
}

let data_files_iter = delete_files.iter().chain(data_files.iter());
let data_files_iter = delete_files
.iter()
.chain(data_files.iter())
.map(|f| &f.data_file);

let manifest_list_writer = if let Some(manifest_list_bytes) =
prefetch_manifest_list(old_snapshot, &object_store)
Expand Down Expand Up @@ -144,11 +154,19 @@ impl Operation {
delete_files
.into_iter()
.chain(data_files.into_iter())
.map(|data_file| {
ManifestEntry::builder()
.map(|dfi| {
let mut builder = ManifestEntry::builder();
builder
.with_format_version(table_metadata.format_version)
.with_status(Status::Added)
.with_data_file(data_file)
.with_data_file(dfi.data_file);
if let Some(dsn_increment) = dfi.dsn_increment {
builder.with_sequence_number(
table_metadata.last_sequence_number + (dsn_increment as i64),
);
}

builder
.build()
.map_err(crate::spec::error::Error::from)
.map_err(Error::from)
Expand Down