Skip to content

Commit fcc8892

Browse files
authored
feat: Add summary functionality to SnapshotProduceAction (#1139)
## Which issue does this PR close? - Closes #724 . ## What changes are included in this PR? Added summary functionality to snapshot produce action
1 parent ab5497b commit fcc8892

File tree

4 files changed

+78
-15
lines changed

4 files changed

+78
-15
lines changed

crates/iceberg/src/spec/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub use manifest_list::*;
4141
pub use partition::*;
4242
pub use schema::*;
4343
pub use snapshot::*;
44+
pub use snapshot_summary::*;
4445
pub use sort::*;
4546
pub use statistic_file::*;
4647
pub use table_metadata::*;

crates/iceberg/src/spec/snapshot_summary.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,10 @@ const TOTAL_FILE_SIZE: &str = "total-files-size";
4848
const CHANGED_PARTITION_COUNT_PROP: &str = "changed-partition-count";
4949
const CHANGED_PARTITION_PREFIX: &str = "partitions.";
5050

51+
/// `SnapshotSummaryCollector` collects and aggregates snapshot update metrics.
52+
/// It gathers metrics about added or removed data files and manifests, and tracks
53+
/// partition-specific updates.
5154
#[derive(Default)]
52-
#[allow(dead_code)]
5355
pub struct SnapshotSummaryCollector {
5456
metrics: UpdateMetrics,
5557
partition_metrics: HashMap<String, UpdateMetrics>,
@@ -58,17 +60,19 @@ pub struct SnapshotSummaryCollector {
5860
trust_partition_metrics: bool,
5961
}
6062

61-
#[allow(dead_code)]
6263
impl SnapshotSummaryCollector {
63-
// Set properties
64+
/// Set properties for snapshot summary
6465
pub fn set(&mut self, key: &str, value: &str) {
6566
self.properties.insert(key.to_string(), value.to_string());
6667
}
6768

69+
/// Sets the limit for including partition summaries. Summaries are not
70+
/// included if the number of partitions is exceeded.
6871
pub fn set_partition_summary_limit(&mut self, limit: u64) {
6972
self.max_changed_partitions_for_summaries = limit;
7073
}
7174

75+
/// Adds a data file to the summary collector
7276
pub fn add_file(
7377
&mut self,
7478
data_file: &DataFile,
@@ -81,6 +85,7 @@ impl SnapshotSummaryCollector {
8185
}
8286
}
8387

88+
/// Removes a data file from the summary collector
8489
pub fn remove_file(
8590
&mut self,
8691
data_file: &DataFile,
@@ -93,12 +98,14 @@ impl SnapshotSummaryCollector {
9398
}
9499
}
95100

101+
/// Adds a manifest to the summary collector
96102
pub fn add_manifest(&mut self, manifest: &ManifestFile) {
97103
self.trust_partition_metrics = false;
98104
self.partition_metrics.clear();
99105
self.metrics.add_manifest(manifest);
100106
}
101107

108+
/// Updates partition-specific metrics for a data file.
102109
pub fn update_partition_metrics(
103110
&mut self,
104111
schema: SchemaRef,
@@ -116,6 +123,7 @@ impl SnapshotSummaryCollector {
116123
}
117124
}
118125

126+
/// Merges another `SnapshotSummaryCollector` into the current one
119127
pub fn merge(&mut self, summary: SnapshotSummaryCollector) {
120128
self.metrics.merge(&summary.metrics);
121129
self.properties.extend(summary.properties);
@@ -133,6 +141,7 @@ impl SnapshotSummaryCollector {
133141
}
134142
}
135143

144+
/// Builds final map of summaries
136145
pub fn build(&self) -> HashMap<String, String> {
137146
let mut properties = self.metrics.to_map();
138147
let changed_partitions_count = self.partition_metrics.len() as u64;
@@ -507,8 +516,6 @@ fn update_totals(
507516
.insert(total_property.to_string(), new_total.to_string());
508517
}
509518

510-
// TODO: ancestors of function
511-
512519
#[cfg(test)]
513520
mod tests {
514521
use std::collections::HashMap;

crates/iceberg/src/spec/table_metadata.rs

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo
7777
/// Default value for max number of previous versions to keep.
7878
pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
7979

80+
/// Property key for max number of partitions to keep summary stats for.
81+
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit";
82+
/// Default value for the max number of partitions to keep summary stats for.
83+
pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
84+
8085
/// Reserved Iceberg table properties list.
8186
///
8287
/// Reserved table properties are only used to control behaviors when creating or updating a

crates/iceberg/src/transaction/snapshot.rs

+60-10
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ use uuid::Uuid;
2424
use crate::error::Result;
2525
use crate::io::OutputFile;
2626
use crate::spec::{
27-
DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter,
28-
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct,
29-
StructType, Summary, MAIN_BRANCH,
27+
update_snapshot_summaries, DataFile, DataFileFormat, FormatVersion, ManifestEntry,
28+
ManifestFile, ManifestListWriter, ManifestWriterBuilder, Operation, Snapshot,
29+
SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
30+
MAIN_BRANCH, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
31+
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
3032
};
3133
use crate::transaction::Transaction;
3234
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
@@ -221,13 +223,55 @@ impl<'a> SnapshotProduceAction<'a> {
221223
Ok(manifest_files)
222224
}
223225

224-
// # TODO
225-
// Fulfill this function
226-
fn summary<OP: SnapshotProduceOperation>(&self, snapshot_produce_operation: &OP) -> Summary {
227-
Summary {
228-
operation: snapshot_produce_operation.operation(),
229-
additional_properties: self.snapshot_properties.clone(),
226+
// Returns a `Summary` of the current snapshot
227+
fn summary<OP: SnapshotProduceOperation>(
228+
&self,
229+
snapshot_produce_operation: &OP,
230+
) -> Result<Summary> {
231+
let mut summary_collector = SnapshotSummaryCollector::default();
232+
let table_metadata = self.tx.table.metadata_ref();
233+
234+
let partition_summary_limit = if let Some(limit) = table_metadata
235+
.properties()
236+
.get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
237+
{
238+
if let Ok(limit) = limit.parse::<u64>() {
239+
limit
240+
} else {
241+
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
242+
}
243+
} else {
244+
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
245+
};
246+
247+
summary_collector.set_partition_summary_limit(partition_summary_limit);
248+
249+
for data_file in &self.added_data_files {
250+
summary_collector.add_file(
251+
data_file,
252+
table_metadata.current_schema().clone(),
253+
table_metadata.default_partition_spec().clone(),
254+
);
230255
}
256+
257+
let previous_snapshot = table_metadata
258+
.snapshot_by_id(self.snapshot_id)
259+
.and_then(|snapshot| snapshot.parent_snapshot_id())
260+
.and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
261+
262+
let mut additional_properties = summary_collector.build();
263+
additional_properties.extend(self.snapshot_properties.clone());
264+
265+
let summary = Summary {
266+
operation: snapshot_produce_operation.operation(),
267+
additional_properties,
268+
};
269+
270+
update_snapshot_summaries(
271+
summary,
272+
previous_snapshot.map(|s| s.summary()),
273+
snapshot_produce_operation.operation() == Operation::Overwrite,
274+
)
231275
}
232276

233277
fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
@@ -253,7 +297,13 @@ impl<'a> SnapshotProduceAction<'a> {
253297
.await?;
254298
let next_seq_num = self.tx.table.metadata().next_sequence_number();
255299

256-
let summary = self.summary(&snapshot_produce_operation);
300+
let summary = self
301+
.summary(&snapshot_produce_operation)
302+
.map_err(|err| {
303+
Error::new(ErrorKind::Unexpected, "Failed to create snapshot summary.")
304+
.with_source(err)
305+
})
306+
.unwrap();
257307

258308
let manifest_list_path = self.generate_manifest_list_file_path(0);
259309

0 commit comments

Comments
 (0)