Skip to content

Commit a38203b

Browse files
author
ZENOTME
committed
add position delete writer support
1 parent b2fb803 commit a38203b

File tree

3 files changed

+335
-1
lines changed

3 files changed

+335
-1
lines changed

crates/iceberg/src/arrow/schema.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use arrow_array::{
2727
BooleanArray, Date32Array, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array,
2828
Int64Array, PrimitiveArray, Scalar, StringArray, TimestampMicrosecondArray,
2929
};
30-
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
30+
use arrow_schema::{DataType, Field, FieldRef, Fields, Schema as ArrowSchema, TimeUnit};
3131
use bitvec::macros::internal::funty::Fundamental;
3232
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
3333
use parquet::file::statistics::Statistics;
@@ -607,6 +607,19 @@ impl SchemaVisitor for ToArrowSchemaConverter {
607607
}
608608
}
609609

610+
/// Convert iceberg field to an arrow field.
611+
pub fn field_to_arrow_field(field: &crate::spec::NestedFieldRef) -> Result<FieldRef> {
612+
let mut converter = ToArrowSchemaConverter;
613+
converter.before_struct_field(field)?;
614+
let result = crate::spec::visit_type(&field.field_type, &mut converter)?;
615+
converter.after_struct_field(field)?;
616+
let result = converter.field(field, result)?;
617+
match result {
618+
ArrowSchemaOrFieldOrType::Field(field) => Ok(field.into()),
619+
_ => unreachable!(),
620+
}
621+
}
622+
610623
/// Convert iceberg schema to an arrow schema.
611624
pub fn schema_to_arrow_schema(schema: &crate::spec::Schema) -> crate::Result<ArrowSchema> {
612625
let mut converter = ToArrowSchemaConverter;

crates/iceberg/src/writer/base_writer/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
//! Base writer module contains the basic writer provide by iceberg: `DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
1919
2020
pub mod data_file_writer;
21+
pub mod position_delete_file_writer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Position delete file writer.
19+
use std::collections::BTreeMap;
20+
use std::sync::Arc;
21+
22+
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
23+
use arrow_schema::{Schema, SchemaRef};
24+
use once_cell::sync::Lazy;
25+
26+
use crate::arrow::field_to_arrow_field;
27+
use crate::spec::{DataFile, NestedField, PrimitiveType, Struct, Type};
28+
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
29+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
30+
use crate::Result;
31+
32+
/// Builder for `MemoryPositionDeleteWriter`.
33+
#[derive(Clone)]
34+
pub struct MemoryPositionDeleteWriterBuilder<B: FileWriterBuilder> {
35+
inner: B,
36+
}
37+
38+
impl<B: FileWriterBuilder> MemoryPositionDeleteWriterBuilder<B> {
39+
/// Create a new `MemoryPositionDeleteWriterBuilder` using a `FileWriterBuilder`.
40+
pub fn new(inner: B) -> Self {
41+
Self { inner }
42+
}
43+
}
44+
45+
/// The config for `MemoryPositionDeleteWriter`.
46+
pub struct MemoryPositionDeleteWriterConfig {
47+
/// The number of max rows to cache in memory.
48+
pub cache_num: usize,
49+
/// The partition value of the position delete file.
50+
pub partition_value: Struct,
51+
}
52+
53+
impl MemoryPositionDeleteWriterConfig {
54+
/// Create a new `MemoryPositionDeleteWriterConfig`.
55+
pub fn new(cache_num: usize, partition_value: Option<Struct>) -> Self {
56+
Self {
57+
cache_num,
58+
partition_value: partition_value.unwrap_or(Struct::empty()),
59+
}
60+
}
61+
}
62+
63+
static POSITION_DELETE_SCHEMA: Lazy<SchemaRef> = Lazy::new(|| {
64+
Schema::new(vec![
65+
field_to_arrow_field(&Arc::new(NestedField::required(
66+
2147483546,
67+
"file_path",
68+
Type::Primitive(PrimitiveType::String),
69+
)))
70+
.unwrap(),
71+
field_to_arrow_field(&Arc::new(NestedField::required(
72+
2147483545,
73+
"pos",
74+
Type::Primitive(PrimitiveType::Long),
75+
)))
76+
.unwrap(),
77+
])
78+
.into()
79+
});
80+
81+
#[async_trait::async_trait]
82+
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<PositionDeleteInput<'a>, Vec<DataFile>>
83+
for MemoryPositionDeleteWriterBuilder<B>
84+
{
85+
type R = MemoryPositionDeleteWriter<B>;
86+
type C = MemoryPositionDeleteWriterConfig;
87+
88+
async fn build(self, config: Self::C) -> Result<Self::R> {
89+
Ok(MemoryPositionDeleteWriter {
90+
inner_writer_builder: self.inner.clone(),
91+
cache_num: config.cache_num,
92+
cache: BTreeMap::new(),
93+
data_files: Vec::new(),
94+
partition_value: config.partition_value,
95+
})
96+
}
97+
}
98+
99+
/// Position delete input.
100+
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
101+
pub struct PositionDeleteInput<'a> {
102+
/// The path of the file.
103+
pub path: &'a str,
104+
/// The offset of the position delete.
105+
pub offset: i64,
106+
}
107+
108+
/// The memory position delete writer.
109+
pub struct MemoryPositionDeleteWriter<B: FileWriterBuilder> {
110+
inner_writer_builder: B,
111+
cache_num: usize,
112+
cache: BTreeMap<String, Vec<i64>>,
113+
data_files: Vec<DataFile>,
114+
partition_value: Struct,
115+
}
116+
117+
impl<'a, B: FileWriterBuilder> MemoryPositionDeleteWriter<B> {
118+
async fn write_cache_out(&mut self) -> Result<()> {
119+
let mut keys = Vec::new();
120+
let mut values = Vec::new();
121+
let mut cache = std::mem::take(&mut self.cache);
122+
for (key, offsets) in cache.iter_mut() {
123+
offsets.sort();
124+
let key_ref = key.as_str();
125+
for offset in offsets {
126+
keys.push(key_ref);
127+
values.push(*offset);
128+
}
129+
}
130+
let key_array = Arc::new(StringArray::from(keys)) as ArrayRef;
131+
let value_array = Arc::new(Int64Array::from(values)) as ArrayRef;
132+
let record_batch =
133+
RecordBatch::try_new(POSITION_DELETE_SCHEMA.clone(), vec![key_array, value_array])?;
134+
let mut writer = self.inner_writer_builder.clone().build().await?;
135+
writer.write(&record_batch).await?;
136+
self.data_files
137+
.extend(writer.close().await?.into_iter().map(|mut res| {
138+
res.content(crate::spec::DataContentType::PositionDeletes);
139+
res.partition(self.partition_value.clone());
140+
res.build().expect("Guaranteed to be valid")
141+
}));
142+
Ok(())
143+
}
144+
}
145+
146+
/// Implement `IcebergWriter` for `PositionDeleteWriter`.
147+
impl<'a, B: FileWriterBuilder> IcebergWriter<PositionDeleteInput<'a>>
148+
for MemoryPositionDeleteWriter<B>
149+
{
150+
#[doc = " Write data to iceberg table."]
151+
#[must_use]
152+
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
153+
fn write<'life0, 'async_trait>(
154+
&'life0 mut self,
155+
input: PositionDeleteInput<'a>,
156+
) -> ::core::pin::Pin<
157+
Box<dyn ::core::future::Future<Output = Result<()>> + ::core::marker::Send + 'async_trait>,
158+
>
159+
where
160+
'life0: 'async_trait,
161+
Self: 'async_trait,
162+
{
163+
// Fast path: write to cache directly.
164+
if self.cache.len() < self.cache_num {
165+
if let Some(v) = self.cache.get_mut(input.path) {
166+
v.push(input.offset);
167+
} else {
168+
self.cache
169+
.insert(input.path.to_string(), vec![input.offset]);
170+
}
171+
return Box::pin(async move { Ok(()) });
172+
}
173+
174+
// Slow path: write to file frist.
175+
let path = input.path.to_string();
176+
Box::pin(async move {
177+
self.write_cache_out().await?;
178+
self.cache.insert(path, vec![input.offset]);
179+
Ok(())
180+
})
181+
}
182+
183+
#[doc = " Close the writer and return the written data files."]
184+
#[doc = " If close failed, the data written before maybe be lost. User may need to recreate the writer and rewrite the data again."]
185+
#[doc = " # NOTE"]
186+
#[doc = " After close, regardless of success or failure, the writer should never be used again, otherwise the writer will panic."]
187+
#[must_use]
188+
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
189+
fn close<'life0, 'async_trait>(
190+
&'life0 mut self,
191+
) -> ::core::pin::Pin<
192+
Box<
193+
dyn ::core::future::Future<Output = Result<Vec<DataFile>>>
194+
+ ::core::marker::Send
195+
+ 'async_trait,
196+
>,
197+
>
198+
where
199+
'life0: 'async_trait,
200+
Self: 'async_trait,
201+
{
202+
Box::pin(async move {
203+
self.write_cache_out().await?;
204+
Ok(std::mem::take(&mut self.data_files))
205+
})
206+
}
207+
}
208+
209+
#[cfg(test)]
210+
mod test {
211+
use std::sync::Arc;
212+
213+
use arrow_array::{Int64Array, StringArray};
214+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
215+
use parquet::file::properties::WriterProperties;
216+
use tempfile::TempDir;
217+
218+
use crate::arrow::arrow_schema_to_schema;
219+
use crate::io::FileIOBuilder;
220+
use crate::spec::{DataContentType, DataFileFormat, Struct};
221+
use crate::writer::base_writer::position_delete_file_writer::{
222+
MemoryPositionDeleteWriterBuilder, MemoryPositionDeleteWriterConfig, PositionDeleteInput,
223+
POSITION_DELETE_SCHEMA,
224+
};
225+
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
226+
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
227+
use crate::writer::file_writer::ParquetWriterBuilder;
228+
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
229+
use crate::Result;
230+
231+
#[tokio::test]
232+
async fn test_position_delete_writer() -> Result<()> {
233+
let temp_dir = TempDir::new().unwrap();
234+
let file_io = FileIOBuilder::new("memory").build().unwrap();
235+
let location_gen =
236+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
237+
let file_name_gen =
238+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
239+
240+
let pw = ParquetWriterBuilder::new(
241+
WriterProperties::builder().build(),
242+
Arc::new(arrow_schema_to_schema(&POSITION_DELETE_SCHEMA).unwrap()),
243+
file_io.clone(),
244+
location_gen,
245+
file_name_gen,
246+
);
247+
let mut position_delete_writer = MemoryPositionDeleteWriterBuilder::new(pw)
248+
.build(MemoryPositionDeleteWriterConfig::new(10, None))
249+
.await?;
250+
251+
// Write some position delete inputs
252+
let mut inputs = [
253+
PositionDeleteInput {
254+
path: "file2.parquet",
255+
offset: 2,
256+
},
257+
PositionDeleteInput {
258+
path: "file2.parquet",
259+
offset: 1,
260+
},
261+
PositionDeleteInput {
262+
path: "file2.parquet",
263+
offset: 3,
264+
},
265+
PositionDeleteInput {
266+
path: "file3.parquet",
267+
offset: 2,
268+
},
269+
PositionDeleteInput {
270+
path: "file1.parquet",
271+
offset: 5,
272+
},
273+
PositionDeleteInput {
274+
path: "file1.parquet",
275+
offset: 4,
276+
},
277+
PositionDeleteInput {
278+
path: "file1.parquet",
279+
offset: 1,
280+
},
281+
];
282+
for input in inputs.iter() {
283+
position_delete_writer.write(*input).await?;
284+
}
285+
286+
let data_files = position_delete_writer.close().await.unwrap();
287+
assert_eq!(data_files.len(), 1);
288+
assert_eq!(data_files[0].file_format, DataFileFormat::Parquet);
289+
assert_eq!(data_files[0].content, DataContentType::PositionDeletes);
290+
assert_eq!(data_files[0].partition, Struct::empty());
291+
292+
let parquet_file = file_io
293+
.new_input(&data_files[0].file_path)?
294+
.read()
295+
.await
296+
.unwrap();
297+
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap();
298+
let reader = builder.build().unwrap();
299+
let batches = reader.map(|x| x.unwrap()).collect::<Vec<_>>();
300+
301+
let path_column = batches[0]
302+
.column(0)
303+
.as_any()
304+
.downcast_ref::<StringArray>()
305+
.unwrap();
306+
let offset_column = batches[0]
307+
.column(1)
308+
.as_any()
309+
.downcast_ref::<Int64Array>()
310+
.unwrap();
311+
312+
inputs.sort_by(|a, b| a.path.cmp(b.path).then_with(|| a.offset.cmp(&b.offset)));
313+
for (i, input) in inputs.iter().enumerate() {
314+
assert_eq!(path_column.value(i), input.path);
315+
assert_eq!(offset_column.value(i), input.offset);
316+
}
317+
318+
Ok(())
319+
}
320+
}

0 commit comments

Comments
 (0)