Skip to content

Commit e2ca9ec

Browse files
committed
Merge pull request JanKaul#251 from splitgraph/new-dsn-2
AppendSequenceGroups operation
2 parents a6442fc + cf3a12b commit e2ca9ec

File tree

2 files changed

+206
-5
lines changed

2 files changed

+206
-5
lines changed

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use tracing::debug;
2121
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2222

2323
use crate::table::transaction::append::append_summary;
24+
use crate::table::transaction::operation::SequenceGroup;
2425
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
2526

2627
use self::operation::Operation;
@@ -32,12 +33,13 @@ pub(crate) mod overwrite;
3233
pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
3334
pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
3435
pub(crate) static APPEND_INDEX: usize = 2;
35-
pub(crate) static REPLACE_INDEX: usize = 3;
36-
pub(crate) static OVERWRITE_INDEX: usize = 4;
37-
pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 5;
38-
pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 6;
36+
pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
37+
pub(crate) static REPLACE_INDEX: usize = 4;
38+
pub(crate) static OVERWRITE_INDEX: usize = 5;
39+
pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
40+
pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
3941

40-
pub(crate) static NUM_OPERATIONS: usize = 7;
42+
pub(crate) static NUM_OPERATIONS: usize = 8;
4143

4244
/// A transaction that can perform multiple operations on a table atomically
4345
///
@@ -118,6 +120,9 @@ impl<'table> TableTransaction<'table> {
118120
/// .await?;
119121
/// ```
120122
pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
123+
if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
124+
panic!("Cannot use append and append_sequence_group in the same transaction");
125+
}
121126
let summary = append_summary(&files);
122127

123128
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
@@ -157,6 +162,9 @@ impl<'table> TableTransaction<'table> {
157162
/// .await?;
158163
/// ```
159164
pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
165+
if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
166+
panic!("Cannot use append and append_sequence_group in the same transaction");
167+
}
160168
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
161169
if let Operation::Append {
162170
delete_files: old, ..
@@ -174,6 +182,39 @@ impl<'table> TableTransaction<'table> {
174182
}
175183
self
176184
}
185+
186+
/// Appends a group of data and delete files to the table
187+
///
188+
pub fn append_sequence_group(
189+
mut self,
190+
data_files: Vec<DataFile>,
191+
delete_files: Vec<DataFile>,
192+
) -> Self {
193+
if self.operations[APPEND_INDEX].is_some() {
194+
panic!("Cannot use append and append_sequence_group in the same transaction");
195+
}
196+
if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
197+
if let Operation::AppendSequenceGroups {
198+
sequence_groups: old,
199+
..
200+
} = operation
201+
{
202+
old.push(SequenceGroup {
203+
delete_files,
204+
data_files,
205+
});
206+
}
207+
} else {
208+
self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
209+
branch: self.branch.clone(),
210+
sequence_groups: vec![SequenceGroup {
211+
delete_files,
212+
data_files,
213+
}],
214+
});
215+
}
216+
self
217+
}
177218
/// Overwrites specific data files in the table with new ones
178219
///
179220
/// This operation replaces specified existing data files with new ones, rather than

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tracing::{debug, instrument};
2828

