Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet-writer)!: enable multiple compaction output #5292

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl FilePathProvider for WriteCachePathProvider {
/// Path provider that builds paths in region storage path.
#[derive(Clone, Debug)]
pub(crate) struct RegionFilePathFactory {
region_dir: String,
pub(crate) region_dir: String,
}

impl RegionFilePathFactory {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl Compactor for DefaultCompactor {
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
let write_opts = WriteOptions {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
max_file_size: picker_output.max_file_size,
..Default::default()
};

Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct PickerOutput {
pub outputs: Vec<CompactionOutput>,
pub expired_ssts: Vec<FileHandle>,
pub time_window_size: i64,
/// Max single output file size in bytes.
pub max_file_size: Option<usize>,
}

/// SerializedPickerOutput is a serialized version of PickerOutput by replacing [CompactionOutput] and [FileHandle] with [SerializedCompactionOutput] and [FileMeta].
Expand All @@ -53,6 +55,7 @@ pub struct SerializedPickerOutput {
pub outputs: Vec<SerializedCompactionOutput>,
pub expired_ssts: Vec<FileMeta>,
pub time_window_size: i64,
pub max_file_size: Option<usize>,
}

impl From<&PickerOutput> for SerializedPickerOutput {
Expand All @@ -76,6 +79,7 @@ impl From<&PickerOutput> for SerializedPickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
Expand Down Expand Up @@ -111,6 +115,7 @@ impl PickerOutput {
outputs,
expired_ssts,
time_window_size: input.time_window_size,
max_file_size: input.max_file_size,
}
}
}
Expand Down Expand Up @@ -179,6 +184,7 @@ mod tests {
],
expired_ssts: expired_ssts_file_handle.clone(),
time_window_size: 1000,
max_file_size: None,
};

let picker_output_str =
Expand Down
97 changes: 4 additions & 93 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,7 @@ impl TwcsPicker {
continue;
};

let split_inputs = if !filter_deleted
&& let Some(max_output_file_size) = self.max_output_file_size
{
let len_before_split = inputs.len();
let maybe_split = enforce_max_output_size(inputs, max_output_file_size);
if maybe_split.len() != len_before_split {
info!("Compaction output file size exceeds threshold {}, split compaction inputs to: {:?}", max_output_file_size, maybe_split);
}
maybe_split
} else {
inputs
};

for input in split_inputs {
for input in inputs {
debug_assert!(input.len() > 1);
output.push(CompactionOutput {
output_level: LEVEL_COMPACTED, // always compact to l1
Expand All @@ -145,43 +132,6 @@ impl TwcsPicker {
}
}

/// Limits the size of compaction output in a naive manner.
/// todo(hl): we can find the output file size more precisely by checking the time range
/// of each row group and adding the sizes of those non-overlapping row groups. But now
/// we'd better not to expose the SST details in this level.
fn enforce_max_output_size(
inputs: Vec<Vec<FileHandle>>,
max_output_file_size: u64,
) -> Vec<Vec<FileHandle>> {
inputs
.into_iter()
.flat_map(|input| {
debug_assert!(input.len() > 1);
let estimated_output_size = input.iter().map(|f| f.size()).sum::<u64>();
if estimated_output_size < max_output_file_size {
// total file size does not exceed the threshold, just return the original input.
return vec![input];
}
let mut splits = vec![];
let mut new_input = vec![];
let mut new_input_size = 0;
for f in input {
if new_input_size + f.size() > max_output_file_size {
splits.push(std::mem::take(&mut new_input));
new_input_size = 0;
}
new_input_size += f.size();
new_input.push(f);
}
if !new_input.is_empty() {
splits.push(new_input);
}
splits
})
.filter(|p| p.len() > 1)
.collect()
}

/// Merges consecutive files so that file num does not exceed `max_file_num`, and chooses
/// the solution with minimum overhead according to files sizes to be merged.
/// `enforce_file_num` only merges consecutive files so that it won't create overlapping outputs.
Expand Down Expand Up @@ -246,10 +196,12 @@ impl Picker for TwcsPicker {
return None;
}

let max_file_size = self.max_output_file_size.map(|v| v as usize);
Some(PickerOutput {
outputs,
expired_ssts,
time_window_size,
max_file_size,
})
}
}
Expand Down Expand Up @@ -368,12 +320,10 @@ fn find_latest_window_in_seconds<'a>(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use super::*;
use crate::compaction::test_util::{new_file_handle, new_file_handles};
use crate::sst::file::{FileId, FileMeta, Level};
use crate::test_util::NoopFilePurger;
use crate::sst::file::{FileId, Level};

#[test]
fn test_get_latest_window_in_seconds() {
Expand Down Expand Up @@ -741,44 +691,5 @@ mod tests {
.check();
}

fn make_file_handles(inputs: &[(i64, i64, u64)]) -> Vec<FileHandle> {
inputs
.iter()
.map(|(start, end, size)| {
FileHandle::new(
FileMeta {
region_id: Default::default(),
file_id: Default::default(),
time_range: (
Timestamp::new_millisecond(*start),
Timestamp::new_millisecond(*end),
),
level: 0,
file_size: *size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
Arc::new(NoopFilePurger),
)
})
.collect()
}

#[test]
fn test_limit_output_size() {
let mut files = make_file_handles(&[(1, 1, 1)].repeat(6));
let runs = find_sorted_runs(&mut files);
assert_eq!(6, runs.len());
let files_to_merge = reduce_runs(runs, 2);

let enforced = enforce_max_output_size(files_to_merge, 2);
assert_eq!(2, enforced.len());
assert_eq!(2, enforced[0].len());
assert_eq!(2, enforced[1].len());
}

// TODO(hl): TTL tester that checks if get_expired_ssts function works as expected.
}
1 change: 1 addition & 0 deletions src/mito2/src/compaction/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl Picker for WindowedCompactionPicker {
outputs,
expired_ssts,
time_window_size: time_window,
max_file_size: None, // todo (hl): we may need to support `max_file_size` parameter in manual compaction.
})
}
}
Expand Down
65 changes: 63 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
/// Row group size.
pub row_group_size: usize,
/// Max single output file size.
/// Note: This is not a hard limit as we can only observe the file size when
/// ArrowWrite writes to underlying writers.
pub max_file_size: Option<usize>,
}

