Skip to content

Commit d410f7f

Browse files
committed
- **Refactor RegionFilePathFactory to RegionFilePathProvider:** Updated references and implementations in access_layer.rs, write_cache.rs, and related test files to use the new struct name.
- **Add `max_file_size` support in compaction:** Introduced `max_file_size` option in `PickerOutput`, `SerializedPickerOutput`, and `WriteOptions` in `compactor.rs`, `picker.rs`, `twcs.rs`, and `window.rs`. - **Enhance Parquet writing logic:** Modified `parquet.rs` and `parquet/writer.rs` to support optional `max_file_size` and added a test case `test_write_multiple_files` to verify writing multiple files based on size constraints.
1 parent 2c5ace5 commit d410f7f

File tree

8 files changed

+90
-19
lines changed

8 files changed

+90
-19
lines changed

src/mito2/src/access_layer.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl AccessLayer {
135135
.write_and_upload_sst(
136136
request,
137137
SstUploadRequest {
138-
dest_path_provider: RegionFilePathFactory {
138+
dest_path_provider: RegionFilePathProvider {
139139
region_dir: self.region_dir.clone(),
140140
},
141141
remote_store: self.object_store.clone(),
@@ -161,7 +161,7 @@ impl AccessLayer {
161161
self.object_store.clone(),
162162
request.metadata,
163163
indexer_builder,
164-
RegionFilePathFactory {
164+
RegionFilePathProvider {
165165
region_dir: self.region_dir.clone(),
166166
},
167167
)
@@ -266,11 +266,11 @@ impl FilePathProvider for WriteCachePathProvider {
266266

267267
/// Path provider that builds paths in region storage path.
268268
#[derive(Clone, Debug)]
269-
pub(crate) struct RegionFilePathFactory {
269+
pub(crate) struct RegionFilePathProvider {
270270
pub(crate) region_dir: String,
271271
}
272272

273-
impl FilePathProvider for RegionFilePathFactory {
273+
impl FilePathProvider for RegionFilePathProvider {
274274
fn build_index_file_path(&self, file_id: FileId) -> String {
275275
location::index_file_path(&self.region_dir, file_id)
276276
}

src/mito2/src/cache/write_cache.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use object_store::ObjectStore;
2424
use snafu::ResultExt;
2525

2626
use crate::access_layer::{
27-
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
27+
new_fs_cache_store, FilePathProvider, RegionFilePathProvider, SstInfoArray, SstWriteRequest,
2828
WriteCachePathProvider,
2929
};
3030
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
@@ -328,7 +328,7 @@ impl WriteCache {
328328
/// Request to write and upload a SST.
329329
pub struct SstUploadRequest {
330330
/// Destination path provider of which SST files in write cache should be uploaded to.
331-
pub dest_path_provider: RegionFilePathFactory,
331+
pub dest_path_provider: RegionFilePathProvider,
332332
/// Remote object store to upload.
333333
pub remote_store: ObjectStore,
334334
}
@@ -355,7 +355,7 @@ mod tests {
355355
// and now just use local file system to mock.
356356
let mut env = TestEnv::new();
357357
let mock_store = env.init_object_store_manager();
358-
let path_provider = RegionFilePathFactory {
358+
let path_provider = RegionFilePathProvider {
359359
region_dir: "test".to_string(),
360360
};
361361

@@ -488,7 +488,7 @@ mod tests {
488488
..Default::default()
489489
};
490490
let upload_request = SstUploadRequest {
491-
dest_path_provider: RegionFilePathFactory {
491+
dest_path_provider: RegionFilePathProvider {
492492
region_dir: data_home.clone(),
493493
},
494494
remote_store: mock_store.clone(),

src/mito2/src/compaction/compactor.rs

+1
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ impl Compactor for DefaultCompactor {
283283

284284
let write_opts = WriteOptions {
285285
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
286+
max_file_size: picker_output.max_file_size,
286287
..Default::default()
287288
};
288289

src/mito2/src/compaction/picker.rs

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub struct PickerOutput {
4545
pub outputs: Vec<CompactionOutput>,
4646
pub expired_ssts: Vec<FileHandle>,
4747
pub time_window_size: i64,
48+
/// Max single output file size in bytes.
49+
pub max_file_size: Option<usize>,
4850
}
4951

5052
/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
@@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
5355
pub outputs: Vec<SerializedCompactionOutput>,
5456
pub expired_ssts: Vec<FileMeta>,
5557
pub time_window_size: i64,
58+
pub max_file_size: Option<usize>,
5659
}
5760

5861
impl From<&PickerOutput> for SerializedPickerOutput {
@@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
7679
outputs,
7780
expired_ssts,
7881
time_window_size: input.time_window_size,
82+
max_file_size: input.max_file_size,
7983
}
8084
}
8185
}
@@ -111,6 +115,7 @@ impl PickerOutput {
111115
outputs,
112116
expired_ssts,
113117
time_window_size: input.time_window_size,
118+
max_file_size: input.max_file_size,
114119
}
115120
}
116121
}
@@ -179,6 +184,7 @@ mod tests {
179184
],
180185
expired_ssts: expired_ssts_file_handle.clone(),
181186
time_window_size: 1000,
187+
max_file_size: None,
182188
};
183189

184190
let picker_output_str =

src/mito2/src/compaction/twcs.rs

+2
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,12 @@ impl Picker for TwcsPicker {
196196
return None;
197197
}
198198

199+
let max_file_size = self.max_output_file_size.map(|v| v as usize);
199200
Some(PickerOutput {
200201
outputs,
201202
expired_ssts,
202203
time_window_size,
204+
max_file_size,
203205
})
204206
}
205207
}

src/mito2/src/compaction/window.rs

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
115115
outputs,
116116
expired_ssts,
117117
time_window_size: time_window,
118+
max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
118119
})
119120
}
120121
}

src/mito2/src/sst/parquet.rs

+62-4
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,17 @@ pub struct WriteOptions {
5050
/// Row group size.
5151
pub row_group_size: usize,
5252
/// Max single output file size.
53-
pub max_file_size: usize,
53+
/// Note: This is not a hard limit as we can only observe the file size when
54+
/// ArrowWrite writes to underlying writers.
55+
pub max_file_size: Option<usize>,
5456
}
5557

5658
impl Default for WriteOptions {
5759
fn default() -> Self {
5860
WriteOptions {
5961
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
6062
row_group_size: DEFAULT_ROW_GROUP_SIZE,
61-
max_file_size: usize::MAX,
63+
max_file_size: None,
6264
}
6365
}
6466
}
@@ -100,16 +102,18 @@ mod tests {
100102
use tokio_util::compat::FuturesAsyncWriteCompatExt;
101103

102104
use super::*;
103-
use crate::access_layer::FilePathProvider;
105+
use crate::access_layer::{FilePathProvider, RegionFilePathProvider};
104106
use crate::cache::{CacheManager, CacheStrategy, PageKey};
107+
use crate::read::BatchReader;
105108
use crate::sst::index::{Indexer, IndexerBuilder};
106109
use crate::sst::parquet::format::WriteFormat;
107110
use crate::sst::parquet::reader::ParquetReaderBuilder;
108111
use crate::sst::parquet::writer::ParquetWriter;
109112
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
110113
use crate::test_util::sst_util::{
111114
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
112-
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
115+
new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
116+
sst_region_metadata,
113117
};
114118
use crate::test_util::{check_reader_result, TestEnv};
115119

@@ -539,4 +543,58 @@ mod tests {
539543
)
540544
.await;
541545
}
546+
547+
#[tokio::test]
548+
async fn test_write_multiple_files() {
549+
common_telemetry::init_default_ut_logging();
550+
// create test env
551+
let mut env = TestEnv::new();
552+
let object_store = env.init_object_store_manager();
553+
let metadata = Arc::new(sst_region_metadata());
554+
let batches = &[
555+
new_batch_by_range(&["a", "d"], 0, 1000),
556+
new_batch_by_range(&["b", "f"], 0, 1000),
557+
new_batch_by_range(&["b", "h"], 100, 200),
558+
new_batch_by_range(&["b", "h"], 200, 300),
559+
new_batch_by_range(&["b", "h"], 300, 1000),
560+
];
561+
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();
562+
563+
let source = new_source(batches);
564+
let write_opts = WriteOptions {
565+
row_group_size: 50,
566+
max_file_size: Some(1024 * 16),
567+
..Default::default()
568+
};
569+
570+
let path_provider = RegionFilePathProvider {
571+
region_dir: "test".to_string(),
572+
};
573+
let mut writer = ParquetWriter::new_with_object_store(
574+
object_store.clone(),
575+
metadata.clone(),
576+
NoopIndexBuilder,
577+
path_provider,
578+
)
579+
.await;
580+
581+
let files = writer.write_all(source, None, &write_opts).await.unwrap();
582+
assert_eq!(2, files.len());
583+
584+
let mut rows_read = 0;
585+
for f in &files {
586+
let file_handle = sst_file_handle_with_file_id(
587+
f.file_id,
588+
f.time_range.0.value(),
589+
f.time_range.1.value(),
590+
);
591+
let builder =
592+
ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
593+
let mut reader = builder.build().await.unwrap();
594+
while let Some(batch) = reader.next_batch().await.unwrap() {
595+
rows_read += batch.num_rows();
596+
}
597+
}
598+
assert_eq!(total_rows, rows_read);
599+
}
542600
}

src/mito2/src/sst/parquet/writer.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,13 @@ where
148148
// At least one row has been written.
149149
assert!(stats.num_rows > 0);
150150

151+
debug!(
152+
"Finishing current file {}, file size: {}, num rows: {}",
153+
self.current_file,
154+
self.bytes_written.load(Ordering::Relaxed),
155+
stats.num_rows
156+
);
157+
151158
// Finish indexer and writer.
152159
// safety: writer and index can only be both present or not.
153160
let index_output = self.current_indexer.as_mut().unwrap().finish().await;
@@ -201,13 +208,9 @@ where
201208
stats.update(&batch);
202209
// safety: self.current_indexer must be set when first batch has been written.
203210
self.current_indexer.as_mut().unwrap().update(&batch).await;
204-
if self.bytes_written.load(Ordering::Relaxed) > opts.max_file_size {
205-
debug!(
206-
"Finishing current file {}, file size: {}, max file size: {}",
207-
self.current_file,
208-
self.bytes_written.load(Ordering::Relaxed),
209-
opts.max_file_size
210-
);
211+
if let Some(max_file_size) = opts.max_file_size
212+
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
213+
{
211214
self.finish_current_file(&mut results, &mut stats).await?;
212215
}
213216
}

0 commit comments

Comments
 (0)