Skip to content

Commit 094655a

Browse files
author
ZENOTME
committed
refine writer interface(wip)
1 parent 4603b64 commit 094655a

File tree

11 files changed

+275
-126
lines changed

11 files changed

+275
-126
lines changed

crates/iceberg/src/writer/base_writer/data_file_writer.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,38 @@ use itertools::Itertools;
2222

2323
use crate::spec::{DataContentType, DataFile, Struct};
2424
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
25-
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
25+
use crate::writer::output_file_generator::OutputFileGenerator;
26+
use crate::writer::{CurrentFileStatus, IcebergWriter, SinglePartitionWriterBuilder};
2627
use crate::Result;
2728

2829
/// Builder for `DataFileWriter`.
2930
#[derive(Clone, Debug)]
3031
pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
3132
inner: B,
32-
partition_value: Option<Struct>,
33+
outfile_genenerator: OutputFileGenerator,
3334
}
3435

3536
impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
3637
/// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
37-
pub fn new(inner: B, partition_value: Option<Struct>) -> Self {
38+
pub fn new(inner: B, outfile_genenerator: OutputFileGenerator) -> Self {
3839
Self {
3940
inner,
40-
partition_value,
41+
outfile_genenerator,
4142
}
4243
}
4344
}
4445

4546
#[async_trait::async_trait]
46-
impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
47+
impl<B: FileWriterBuilder> SinglePartitionWriterBuilder for DataFileWriterBuilder<B> {
4748
type R = DataFileWriter<B>;
4849

49-
async fn build(self) -> Result<Self::R> {
50+
async fn build(self, partition: Option<Struct>) -> Result<Self::R> {
51+
let output_file = self
52+
.outfile_genenerator
53+
.create_output_file(&partition)?;
5054
Ok(DataFileWriter {
51-
inner_writer: Some(self.inner.clone().build().await?),
52-
partition_value: self.partition_value.unwrap_or(Struct::empty()),
55+
inner_writer: Some(self.inner.clone().build(output_file).await?),
56+
partition_value: partition.unwrap_or(Struct::empty()),
5357
})
5458
}
5559
}
@@ -114,7 +118,7 @@ mod test {
114118
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
115119
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
116120
use crate::writer::file_writer::ParquetWriterBuilder;
117-
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
121+
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder, RecordBatch};
118122
use crate::Result;
119123

120124
#[tokio::test]

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

+23-16
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,30 @@ use crate::arrow::record_batch_projector::RecordBatchProjector;
2828
use crate::arrow::schema_to_arrow_schema;
2929
use crate::spec::{DataFile, SchemaRef, Struct};
3030
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
31-
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
31+
use crate::writer::output_file_generator::OutputFileGenerator;
32+
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder};
3233
use crate::{Error, ErrorKind, Result};
3334

3435
/// Builder for `EqualityDeleteWriter`.
3536
#[derive(Clone, Debug)]
3637
pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
3738
inner: B,
3839
config: EqualityDeleteWriterConfig,
40+
outfile_genenerator: OutputFileGenerator,
3941
}
4042

4143
impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
4244
/// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`.
43-
pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
44-
Self { inner, config }
45+
pub fn new(
46+
inner: B,
47+
config: EqualityDeleteWriterConfig,
48+
outfile_genenerator: OutputFileGenerator,
49+
) -> Self {
50+
Self {
51+
inner,
52+
config,
53+
outfile_genenerator,
54+
}
4555
}
4656
}
4757

@@ -52,16 +62,11 @@ pub struct EqualityDeleteWriterConfig {
5262
equality_ids: Vec<i32>,
5363
// Projector used to project the data chunk into specific fields.
5464
projector: RecordBatchProjector,
55-
partition_value: Struct,
5665
}
5766

