Skip to content

Commit 1c885ab

Browse files
authored
Merge pull request JanKaul#255 from JanKaul/feat-snapshot-column-bounds
compute snapshot column bounds
2 parents 6659e0d + 3e061c8 commit 1c885ab

File tree

3 files changed

+103
-3
lines changed

3 files changed

+103
-3
lines changed

iceberg-rust-spec/src/spec/types.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::{collections::HashMap, fmt, ops::Index, slice::Iter};
1919

2020
use derive_builder::Builder;
2121

22+
use itertools::Itertools;
2223
use serde::{
2324
de::{self, Error as SerdeError, IntoDeserializer, MapAccess, Visitor},
2425
Deserialize, Deserializer, Serialize, Serializer,
@@ -360,6 +361,29 @@ impl StructType {
360361
pub fn iter(&self) -> Iter<'_, StructField> {
361362
self.fields.iter()
362363
}
364+
365+
/// Returns an iterator over all field IDs in this struct, sorted in ascending order
366+
///
367+
/// # Returns
368+
/// * An iterator yielding field IDs (i32) in sorted order
369+
pub fn field_ids(&self) -> impl Iterator<Item = i32> {
370+
self.lookup.keys().map(ToOwned::to_owned).sorted()
371+
}
372+
373+
/// Returns an iterator over field IDs of primitive-type fields only, sorted in ascending order
374+
///
375+
/// This method filters the struct's fields to return only those with primitive types
376+
/// (boolean, numeric, string, etc.), excluding complex types like structs, lists, and maps.
377+
///
378+
/// # Returns
379+
/// * An iterator yielding field IDs (i32) of primitive fields in sorted order
380+
pub fn primitive_field_ids(&self) -> impl Iterator<Item = i32> {
381+
self.lookup
382+
.iter()
383+
.filter(|(_, x)| matches!(self.fields[**x].field_type, Type::Primitive(_)))
384+
.map(|x| x.0.to_owned())
385+
.sorted()
386+
}
363387
}
364388

365389
impl Index<usize> for StructType {

iceberg-rust/src/table/manifest_list.rs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
use apache_avro::{
1313
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
1414
};
15-
use futures::future::join_all;
15+
use futures::{future::join_all, TryStreamExt};
1616
use iceberg_rust_spec::{
1717
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
1818
manifest_list::{
@@ -28,7 +28,8 @@ use smallvec::SmallVec;
2828

2929
use crate::{
3030
error::Error,
31-
util::{summary_to_rectangle, Rectangle},
31+
table::datafiles,
32+
util::{summary_to_rectangle, Rectangle, Vec4},
3233
};
3334

3435
use super::{
@@ -213,6 +214,81 @@ pub async fn snapshot_partition_bounds(
213214
})
214215
}
215216

217+
/// Computes the column bounds (minimum and maximum values) for all primitive fields
218+
/// across all data files in a snapshot.
219+
///
220+
/// This function reads all manifests in the snapshot, extracts data files from them,
221+
/// and computes a bounding rectangle that encompasses the lower and upper bounds
222+
/// of all primitive columns across all data files.
223+
///
224+
/// # Arguments
225+
///
226+
/// * `snapshot` - The snapshot to compute column bounds for
227+
/// * `table_metadata` - Metadata of the table containing schema information
228+
/// * `object_store` - Object store implementation for reading manifest files
229+
///
230+
/// # Returns
231+
///
232+
/// Returns `Ok(Some(Rectangle))` containing the computed bounds, or `Ok(None)` if
233+
/// no data files are found. Returns an error if:
234+
/// - Schema cannot be resolved for the snapshot
235+
/// - Manifest files cannot be read
236+
/// - Column bounds are missing for any primitive field in any data file
237+
///
238+
/// # Errors
239+
///
240+
/// * `Error::NotFound` - When column bounds are missing for a primitive field
241+
/// * Other I/O errors from reading manifest or data files
242+
pub async fn snapshot_column_bounds(
243+
snapshot: &Snapshot,
244+
table_metadata: &TableMetadata,
245+
object_store: Arc<dyn ObjectStore>,
246+
) -> Result<Option<Rectangle>, Error> {
247+
let schema = table_metadata
248+
.schema(*snapshot.snapshot_id())
249+
.or(table_metadata.current_schema(None))?;
250+
let manifests = read_snapshot(snapshot, table_metadata, object_store.clone())
251+
.await?
252+
.collect::<Result<Vec<_>, _>>()?;
253+
let datafiles = datafiles(object_store, &manifests, None, (None, None)).await?;
254+
255+
let primitive_field_ids = schema.primitive_field_ids().collect::<Vec<_>>();
256+
let n = primitive_field_ids.len();
257+
datafiles
258+
.try_fold(None::<Rectangle>, |acc, (_, manifest)| {
259+
let primitive_field_ids = &primitive_field_ids;
260+
async move {
261+
let mut mins = Vec4::with_capacity(n);
262+
let mut maxs = Vec4::with_capacity(n);
263+
for (i, id) in primitive_field_ids.iter().enumerate() {
264+
let min = manifest
265+
.data_file()
266+
.lower_bounds()
267+
.as_ref()
268+
.and_then(|x| x.get(id));
269+
let max = manifest
270+
.data_file()
271+
.upper_bounds()
272+
.as_ref()
273+
.and_then(|x| x.get(id));
274+
let (Some(min), Some(max)) = (min, max) else {
275+
return Err(Error::NotFound("column bounds".to_string()));
276+
};
277+
mins[i] = min.clone();
278+
maxs[i] = max.clone();
279+
}
280+
let rect = Rectangle::new(mins, maxs);
281+
if let Some(mut acc) = acc {
282+
acc.expand(&rect);
283+
Ok(Some(acc))
284+
} else {
285+
Ok(Some(rect))
286+
}
287+
}
288+
})
289+
.await
290+
}
291+
216292
/// A writer for Iceberg manifest list files that manages the creation and updating of manifest lists.
217293
///
218294
/// The ManifestListWriter is responsible for:

iceberg-rust/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use smallvec::SmallVec;
88

99
use crate::error::Error;
1010

11-
type Vec4<T> = SmallVec<[T; 4]>;
11+
pub(crate) type Vec4<T> = SmallVec<[T; 4]>;
1212

1313
#[derive(Debug, Clone, PartialEq, Eq)]
1414
pub struct Rectangle {

0 commit comments

Comments
 (0)