Skip to content

Commit 5b8593f

Browse files
Jan KaulJanKaul
authored andcommitted
track number of written manifests
1 parent 8079cca commit 5b8593f

File tree

1 file changed

+55
-51
lines changed

1 file changed

+55
-51
lines changed

iceberg-rust/src/table/manifest_list.rs

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ pub(crate) struct ManifestListWriter<'schema, 'metadata> {
250250
bounding_partition_values: Rectangle,
251251
n_existing_files: usize,
252252
commit_uuid: String,
253+
manifest_count: usize,
253254
branch: Option<String>,
254255
}
255256

@@ -312,6 +313,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
312313
bounding_partition_values,
313314
n_existing_files: 0,
314315
commit_uuid,
316+
manifest_count: 0,
315317
branch: branch.map(ToOwned::to_owned),
316318
})
317319
}
@@ -403,6 +405,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
403405
bounding_partition_values,
404406
n_existing_files: file_count_all_entries,
405407
commit_uuid,
408+
manifest_count: 0,
406409
branch: branch.map(ToOwned::to_owned),
407410
})
408411
}
@@ -506,6 +509,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
506509
bounding_partition_values,
507510
n_existing_files: file_count_all_entries,
508511
commit_uuid,
512+
manifest_count: 0,
509513
branch: branch.map(ToOwned::to_owned),
510514
},
511515
manifests,
@@ -702,8 +706,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
702706
{
703707
let manifest_bytes = manifest_bytes.await??;
704708

705-
manifest.manifest_path =
706-
new_manifest_location(&self.table_metadata.location, &self.commit_uuid, 0);
709+
manifest.manifest_path = self.next_manifest_location();
707710

708711
let manifest_reader = ManifestReader::new(manifest_bytes.as_ref())?;
709712

@@ -725,8 +728,7 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
725728
)?
726729
}
727730
} else {
728-
let manifest_location =
729-
new_manifest_location(&self.table_metadata.location, &self.commit_uuid, 0);
731+
let manifest_location = self.next_manifest_location();
730732

731733
ManifestWriter::new(
732734
&manifest_location,
@@ -956,10 +958,8 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
956958

957959
let manifest_futures = splits
958960
.into_iter()
959-
.enumerate()
960-
.map(|(i, entries)| {
961-
let manifest_location =
962-
new_manifest_location(&self.table_metadata.location, &self.commit_uuid, i);
961+
.map(|entries| {
962+
let manifest_location = self.next_manifest_location();
963963

964964
let mut manifest_writer = ManifestWriter::new(
965965
&manifest_location,
@@ -1078,7 +1078,6 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10781078
data_files_to_filter: &HashMap<String, Vec<String>>,
10791079
object_store: Arc<dyn ObjectStore>,
10801080
) -> Result<(), Error> {
1081-
let table_metadata = &self.table_metadata;
10821081
let partition_fields = self
10831082
.table_metadata
10841083
.current_partition_fields(self.branch.as_deref())?;
@@ -1088,48 +1087,43 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
10881087
&self.table_metadata.format_version,
10891088
)?);
10901089

1091-
let futures = manifests_to_overwrite
1092-
.into_iter()
1093-
.enumerate()
1094-
.map(|(i, mut manifest)| {
1095-
let object_store = object_store.clone();
1096-
let location = self.table_metadata.location.clone();
1097-
let commit_uuid = self.commit_uuid.clone();
1098-
let manifest_schema = manifest_schema.clone();
1099-
let branch = self.branch.clone();
1100-
async move {
1101-
let data_files_to_filter: HashSet<String> = data_files_to_filter
1102-
.get(&manifest.manifest_path)
1103-
.ok_or(Error::NotFound("Datafiles for manifest".to_owned()))?
1104-
.iter()
1105-
.map(ToOwned::to_owned)
1106-
.collect();
1107-
1108-
let bytes = object_store
1109-
.clone()
1110-
.get(&strip_prefix(&manifest.manifest_path).into())
1111-
.await?
1112-
.bytes()
1113-
.await?;
1114-
1115-
let manifest_location = new_manifest_location(&location, &commit_uuid, i);
1116-
1117-
manifest.manifest_path = manifest_location;
1118-
1119-
let manifest_writer = ManifestWriter::from_existing_with_filter(
1120-
&bytes,
1121-
manifest,
1122-
&data_files_to_filter,
1123-
&manifest_schema,
1124-
table_metadata,
1125-
branch.as_deref(),
1126-
)?;
1127-
1128-
let new_manifest = manifest_writer.finish(object_store.clone()).await?;
1129-
1130-
Ok::<_, Error>(new_manifest)
1131-
}
1132-
});
1090+
let futures = manifests_to_overwrite.into_iter().map(|mut manifest| {
1091+
let object_store = object_store.clone();
1092+
let manifest_schema = manifest_schema.clone();
1093+
let branch = self.branch.clone();
1094+
let manifest_location = self.next_manifest_location();
1095+
let table_metadata = self.table_metadata;
1096+
async move {
1097+
let data_files_to_filter: HashSet<String> = data_files_to_filter
1098+
.get(&manifest.manifest_path)
1099+
.ok_or(Error::NotFound("Datafiles for manifest".to_owned()))?
1100+
.iter()
1101+
.map(ToOwned::to_owned)
1102+
.collect();
1103+
1104+
let bytes = object_store
1105+
.clone()
1106+
.get(&strip_prefix(&manifest.manifest_path).into())
1107+
.await?
1108+
.bytes()
1109+
.await?;
1110+
1111+
manifest.manifest_path = manifest_location;
1112+
1113+
let manifest_writer = ManifestWriter::from_existing_with_filter(
1114+
&bytes,
1115+
manifest,
1116+
&data_files_to_filter,
1117+
&manifest_schema,
1118+
table_metadata,
1119+
branch.as_deref(),
1120+
)?;
1121+
1122+
let new_manifest = manifest_writer.finish(object_store.clone()).await?;
1123+
1124+
Ok::<_, Error>(new_manifest)
1125+
}
1126+
});
11331127
for manifest_res in join_all(futures).await {
11341128
let manifest = manifest_res?;
11351129
self.writer.append_ser(manifest)?;
@@ -1140,4 +1134,14 @@ impl<'schema, 'metadata> ManifestListWriter<'schema, 'metadata> {
11401134
pub(crate) fn selected_manifest(&self) -> Option<&ManifestListEntry> {
11411135
self.selected_data_manifest.as_ref()
11421136
}
1137+
1138+
/// Get the next manifest location, tracking and numbering preceding manifests written by this
1139+
/// writer.
1140+
fn next_manifest_location(&mut self) -> String {
1141+
let next_id = self.manifest_count;
1142+
1143+
self.manifest_count += 1;
1144+
1145+
new_manifest_location(&self.table_metadata.location, &self.commit_uuid, next_id)
1146+
}
11431147
}

0 commit comments

Comments
 (0)