Skip to content

Commit e287264

Browse files
authored
fix(delete): fix position delete (#8)
* fix position * fix comm * fix comm
1 parent 8c486d6 commit e287264

File tree

1 file changed

+17
-10
lines changed

1 file changed

+17
-10
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::expr::{BoundPredicate, BoundReference};
4848
use crate::io::{FileIO, FileMetadata, FileRead};
4949
use crate::runtime::spawn;
5050
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
51-
use crate::spec::{Datum, Schema};
51+
use crate::spec::{DataContentType, Datum, Schema};
5252
use crate::utils::available_parallelism;
5353
use crate::{Error, ErrorKind};
5454

@@ -210,12 +210,6 @@ impl ArrowReader {
210210
)?;
211211
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);
212212

213-
// RecordBatchTransformer performs any required transformations on the RecordBatches
214-
// that come back from the file, such as type promotion, default column insertion
215-
// and column re-ordering
216-
let mut record_batch_transformer =
217-
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
218-
219213
if let Some(batch_size) = batch_size {
220214
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
221215
}
@@ -269,9 +263,22 @@ impl ArrowReader {
269263
// to the requester.
270264
let mut record_batch_stream = record_batch_stream_builder.build()?;
271265

272-
while let Some(batch) = record_batch_stream.try_next().await? {
273-
tx.send(record_batch_transformer.process_record_batch(batch))
274-
.await?
266+
// The schema of the xxx file doesn't change, so we don't need to convert the schema.
267+
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
268+
while let Some(batch) = record_batch_stream.try_next().await? {
269+
tx.send(Ok(batch)).await?
270+
}
271+
} else {
272+
// RecordBatchTransformer performs any required transformations on the RecordBatches
273+
// that come back from the file, such as type promotion, default column insertion
274+
// and column re-ordering.
275+
let mut record_batch_transformer =
276+
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
277+
278+
while let Some(batch) = record_batch_stream.try_next().await? {
279+
tx.send(record_batch_transformer.process_record_batch(batch))
280+
.await?
281+
}
275282
}
276283

277284
Ok(())

0 commit comments

Comments
 (0)