Skip to content

Commit 62541fc

Browse files
ZENOTMEZENOTME
and
ZENOTME
authored
feat: Add equality delete writer (#703)
* copy from #372 * fix test and refine ArrowFieldProjector * add comment and rename to make code more clear * refine code * refine RecordBatchProjector * refine error * refine function name --------- Co-authored-by: ZENOTME <[email protected]>
1 parent 564f5a3 commit 62541fc

File tree

4 files changed

+796
-0
lines changed

4 files changed

+796
-0
lines changed

Diff for: crates/iceberg/src/arrow/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
mod schema;
2121
pub use schema::*;
2222
mod reader;
23+
pub(crate) mod record_batch_projector;
2324
pub(crate) mod record_batch_transformer;
2425

2526
pub use reader::*;

Diff for: crates/iceberg/src/arrow/record_batch_projector.rs

+291
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use arrow_array::{ArrayRef, RecordBatch, StructArray};
21+
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
22+
23+
use crate::error::Result;
24+
use crate::{Error, ErrorKind};
25+
26+
/// Help to project specific field from `RecordBatch`` according to the fields id.
27+
#[derive(Clone)]
28+
pub(crate) struct RecordBatchProjector {
29+
// A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure.
30+
// E.g. [[0], [1, 2]] means the first field is accessed directly from the first column,
31+
// while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column).
32+
field_indices: Vec<Vec<usize>>,
33+
// The schema reference after projection. This schema is derived from the original schema based on the given field IDs.
34+
projected_schema: SchemaRef,
35+
}
36+
37+
impl RecordBatchProjector {
38+
/// Init ArrowFieldProjector
39+
///
40+
/// This function will iterate through the field and fetch the field from the original schema according to the field ids.
41+
/// The function to fetch the field id from the field is provided by `field_id_fetch_func`, return None if the field need to be skipped.
42+
/// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether
43+
/// iterate into the nested fields.
44+
pub(crate) fn new<F1, F2>(
45+
original_schema: SchemaRef,
46+
field_ids: &[i32],
47+
field_id_fetch_func: F1,
48+
searchable_field_func: F2,
49+
) -> Result<Self>
50+
where
51+
F1: Fn(&Field) -> Result<Option<i64>>,
52+
F2: Fn(&Field) -> bool,
53+
{
54+
let mut field_indices = Vec::with_capacity(field_ids.len());
55+
let mut fields = Vec::with_capacity(field_ids.len());
56+
for &id in field_ids {
57+
let mut field_index = vec![];
58+
let field = Self::fetch_field_index(
59+
original_schema.fields(),
60+
&mut field_index,
61+
id as i64,
62+
&field_id_fetch_func,
63+
&searchable_field_func,
64+
)?
65+
.ok_or_else(|| {
66+
Error::new(ErrorKind::Unexpected, "Field not found")
67+
.with_context("field_id", id.to_string())
68+
})?;
69+
fields.push(field.clone());
70+
field_indices.push(field_index);
71+
}
72+
let delete_arrow_schema = Arc::new(Schema::new(fields));
73+
Ok(Self {
74+
field_indices,
75+
projected_schema: delete_arrow_schema,
76+
})
77+
}
78+
79+
fn fetch_field_index<F1, F2>(
80+
fields: &Fields,
81+
index_vec: &mut Vec<usize>,
82+
target_field_id: i64,
83+
field_id_fetch_func: &F1,
84+
searchable_field_func: &F2,
85+
) -> Result<Option<FieldRef>>
86+
where
87+
F1: Fn(&Field) -> Result<Option<i64>>,
88+
F2: Fn(&Field) -> bool,
89+
{
90+
for (pos, field) in fields.iter().enumerate() {
91+
let id = field_id_fetch_func(field)?;
92+
if let Some(id) = id {
93+
if target_field_id == id {
94+
index_vec.push(pos);
95+
return Ok(Some(field.clone()));
96+
}
97+
}
98+
if let DataType::Struct(inner) = field.data_type() {
99+
if searchable_field_func(field) {
100+
if let Some(res) = Self::fetch_field_index(
101+
inner,
102+
index_vec,
103+
target_field_id,
104+
field_id_fetch_func,
105+
searchable_field_func,
106+
)? {
107+
index_vec.push(pos);
108+
return Ok(Some(res));
109+
}
110+
}
111+
}
112+
}
113+
Ok(None)
114+
}
115+
116+
/// Return the reference of projected schema
117+
pub(crate) fn projected_schema_ref(&self) -> &SchemaRef {
118+
&self.projected_schema
119+
}
120+
121+
/// Do projection with record batch
122+
pub(crate) fn project_bacth(&self, batch: RecordBatch) -> Result<RecordBatch> {
123+
RecordBatch::try_new(
124+
self.projected_schema.clone(),
125+
self.project_column(batch.columns())?,
126+
)
127+
.map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))
128+
}
129+
130+
/// Do projection with columns
131+
pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
132+
self.field_indices
133+
.iter()
134+
.map(|index_vec| Self::get_column_by_field_index(batch, index_vec))
135+
.collect::<Result<Vec<_>>>()
136+
}
137+
138+
fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result<ArrayRef> {
139+
let mut rev_iterator = field_index.iter().rev();
140+
let mut array = batch[*rev_iterator.next().unwrap()].clone();
141+
for idx in rev_iterator {
142+
array = array
143+
.as_any()
144+
.downcast_ref::<StructArray>()
145+
.ok_or(Error::new(
146+
ErrorKind::Unexpected,
147+
"Cannot convert Array to StructArray",
148+
))?
149+
.column(*idx)
150+
.clone();
151+
}
152+
Ok(array)
153+
}
154+
}
155+
156+
#[cfg(test)]
157+
mod test {
158+
use std::sync::Arc;
159+
160+
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StructArray};
161+
use arrow_schema::{DataType, Field, Fields, Schema};
162+
163+
use crate::arrow::record_batch_projector::RecordBatchProjector;
164+
use crate::{Error, ErrorKind};
165+
166+
#[test]
167+
fn test_record_batch_projector_nested_level() {
168+
let inner_fields = vec![
169+
Field::new("inner_field1", DataType::Int32, false),
170+
Field::new("inner_field2", DataType::Utf8, false),
171+
];
172+
let fields = vec![
173+
Field::new("field1", DataType::Int32, false),
174+
Field::new(
175+
"field2",
176+
DataType::Struct(Fields::from(inner_fields.clone())),
177+
false,
178+
),
179+
];
180+
let schema = Arc::new(Schema::new(fields));
181+
182+
let field_id_fetch_func = |field: &Field| match field.name().as_str() {
183+
"field1" => Ok(Some(1)),
184+
"field2" => Ok(Some(2)),
185+
"inner_field1" => Ok(Some(3)),
186+
"inner_field2" => Ok(Some(4)),
187+
_ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")),
188+
};
189+
let projector =
190+
RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true)
191+
.unwrap();
192+
193+
assert!(projector.field_indices.len() == 2);
194+
assert_eq!(projector.field_indices[0], vec![0]);
195+
assert_eq!(projector.field_indices[1], vec![0, 1]);
196+
197+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
198+
let inner_int_array = Arc::new(Int32Array::from(vec![4, 5, 6])) as ArrayRef;
199+
let inner_string_array = Arc::new(StringArray::from(vec!["x", "y", "z"])) as ArrayRef;
200+
let struct_array = Arc::new(StructArray::from(vec![
201+
(
202+
Arc::new(inner_fields[0].clone()),
203+
inner_int_array as ArrayRef,
204+
),
205+
(
206+
Arc::new(inner_fields[1].clone()),
207+
inner_string_array as ArrayRef,
208+
),
209+
])) as ArrayRef;
210+
let batch = RecordBatch::try_new(schema, vec![int_array, struct_array]).unwrap();
211+
212+
let projected_batch = projector.project_bacth(batch).unwrap();
213+
assert_eq!(projected_batch.num_columns(), 2);
214+
let projected_int_array = projected_batch
215+
.column(0)
216+
.as_any()
217+
.downcast_ref::<Int32Array>()
218+
.unwrap();
219+
let projected_inner_int_array = projected_batch
220+
.column(1)
221+
.as_any()
222+
.downcast_ref::<Int32Array>()
223+
.unwrap();
224+
225+
assert_eq!(projected_int_array.values(), &[1, 2, 3]);
226+
assert_eq!(projected_inner_int_array.values(), &[4, 5, 6]);
227+
}
228+
229+
#[test]
230+
fn test_field_not_found() {
231+
let inner_fields = vec![
232+
Field::new("inner_field1", DataType::Int32, false),
233+
Field::new("inner_field2", DataType::Utf8, false),
234+
];
235+
236+
let fields = vec![
237+
Field::new("field1", DataType::Int32, false),
238+
Field::new(
239+
"field2",
240+
DataType::Struct(Fields::from(inner_fields.clone())),
241+
false,
242+
),
243+
];
244+
let schema = Arc::new(Schema::new(fields));
245+
246+
let field_id_fetch_func = |field: &Field| match field.name().as_str() {
247+
"field1" => Ok(Some(1)),
248+
"field2" => Ok(Some(2)),
249+
"inner_field1" => Ok(Some(3)),
250+
"inner_field2" => Ok(Some(4)),
251+
_ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")),
252+
};
253+
let projector =
254+
RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true);
255+
256+
assert!(projector.is_err());
257+
}
258+
259+
#[test]
260+
fn test_field_not_reachable() {
261+
let inner_fields = vec![
262+
Field::new("inner_field1", DataType::Int32, false),
263+
Field::new("inner_field2", DataType::Utf8, false),
264+
];
265+
266+
let fields = vec![
267+
Field::new("field1", DataType::Int32, false),
268+
Field::new(
269+
"field2",
270+
DataType::Struct(Fields::from(inner_fields.clone())),
271+
false,
272+
),
273+
];
274+
let schema = Arc::new(Schema::new(fields));
275+
276+
let field_id_fetch_func = |field: &Field| match field.name().as_str() {
277+
"field1" => Ok(Some(1)),
278+
"field2" => Ok(Some(2)),
279+
"inner_field1" => Ok(Some(3)),
280+
"inner_field2" => Ok(Some(4)),
281+
_ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")),
282+
};
283+
let projector =
284+
RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false);
285+
assert!(projector.is_err());
286+
287+
let projector =
288+
RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true);
289+
assert!(projector.is_ok());
290+
}
291+
}

0 commit comments

Comments
 (0)