Skip to content

refactor(query): reduce memory usage during aggregate spill #17550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 85 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
f953fff
refactor(query): refactor aggreagte spill code
zhang2014 Mar 4, 2025
d6e7d7c
refactor(query): refactor aggreagte spill code
zhang2014 Mar 5, 2025
259fec8
refactor(query): refactor aggreagte spill code
zhang2014 Mar 6, 2025
3e397cd
refactor(query): refactor aggreagte spill code
zhang2014 Mar 11, 2025
e03ce66
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
e57cbe1
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 13, 2025
be7c1da
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
b7cf072
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
bf59696
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
b77c041
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
78a0199
refactor(query): refactor aggreagte spill code
zhang2014 Mar 13, 2025
22dad6e
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
417f64d
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
4193c6c
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
20ca2a9
refactor(query): refactor aggreagte spill code
zhang2014 Mar 16, 2025
472aa37
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
2f62d3d
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
3a0652b
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
9a56d22
refactor(query): refactor aggreagte spill code
zhang2014 Mar 17, 2025
552bcda
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
7d46563
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
f6465f3
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 19, 2025
5f1fbbc
refactor(query): refactor aggreagte spill code
zhang2014 Mar 19, 2025
d61d925
refactor(query): refactor aggreagte spill code
zhang2014 Mar 20, 2025
1206211
refactor(query): refactor aggreagte spill code
zhang2014 Mar 24, 2025
0374a7f
refactor(query): refactor aggreagte spill code
zhang2014 Mar 24, 2025
ce6e754
refactor(query): refactor aggreagte spill code
zhang2014 Mar 28, 2025
6a53857
refactor(query): refactor aggreagte spill code
zhang2014 Mar 29, 2025
713d2c7
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Mar 30, 2025
31037d1
refactor(query): refactor aggreagte spill code
zhang2014 Mar 30, 2025
7038f10
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Mar 30, 2025
f687164
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
7e5963f
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
eaee69a
Merge branch 'main' of https://github.com/datafuselabs/databend into …
zhang2014 Apr 1, 2025
55f522d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
2a7f2bc
refactor(query): refactor aggreagte spill code
zhang2014 Apr 1, 2025
7af4255
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
9728660
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
b37b273
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
5ae50c7
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
62bf28b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
cd8160a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 2, 2025
0da9b3b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
715a50d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
404af38
refactor(query): refactor aggreagte spill code
zhang2014 Apr 3, 2025
48237d6
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
00f9096
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
e39e64b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
a6b2dc2
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
8ae9c4c
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
b6b4fd9
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
b8d280d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 5, 2025
dcbcd53
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
9dc9d1d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
b90b6c1
refactor(query): refactor aggreagte spill code
zhang2014 Apr 6, 2025
be1e1ba
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Apr 6, 2025
34d6770
refactor(query): refactor aggreagte spill code
zhang2014 Apr 7, 2025
e03481a
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Apr 7, 2025
6612c9d
Merge branch 'main' into refactor/aggregate_spill
zhang2014 Apr 7, 2025
b90f249
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
31069ce
Merge branch 'refactor/aggregate_spill' of github.com:zhang2014/dataf…
zhang2014 Apr 8, 2025
76eabfe
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
12ba91b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
98b58d8
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
b54d32a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
6a6bace
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
1f80959
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
2d6bf9a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
b8fad43
refactor(query): refactor aggreagte spill code
zhang2014 Apr 8, 2025
bb7ca69
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
2be5e4a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
1551f0e
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
c7bc7da
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
b06e8ef
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
66ade37
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
0b9305a
refactor(query): refactor aggreagte spill code
zhang2014 Apr 9, 2025
87c911b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
d827b8c
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
d7e2a36
refactor(query): refactor aggreagte spill code
zhang2014 Apr 13, 2025
eb7cb15
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
5090728
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
580440b
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
9631855
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
d8fe94d
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
68cf7c0
refactor(query): refactor aggreagte spill code
zhang2014 Apr 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 111 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -587,9 +587,8 @@ result_large_err = "allow"
[profile.release]
debug = 1
lto = "thin"
overflow-checks = false
opt-level = "s" # defaults to be 3
incremental = true
opt-level = "s" # defaults to be 3
#incremental = true

