Skip to content

Commit 99ad49e

Browse files
author
ZENOTME
committed
refine input of writer
1 parent c1c3e9d commit 99ad49e

File tree

1 file changed

+84
-52
lines changed

1 file changed

+84
-52
lines changed

crates/iceberg/src/writer/base_writer/position_delete_file_writer.rs

+84-52
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
// under the License.
1717

1818
//! Position delete file writer.
19+
use std::future::Future;
20+
use std::pin::Pin;
1921
use std::sync::Arc;
2022

2123
use arrow_array::builder::{PrimitiveBuilder, StringBuilder};
@@ -29,17 +31,21 @@ use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
2931
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
3032
use crate::{Error, ErrorKind, Result};
3133

34+
const POS_DELETE_FIELD1_NAME: &str = "file_path";
35+
const POS_DELETE_FIELD1_ID: i32 = 2147483546;
36+
const POS_DELETE_FIELD2_NAME: &str = "pos";
37+
const POS_DELETE_FIELD2_ID: i32 = 2147483545;
3238
static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
3339
Schema::builder()
3440
.with_fields(vec![
3541
Arc::new(NestedField::required(
36-
2147483546,
37-
"file_path",
42+
POS_DELETE_FIELD1_ID,
43+
POS_DELETE_FIELD1_NAME,
3844
Type::Primitive(PrimitiveType::String),
3945
)),
4046
Arc::new(NestedField::required(
41-
2147483545,
42-
"pos",
47+
POS_DELETE_FIELD2_ID,
48+
POS_DELETE_FIELD2_NAME,
4349
Type::Primitive(PrimitiveType::Long),
4450
)),
4551
])
@@ -49,17 +55,17 @@ static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
4955

5056
/// Position delete input.
5157
#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Debug)]
52-
pub struct PositionDeleteInput {
58+
pub struct PositionDeleteInput<'a> {
5359
/// The path of the file.
54-
pub path: String,
55-
/// The offset of the position delete.
56-
pub offsets: Vec<i64>,
60+
pub path: &'a str,
61+
/// The row number in data file
62+
pub pos: i64,
5763
}
5864