2929
use crate::table::manifest::ManifestWriter;
3030
use crate::table::manifest_list::ManifestListWriter;
31+
use crate::table::transaction::append::append_summary;
3132
use crate::{
3233
catalog::commit::{TableRequirement, TableUpdate},
3334
error::Error,
@@ -39,6 +40,15 @@ use super::append::split_datafiles;
3940
/// The target number of datafiles per manifest is dynamic, but we don't want to go below this number.
4041
static MIN_DATAFILES_PER_MANIFEST: usize = 4;
4142

43+
#[derive(Debug, Clone)]
44+
/// Group of writes sharing a Data Sequence Number
45+
pub struct SequenceGroup {
46+
/// Delete files. These apply to insert files from previous Sequence Groups
47+
pub delete_files: Vec<DataFile>,
48+
/// Insert files
49+
pub data_files: Vec<DataFile>,
50+
}
51+
4252
#[derive(Debug)]
4353
///Table operations
4454
pub enum Operation {
@@ -61,6 +71,11 @@ pub enum Operation {
6171
delete_files: Vec<DataFile>,
6272
additional_summary: Option<HashMap<String, String>>,
6373
},
74+
/// Append new change groups to the table
75+
AppendSequenceGroups {
76+
branch: Option<String>,
77+
sequence_groups: Vec<SequenceGroup>,
78+
},
6479
// /// Quickly append new files to the table
6580
// NewFastAppend {
6681
// paths: Vec<String>,
@@ -99,6 +114,151 @@ impl Operation {
99114
object_store: Arc<dyn ObjectStore>,
100115
) -> Result<(Option<TableRequirement>, Vec<TableUpdate>), Error> {
101116
match self {
117+
Operation::AppendSequenceGroups {
118+
branch,
119+
sequence_groups,
120+
} => {
121+
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
122+
123+
let manifest_list_schema = match table_metadata.format_version {
124+
FormatVersion::V1 => manifest_list_schema_v1(),
125+
FormatVersion::V2 => manifest_list_schema_v2(),
126+
};
127+
128+
let n_data_files = sequence_groups.iter().map(|d| d.data_files.len()).sum();
129+
let n_delete_files = sequence_groups.iter().map(|d| d.delete_files.len()).sum();
130+
if n_data_files + n_delete_files == 0 {
131+
return Ok((None, vec![]));
132+
};
133+
134+
let all_files: Vec<DataFile> = sequence_groups
135+
.iter()
136+
.flat_map(|d| d.delete_files.iter().chain(d.data_files.iter()))
137+
.cloned()
138+
.collect();
139+
let additional_summary = append_summary(&all_files);
140+
141+
let mut manifest_list_writer = if let Some(manifest_list_bytes) =
142+
prefetch_manifest_list(old_snapshot, &object_store)
143+
{
144+
let bytes = manifest_list_bytes.await??;
145+
ManifestListWriter::from_existing(
146+
&bytes,
147+
all_files.iter(),
148+
manifest_list_schema,
149+
table_metadata,
150+
branch.as_deref(),
151+
)?
152+
} else {
153+
ManifestListWriter::new(
154+
all_files.iter(),
155+
manifest_list_schema,
156+
table_metadata,
157+
branch.as_deref(),
158+
)?
159+
};
160+
161+
let snapshot_id = generate_snapshot_id();
162+
163+
let mut dsn_offset: i64 = 0;
164+
for SequenceGroup {
165+
data_files,
166+
delete_files,
167+
} in sequence_groups.into_iter()
168+
{
169+
dsn_offset += 1;
170+
let n_data_files_in_group = data_files.len();
171+
let n_delete_files_in_group = delete_files.len();
172+
173+
let new_datafile_iter = data_files.into_iter().map(|data_file| {
174+
ManifestEntry::builder()
175+
.with_format_version(table_metadata.format_version)
176+
.with_status(Status::Added)
177+
.with_data_file(data_file)
178+
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset)
179+
.build()
180+
.map_err(Error::from)
181+
});
182+
183+
let new_deletefile_iter = delete_files.into_iter().map(|data_file| {
184+
ManifestEntry::builder()
185+
.with_format_version(table_metadata.format_version)
186+
.with_status(Status::Added)
187+
.with_data_file(data_file)
188+
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset)
189+
.build()
190+
.map_err(Error::from)
191+
});
192+
193+
// Write manifest files
194+
// Split manifest file if limit is exceeded
195+
for (content, files, n_files) in [
196+
(
197+
Content::Data,
198+
Either::Left(new_datafile_iter),
199+
n_data_files_in_group,
200+
),
201+
(
202+
Content::Deletes,
203+
Either::Right(new_deletefile_iter),
204+
n_delete_files_in_group,
205+
),
206+
] {
207+
if n_files != 0 {
208+
manifest_list_writer
209+
.append(files, snapshot_id, object_store.clone(), content)
210+
.await?;
211+
}
212+
}
213+
}
214+
215+
let new_manifest_list_location = manifest_list_writer
216+
.finish(snapshot_id, object_store)
217+
.await?;
218+
219+
let snapshot_operation = match (n_data_files, n_delete_files) {
220+
(0, 0) => return Ok((None, Vec::new())),
221+
(_, 0) => Ok::<_, Error>(SnapshotOperation::Append),
222+
(0, _) => Ok(SnapshotOperation::Delete),
223+
(_, _) => Ok(SnapshotOperation::Overwrite),
224+
}?;
225+
226+
let mut snapshot_builder = SnapshotBuilder::default();
227+
snapshot_builder
228+
.with_snapshot_id(snapshot_id)
229+
.with_manifest_list(new_manifest_list_location)
230+
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset)
231+
.with_summary(Summary {
232+
operation: snapshot_operation,
233+
other: additional_summary.unwrap_or_default(),
234+
})
235+
.with_schema_id(
236+
*table_metadata
237+
.current_schema(branch.as_deref())?
238+
.schema_id(),
239+
);
240+
if let Some(snapshot) = old_snapshot {
241+
snapshot_builder.with_parent_snapshot_id(*snapshot.snapshot_id());
242+
}
243+
let snapshot = snapshot_builder.build()?;
244+
245+
Ok((
246+
old_snapshot.map(|x| TableRequirement::AssertRefSnapshotId {
247+
r#ref: branch.clone().unwrap_or("main".to_owned()),
248+
snapshot_id: *x.snapshot_id(),
249+
}),
250+
vec![
251+
TableUpdate::AddSnapshot { snapshot },
252+
TableUpdate::SetSnapshotRef {
253+
ref_name: branch.unwrap_or("main".to_owned()),
254+
snapshot_reference: SnapshotReference {
255+
snapshot_id,
256+
retention: SnapshotRetention::default(),
257+
},
258+
},
259+
],
260+
))
261+
}
102262
Operation::Append {
103263
branch,
104264
data_files,

0 commit comments

Comments
 (0)