Skip to content

Commit 20cb183

Browse files
committed
feat: add removal of deleted data files for manifest writing
1 parent 50c01b4 commit 20cb183

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ use uuid::Uuid;
2424
use crate::error::Result;
2525
use crate::io::OutputFile;
2626
use crate::spec::{
27-
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
28-
ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation,
29-
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
30-
Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType,
31-
Summary, update_snapshot_summaries,
27+
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestEntry, ManifestFile,
28+
ManifestListWriter, ManifestWriterBuilder, Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
29+
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
30+
SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
3231
};
3332
use crate::transaction::Transaction;
3433
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
@@ -247,17 +246,30 @@ impl<'a> SnapshotProduceAction<'a> {
247246
let mut existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
248247

249248
if !self.added_data_files.is_empty() {
250-
let added_data_files = std::mem::take(&mut self.added_data_files);
251-
let added_manifest = self.write_added_manifest(added_data_files).await?;
249+
let added_manifest = self.write_added_manifest().await?;
252250
existing_manifests.push(added_manifest);
253251
}
254252

255253
if !self.added_delete_files.is_empty() {
256-
let added_delete_files = std::mem::take(&mut self.added_delete_files);
257-
let added_manifest = self.write_added_manifest(added_delete_files).await?;
258-
existing_manifests.push(added_manifest);
254+
for manifest in existing_manifests.clone() {
255+
let manifest_entry = manifest
256+
.load_manifest(self.tx.current_table.file_io())
257+
.await?;
258+
for entry in manifest_entry.entries() {
259+
// HACK: this will be quite slow.
260+
let idx = if let Some(idx) = self
261+
.added_delete_files
262+
.iter()
263+
.position(|d| d.file_path == entry.data_file.file_path)
264+
{
265+
idx
266+
} else {
267+
continue;
268+
};
269+
existing_manifests.swap_remove(idx);
270+
}
271+
}
259272
}
260-
261273
let manifest_files = manifest_process.process_manifests(existing_manifests);
262274
Ok(manifest_files)
263275
}

0 commit comments

Comments
 (0)