59-
impl PositionDeleteInput {
65+
impl<'a> PositionDeleteInput<'a> {
6066
/// Create a new `PositionDeleteInput`.
61-
pub fn new(path: String, offsets: Vec<i64>) -> Self {
62-
PositionDeleteInput { path, offsets }
67+
pub fn new(path: &'a str, row: i64) -> Self {
68+
Self { path, pos: row }
6369
}
6470
}
6571
/// Builder for `MemoryPositionDeleteWriter`.
@@ -80,7 +86,7 @@ impl<B: FileWriterBuilder> PositionDeleteWriterBuilder<B> {
8086
}
8187

8288
#[async_trait::async_trait]
83-
impl<B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput>>
89+
impl<'a, B: FileWriterBuilder> IcebergWriterBuilder<Vec<PositionDeleteInput<'a>>>
8490
for PositionDeleteWriterBuilder<B>
8591
{
8692
type R = PositionDeleteWriter<B>;
@@ -99,16 +105,22 @@ pub struct PositionDeleteWriter<B: FileWriterBuilder> {
99105
partition_value: Struct,
100106
}
101107

102-
#[async_trait::async_trait]
103-
impl<B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput>> for PositionDeleteWriter<B> {
104-
async fn write(&mut self, inputs: Vec<PositionDeleteInput>) -> Result<()> {
108+
impl<'a, B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput<'a>>>
109+
for PositionDeleteWriter<B>
110+
{
111+
fn write<'life0, 'async_trait>(
112+
&'life0 mut self,
113+
input: Vec<PositionDeleteInput<'a>>,
114+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
115+
where
116+
'life0: 'async_trait,
117+
Self: 'async_trait,
118+
{
105119
let mut path_column_builder = StringBuilder::new();
106120
let mut offset_column_builder = PrimitiveBuilder::<Int64Type>::new();
107-
for pd_input in inputs.into_iter() {
108-
for offset in pd_input.offsets {
109-
path_column_builder.append_value(&pd_input.path);
110-
offset_column_builder.append_value(offset);
111-
}
121+
for pd_input in input.into_iter() {
122+
path_column_builder.append_value(pd_input.path);
123+
offset_column_builder.append_value(pd_input.pos);
112124
}
113125
let record_batch = RecordBatch::try_new(
114126
Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap()),
@@ -117,28 +129,38 @@ impl<B: FileWriterBuilder> IcebergWriter<Vec<PositionDeleteInput>> for PositionD
117129
Arc::new(offset_column_builder.finish()),
118130
],
119131
)
120-
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()))?;
132+
.map_err(|e| Error::new(ErrorKind::DataInvalid, e.to_string()));
121133

122-
if let Some(inner_writer) = &mut self.inner_writer {
123-
inner_writer.write(&record_batch).await?;
124-
} else {
125-
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
126-
}
127-
Ok(())
134+
Box::pin(async move {
135+
if let Some(inner_writer) = &mut self.inner_writer {
136+
inner_writer.write(&record_batch?).await?;
137+
} else {
138+
return Err(Error::new(ErrorKind::Unexpected, "write has been closed"));
139+
}
140+
Ok(())
141+
})
128142
}
129143

130-
async fn close(&mut self) -> Result<Vec<DataFile>> {
131-
let writer = self.inner_writer.take().unwrap();
132-
Ok(writer
133-
.close()
134-
.await?
135-
.into_iter()
136-
.map(|mut res| {
137-
res.content(DataContentType::PositionDeletes);
138-
res.partition(self.partition_value.clone());
139-
res.build().expect("Guaranteed to be valid")
140-
})
141-
.collect())
144+
fn close<'life0, 'async_trait>(
145+
&'life0 mut self,
146+
) -> Pin<Box<dyn Future<Output = Result<Vec<DataFile>>> + Send + 'async_trait>>
147+
where
148+
'life0: 'async_trait,
149+
Self: 'async_trait,
150+
{
151+
Box::pin(async move {
152+
let writer = self.inner_writer.take().unwrap();
153+
Ok(writer
154+
.close()
155+
.await?
156+
.into_iter()
157+
.map(|mut res| {
158+
res.content(DataContentType::PositionDeletes);
159+
res.partition(self.partition_value.clone());
160+
res.build().expect("Guaranteed to be valid")
161+
})
162+
.collect())
163+
})
142164
}
143165
}
144166

@@ -184,28 +206,38 @@ mod test {
184206
// Write some position delete inputs
185207
let inputs: Vec<PositionDeleteInput> = vec![
186208
PositionDeleteInput {
187-
path: "file2.parquet".to_string(),
188-
offsets: vec![2, 1, 3],
209+
path: "file2.parquet",
210+
pos: 2,
211+
},
212+
PositionDeleteInput {
213+
path: "file2.parquet",
214+
pos: 1,
215+
},
216+
PositionDeleteInput {
217+
path: "file2.parquet",
218+
pos: 3,
219+
},
220+
PositionDeleteInput {
221+
path: "file3.parquet",
222+
pos: 2,
223+
},
224+
PositionDeleteInput {
225+
path: "file1.parquet",
226+
pos: 5,
189227
},
190228
PositionDeleteInput {
191-
path: "file3.parquet".to_string(),
192-
offsets: vec![2],
229+
path: "file1.parquet",
230+
pos: 4,
193231
},
194232
PositionDeleteInput {
195-
path: "file1.parquet".to_string(),
196-
offsets: vec![5, 4, 1],
233+
path: "file1.parquet",
234+
pos: 1,
197235
},
198236
];
199237
let expect_inputs = inputs
200238
.clone()
201239
.into_iter()
202-
.flat_map(|input| {
203-
input
204-
.offsets
205-
.iter()
206-
.map(|off| (input.path.clone(), *off))
207-
.collect::<Vec<_>>()
208-
})
240+
.map(|input| (input.path.to_string(), input.pos))
209241
.collect_vec();
210242
position_delete_writer.write(inputs.clone()).await?;
211243

0 commit comments

Comments
 (0)