Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
aab78d6
Add REE file column helpers
gbrgr Nov 4, 2025
ee21cab
Add helper tests
gbrgr Nov 4, 2025
37b52e2
Add constants
gbrgr Nov 4, 2025
44463a0
Add support for _file constant
gbrgr Nov 4, 2025
b5449f6
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
e034009
Update tests
gbrgr Nov 4, 2025
4f0a4f1
Fix clippy warning
gbrgr Nov 4, 2025
51f76d3
Fix doc test
gbrgr Nov 4, 2025
d84e16b
Track in field ids
gbrgr Nov 4, 2025
984dacd
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
bd478cb
Add test
gbrgr Nov 4, 2025
8593db0
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 4, 2025
9b186c7
Allow repeated virtual file column selection
gbrgr Nov 4, 2025
30ae5fb
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 5, 2025
adf0da0
Refactor into own transformer step
gbrgr Nov 7, 2025
f4336a8
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 7, 2025
ef3a965
Revert "Refactor into own transformer step"
gbrgr Nov 7, 2025
534490b
Avoid special casing in batch creation
gbrgr Nov 7, 2025
04bf463
.
gbrgr Nov 7, 2025
9e88edf
Modify record batch transformer to support reserved fields
gbrgr Nov 12, 2025
060b45d
Add metadata column helper functions
gbrgr Nov 12, 2025
8572dae
Store fields instead of constants
gbrgr Nov 12, 2025
f273add
Add comment
gbrgr Nov 12, 2025
5aa92ae
Adapt comment
gbrgr Nov 12, 2025
c05b886
.
gbrgr Nov 12, 2025
33bb0ad
Adapt error message
gbrgr Nov 12, 2025
42167ff
Consider field_id range
gbrgr Nov 12, 2025
cbc6b17
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
977c813
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
83443aa
Use REE encoding in record batch transformer
gbrgr Nov 14, 2025
35aba12
Fix clippy errors
gbrgr Nov 14, 2025
830e462
Format
gbrgr Nov 14, 2025
4eb8a63
Add `with_file_path_column` helper
gbrgr Nov 14, 2025
9d41b7f
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 17, 2025
0b8f15b
Rename field
gbrgr Nov 17, 2025
7ce462e
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 17, 2025
fca14bd
Rename method
gbrgr Nov 17, 2025
b7da6d3
Undo some changes
gbrgr Nov 17, 2025
7ebdf87
.
gbrgr Nov 17, 2025
edbc72a
Re-refactor tests
gbrgr Nov 17, 2025
4a08ee6
Undo reader test changes
gbrgr Nov 17, 2025
671fd4f
.
gbrgr Nov 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 308 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ use std::str::FromStr;
use std::sync::Arc;

use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar};
use arrow_array::{
Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar,
StringArray,
};
use arrow_cast::cast::cast;
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{
ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use arrow_string::like::starts_with;
use bytes::Bytes;
Expand Down Expand Up @@ -59,6 +62,12 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

/// Reserved field ID for the file path (_file) column per Iceberg spec
pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646;

/// Column name for the file path metadata column per Iceberg spec
pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
Expand Down Expand Up @@ -219,10 +228,26 @@ impl ArrowReader {
initial_stream_builder
};

// Check if _file column is requested and filter it out for projection
let mut file_column_positions = Vec::new();
let project_field_ids_without_virtual: Vec<i32> = task
.project_field_ids
.iter()
.enumerate()
.filter_map(|(idx, &field_id)| {
if field_id == RESERVED_FIELD_ID_FILE {
file_column_positions.push(idx);
None
} else {
Some(field_id)
}
})
.collect();

// Fallback IDs don't match Parquet's embedded field IDs (since they don't exist),
// so we must use position-based projection instead of field-ID matching
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&project_field_ids_without_virtual,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
Expand All @@ -236,7 +261,7 @@ impl ArrowReader {
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
RecordBatchTransformer::build(task.schema_ref(), &project_field_ids_without_virtual);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -368,13 +393,34 @@ impl ArrowReader {
record_batch_stream_builder.with_row_groups(selected_row_group_indices);
}

// Clone data_file_path for use in the closure
let data_file_path = task.data_file_path.clone();

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream =
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Ok(batch) => {
let mut processed_batch =
record_batch_transformer.process_record_batch(batch)?;

// Add the _file column at each requested position
// We insert them back at their original positions since we're reconstructing
// the original column order
for &position in &file_column_positions {
processed_batch = Self::add_file_path_column_ree_at_position(
processed_batch,
&data_file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
position,
)?;
}

Ok(processed_batch)
}
Err(err) => Err(err.into()),
});

