Skip to content

Commit 9c6514d

Browse files
committed
Metadata table scans as streams
1 parent 1c632b8 commit 9c6514d

File tree

1 file changed

+60
-30
lines changed

1 file changed

+60
-30
lines changed

crates/iceberg/src/metadata_scan.rs

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ use arrow_array::builder::{
2424
};
2525
use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondType};
2626
use arrow_array::RecordBatch;
27-
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
27+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
28+
use futures::StreamExt;
2829

30+
use crate::io::FileIO;
31+
use crate::scan::ArrowRecordBatchStream;
32+
use crate::spec::TableMetadata;
2933
use crate::table::Table;
3034
use crate::Result;
3135

@@ -95,7 +99,17 @@ impl<'a> SnapshotsTable<'a> {
9599
}
96100

97101
/// Scans the snapshots table.
98-
pub fn scan(&self) -> Result<RecordBatch> {
102+
pub fn scan(&self) -> Result<ArrowRecordBatchStream> {
103+
let arrow_schema = Arc::new(self.schema());
104+
let table_metadata = self.table.metadata_ref();
105+
106+
Ok(
107+
futures::stream::once(async move { Self::build_batch(arrow_schema, &table_metadata) })
108+
.boxed(),
109+
)
110+
}
111+
112+
fn build_batch(arrow_schema: SchemaRef, table_metadata: &TableMetadata) -> Result<RecordBatch> {
99113
let mut committed_at =
100114
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
101115
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
@@ -104,7 +118,7 @@ impl<'a> SnapshotsTable<'a> {
104118
let mut manifest_list = StringBuilder::new();
105119
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());
106120

107-
for snapshot in self.table.metadata().snapshots() {
121+
for snapshot in table_metadata.snapshots() {
108122
committed_at.append_value(snapshot.timestamp_ms());
109123
snapshot_id.append_value(snapshot.snapshot_id());
110124
parent_id.append_option(snapshot.parent_snapshot_id());
@@ -117,7 +131,7 @@ impl<'a> SnapshotsTable<'a> {
117131
summary.append(true)?;
118132
}
119133

120-
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
134+
Ok(RecordBatch::try_new(arrow_schema, vec![
121135
Arc::new(committed_at.finish()),
122136
Arc::new(snapshot_id.finish()),
123137
Arc::new(parent_id.finish()),
@@ -134,7 +148,7 @@ pub struct ManifestsTable<'a> {
134148
}
135149

136150
impl<'a> ManifestsTable<'a> {
137-
fn partition_summary_fields(&self) -> Vec<Field> {
151+
fn partition_summary_fields() -> Vec<Field> {
138152
vec![
139153
Field::new("contains_null", DataType::Boolean, false),
140154
Field::new("contains_nan", DataType::Boolean, true),
@@ -161,7 +175,7 @@ impl<'a> ManifestsTable<'a> {
161175
"partition_summaries",
162176
DataType::List(Arc::new(Field::new_struct(
163177
"item",
164-
self.partition_summary_fields(),
178+
Self::partition_summary_fields(),
165179
false,
166180
))),
167181
false,
@@ -170,7 +184,22 @@ impl<'a> ManifestsTable<'a> {
170184
}
171185

172186
/// Scans the manifests table.
173-
pub async fn scan(&self) -> Result<RecordBatch> {
187+
pub fn scan(&self) -> Result<ArrowRecordBatchStream> {
188+
let arrow_schema = Arc::new(self.schema());
189+
let table_metadata = self.table.metadata_ref();
190+
let file_io = self.table.file_io().clone();
191+
192+
Ok(futures::stream::once(async move {
193+
Self::build_batch(arrow_schema, &table_metadata, &file_io).await
194+
})
195+
.boxed())
196+
}
197+
198+
async fn build_batch(
199+
arrow_schema: SchemaRef,
200+
table_metadata: &TableMetadata,
201+
file_io: &FileIO,
202+
) -> Result<RecordBatch> {
174203
let mut content = PrimitiveBuilder::<Int8Type>::new();
175204
let mut path = StringBuilder::new();
176205
let mut length = PrimitiveBuilder::<Int64Type>::new();
@@ -183,19 +212,17 @@ impl<'a> ManifestsTable<'a> {
183212
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
184213
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
185214
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
186-
Fields::from(self.partition_summary_fields()),
215+
Fields::from(Self::partition_summary_fields()),
187216
0,
188217
))
189218
.with_field(Arc::new(Field::new_struct(
190219
"item",
191-
self.partition_summary_fields(),
220+
Self::partition_summary_fields(),
192221
false,
193222
)));
194223

195-
if let Some(snapshot) = self.table.metadata().current_snapshot() {
196-
let manifest_list = snapshot
197-
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
198-
.await?;
224+
if let Some(snapshot) = table_metadata.current_snapshot() {
225+
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
199226
for manifest in manifest_list.entries() {
200227
content.append_value(manifest.content as i8);
201228
path.append_value(manifest.manifest_path.clone());
@@ -238,7 +265,7 @@ impl<'a> ManifestsTable<'a> {
238265
}
239266
}
240267

241-
Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
268+
Ok(RecordBatch::try_new(arrow_schema, vec![
242269
Arc::new(content.finish()),
243270
Arc::new(path.finish()),
244271
Arc::new(length.finish()),
@@ -257,7 +284,9 @@ impl<'a> ManifestsTable<'a> {
257284

258285
#[cfg(test)]
259286
mod tests {
287+
use arrow_select::concat::concat_batches;
260288
use expect_test::{expect, Expect};
289+
use futures::TryStreamExt;
261290
use itertools::Itertools;
262291

263292
use super::*;
@@ -271,13 +300,20 @@ mod tests {
271300
/// Check the doc of [`expect_test`] for more details.
272301
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
273302
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
274-
fn check_record_batch(
275-
record_batch: RecordBatch,
303+
async fn check_record_batches(
304+
batch_stream: ArrowRecordBatchStream,
276305
expected_schema: Expect,
277306
expected_data: Expect,
278307
ignore_check_columns: &[&str],
279308
sort_column: Option<&str>,
280309
) {
310+
let record_batches = batch_stream.try_collect::<Vec<_>>().await.unwrap();
311+
assert!(!record_batches.is_empty(), "Empty record batches");
312+
313+
// Combine record batches using the first batch's schema
314+
let first_batch = record_batches.first().unwrap();
315+
let record_batch = concat_batches(&first_batch.schema(), record_batches).unwrap();
316+
281317
let mut columns = record_batch.columns().to_vec();
282318
if let Some(sort_column) = sort_column {
283319
let column = record_batch.column_by_name(sort_column).unwrap();
@@ -310,12 +346,12 @@ mod tests {
310346
));
311347
}
312348

313-
#[test]
314-
fn test_snapshots_table() {
349+
#[tokio::test]
350+
async fn test_snapshots_table() {
315351
let table = TableTestFixture::new().table;
316-
let record_batch = table.metadata_table().snapshots().scan().unwrap();
317-
check_record_batch(
318-
record_batch,
352+
let batch_stream = table.metadata_table().snapshots().scan().unwrap();
353+
check_record_batches(
354+
batch_stream,
319355
expect![[r#"
320356
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
321357
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
@@ -386,16 +422,10 @@ mod tests {
386422
let mut fixture = TableTestFixture::new();
387423
fixture.setup_manifest_files().await;
388424

389-
let record_batch = fixture
390-
.table
391-
.metadata_table()
392-
.manifests()
393-
.scan()
394-
.await
395-
.unwrap();
425+
let batch_stream = fixture.table.metadata_table().manifests().scan().unwrap();
396426

397-
check_record_batch(
398-
record_batch,
427+
check_record_batches(
428+
batch_stream,
399429
expect![[r#"
400430
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
401431
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },

0 commit comments

Comments
 (0)