Skip to content

Commit d504fc9

Browse files
committed
try_stream! and inline batch-building for manifests as well
1 parent c7aeced commit d504fc9

File tree

1 file changed

+79
-88
lines changed

1 file changed

+79
-88
lines changed

crates/iceberg/src/inspect/manifests.rs

+79-88
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,11 @@ use arrow_array::builder::{
2222
};
2323
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
2424
use arrow_array::RecordBatch;
25-
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
25+
use arrow_schema::{DataType, Field, Fields, Schema};
26+
use async_stream::try_stream;
2627
use futures::StreamExt;
2728

28-
use crate::io::FileIO;
2929
use crate::scan::ArrowRecordBatchStream;
30-
use crate::spec::TableMetadata;
3130
use crate::table::Table;
3231
use crate::Result;
3332

@@ -83,96 +82,88 @@ impl<'a> ManifestsTable<'a> {
8382
let table_metadata = self.table.metadata_ref();
8483
let file_io = self.table.file_io().clone();
8584

86-
Ok(futures::stream::once(async move {
87-
Self::build_batch(arrow_schema, &table_metadata, &file_io).await
88-
})
89-
.boxed())
90-
}
91-
92-
async fn build_batch(
93-
arrow_schema: SchemaRef,
94-
table_metadata: &TableMetadata,
95-
file_io: &FileIO,
96-
) -> Result<RecordBatch> {
97-
let mut content = PrimitiveBuilder::<Int8Type>::new();
98-
let mut path = StringBuilder::new();
99-
let mut length = PrimitiveBuilder::<Int64Type>::new();
100-
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
101-
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
102-
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
103-
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
104-
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
105-
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
106-
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
107-
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
108-
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
109-
Fields::from(Self::partition_summary_fields()),
110-
0,
111-
))
112-
.with_field(Arc::new(Field::new_struct(
113-
"item",
114-
Self::partition_summary_fields(),
115-
false,
116-
)));
85+
Ok(try_stream! {
86+
let mut content = PrimitiveBuilder::<Int8Type>::new();
87+
let mut path = StringBuilder::new();
88+
let mut length = PrimitiveBuilder::<Int64Type>::new();
89+
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
90+
let mut added_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
91+
let mut added_data_files_count = PrimitiveBuilder::<Int32Type>::new();
92+
let mut existing_data_files_count = PrimitiveBuilder::<Int32Type>::new();
93+
let mut deleted_data_files_count = PrimitiveBuilder::<Int32Type>::new();
94+
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
95+
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
96+
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
97+
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
98+
Fields::from(Self::partition_summary_fields()),
99+
0,
100+
))
101+
.with_field(Arc::new(Field::new_struct(
102+
"item",
103+
Self::partition_summary_fields(),
104+
false,
105+
)));
117106

118-
if let Some(snapshot) = table_metadata.current_snapshot() {
119-
let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?;
120-
for manifest in manifest_list.entries() {
121-
content.append_value(manifest.content as i8);
122-
path.append_value(manifest.manifest_path.clone());
123-
length.append_value(manifest.manifest_length);
124-
partition_spec_id.append_value(manifest.partition_spec_id);
125-
added_snapshot_id.append_value(manifest.added_snapshot_id);
126-
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
127-
existing_data_files_count
128-
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
129-
deleted_data_files_count
130-
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
131-
added_delete_files_count
132-
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
133-
existing_delete_files_count
134-
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
135-
deleted_delete_files_count
136-
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
107+
if let Some(snapshot) = table_metadata.current_snapshot() {
108+
let manifest_list = snapshot.load_manifest_list(&file_io, &table_metadata).await?;
109+
for manifest in manifest_list.entries() {
110+
content.append_value(manifest.content as i8);
111+
path.append_value(manifest.manifest_path.clone());
112+
length.append_value(manifest.manifest_length);
113+
partition_spec_id.append_value(manifest.partition_spec_id);
114+
added_snapshot_id.append_value(manifest.added_snapshot_id);
115+
added_data_files_count.append_value(manifest.added_files_count.unwrap_or(0) as i32);
116+
existing_data_files_count
117+
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
118+
deleted_data_files_count
119+
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
120+
added_delete_files_count
121+
.append_value(manifest.added_files_count.unwrap_or(0) as i32);
122+
existing_delete_files_count
123+
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
124+
deleted_delete_files_count
125+
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);
137126

138-
let partition_summaries_builder = partition_summaries.values();
139-
for summary in &manifest.partitions {
140-
partition_summaries_builder
141-
.field_builder::<BooleanBuilder>(0)
142-
.unwrap()
143-
.append_value(summary.contains_null);
144-
partition_summaries_builder
145-
.field_builder::<BooleanBuilder>(1)
146-
.unwrap()
147-
.append_option(summary.contains_nan);
148-
partition_summaries_builder
149-
.field_builder::<StringBuilder>(2)
150-
.unwrap()
151-
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
152-
partition_summaries_builder
153-
.field_builder::<StringBuilder>(3)
154-
.unwrap()
155-
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
156-
partition_summaries_builder.append(true);
127+
let partition_summaries_builder = partition_summaries.values();
128+
for summary in &manifest.partitions {
129+
partition_summaries_builder
130+
.field_builder::<BooleanBuilder>(0)
131+
.unwrap()
132+
.append_value(summary.contains_null);
133+
partition_summaries_builder
134+
.field_builder::<BooleanBuilder>(1)
135+
.unwrap()
136+
.append_option(summary.contains_nan);
137+
partition_summaries_builder
138+
.field_builder::<StringBuilder>(2)
139+
.unwrap()
140+
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
141+
partition_summaries_builder
142+
.field_builder::<StringBuilder>(3)
143+
.unwrap()
144+
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
145+
partition_summaries_builder.append(true);
146+
}
147+
partition_summaries.append(true);
157148
}
158-
partition_summaries.append(true);
159149
}
160-
}
161150

162-
Ok(RecordBatch::try_new(arrow_schema, vec![
163-
Arc::new(content.finish()),
164-
Arc::new(path.finish()),
165-
Arc::new(length.finish()),
166-
Arc::new(partition_spec_id.finish()),
167-
Arc::new(added_snapshot_id.finish()),
168-
Arc::new(added_data_files_count.finish()),
169-
Arc::new(existing_data_files_count.finish()),
170-
Arc::new(deleted_data_files_count.finish()),
171-
Arc::new(added_delete_files_count.finish()),
172-
Arc::new(existing_delete_files_count.finish()),
173-
Arc::new(deleted_delete_files_count.finish()),
174-
Arc::new(partition_summaries.finish()),
175-
])?)
151+
yield RecordBatch::try_new(arrow_schema, vec![
152+
Arc::new(content.finish()),
153+
Arc::new(path.finish()),
154+
Arc::new(length.finish()),
155+
Arc::new(partition_spec_id.finish()),
156+
Arc::new(added_snapshot_id.finish()),
157+
Arc::new(added_data_files_count.finish()),
158+
Arc::new(existing_data_files_count.finish()),
159+
Arc::new(deleted_data_files_count.finish()),
160+
Arc::new(added_delete_files_count.finish()),
161+
Arc::new(existing_delete_files_count.finish()),
162+
Arc::new(deleted_delete_files_count.finish()),
163+
Arc::new(partition_summaries.finish()),
164+
])?;
165+
}
166+
.boxed())
176167
}
177168
}
178169

0 commit comments

Comments
 (0)