Expand Down Expand Up @@ -523,6 +569,93 @@ impl ArrowReader {
Ok(results.into())
}

/// Helper function to add a `_file` column to a RecordBatch at a specific position.
/// Takes the array, field to add, and position where to insert.
fn create_file_field_at_position(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach is not extensible. I prefer what's similar in this pr:

  1. Add constant_map for ArrowReader
  2. Add another variant of RecordBatchTransformer to handle constant field like _file

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sketched an approach in the record batch transformer, I took the path of the transformer just having a constant_map stored which can be populated by the reader.

batch: RecordBatch,
file_array: ArrayRef,
file_field: Field,
field_id: i32,
position: usize,
) -> Result<RecordBatch> {
let file_field_with_metadata = Arc::new(file_field.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
field_id.to_string(),
)])));

// Build columns vector in a single pass without insert
let original_columns = batch.columns();
let mut columns = Vec::with_capacity(original_columns.len() + 1);
columns.extend_from_slice(&original_columns[..position]);
columns.push(file_array);
columns.extend_from_slice(&original_columns[position..]);

// Build fields vector in a single pass without insert
let schema = batch.schema();
let original_fields = schema.fields();
let mut fields = Vec::with_capacity(original_fields.len() + 1);
fields.extend(original_fields[..position].iter().cloned());
fields.push(file_field_with_metadata);
fields.extend(original_fields[position..].iter().cloned());

let schema = Arc::new(ArrowSchema::new(fields));
RecordBatch::try_new(schema, columns).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to add _file column to RecordBatch",
)
.with_source(e)
})
}

/// Adds a `_file` column to the RecordBatch at a specific position.
/// Uses Run-End Encoding (REE) for maximum memory efficiency.
pub(crate) fn add_file_path_column_ree_at_position(
batch: RecordBatch,
file_path: &str,
field_name: &str,
field_id: i32,
position: usize,
) -> Result<RecordBatch> {
let num_rows = batch.num_rows();

// Use Run-End Encoded array for optimal memory efficiency
let run_ends = if num_rows == 0 {
Int32Array::from(Vec::<i32>::new())
} else {
Int32Array::from(vec![num_rows as i32])
};
let values = if num_rows == 0 {
StringArray::from(Vec::<&str>::new())
} else {
StringArray::from(vec![file_path])
};

let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create RunArray for _file column",
)
.with_source(e)
})?;

let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
let values_field = Arc::new(Field::new("values", DataType::Utf8, true));
let file_field = Field::new(
field_name,
DataType::RunEndEncoded(run_ends_field, values_field),
false,
);

Self::create_file_field_at_position(
batch,
Arc::new(file_array),
file_field,
field_id,
position,
)
}

fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
Expand Down Expand Up @@ -1626,6 +1759,7 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use as_any::Downcast;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::arrow::{ArrowWriter, ProjectionMask};
Expand All @@ -1639,7 +1773,9 @@ mod tests {

use crate::ErrorKind;
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
use crate::arrow::{
ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE,
};
use crate::delete_vector::DeleteVector;
use crate::expr::visitors::bound_predicate_visitor::visit;
use crate::expr::{Bind, Predicate, Reference};
Expand Down Expand Up @@ -2438,6 +2574,172 @@ message schema {
assert!(col_b.is_null(2));
}

#[test]
fn test_add_file_path_column_ree() {
use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};

// Create a simple test batch with 2 columns and 3 rows
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));

let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);

let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(id_array),
Arc::new(name_array),
])
.unwrap();

assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 3);