impl Default for WriteOptions {
fn default() -> Self {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
max_file_size: None,
}
}
}
Expand Down Expand Up @@ -97,16 +102,18 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;

use super::*;
use crate::access_layer::FilePathProvider;
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::read::BatchReader;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
new_batch_with_binary, new_source, sst_file_handle, sst_file_handle_with_file_id,
sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -536,4 +543,58 @@ mod tests {
)
.await;
}

#[tokio::test]
async fn test_write_multiple_files() {
common_telemetry::init_default_ut_logging();
// create test env
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let metadata = Arc::new(sst_region_metadata());
let batches = &[
new_batch_by_range(&["a", "d"], 0, 1000),
new_batch_by_range(&["b", "f"], 0, 1000),
new_batch_by_range(&["b", "h"], 100, 200),
new_batch_by_range(&["b", "h"], 200, 300),
new_batch_by_range(&["b", "h"], 300, 1000),
];
let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum();

let source = new_source(batches);
let write_opts = WriteOptions {
row_group_size: 50,
max_file_size: Some(1024 * 16),
..Default::default()
};

let path_provider = RegionFilePathFactory {
region_dir: "test".to_string(),
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
path_provider,
)
.await;

let files = writer.write_all(source, None, &write_opts).await.unwrap();
assert_eq!(2, files.len());

let mut rows_read = 0;
for f in &files {
let file_handle = sst_file_handle_with_file_id(
f.file_id,
f.time_range.0.value(),
f.time_range.1.value(),
);
let builder =
ParquetReaderBuilder::new("test".to_string(), file_handle, object_store.clone());
let mut reader = builder.build().await.unwrap();
while let Some(batch) = reader.next_batch().await.unwrap() {
rows_read += batch.num_rows();
}
}
assert_eq!(total_rows, rows_read);
}
}
Loading
Loading