[profile.ci]
inherits = "release"
40 changes: 16 additions & 24 deletions src/common/base/src/runtime/memory/mem_stat.rs
Original file line number Diff line number Diff line change
@@ -40,14 +40,14 @@ pub struct MemStat {
name: Option<String>,

pub(crate) used: AtomicI64,
pub(crate) peek_used: AtomicI64,
pub(crate) peak_used: AtomicI64,

/// The limit of max used memory for this tracker.
///
/// Set to 0 to disable the limit.
limit: AtomicI64,

parent_memory_stat: Vec<Arc<MemStat>>,
parent_memory_stat: Option<Arc<MemStat>>,
}

impl MemStat {
@@ -56,17 +56,17 @@ impl MemStat {
id: 0,
name: None,
used: AtomicI64::new(0),
peek_used: AtomicI64::new(0),
peak_used: AtomicI64::new(0),
limit: AtomicI64::new(0),
parent_memory_stat: vec![],
parent_memory_stat: None,
}
}

pub fn create(name: String) -> Arc<MemStat> {
MemStat::create_child(name, vec![])
MemStat::create_child(name, None)
}

pub fn create_child(name: String, parent_memory_stat: Vec<Arc<MemStat>>) -> Arc<MemStat> {
pub fn create_child(name: String, parent_memory_stat: Option<Arc<MemStat>>) -> Arc<MemStat> {
let id = match GlobalSequence::next() {
0 => GlobalSequence::next(),
id => id,
@@ -76,16 +76,12 @@ impl MemStat {
id,
name: Some(name),
used: AtomicI64::new(0),
peek_used: AtomicI64::new(0),
peak_used: AtomicI64::new(0),
limit: AtomicI64::new(0),
parent_memory_stat,
})
}

pub fn get_parent_memory_stat(&self) -> Vec<Arc<MemStat>> {
self.parent_memory_stat.clone()
}

pub fn set_limit(&self, mut size: i64) {
// It may cause the process unable to run if memory limit is too low.
if size > 0 && size < MINIMUM_MEMORY_LIMIT {
@@ -107,19 +103,15 @@ impl MemStat {
let mut used = self.used.fetch_add(batch_memory_used, Ordering::Relaxed);

used += batch_memory_used;
self.peek_used.fetch_max(used, Ordering::Relaxed);
self.peak_used.fetch_max(used, Ordering::Relaxed);

for (idx, parent_memory_stat) in self.parent_memory_stat.iter().enumerate() {
if let Some(parent_memory_stat) = self.parent_memory_stat.as_ref() {
if let Err(cause) = parent_memory_stat
.record_memory::<NEED_ROLLBACK>(batch_memory_used, current_memory_alloc)
{
if NEED_ROLLBACK {
// We only roll back the memory that alloc failed
self.used.fetch_sub(current_memory_alloc, Ordering::Relaxed);

for index in 0..idx {
self.parent_memory_stat[index].rollback(current_memory_alloc);
}
}

return Err(cause);
@@ -142,8 +134,8 @@ impl MemStat {
pub fn rollback(&self, memory_usage: i64) {
self.used.fetch_sub(memory_usage, Ordering::Relaxed);

for parent_memory_stat in &self.parent_memory_stat {
parent_memory_stat.rollback(memory_usage)
if let Some(parent_memory_stat) = &self.parent_memory_stat {
parent_memory_stat.rollback(memory_usage);
}
}

@@ -171,7 +163,7 @@ impl MemStat {

#[inline]
pub fn get_peek_memory_usage(&self) -> i64 {
self.peek_used.load(Ordering::Relaxed)
self.peak_used.load(Ordering::Relaxed)
}
}

@@ -268,7 +260,7 @@ mod tests {
fn test_multiple_level_mem_stat() -> Result<()> {
let mem_stat = MemStat::create("TEST".to_string());
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));

mem_stat.record_memory::<false>(1, 1).unwrap();
mem_stat.record_memory::<false>(2, 2).unwrap();
@@ -292,7 +284,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);

mem_stat.record_memory::<false>(1, 1).unwrap();
@@ -322,7 +314,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);

assert!(child_mem_stat
@@ -335,7 +327,7 @@ mod tests {
let mem_stat = MemStat::create("TEST".to_string());
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2);
let child_mem_stat =
MemStat::create_child("TEST_CHILD".to_string(), vec![mem_stat.clone()]);
MemStat::create_child("TEST_CHILD".to_string(), Some(mem_stat.clone()));
child_mem_stat.set_limit(MINIMUM_MEMORY_LIMIT);

assert!(child_mem_stat
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/memory/stat_buffer_global.rs
Original file line number Diff line number Diff line change
@@ -90,7 +90,7 @@ impl GlobalStatBuffer {
.used
.fetch_add(memory_usage, Ordering::Relaxed);
self.global_mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return Ok(());
}
@@ -126,7 +126,7 @@ impl GlobalStatBuffer {
.used
.fetch_add(memory_usage, Ordering::Relaxed);
self.global_mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return;
}
4 changes: 2 additions & 2 deletions src/common/base/src/runtime/memory/stat_buffer_mem_stat.rs
Original file line number Diff line number Diff line change
@@ -93,7 +93,7 @@ impl MemStatBuffer {
if self.destroyed_thread_local_macro {
let used = mem_stat.used.fetch_add(usage, Ordering::Relaxed);
mem_stat
.peek_used
.peak_used
.fetch_max(used + usage, Ordering::Relaxed);
return Ok(());
}
@@ -134,7 +134,7 @@ impl MemStatBuffer {
if self.destroyed_thread_local_macro {
let used = mem_stat.used.fetch_add(memory_usage, Ordering::Relaxed);
mem_stat
.peek_used
.peak_used
.fetch_max(used + memory_usage, Ordering::Relaxed);
return;
}
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
@@ -177,7 +177,7 @@ impl AggregateHashTable {
row_count: usize,
) -> Result<usize> {
state.row_count = row_count;
group_hash_columns(group_columns, &mut state.group_hashes);
group_hash_columns(group_columns, state.group_hashes.as_mut_slice());

let new_group_count = if self.direct_append {
for idx in 0..row_count {
@@ -337,7 +337,7 @@ impl AggregateHashTable {
unsafe {
row_match_columns(
group_columns,
&state.addresses,
state.addresses.as_slice(),
&mut state.group_compare_vector,
&mut state.temp_vector,
need_compare_count,
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -38,10 +38,10 @@ pub use payload::*;
pub use payload_flush::*;
pub use probe_state::*;

pub type SelectVector = [usize; BATCH_SIZE];
pub type SelectVector = Vec<usize>;

pub fn new_sel() -> SelectVector {
[0; BATCH_SIZE]
vec![0; BATCH_SIZE]
}

// A batch size to probe, flush, repartition, etc.
28 changes: 21 additions & 7 deletions src/query/expression/src/aggregate/partitioned_payload.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ use std::sync::Arc;

use bumpalo::Bump;
use itertools::Itertools;
use serde::Deserializer;
use serde::Serializer;

use super::payload::Payload;
use super::probe_state::ProbeState;
@@ -50,6 +52,18 @@ pub struct PartitionedPayload {
unsafe impl Send for PartitionedPayload {}
unsafe impl Sync for PartitionedPayload {}

impl serde::Serialize for PartitionedPayload {
fn serialize<S: Serializer>(&self, _: S) -> Result<S::Ok, S::Error> {
unreachable!("PartitionedPayload must not be exchanged between multiple nodes.")
}
}

impl<'de> serde::Deserialize<'de> for PartitionedPayload {
fn deserialize<D: Deserializer<'de>>(_: D) -> Result<Self, D::Error> {
unreachable!("PartitionedPayload must not be exchanged between multiple nodes.")
}
}

impl PartitionedPayload {
pub fn new(
group_types: Vec<DataType>,
@@ -69,7 +83,7 @@ impl PartitionedPayload {
let payloads = (0..partition_count)
.map(|_| {
Payload::new(
arenas[0].clone(),
arenas.clone(),
group_types.clone(),
aggrs.clone(),
states_layout.clone(),
@@ -116,9 +130,9 @@ impl PartitionedPayload {
if self.payloads.len() == 1 {
self.payloads[0].reserve_append_rows(
&state.empty_vector,
&state.group_hashes,
&mut state.addresses,
&mut state.page_index,
state.group_hashes.as_slice(),
state.addresses.as_mut_slice(),
state.page_index.as_mut_slice(),
new_group_rows,
group_columns,
);
@@ -143,9 +157,9 @@ impl PartitionedPayload {

self.payloads[partition_index].reserve_append_rows(
sel,
&state.group_hashes,
&mut state.addresses,
&mut state.page_index,
state.group_hashes.as_slice(),
state.addresses.as_mut_slice(),
state.page_index.as_mut_slice(),
count,
group_columns,
);
44 changes: 34 additions & 10 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ use crate::MAX_PAGE_SIZE;
// [HASH] is the hash data of the groups
// [STATE_ADDRS] is the state_addrs of the aggregate functions, 8 bytes each
pub struct Payload {
pub arena: Arc<Bump>,
pub arena: Vec<Arc<Bump>>,
// if true, the states are moved out of the payload into other payload, and will not be dropped
pub state_move_out: bool,
pub group_types: Vec<DataType>,
@@ -94,7 +94,7 @@ pub type Pages = Vec<Page>;

impl Payload {
pub fn new(
arena: Arc<Bump>,
arena: Vec<Arc<Bump>>,
group_types: Vec<DataType>,
aggrs: Vec<AggregateFunctionRef>,
states_layout: Option<StatesLayout>,
@@ -267,7 +267,7 @@ impl Payload {

unsafe {
serialize_column_to_rowformat(
&self.arena,
&self.arena[0],
col,
select_vector,
new_group_rows,
@@ -297,7 +297,7 @@ impl Payload {
// write states
let (array_layout, padded_size) = layout.repeat(new_group_rows).unwrap();
// Bump only allocates but does not drop, so there is no use after free for any item.
let place = self.arena.alloc_layout(array_layout);
let place = self.arena[0].alloc_layout(array_layout);
for (idx, place) in select_vector
.iter()
.take(new_group_rows)
@@ -385,7 +385,11 @@ impl Payload {
);
}

pub fn scatter(&self, state: &mut PayloadFlushState, partition_count: usize) -> bool {
pub fn scatter_with_seed<const SEED: u64>(
&self,
state: &mut PayloadFlushState,
partitions: usize,
) -> bool {
if state.flush_page >= self.pages.len() {
return false;
}
@@ -397,23 +401,27 @@ impl Payload {
state.flush_page += 1;
state.flush_page_row = 0;
state.row_count = 0;
return self.scatter(state, partition_count);
return self.scatter_with_seed::<SEED>(state, partitions);
}

let end = (state.flush_page_row + BATCH_SIZE).min(page.rows);
let rows = end - state.flush_page_row;
state.row_count = rows;

state.probe_state.reset_partitions(partition_count);
state.probe_state.reset_partitions(partitions);

let mods: StrengthReducedU64 = StrengthReducedU64::new(partitions as u64);

let mods: StrengthReducedU64 = StrengthReducedU64::new(partition_count as u64);
for idx in 0..rows {
state.addresses[idx] = self.data_ptr(page, idx + state.flush_page_row);

let hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };
let mut hash = unsafe { read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };

let partition_idx = (hash % mods) as usize;
if SEED != 0 {
hash = Self::combine_hash(hash, SEED);
}

let partition_idx = (hash % mods) as usize;
let sel = &mut state.probe_state.partition_entries[partition_idx];
sel[state.probe_state.partition_count[partition_idx]] = idx;
state.probe_state.partition_count[partition_idx] += 1;
@@ -422,6 +430,10 @@ impl Payload {
true
}

pub fn scatter(&self, state: &mut PayloadFlushState, partitions: usize) -> bool {
self.scatter_with_seed::<0>(state, partitions)
}

pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let columns = (0..self.aggrs.len())
@@ -434,6 +446,18 @@ impl Payload {
.collect_vec();
DataBlock::new_from_columns(columns)
}

#[allow(unused_parens)]
fn combine_hash(hash: u64, seed: u64) -> u64 {
static KMUL: u64 = 0x9ddfea08eb382d69;

let mut a = (seed ^ hash).wrapping_mul(KMUL);
a ^= (a >> 47);

let mut b = (hash ^ a).wrapping_mul(KMUL);
b ^= (b >> 47);
b.wrapping_mul(KMUL)
}
}

impl Drop for Payload {
Loading