// Add file path column with REE at the end (position 2)
let file_path = "/path/to/data/file.parquet";
let result = ArrowReader::add_file_path_column_ree_at_position(
batch,
file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
2, // Position at the end after id and name columns
);
assert!(result.is_ok(), "Should successfully add file path column");

let new_batch = result.unwrap();

// Verify the new batch has 3 columns
assert_eq!(new_batch.num_columns(), 3);
assert_eq!(new_batch.num_rows(), 3);

// Verify schema has the _file column
let schema = new_batch.schema();
assert_eq!(schema.fields().len(), 3);

let file_field = schema.field(2);
assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE);
assert!(!file_field.is_nullable());

// Verify the field has the correct metadata
let metadata = file_field.metadata();
assert_eq!(
metadata.get(PARQUET_FIELD_ID_META_KEY),
Some(&RESERVED_FIELD_ID_FILE.to_string())
);

// Verify the data type is RunEndEncoded
match file_field.data_type() {
DataType::RunEndEncoded(run_ends_field, values_field) => {
assert_eq!(run_ends_field.name(), "run_ends");
assert_eq!(run_ends_field.data_type(), &DataType::Int32);
assert!(!run_ends_field.is_nullable());

assert_eq!(values_field.name(), "values");
assert_eq!(values_field.data_type(), &DataType::Utf8);
}
_ => panic!("Expected RunEndEncoded data type for _file column"),
}

// Verify the original columns are intact
let id_col = new_batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.values(), &[1, 2, 3]);

let name_col = new_batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");

// Verify the file path column contains the correct value
// The _file column is a RunArray, so we need to decode it
let file_col = new_batch.column(2);
let run_array = file_col
.as_any()
.downcast_ref::<arrow_array::RunArray<arrow_array::types::Int32Type>>()
.expect("Expected RunArray for _file column");

// Verify the run array structure (should be optimally encoded)
let run_ends = run_array.run_ends();
assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end");
assert_eq!(
run_ends.values()[0],
new_batch.num_rows() as i32,
"Run end should equal number of rows"
);

// Check that the single value in the RunArray is the expected file path
let values = run_array.values();
let string_values = values.as_string::<i32>();
assert_eq!(string_values.len(), 1, "Should have only 1 value");
assert_eq!(string_values.value(0), file_path);

assert!(
string_values
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.all(|v| v == Some(file_path))
)
}

#[test]
fn test_add_file_path_column_ree_empty_batch() {
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

// Create an empty batch
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

let id_array = arrow_array::Int32Array::from(Vec::<i32>::new());
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap();

assert_eq!(batch.num_rows(), 0);

// Add file path column to empty batch with REE at position 1 (after id column)
let file_path = "/empty/file.parquet";
let result = ArrowReader::add_file_path_column_ree_at_position(
batch,
file_path,
RESERVED_COL_NAME_FILE,
RESERVED_FIELD_ID_FILE,
1, // Position 1, after the id column
);

// Should succeed with empty RunArray for empty batches
assert!(result.is_ok());
let new_batch = result.unwrap();
assert_eq!(new_batch.num_rows(), 0);
assert_eq!(new_batch.num_columns(), 2);

// Verify the _file column exists with correct schema
let schema = new_batch.schema();
let file_field = schema.field(1);
assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE);

// Should use RunEndEncoded even for empty batches
match file_field.data_type() {
DataType::RunEndEncoded(run_ends_field, values_field) => {
assert_eq!(run_ends_field.data_type(), &DataType::Int32);
assert_eq!(values_field.data_type(), &DataType::Utf8);
}
_ => panic!("Expected RunEndEncoded data type for _file column"),
}

// Verify metadata with reserved field ID
assert_eq!(
file_field.metadata().get(PARQUET_FIELD_ID_META_KEY),
Some(&RESERVED_FIELD_ID_FILE.to_string())
);

// Verify the file path column is empty but properly structured
let file_path_column = new_batch.column(1);
assert_eq!(file_path_column.len(), 0);
}

/// Test for bug where position deletes in later row groups are not applied correctly.
///
/// When a file has multiple row groups and a position delete targets a row in a later
Expand Down
Loading
Loading