Skip to content

Commit cd95677

Browse files
authored
chore: calc rows per block for recluster (#17639)
* fix calc rows per block * fix * enable compact when block_size_threshold changed * fix * fix hilbert recluster get small blocks * add unit test * fix test
1 parent 602e159 commit cd95677

File tree

23 files changed

+314
-176
lines changed

23 files changed

+314
-176
lines changed

Diff for: src/common/io/src/constants.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,13 @@ pub const INF_BYTES_LOWER: &str = "inf";
2525
pub const INF_BYTES_LONG: &str = "Infinity";
2626

2727
// The size of the I/O read/write block buffer by default.
28-
pub const DEFAULT_BLOCK_BUFFER_SIZE: usize = 100 * 1024 * 1024;
28+
pub const DEFAULT_BLOCK_BUFFER_SIZE: usize = 125 * 1024 * 1024;
2929
// The size of the block compressed by default.
30-
pub const DEFAULT_BLOCK_COMPRESSED_SIZE: usize = 10 * 1024 * 1024;
30+
pub const DEFAULT_BLOCK_COMPRESSED_SIZE: usize = 16 * 1024 * 1024;
3131
// The size of the I/O read/write block index buffer by default.
3232
pub const DEFAULT_BLOCK_INDEX_BUFFER_SIZE: usize = 300 * 1024;
3333
// The max number of a block by default.
34-
pub const DEFAULT_BLOCK_MAX_ROWS: usize = 1000 * 1000;
35-
// The min number of a block by default.
36-
pub const DEFAULT_BLOCK_MIN_ROWS: usize = 800 * 1000;
34+
pub const DEFAULT_BLOCK_ROW_COUNT: usize = 1000 * 1000;
3735
// The number of blocks in a segment by default.
3836
pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000;
3937
/// The number of bytes read at the end of the file on first read

Diff for: src/query/catalog/src/table.rs

+1-12
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ use databend_common_expression::BlockThresholds;
2828
use databend_common_expression::ColumnId;
2929
use databend_common_expression::Scalar;
3030
use databend_common_expression::TableSchema;
31-
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
32-
use databend_common_io::constants::DEFAULT_BLOCK_COMPRESSED_SIZE;
33-
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
34-
use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS;
35-
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
3631
use databend_common_meta_app::app_error::AppError;
3732
use databend_common_meta_app::app_error::UnknownTableId;
3833
use databend_common_meta_app::schema::TableIdent;
@@ -382,13 +377,7 @@ pub trait Table: Sync + Send {
382377
}
383378

384379
fn get_block_thresholds(&self) -> BlockThresholds {
385-
BlockThresholds {
386-
max_rows_per_block: DEFAULT_BLOCK_MAX_ROWS,
387-
min_rows_per_block: DEFAULT_BLOCK_MIN_ROWS,
388-
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE,
389-
max_bytes_per_file: DEFAULT_BLOCK_COMPRESSED_SIZE,
390-
block_per_segment: DEFAULT_BLOCK_PER_SEGMENT,
391-
}
380+
BlockThresholds::default()
392381
}
393382

394383
#[async_backtrace::framed]

Diff for: src/query/expression/src/utils/block_thresholds.rs

+72-33
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,32 @@
1414

1515
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
1616
use databend_common_io::constants::DEFAULT_BLOCK_COMPRESSED_SIZE;
17-
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
18-
use databend_common_io::constants::DEFAULT_BLOCK_MIN_ROWS;
1917
use databend_common_io::constants::DEFAULT_BLOCK_PER_SEGMENT;
18+
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
2019

2120
#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)]
2221
pub struct BlockThresholds {
2322
pub max_rows_per_block: usize,
2423
pub min_rows_per_block: usize,
24+
2525
pub max_bytes_per_block: usize,
26-
pub max_bytes_per_file: usize,
26+
pub min_bytes_per_block: usize,
27+
28+
pub max_compressed_per_block: usize,
29+
pub min_compressed_per_block: usize,
30+
2731
pub block_per_segment: usize,
2832
}
2933

3034
impl Default for BlockThresholds {
3135
fn default() -> BlockThresholds {
3236
BlockThresholds {
33-
max_rows_per_block: DEFAULT_BLOCK_MAX_ROWS,
34-
min_rows_per_block: DEFAULT_BLOCK_MIN_ROWS,
37+
max_rows_per_block: DEFAULT_BLOCK_ROW_COUNT,
38+
min_rows_per_block: (DEFAULT_BLOCK_ROW_COUNT * 4).div_ceil(5),
3539
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE,
36-
max_bytes_per_file: DEFAULT_BLOCK_COMPRESSED_SIZE,
40+
min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5),
41+
max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE,
42+
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
3743
block_per_segment: DEFAULT_BLOCK_PER_SEGMENT,
3844
}
3945
}
@@ -42,16 +48,17 @@ impl Default for BlockThresholds {
4248
impl BlockThresholds {
4349
pub fn new(
4450
max_rows_per_block: usize,
45-
min_rows_per_block: usize,
4651
max_bytes_per_block: usize,
47-
max_bytes_per_file: usize,
52+
max_compressed_per_block: usize,
4853
block_per_segment: usize,
4954
) -> Self {
5055
BlockThresholds {
5156
max_rows_per_block,
52-
min_rows_per_block,
57+
min_rows_per_block: (max_rows_per_block * 4).div_ceil(5),
5358
max_bytes_per_block,
54-
max_bytes_per_file,
59+
min_bytes_per_block: (max_bytes_per_block * 4).div_ceil(5),
60+
max_compressed_per_block,
61+
min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5),
5562
block_per_segment,
5663
}
5764
}
@@ -64,8 +71,8 @@ impl BlockThresholds {
6471
file_size: usize,
6572
) -> bool {
6673
row_count >= self.min_rows_per_block
67-
|| block_size >= self.max_bytes_per_block
68-
|| file_size >= self.max_bytes_per_file
74+
|| block_size >= self.min_bytes_per_block
75+
|| file_size >= self.min_compressed_per_block
6976
}
7077

7178
#[inline]
@@ -78,53 +85,85 @@ impl BlockThresholds {
7885
) -> bool {
7986
total_blocks >= self.block_per_segment
8087
&& (total_rows >= self.min_rows_per_block * self.block_per_segment
81-
|| total_bytes >= self.max_bytes_per_block * self.block_per_segment
82-
|| total_compressed >= self.max_bytes_per_file * self.block_per_segment)
88+
|| total_bytes >= self.min_bytes_per_block * self.block_per_segment
89+
|| total_compressed >= self.min_compressed_per_block * self.block_per_segment)
8390
}
8491

8592
#[inline]
8693
pub fn check_large_enough(&self, row_count: usize, block_size: usize) -> bool {
87-
row_count >= self.min_rows_per_block || block_size >= self.max_bytes_per_block
94+
row_count >= self.min_rows_per_block || block_size >= self.min_bytes_per_block
8895
}
8996

9097
#[inline]
9198
pub fn check_for_compact(&self, row_count: usize, block_size: usize) -> bool {
92-
row_count < 2 * self.min_rows_per_block && block_size < 2 * self.max_bytes_per_block
99+
row_count < 2 * self.min_rows_per_block && block_size < 2 * self.min_bytes_per_block
93100
}
94101

95102
#[inline]
96103
pub fn check_too_small(&self, row_count: usize, block_size: usize, file_size: usize) -> bool {
97104
row_count < self.min_rows_per_block / 2
98-
&& block_size < self.max_bytes_per_block / 2
99-
&& file_size <= self.max_bytes_per_file / 2
105+
&& block_size < self.min_bytes_per_block / 2
106+
&& file_size < self.min_compressed_per_block / 2
100107
}
101108

102109
#[inline]
103-
pub fn calc_rows_per_block(
110+
pub fn calc_rows_for_compact(&self, total_bytes: usize, total_rows: usize) -> usize {
111+
if self.check_for_compact(total_rows, total_bytes) {
112+
return total_rows;
113+
}
114+
115+
let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
116+
let block_num_by_size = total_bytes / self.min_bytes_per_block;
117+
if block_num_by_rows >= block_num_by_size {
118+
return self.max_rows_per_block;
119+
}
120+
total_rows.div_ceil(block_num_by_size)
121+
}
122+
123+
/// Calculates the optimal number of rows per block based on total data size and row count.
124+
///
125+
/// # Parameters
126+
/// - `total_bytes`: The total size of the data in bytes.
127+
/// - `total_rows`: The total number of rows in the data.
128+
/// - `total_compressed`: The total compressed size of the data in bytes.
129+
///
130+
/// # Returns
131+
/// - The calculated number of rows per block that satisfies the thresholds.
132+
#[inline]
133+
pub fn calc_rows_for_recluster(
104134
&self,
105-
total_bytes: usize,
106135
total_rows: usize,
136+
total_bytes: usize,
107137
total_compressed: usize,
108138
) -> usize {
109-
if self.check_for_compact(total_rows, total_bytes) {
139+
// Check if the data is compact enough to skip further calculations.
140+
if self.check_for_compact(total_rows, total_bytes)
141+
&& total_compressed < 2 * self.min_compressed_per_block
142+
{
110143
return total_rows;
111144
}
112145

113146
let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
114-
let block_num_by_size = std::cmp::max(
115-
total_bytes / self.max_bytes_per_block,
116-
total_compressed / self.max_bytes_per_file,
117-
);
118-
if block_num_by_rows >= block_num_by_size {
147+
let block_num_by_compressed = total_compressed.div_ceil(self.max_compressed_per_block);
148+
// If row-based block count exceeds compressed-based block count, use max rows per block.
149+
if block_num_by_rows >= block_num_by_compressed {
119150
return self.max_rows_per_block;
120151
}
121152

122-
let mut rows_per_block = total_rows.div_ceil(block_num_by_size);
123-
if rows_per_block < self.max_rows_per_block / 2 {
124-
// If block rows < 500_000, max_bytes_per_block set to 125M
125-
let block_num_by_size = (4 * block_num_by_size / 5).max(1);
126-
rows_per_block = total_rows.div_ceil(block_num_by_size);
127-
}
128-
rows_per_block
153+
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154+
// Adjust the number of blocks based on block size thresholds.
155+
let max_bytes_per_block = (4 * self.min_bytes_per_block).min(400 * 1024 * 1024);
156+
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
157+
let block_nums = if bytes_per_block > max_bytes_per_block {
158+
// Case 1: If the block size is too bigger.
159+
total_bytes.div_ceil(max_bytes_per_block)
160+
} else if bytes_per_block < min_bytes_per_block {
161+
// Case 2: If the block size is too smaller.
162+
total_bytes / min_bytes_per_block
163+
} else {
164+
// Case 3: Otherwise, use the compressed-based block count.
165+
block_num_by_compressed
166+
};
167+
total_rows.div_ceil(block_nums.max(1)).max(1)
129168
}
130169
}

Diff for: src/query/expression/tests/it/block.rs

+14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use databend_common_column::buffer::Buffer;
216
use databend_common_expression::block_debug::box_render;
317
use databend_common_expression::types::number::NumberScalar;

Diff for: src/query/expression/tests/it/block_thresholds.rs

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2023 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_expression::BlockThresholds;
16+
17+
fn default_thresholds() -> BlockThresholds {
18+
BlockThresholds::new(1000, 1_000_000, 100_000, 4)
19+
}
20+
21+
#[test]
22+
fn test_check_perfect_block() {
23+
let t = default_thresholds();
24+
25+
// All below threshold
26+
assert!(!t.check_perfect_block(100, 1000, 1000));
27+
28+
// Any one above threshold
29+
assert!(t.check_perfect_block(800, 1000, 1000));
30+
assert!(t.check_perfect_block(100, 800_000, 1000));
31+
assert!(t.check_perfect_block(100, 1000, 80_000));
32+
}
33+
34+
#[test]
35+
fn test_check_perfect_segment() {
36+
let t = default_thresholds();
37+
let total_blocks = 4;
38+
39+
// Below threshold
40+
assert!(!t.check_perfect_segment(total_blocks, 100, 1000, 1000));
41+
42+
// One condition meets threshold
43+
assert!(t.check_perfect_segment(total_blocks, 4000, 1000, 1000));
44+
assert!(t.check_perfect_segment(total_blocks, 100, 4_000_000, 1000));
45+
assert!(t.check_perfect_segment(total_blocks, 100, 1000, 320_000));
46+
}
47+
48+
#[test]
49+
fn test_check_large_enough() {
50+
let t = default_thresholds();
51+
52+
assert!(!t.check_large_enough(100, 1000));
53+
assert!(t.check_large_enough(800, 1000));
54+
assert!(t.check_large_enough(100, 800_000));
55+
}
56+
57+
#[test]
58+
fn test_check_for_compact() {
59+
let t = default_thresholds();
60+
61+
assert!(t.check_for_compact(1500, 1_500_000)); // just under 2x min
62+
assert!(!t.check_for_compact(3000, 3_000_000)); // too large
63+
}
64+
65+
#[test]
66+
fn test_check_too_small() {
67+
let t = default_thresholds();
68+
69+
assert!(t.check_too_small(50, 10_000, 10_000)); // All very small
70+
assert!(!t.check_too_small(800, 10_000, 10_000)); // Row count not too small
71+
assert!(!t.check_too_small(50, 800_000, 10_000)); // Block size not too small
72+
}
73+
74+
#[test]
75+
fn test_calc_rows_for_compact() {
76+
let t = default_thresholds();
77+
78+
assert_eq!(t.calc_rows_for_compact(500_000, 1000), 1000);
79+
80+
// Block number by rows wins → max_rows_per_block
81+
assert_eq!(
82+
t.calc_rows_for_compact(2_000_000, 10_000),
83+
t.max_rows_per_block
84+
);
85+
86+
// Size-based block number wins
87+
assert_eq!(t.calc_rows_for_compact(4_000_000, 2000), 400);
88+
}
89+
90+
#[test]
91+
fn test_calc_rows_for_recluster() {
92+
let t = default_thresholds();
93+
94+
// compact enough to skip further calculations
95+
assert_eq!(t.calc_rows_for_recluster(1000, 500_000, 100_000), 1000);
96+
97+
// row-based block count exceeds compressed-based block count, use max rows per block.
98+
assert_eq!(
99+
t.calc_rows_for_recluster(10_000, 2_000_000, 100_000),
100+
t.max_rows_per_block
101+
);
102+
103+
// Case 1: If the block size is too bigger.
104+
let result = t.calc_rows_for_recluster(4_000, 30_000_000, 600_000);
105+
assert_eq!(result, 400);
106+
107+
// Case 2: If the block size is too smaller.
108+
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
109+
assert_eq!(result, 800);
110+
111+
// Case 3: use the compressed-based block count.
112+
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
113+
assert_eq!(result, 667);
114+
}

Diff for: src/query/expression/tests/it/main.rs

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern crate core;
2626

2727
mod arrow;
2828
mod block;
29+
mod block_thresholds;
2930
mod common;
3031
mod decimal;
3132
mod fill_field_default_value;

Diff for: src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl AccumulatingTransform for BlockCompactBuilder {
6969
// holding slices of blocks to merge later may lead to oom, so
7070
// 1. we expect blocks from file formats are not slice.
7171
// 2. if block is split here, cut evenly and emit them at once.
72-
let rows_per_block = self.thresholds.calc_rows_per_block(num_bytes, num_rows, 0);
72+
let rows_per_block = self.thresholds.calc_rows_for_compact(num_bytes, num_rows);
7373
Ok(vec![DataBlock::empty_with_meta(Box::new(
7474
BlockCompactMeta::Split {
7575
block: data,

Diff for: src/query/service/src/interpreters/common/table_option_validation.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use chrono::Duration;
2020
use databend_common_ast::ast::Engine;
2121
use databend_common_exception::ErrorCode;
2222
use databend_common_expression::TableSchemaRef;
23-
use databend_common_io::constants::DEFAULT_BLOCK_MAX_ROWS;
23+
use databend_common_io::constants::DEFAULT_BLOCK_ROW_COUNT;
2424
use databend_common_settings::Settings;
2525
use databend_common_sql::BloomIndexColumns;
2626
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
@@ -154,7 +154,7 @@ pub fn is_valid_row_per_block(
154154
let row_per_block = value.parse::<u64>()?;
155155
let error_str = "invalid row_per_block option, can't be over 1000000";
156156

157-
if row_per_block > DEFAULT_BLOCK_MAX_ROWS as u64 {
157+
if row_per_block > DEFAULT_BLOCK_ROW_COUNT as u64 {
158158
error!("{}", error_str);
159159
return Err(ErrorCode::TableOptionInvalid(error_str));
160160
}

0 commit comments

Comments
 (0)