Skip to content

Commit 79b5650

Browse files
authored
fix: Return error on reader task (#498)
1 parent ab4f69a commit 79b5650

File tree

1 file changed

+45
-37
lines changed

1 file changed

+45
-37
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -93,43 +93,51 @@ impl ArrowReader {
9393
let file_io = self.file_io.clone();
9494

9595
Ok(try_stream! {
96-
while let Some(Ok(task)) = tasks.next().await {
97-
// Collect Parquet column indices from field ids
98-
let mut collector = CollectFieldIdVisitor {
99-
field_ids: HashSet::default(),
100-
};
101-
if let Some(predicates) = task.predicate() {
102-
visit(&mut collector, predicates)?;
103-
}
104-
105-
let parquet_file = file_io
106-
.new_input(task.data_file_path())?;
107-
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
108-
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
109-
110-
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
111-
.await?;
112-
113-
let parquet_schema = batch_stream_builder.parquet_schema();
114-
let arrow_schema = batch_stream_builder.schema();
115-
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
116-
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
117-
118-
let parquet_schema = batch_stream_builder.parquet_schema();
119-
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
120-
121-
if let Some(row_filter) = row_filter {
122-
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
123-
}
124-
125-
if let Some(batch_size) = self.batch_size {
126-
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
127-
}
128-
129-
let mut batch_stream = batch_stream_builder.build()?;
130-
131-
while let Some(batch) = batch_stream.next().await {
132-
yield batch?;
96+
while let Some(task_result) = tasks.next().await {
97+
match task_result {
98+
Ok(task) => {
99+
// Collect Parquet column indices from field ids
100+
let mut collector = CollectFieldIdVisitor {
101+
field_ids: HashSet::default(),
102+
};
103+
if let Some(predicates) = task.predicate() {
104+
visit(&mut collector, predicates)?;
105+
}
106+
107+
let parquet_file = file_io
108+
.new_input(task.data_file_path())?;
109+
110+
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
111+
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
112+
113+
let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
114+
.await?;
115+
116+
let parquet_schema = batch_stream_builder.parquet_schema();
117+
let arrow_schema = batch_stream_builder.schema();
118+
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
119+
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
120+
121+
let parquet_schema = batch_stream_builder.parquet_schema();
122+
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;
123+
124+
if let Some(row_filter) = row_filter {
125+
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
126+
}
127+
128+
if let Some(batch_size) = self.batch_size {
129+
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
130+
}
131+
132+
let mut batch_stream = batch_stream_builder.build()?;
133+
134+
while let Some(batch) = batch_stream.next().await {
135+
yield batch?;
136+
}
137+
}
138+
Err(e) => {
139+
Err(e)?
140+
}
133141
}
134142
}
135143
}

0 commit comments

Comments
 (0)