5867
impl EqualityDeleteWriterConfig {
5968
/// Create a new `DataFileWriterConfig` with equality ids.
60-
pub fn new(
61-
equality_ids: Vec<i32>,
62-
original_schema: SchemaRef,
63-
partition_value: Option<Struct>,
64-
) -> Result<Self> {
69+
pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
6570
let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
6671
let projector = RecordBatchProjector::new(
6772
original_arrow_schema,
@@ -97,7 +102,6 @@ impl EqualityDeleteWriterConfig {
97102
Ok(Self {
98103
equality_ids,
99104
projector,
100-
partition_value: partition_value.unwrap_or(Struct::empty()),
101105
})
102106
}
103107

@@ -108,15 +112,18 @@ impl EqualityDeleteWriterConfig {
108112
}
109113

110114
#[async_trait::async_trait]
111-
impl<B: FileWriterBuilder> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
115+
impl<B: FileWriterBuilder> SinglePartitionWriterBuilder for EqualityDeleteFileWriterBuilder<B> {
112116
type R = EqualityDeleteFileWriter<B>;
113117

114-
async fn build(self) -> Result<Self::R> {
118+
async fn build(self, partition: Option<Struct>) -> Result<Self::R> {
119+
let output_file = self
120+
.outfile_genenerator
121+
.create_output_file(&partition)?;
115122
Ok(EqualityDeleteFileWriter {
116-
inner_writer: Some(self.inner.clone().build().await?),
123+
inner_writer: Some(self.inner.clone().build(output_file).await?),
117124
projector: self.config.projector,
118125
equality_ids: self.config.equality_ids,
119-
partition_value: self.config.partition_value,
126+
partition_value: partition.unwrap_or(Struct::empty()),
120127
})
121128
}
122129
}
@@ -192,7 +199,7 @@ mod test {
192199
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
193200
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
194201
use crate::writer::file_writer::ParquetWriterBuilder;
195-
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
202+
use crate::writer::{IcebergWriter, SinglePartitionWriterBuilder};
196203

197204
async fn check_parquet_data_file_with_equality_delete_write(
198205
file_io: &FileIO,
@@ -385,7 +392,7 @@ mod test {
385392

386393
let equality_ids = vec![0_i32, 8];
387394
let equality_config =
388-
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
395+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
389396
let delete_schema =
390397
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
391398
let projector = equality_config.projector.clone();

crates/iceberg/src/writer/file_writer/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,21 @@ use arrow_array::RecordBatch;
2121
use futures::Future;
2222

2323
use super::CurrentFileStatus;
24-
use crate::spec::DataFileBuilder;
24+
use crate::{io::OutputFile, spec::DataFileBuilder};
2525
use crate::Result;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
2929
mod track_writer;
3030

31-
pub mod location_generator;
32-
3331
type DefaultOutput = Vec<DataFileBuilder>;
3432

3533
/// File writer builder trait.
3634
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
3735
/// The associated file writer type.
3836
type R: FileWriter<O>;
3937
/// Build file writer.
40-
fn build(self) -> impl Future<Output = Result<Self::R>> + Send;
38+
fn build(self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
4139
}
4240

4341
/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)

crates/iceberg/src/writer/file_writer/parquet_writer.rs

+43-77
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ use parquet::file::properties::WriterProperties;
3131
use parquet::file::statistics::{from_thrift, Statistics};
3232
use parquet::format::FileMetaData;
3333

34-
use super::location_generator::{FileNameGenerator, LocationGenerator};
3534
use super::track_writer::TrackWriter;
3635
use super::{FileWriter, FileWriterBuilder};
3736
use crate::arrow::{
3837
get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, DEFAULT_MAP_FIELD_NAME,
3938
};
40-
use crate::io::{FileIO, FileWrite, OutputFile};
39+
use crate::io::{FileWrite, OutputFile};
4140
use crate::spec::{
4241
visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef,
4342
PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type,
@@ -47,45 +46,25 @@ use crate::{Error, ErrorKind, Result};
4746

4847
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
4948
#[derive(Clone, Debug)]
50-
pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
49+
pub struct ParquetWriterBuilder {
5150
props: WriterProperties,
5251
schema: SchemaRef,
53-
54-
file_io: FileIO,
55-
location_generator: T,
56-
file_name_generator: F,
5752
}
5853

59-
impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
54+
impl ParquetWriterBuilder {
6055
/// Create a new `ParquetWriterBuilder`
6156
/// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field.
62-
pub fn new(
63-
props: WriterProperties,
64-
schema: SchemaRef,
65-
file_io: FileIO,
66-
location_generator: T,
67-
file_name_generator: F,
68-
) -> Self {
69-
Self {
70-
props,
71-
schema,
72-
file_io,
73-
location_generator,
74-
file_name_generator,
75-
}
57+
pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
58+
Self { props, schema }
7659
}
7760
}
7861

79-
impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWriterBuilder<T, F> {
62+
impl FileWriterBuilder for ParquetWriterBuilder {
8063
type R = ParquetWriter;
8164

82-
async fn build(self) -> crate::Result<Self::R> {
65+
async fn build(self, out_file: OutputFile) -> crate::Result<Self::R> {
8366
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
8467
let written_size = Arc::new(AtomicI64::new(0));
85-
let out_file = self.file_io.new_output(
86-
self.location_generator
87-
.generate_location(&self.file_name_generator.generate_file_name()),
88-
)?;
8968
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone());
9069
let async_writer = AsyncFileWriter::new(inner_writer);
9170
let writer =
@@ -668,14 +647,14 @@ mod tests {
668647
let to_write_null = RecordBatch::try_new(schema.clone(), vec![null_col]).unwrap();
669648

670649
// write data
650+
let output_file = file_io
651+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
652+
.unwrap();
671653
let mut pw = ParquetWriterBuilder::new(
672654
WriterProperties::builder().build(),
673655
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
674-
file_io.clone(),
675-
location_gen,
676-
file_name_gen,
677656
)
678-
.build()
657+
.build(output_file)
679658
.await?;
680659
pw.write(&to_write).await?;
681660
pw.write(&to_write_null).await?;
@@ -864,15 +843,13 @@ mod tests {
864843
.unwrap();
865844

866845
// write data
867-
let mut pw = ParquetWriterBuilder::new(
868-
WriterProperties::builder().build(),
869-
Arc::new(schema),
870-
file_io.clone(),
871-
location_gen,
872-
file_name_gen,
873-
)
874-
.build()
875-
.await?;
846+
let output_file = file_io
847+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
848+
.unwrap();
849+
let mut pw =
850+
ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
851+
.build(output_file)
852+
.await?;
876853
pw.write(&to_write).await?;
877854
let res = pw.close().await?;
878855
assert_eq!(res.len(), 1);
@@ -1054,15 +1031,13 @@ mod tests {
10541031
.unwrap();
10551032

10561033
// write data
1057-
let mut pw = ParquetWriterBuilder::new(
1058-
WriterProperties::builder().build(),
1059-
Arc::new(schema),
1060-
file_io.clone(),
1061-
loccation_gen,
1062-
file_name_gen,
1063-
)
1064-
.build()
1065-
.await?;
1034+
let output_file = file_io
1035+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
1036+
.unwrap();
1037+
let mut pw =
1038+
ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema))
1039+
.build(output_file)
1040+
.await?;
10661041
pw.write(&to_write).await?;
10671042
let res = pw.close().await?;
10681043
assert_eq!(res.len(), 1);
@@ -1198,15 +1173,12 @@ mod tests {
11981173
.unwrap(),
11991174
);
12001175
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1201-
let mut pw = ParquetWriterBuilder::new(
1202-
WriterProperties::builder().build(),
1203-
schema.clone(),
1204-
file_io.clone(),
1205-
loccation_gen.clone(),
1206-
file_name_gen.clone(),
1207-
)
1208-
.build()
1209-
.await?;
1176+
let output_file = file_io
1177+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
1178+
.unwrap();
1179+
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1180+
.build(output_file)
1181+
.await?;
12101182
let col0 = Arc::new(
12111183
Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
12121184
.with_data_type(DataType::Decimal128(28, 10)),
@@ -1250,15 +1222,12 @@ mod tests {
12501222
.unwrap(),
12511223
);
12521224
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1253-
let mut pw = ParquetWriterBuilder::new(
1254-
WriterProperties::builder().build(),
1255-
schema.clone(),
1256-
file_io.clone(),
1257-
loccation_gen.clone(),
1258-
file_name_gen.clone(),
1259-
)
1260-
.build()
1261-
.await?;
1225+
let output_file = file_io
1226+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
1227+
.unwrap();
1228+
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
1229+
.build(output_file)
1230+
.await?;
12621231
let col0 = Arc::new(
12631232
Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
12641233
.with_data_type(DataType::Decimal128(28, 10)),
@@ -1305,15 +1274,12 @@ mod tests {
13051274
.unwrap(),
13061275
);
13071276
let arrow_schema: ArrowSchemaRef = Arc::new(schema_to_arrow_schema(&schema).unwrap());
1308-
let mut pw = ParquetWriterBuilder::new(
1309-
WriterProperties::builder().build(),
1310-
schema,
1311-
file_io.clone(),
1312-
loccation_gen,
1313-
file_name_gen,
1314-
)
1315-
.build()
1316-
.await?;
1277+
let output_file = file_io
1278+
.new_output(format!("{}/{}", loccation_gen.gen(), file_name_gen.gen()))
1279+
.unwrap();
1280+
let mut pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
1281+
.build(output_file)
1282+
.await?;
13171283
let col0 = Arc::new(
13181284
Decimal128Array::from(vec![
13191285
Some(decimal_max.mantissa()),

0 commit comments

Comments
 (0)