Skip to content

Commit 841f9b9

Browse files
Multiple data sequence numbers per commit
1 parent 2e2cf62 commit 841f9b9

File tree

3 files changed

+119
-42
lines changed

3 files changed

+119
-42
lines changed

iceberg-rust/src/materialized_view/transaction/mod.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ use crate::{
1414
error::Error,
1515
table::{
1616
delete_all_table_files,
17-
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
17+
transaction::{
18+
operation::{DsnGroup, Operation as TableOperation},
19+
APPEND_INDEX, REPLACE_INDEX,
20+
},
1821
},
1922
view::transaction::operation::Operation as ViewOperation,
2023
};
@@ -105,12 +108,17 @@ impl<'view> Transaction<'view> {
105108
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
106109
if let TableOperation::Append {
107110
branch: _,
108-
data_files: old,
109-
delete_files: _,
111+
dsn_groups,
110112
additional_summary: old_lineage,
111113
} = operation
112114
{
113-
old.extend_from_slice(&files);
115+
match dsn_groups.last_mut() {
116+
Some(g) => g.data_files.extend_from_slice(&files),
117+
None => dsn_groups.push(DsnGroup {
118+
data_files: files,
119+
delete_files: vec![],
120+
}),
121+
};
114122
*old_lineage = Some(HashMap::from_iter(vec![(
115123
REFRESH_STATE.to_owned(),
116124
refresh_state.clone(),
@@ -119,8 +127,10 @@ impl<'view> Transaction<'view> {
119127
} else {
120128
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
121129
branch: self.branch.clone(),
122-
data_files: files,
123-
delete_files: Vec::new(),
130+
dsn_groups: vec![DsnGroup {
131+
data_files: files,
132+
delete_files: vec![],
133+
}],
124134
additional_summary: Some(HashMap::from_iter(vec![(
125135
REFRESH_STATE.to_owned(),
126136
refresh_state,
@@ -140,12 +150,17 @@ impl<'view> Transaction<'view> {
140150
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
141151
if let TableOperation::Append {
142152
branch: _,
143-
data_files: _,
144-
delete_files: old,
153+
dsn_groups,
145154
additional_summary: old_lineage,
146155
} = operation
147156
{
148-
old.extend_from_slice(&files);
157+
match dsn_groups.last_mut() {
158+
Some(g) => g.delete_files.extend_from_slice(&files),
159+
None => dsn_groups.push(DsnGroup {
160+
data_files: vec![],
161+
delete_files: files,
162+
}),
163+
};
149164
*old_lineage = Some(HashMap::from_iter(vec![(
150165
REFRESH_STATE.to_owned(),
151166
refresh_state.clone(),
@@ -154,8 +169,10 @@ impl<'view> Transaction<'view> {
154169
} else {
155170
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
156171
branch: self.branch.clone(),
157-
data_files: Vec::new(),
158-
delete_files: files,
172+
dsn_groups: vec![DsnGroup {
173+
data_files: vec![],
174+
delete_files: files,
175+
}],
159176
additional_summary: Some(HashMap::from_iter(vec![(
160177
REFRESH_STATE.to_owned(),
161178
refresh_state,

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2121

2222
use crate::table::transaction::append::append_summary;
23+
pub use crate::table::transaction::operation::DsnGroup;
2324
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
2425

2526
use self::operation::Operation;
@@ -120,17 +121,22 @@ impl<'table> TableTransaction<'table> {
120121
let summary = append_summary(&files);
121122

122123
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
123-
if let Operation::Append {
124-
data_files: old, ..
125-
} = operation
126-
{
127-
old.extend_from_slice(&files);
124+
if let Operation::Append { dsn_groups, .. } = operation {
125+
match dsn_groups.last_mut() {
126+
Some(g) => g.data_files.extend_from_slice(&files),
127+
None => dsn_groups.push(DsnGroup {
128+
data_files: files,
129+
delete_files: vec![],
130+
}),
131+
};
128132
}
129133
} else {
130134
self.operations[APPEND_INDEX] = Some(Operation::Append {
131135
branch: self.branch.clone(),
132-
data_files: files,
133-
delete_files: Vec::new(),
136+
dsn_groups: vec![DsnGroup {
137+
data_files: files,
138+
delete_files: vec![],
139+
}],
134140
additional_summary: summary,
135141
});
136142
}
@@ -159,23 +165,45 @@ impl<'table> TableTransaction<'table> {
159165
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
160166
if let Operation::Append {
161167
branch: _,
162-
data_files: _,
163-
delete_files: old,
168+
dsn_groups,
164169
additional_summary: None,
165170
} = operation
166171
{
167-
old.extend_from_slice(&files);
172+
match dsn_groups.last_mut() {
173+
Some(g) => g.delete_files.extend_from_slice(&files),
174+
None => dsn_groups.push(DsnGroup {
175+
data_files: vec![],
176+
delete_files: files,
177+
}),
178+
};
168179
}
169180
} else {
170181
self.operations[APPEND_INDEX] = Some(Operation::Append {
171182
branch: self.branch.clone(),
172-
data_files: Vec::new(),
173-
delete_files: files,
183+
dsn_groups: vec![DsnGroup {
184+
data_files: vec![],
185+
delete_files: files,
186+
}],
174187
additional_summary: None,
175188
});
176189
}
177190
self
178191
}
192+
/// Create a new data sequence number for subsequent appends
193+
pub fn new_data_sequence_number(mut self) -> Self {
194+
if let Some(Operation::Append {
195+
branch: _,
196+
ref mut dsn_groups,
197+
additional_summary: None,
198+
}) = self.operations[APPEND_INDEX]
199+
{
200+
dsn_groups.push(DsnGroup {
201+
data_files: vec![],
202+
delete_files: vec![],
203+
});
204+
}
205+
self
206+
}
179207
/// Overwrites specific data files in the table with new ones
180208
///
181209
/// This operation replaces specified existing data files with new ones, rather than

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ use super::append::split_datafiles;
3737
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
3838
static MIN_DATAFILES_PER_MANIFEST: usize = 4;
3939

40+
#[derive(Debug, Clone)]
41+
/// Group of write sharing a Data Sequence Number
42+
pub struct DsnGroup {
43+
/// Delete files. These apply to insert files from previous Data Sequence Groups
44+
pub delete_files: Vec<DataFile>,
45+
/// Insert files
46+
pub data_files: Vec<DataFile>,
47+
}
48+
4049
#[derive(Debug)]
4150
///Table operations
4251
pub enum Operation {
@@ -55,8 +64,7 @@ pub enum Operation {
5564
/// Append new files to the table
5665
Append {
5766
branch: Option<String>,
58-
data_files: Vec<DataFile>,
59-
delete_files: Vec<DataFile>,
67+
dsn_groups: Vec<DsnGroup>,
6068
additional_summary: Option<HashMap<String, String>>,
6169
},
6270
// /// Quickly append new files to the table
@@ -98,8 +106,7 @@ impl Operation {
98106
match self {
99107
Operation::Append {
100108
branch,
101-
data_files,
102-
delete_files,
109+
dsn_groups,
103110
additional_summary,
104111
} => {
105112
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
@@ -109,14 +116,30 @@ impl Operation {
109116
FormatVersion::V2 => manifest_list_schema_v2(),
110117
};
111118

119+
let mut dsn_offset = 0;
120+
let mut data_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
121+
let mut delete_files: Vec<(DataFile, i64 /* DSN offset */)> = vec![];
122+
for dsn_group in dsn_groups.into_iter() {
123+
if !dsn_group.data_files.is_empty() || !dsn_group.delete_files.is_empty() {
124+
dsn_offset += 1;
125+
for data_file in dsn_group.data_files.into_iter() {
126+
data_files.push((data_file, dsn_offset));
127+
}
128+
for delete_file in dsn_group.delete_files.into_iter() {
129+
delete_files.push((delete_file, dsn_offset));
130+
}
131+
}
132+
}
133+
let multiple_data_sequence_numbers = dsn_offset > 1;
134+
112135
let n_data_files = data_files.len();
113136
let n_delete_files = delete_files.len();
114137

115138
if n_data_files + n_delete_files == 0 {
116139
return Ok((None, Vec::new()));
117140
}
118141

119-
let data_files_iter = delete_files.iter().chain(data_files.iter());
142+
let data_files_iter = delete_files.iter().chain(data_files.iter()).map(|(x, _)| x);
120143

121144
let manifest_list_writer = if let Some(manifest_list_bytes) =
122145
prefetch_manifest_list(old_snapshot, &object_store)
@@ -140,19 +163,26 @@ impl Operation {
140163

141164
let n_splits = manifest_list_writer.n_splits(n_data_files + n_delete_files);
142165

143-
let new_datafile_iter =
144-
delete_files
145-
.into_iter()
146-
.chain(data_files.into_iter())
147-
.map(|data_file| {
148-
ManifestEntry::builder()
149-
.with_format_version(table_metadata.format_version)
150-
.with_status(Status::Added)
151-
.with_data_file(data_file)
152-
.build()
153-
.map_err(crate::spec::error::Error::from)
154-
.map_err(Error::from)
155-
});
166+
let new_datafile_iter = delete_files.into_iter().chain(data_files.into_iter()).map(
167+
|(data_file, dsn_offset)| {
168+
let mut builder = ManifestEntry::builder();
169+
builder
170+
.with_format_version(table_metadata.format_version)
171+
.with_status(Status::Added)
172+
.with_data_file(data_file);
173+
// If there is only one data sequence number in this commit, we can just use sequence number inheritance
174+
// If there are multiple data sequence numbers in this commit, we need to set the data sequence number on each manifest
175+
if multiple_data_sequence_numbers {
176+
builder.with_sequence_number(
177+
table_metadata.last_sequence_number + dsn_offset,
178+
);
179+
}
180+
builder
181+
.build()
182+
.map_err(crate::spec::error::Error::from)
183+
.map_err(Error::from)
184+
},
185+
);
156186

157187
let snapshot_id = generate_snapshot_id();
158188

@@ -180,11 +210,13 @@ impl Operation {
180210
(_, _) => Ok(SnapshotOperation::Overwrite),
181211
}?;
182212

213+
let snapshot_sequence_number = table_metadata.last_sequence_number
214+
+ if dsn_offset >= 1 { dsn_offset } else { 1 };
183215
let mut snapshot_builder = SnapshotBuilder::default();
184216
snapshot_builder
185217
.with_snapshot_id(snapshot_id)
186218
.with_manifest_list(new_manifest_list_location)
187-
.with_sequence_number(table_metadata.last_sequence_number + 1)
219+
.with_sequence_number(snapshot_sequence_number)
188220
.with_summary(Summary {
189221
operation: snapshot_operation,
190222
other: additional_summary.unwrap_or_default(),

0 commit comments

Comments
 (0)