diff --git a/Cargo.lock b/Cargo.lock index dc5eb73a0b1d..975b1cc7ee63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2476,6 +2476,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", "datafusion-functions-window", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index 878094950808..866c39ceed13 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -83,7 +83,7 @@ impl CommonOpt { config = config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes); } - + config } diff --git a/datafusion-examples/examples/advanced_udaf.rs b/datafusion-examples/examples/advanced_udaf.rs index 9c29e6b40d10..32a785f2379a 100644 --- a/datafusion-examples/examples/advanced_udaf.rs +++ b/datafusion-examples/examples/advanced_udaf.rs @@ -16,7 +16,7 @@ // under the License. use arrow::datatypes::{Field, Schema}; -use datafusion::physical_expr::NullState; +use datafusion::physical_expr::FlatNullState; use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use std::{any::Any, sync::Arc}; @@ -217,7 +217,7 @@ struct GeometricMeanGroupsAccumulator { prods: Vec, /// Track nulls in the input / filters - null_state: NullState, + null_state: FlatNullState, } impl GeometricMeanGroupsAccumulator { @@ -227,7 +227,7 @@ impl GeometricMeanGroupsAccumulator { return_data_type: DataType::Float64, counts: vec![], prods: vec![], - null_state: NullState::new(), + null_state: FlatNullState::new(), } } } @@ -248,13 +248,16 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { // increment counts, update sums self.counts.resize(total_num_groups, 0); self.prods.resize(total_num_groups, 1.0); - // Use the `NullState` structure to generate specialized code for null / non null input elements + // Use the `NullState` structure to generate specialized code for null / non null input elements. + // `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator` + // still not support blocked groups. + // More details can see `GroupsAccumulator::supports_blocked_groups`. self.null_state.accumulate( group_indices, values, opt_filter, total_num_groups, - |group_index, new_value| { + |_, group_index, new_value| { let prod = &mut self.prods[group_index]; *prod = prod.mul_wrapping(new_value); @@ -279,12 +282,15 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { let partial_counts = values[1].as_primitive::(); // update counts with partial counts self.counts.resize(total_num_groups, 0); + // `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator` + // still not support blocked groups. + // More details can see `GroupsAccumulator::supports_blocked_groups`. self.null_state.accumulate( group_indices, partial_counts, opt_filter, total_num_groups, - |group_index, partial_count| { + |_, group_index, partial_count| { self.counts[group_index] += partial_count; }, ); @@ -296,7 +302,7 @@ impl GroupsAccumulator for GeometricMeanGroupsAccumulator { partial_prods, opt_filter, total_num_groups, - |group_index, new_value: ::Native| { + |_, group_index, new_value: ::Native| { let prod = &mut self.prods[group_index]; *prod = prod.mul_wrapping(new_value); }, diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b701b7130bc8..11bfd1df34c1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -411,6 +411,17 @@ config_namespace! { /// written, it may be necessary to increase this size to avoid errors from /// the remote end point. pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024 + + /// Should DataFusion use a blocked approach to manage grouping state. + /// By default, the blocked approach is used which + /// allocates capacity based on a predefined block size firstly. + /// When the block reaches its limit, we allocate a new block (also with + /// the same predefined block size based capacity) instead of expanding + /// the current one and copying the data. + /// If `false`, a single allocation approach is used, where + /// values are managed within a single large memory block. + /// As this block grows, it often triggers numerous copies, resulting in poor performance. + pub enable_aggregation_blocked_groups: bool, default = true } } diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 98373ce4eb95..ac8afdcb2702 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -233,6 +233,46 @@ async fn test_median() { .await; } +// Testing `blocked groups optimization` +// Details of this optimization can see: +// https://github.com/apache/datafusion/issues/7065 +#[tokio::test(flavor = "multi_thread")] +async fn test_blocked_groups_optimization() { + let data_gen_config = baseline_config(); + + // Blocked groups supporting lists: + // + // `GroupAccumulator`: + // - PrimitiveGroupsAccumulator + // + // `GroupValues`: + // - GroupValuesPrimitive + // + + // Test `Numeric aggregation` + `Single group by` + let aggr_functions = ["sum", "min", "max"]; + let aggr_arguments = data_gen_config.numeric_columns(); + let groups_by_columns = data_gen_config.numeric_columns(); + + let mut query_builder = QueryBuilder::new() + .with_table_name("fuzz_table") + .with_aggregate_arguments(aggr_arguments) + .set_group_by_columns(groups_by_columns) + .with_min_group_by_columns(1) + .with_max_group_by_columns(1) + .with_no_grouping(false); + + for func in aggr_functions { + query_builder = query_builder.with_aggregate_function(func); + } + + AggregationFuzzerBuilder::from(data_gen_config) + .add_query_builder(query_builder) + .build() + .run() + .await; +} + /// Return a standard set of columns for testing data generation /// /// Includes numeric and string types diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs index 3c9fe2917251..5d37b5b29f47 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs @@ -103,6 +103,7 @@ impl SessionContextGenerator { target_partitions, skip_partial_params, sort_hint: false, + enable_aggregation_blocked_groups: false, table_name: self.table_name.clone(), table_provider: Arc::new(provider), }; @@ -146,11 +147,14 @@ impl SessionContextGenerator { (provider, false) }; + let enable_aggregation_blocked_groups = rng.gen_bool(0.5); + let builder = GeneratedSessionContextBuilder { batch_size, target_partitions, sort_hint, skip_partial_params, + enable_aggregation_blocked_groups, table_name: self.table_name.clone(), table_provider: Arc::new(provider), }; @@ -174,6 +178,7 @@ struct GeneratedSessionContextBuilder { target_partitions: usize, sort_hint: bool, skip_partial_params: SkipPartialParams, + enable_aggregation_blocked_groups: bool, table_name: String, table_provider: Arc, } @@ -198,6 +203,10 @@ impl GeneratedSessionContextBuilder { "datafusion.execution.skip_partial_aggregation_probe_ratio_threshold", &ScalarValue::Float64(Some(self.skip_partial_params.ratio_threshold)), ); + session_config = session_config.set( + "datafusion.execution.enable_aggregation_blocked_groups", + &ScalarValue::Boolean(Some(self.enable_aggregation_blocked_groups)), + ); let ctx = SessionContext::new_with_config(session_config); ctx.register_table(self.table_name, self.table_provider)?; @@ -207,6 +216,7 @@ impl GeneratedSessionContextBuilder { target_partitions: self.target_partitions, sort_hint: self.sort_hint, skip_partial_params: self.skip_partial_params, + enable_aggregation_blocked_groups: self.enable_aggregation_blocked_groups, }; Ok(SessionContextWithParams { ctx, params }) @@ -221,6 +231,7 @@ pub struct SessionContextParams { target_partitions: usize, sort_hint: bool, skip_partial_params: SkipPartialParams, + enable_aggregation_blocked_groups: bool, } /// Partial skipping parameters diff --git a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/query_builder.rs b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/query_builder.rs index 274bc761accc..70c89957f6c9 100644 --- a/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/query_builder.rs +++ b/datafusion/core/tests/fuzz_cases/aggregation_fuzzer/query_builder.rs @@ -247,7 +247,6 @@ impl QueryBuilder { fn generate_query(&self) -> String { let group_by = self.random_group_by(); - dbg!(&group_by); let mut query = String::from("SELECT "); query.push_str(&group_by.join(", ")); if !group_by.is_empty() { diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 5ff1c1d07216..697089ae0030 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -18,12 +18,12 @@ //! Vectorized [`GroupsAccumulator`] use arrow::array::{ArrayRef, BooleanArray}; -use datafusion_common::{not_impl_err, Result}; +use datafusion_common::{not_impl_err, DataFusionError, Result}; /// Describes how many rows should be emitted during grouping. #[derive(Debug, Clone, Copy)] pub enum EmitTo { - /// Emit all groups + /// Emit all groups, will clear all existing group indexes All, /// Emit only the first `n` groups and shift all existing group /// indexes down by `n`. @@ -31,6 +31,10 @@ pub enum EmitTo { /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. First(usize), + /// Emit next block in the blocked managed groups + /// + /// Similar as `Emit::All`, will also clear all existing group indexes + NextBlock, } impl EmitTo { @@ -39,6 +43,10 @@ impl EmitTo { /// remaining values in `v`. /// /// This avoids copying if Self::All + /// + /// NOTICE: only support emit strategies: `Self::All` and `Self::First` + /// Will call `panic` if called with `Self::NextBlock` + /// pub fn take_needed(&self, v: &mut Vec) -> Vec { match self { Self::All => { @@ -52,6 +60,7 @@ impl EmitTo { std::mem::swap(v, &mut t); t } + Self::NextBlock => unreachable!("don't support take block in take_needed"), } } } @@ -250,4 +259,49 @@ pub trait GroupsAccumulator: Send { /// This function is called once per batch, so it should be `O(n)` to /// compute, not `O(num_groups)` fn size(&self) -> usize; + + /// Returns `true` if this accumulator supports blocked groups. + /// + /// Blocked groups(or called blocked management approach) is an optimization + /// to reduce the cost of managing aggregation intermediate states. + /// + /// Here is brief introduction for two states management approaches: + /// - Blocked approach, states are stored and managed in multiple `Vec`s, + /// we call it `Block`s. Organize like this is for avoiding to resize `Vec` + /// and allocate a new `Vec` instead to reduce cost and get better performance. + /// When locating data in `Block`s, we need to use `block_id` to locate the + /// needed `Block` at first, and use `block_offset` to locate the needed + /// data in `Block` after. + /// + /// - Single approach, all states are stored and managed in a single large `Block`. + /// So when locating data, `block_id` will always be 0, and we only need `block_offset` + /// to locate data in the single `Block`. + /// + /// More details can see: + /// + /// + fn supports_blocked_groups(&self) -> bool { + false + } + + /// Alter the block size in the accumulator + /// + /// If the target block size is `None`, it will use a single big + /// block(can think it a `Vec`) to manage the state. + /// + /// If the target block size` is `Some(blk_size)`, it will try to + /// set the block size to `blk_size`, and the try will only success + /// when the accumulator has supported blocked mode. + /// + /// NOTICE: After altering block size, all data in existing accumulators will be cleared. + /// + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + if block_size.is_some() { + return Err(DataFusionError::NotImplemented( + "this accumulator doesn't support blocked mode yet".to_string(), + )); + } + + Ok(()) + } } diff --git a/datafusion/functions-aggregate-common/Cargo.toml b/datafusion/functions-aggregate-common/Cargo.toml index cf065ca1cb17..0ce85e20d454 100644 --- a/datafusion/functions-aggregate-common/Cargo.toml +++ b/datafusion/functions-aggregate-common/Cargo.toml @@ -50,3 +50,7 @@ rand = { workspace = true } [[bench]] harness = false name = "accumulate" + +[[bench]] +harness = false +name = "null_state_accumulate" diff --git a/datafusion/functions-aggregate-common/benches/null_state_accumulate.rs b/datafusion/functions-aggregate-common/benches/null_state_accumulate.rs new file mode 100644 index 000000000000..8a4c01a8397e --- /dev/null +++ b/datafusion/functions-aggregate-common/benches/null_state_accumulate.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use std::sync::Arc; + +use arrow::array::ArrowNativeTypeOp; +use arrow::{ + array::{ArrayRef, AsArray, BooleanArray, Int64Array}, + datatypes::Int64Type, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::group_index_operations::FlatGroupIndexOperations; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ + accumulate::{self, accumulate_indices, NullStateAdapter}, + blocks::GeneralBlocks, + group_index_operations::{BlockedGroupIndexOperations, GroupIndexOperations}, +}; + +fn generate_group_indices(len: usize) -> Vec { + (0..len).collect() +} + +fn generate_values(len: usize, has_null: bool) -> ArrayRef { + if has_null { + let values = (0..len) + .map(|i| if i % 7 == 0 { None } else { Some(i as i64) }) + .collect::>(); + Arc::new(Int64Array::from(values)) + } else { + let values = (0..len).map(|i| Some(i as i64)).collect::>(); + Arc::new(Int64Array::from(values)) + } +} + +fn generate_filter(len: usize) -> Option { + let values = (0..len) + .map(|i| { + if i % 7 == 0 { + None + } else if i % 5 == 0 { + Some(false) + } else { + Some(true) + } + }) + .collect::>(); + Some(BooleanArray::from(values)) +} + +fn criterion_benchmark(c: &mut Criterion) { + let batch_size = 8192; + let len = batch_size * 4096; + let group_indices = generate_group_indices(len); + let rows_count = group_indices.len(); + let values = generate_values(len, false); + let opt_filter = generate_filter(len); + let prim_op = |x: &mut i64, y: i64| *x = x.add_wrapping(y); + + let mut num_chunks = len.div_ceil(batch_size); + let last_chunk_size = len % batch_size; + let last_chunk_size = if last_chunk_size > 0 { + last_chunk_size + } else { + batch_size + }; + + let group_indices_chunks = group_indices + .chunks(batch_size) + .map(|chunk| chunk.to_vec()) + .collect::>(); + + let mut values_chunks = vec![]; + for chunk_idx in 0..num_chunks { + let chunk_size = if chunk_idx == num_chunks - 1 { + last_chunk_size + } else { + batch_size + }; + let chunk = values.slice(chunk_idx * batch_size, chunk_size); + + values_chunks.push(chunk); + } + + let mut total_num_groups_chunks = vec![]; + for group_indices in &group_indices_chunks { + let total_num_groups = *group_indices.iter().max().unwrap() + 1; + total_num_groups_chunks.push(total_num_groups); + } + + let mode = std::env::var("ACC_MODE").unwrap_or("all".to_string()); + let block_factor = std::env::var("BLOCK_FACTOR") + .unwrap_or("1".to_string()) + .parse::() + .unwrap(); + + if &mode == "blocked" || &mode == "all" { + c.bench_function("Blocked accumulate", |b| { + b.iter(|| { + let block_size = block_factor * batch_size; + let mut blocks = GeneralBlocks::::new(Some(block_size)); + let group_index_operation = BlockedGroupIndexOperations::new(block_size); + + let group_indices_iter = group_indices_chunks.iter(); + let values_iter = values_chunks.iter(); + let total_num_groups_iter = total_num_groups_chunks.iter(); + let iter = group_indices_iter + .zip(values_iter) + .zip(total_num_groups_iter); + for ((group_indices, values), &total_num_groups) in iter { + let values = values.as_primitive::(); + blocks.expand(total_num_groups, 0); + + // let mut value_fn = |block_id, block_offset, new_value| { + // let value = blocks.get_mut(block_id, block_offset); + // prim_op(value, new_value); + // }; + + accumulate::accumulate( + group_indices, + values, + None, + |group_index, value| { + let block_id = + group_index_operation.get_block_id(group_index); + let block_offset = + group_index_operation.get_block_offset(group_index); + sum(&mut blocks, block_id, block_offset, value); + }, + ); + } + }) + }); + } + + if &mode == "flat" || &mode == "all" { + c.bench_function("Flat accumulate", |b| { + b.iter(|| { + let mut blocks = GeneralBlocks::::new(None); + let group_index_operation = FlatGroupIndexOperations; + + let group_indices_iter = group_indices_chunks.iter(); + let values_iter = values_chunks.iter(); + let total_num_groups_iter = total_num_groups_chunks.iter(); + let iter = group_indices_iter + .zip(values_iter) + .zip(total_num_groups_iter); + for ((group_indices, values), &total_num_groups) in iter { + let values = values.as_primitive::(); + blocks.expand(total_num_groups, 0); + + // let mut value_fn = |block_id, block_offset, new_value| { + // let value = blocks.get_mut(block_id, block_offset); + // prim_op(value, new_value); + // }; + + accumulate::accumulate( + group_indices, + values, + None, + |group_index, value| { + let block_id = + group_index_operation.get_block_id(group_index); + let block_offset = + group_index_operation.get_block_offset(group_index); + sum(&mut blocks, block_id, block_offset, value); + }, + ); + } + }) + }); + } +} + +#[inline] +fn sum( + blocks: &mut GeneralBlocks, + block_id: usize, + block_offset: usize, + new_value: i64, +) { + let value = blocks.get_mut(block_id, block_offset); + *value = value.add_wrapping(new_value); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index aa2f5a586e87..77c4a89b2ac5 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -19,7 +19,9 @@ //! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] pub mod accumulate; +pub mod blocks; pub mod bool_op; +pub mod group_index_operations; pub mod nulls; pub mod prim_op; diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs index e629e99e1657..8acfd527abdf 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs @@ -19,11 +19,19 @@ //! //! [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator +use std::fmt::Debug; +use std::marker::PhantomData; + use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, PrimitiveArray}; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::ArrowPrimitiveType; +use crate::aggregate::groups_accumulator::blocks::{EmitBlockBuilder, EmitBlocksState}; +use crate::aggregate::groups_accumulator::group_index_operations::{ + BlockedGroupIndexOperations, FlatGroupIndexOperations, GroupIndexOperations, +}; use datafusion_expr_common::groups_accumulator::EmitTo; + /// Track the accumulator null state per row: if any values for that /// group were null and if any values have been seen at all for that group. /// @@ -50,7 +58,7 @@ use datafusion_expr_common::groups_accumulator::EmitTo; /// /// [`GroupsAccumulator`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator #[derive(Debug)] -pub struct NullState { +pub struct NullState { /// Have we seen any non-filtered input values for `group_index`? /// /// If `seen_values[i]` is true, have seen at least one non null @@ -58,22 +66,32 @@ pub struct NullState { /// /// If `seen_values[i]` is false, have not seen any values that /// pass the filter yet for group `i` + /// + /// NOTICE: we don't even use `blocked approach` to organize `seen_values`, + /// And only adapt to support `blocked emitting`(see [`BlockedNullState`]), + /// it is due to the total cost of `set_bit` become even larger if we do so + /// according to the cpu profiling. + /// I think it is due to the small size of bitmap (`seen_values` is indeed + /// a bitmap): + /// - Resizing is not really expansive + /// - When we organize it by `blocked approach`, the cost of `set_bit` will + /// increase understandably(indirection, noncontiguous memory...) + /// - Finally total cost increases... + /// seen_values: BooleanBufferBuilder, -} -impl Default for NullState { - fn default() -> Self { - Self::new() - } -} + /// Block size + /// + /// In `blocked approach`, it will be `Some(block_size)`, + /// otherwise it will be `None`. + /// + block_size: Option, -impl NullState { - pub fn new() -> Self { - Self { - seen_values: BooleanBufferBuilder::new(0), - } - } + /// phantom data for required type `` + group_index_operation: O, +} +impl NullState { /// return the size of all buffers allocated by this null state, not including self pub fn size(&self) -> usize { // capacity is in bits, so convert to bytes @@ -105,15 +123,18 @@ impl NullState { mut value_fn: F, ) where T: ArrowPrimitiveType + Send, - F: FnMut(usize, T::Native) + Send, + F: FnMut(usize, usize, T::Native) + Send, { // ensure the seen_values is big enough (start everything at // "not seen" valid) let seen_values = initialize_builder(&mut self.seen_values, total_num_groups, false); + let block_size = self.block_size.unwrap_or_default(); accumulate(group_indices, values, opt_filter, |group_index, value| { + let block_id = self.group_index_operation.get_block_id(group_index); + let block_offset = self.group_index_operation.get_block_offset(group_index); seen_values.set_bit(group_index, true); - value_fn(group_index, value); + value_fn(block_id, block_offset, value); }); } @@ -135,17 +156,17 @@ impl NullState { total_num_groups: usize, mut value_fn: F, ) where - F: FnMut(usize, bool) + Send, + F: FnMut(usize, usize, bool) + Send, { let data = values.values(); assert_eq!(data.len(), group_indices.len()); + // These could be made more performant by iterating in chunks of 64 bits at a time // ensure the seen_values is big enough (start everything at // "not seen" valid) let seen_values = initialize_builder(&mut self.seen_values, total_num_groups, false); - - // These could be made more performant by iterating in chunks of 64 bits at a time + let block_size = self.block_size.unwrap_or_default(); match (values.null_count() > 0, opt_filter) { // no nulls, no filter, (false, None) => { @@ -153,8 +174,12 @@ impl NullState { // buffer is big enough (start everything at valid) group_indices.iter().zip(data.iter()).for_each( |(&group_index, new_value)| { + let block_id = + self.group_index_operation.get_block_id(group_index); + let block_offset = + self.group_index_operation.get_block_offset(group_index); seen_values.set_bit(group_index, true); - value_fn(group_index, new_value) + value_fn(block_id, block_offset, new_value) }, ) } @@ -167,8 +192,12 @@ impl NullState { .zip(nulls.iter()) .for_each(|((&group_index, new_value), is_valid)| { if is_valid { + let block_id = + self.group_index_operation.get_block_id(group_index); + let block_offset = + self.group_index_operation.get_block_offset(group_index); seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); + value_fn(block_id, block_offset, new_value); } }) } @@ -182,8 +211,12 @@ impl NullState { .zip(filter.iter()) .for_each(|((&group_index, new_value), filter_value)| { if let Some(true) = filter_value { + let block_id = + self.group_index_operation.get_block_id(group_index); + let block_offset = + self.group_index_operation.get_block_offset(group_index); seen_values.set_bit(group_index, true); - value_fn(group_index, new_value); + value_fn(block_id, block_offset, new_value); } }) } @@ -197,39 +230,361 @@ impl NullState { .for_each(|((filter_value, &group_index), new_value)| { if let Some(true) = filter_value { if let Some(new_value) = new_value { + let block_id = + self.group_index_operation.get_block_id(group_index); + let block_offset = self + .group_index_operation + .get_block_offset(group_index); seen_values.set_bit(group_index, true); - value_fn(group_index, new_value) + value_fn(block_id, block_offset, new_value); } } }) } } } +} - /// Creates the a [`NullBuffer`] representing which group_indices - /// should have null values (because they never saw any values) - /// for the `emit_to` rows. - /// - /// resets the internal state appropriately +/// Ensures that `builder` contains a `BooleanBufferBuilder with at +/// least `total_num_groups`. +/// +/// All new entries are initialized to `default_value` +fn initialize_builder( + builder: &mut BooleanBufferBuilder, + total_num_groups: usize, + default_value: bool, +) -> &mut BooleanBufferBuilder { + if builder.len() < total_num_groups { + let new_groups = total_num_groups - builder.len(); + builder.append_n(new_groups, default_value); + } + builder +} + +/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and [`BlockedNullState`]. +/// For performance, the cost of batch-level dynamic dispatching is acceptable. +#[derive(Debug)] +pub enum NullStateAdapter { + Flat(FlatNullState), + Blocked(BlockedNullState), +} + +impl NullStateAdapter { + pub fn new(block_size: Option) -> Self { + if let Some(blk_size) = block_size { + Self::Blocked(BlockedNullState::new(blk_size)) + } else { + Self::Flat(FlatNullState::new()) + } + } + + #[inline] + pub fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + value_fn: F, + ) where + T: ArrowPrimitiveType + Send, + F: FnMut(usize, usize, T::Native) + Send, + { + match self { + NullStateAdapter::Flat(null_state) => null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ), + NullStateAdapter::Blocked(null_state) => null_state.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ), + } + } + + #[inline] + pub fn accumulate_boolean( + &mut self, + group_indices: &[usize], + values: &BooleanArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + value_fn: F, + ) where + F: FnMut(usize, usize, bool) + Send, + { + match self { + NullStateAdapter::Flat(null_state) => null_state.accumulate_boolean( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ), + NullStateAdapter::Blocked(null_state) => null_state.accumulate_boolean( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ), + } + } + + #[inline] pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { - let nulls: BooleanBuffer = self.seen_values.finish(); + match self { + NullStateAdapter::Flat(null_state) => null_state.build(emit_to), + NullStateAdapter::Blocked(null_state) => null_state.build(), + } + } + + #[inline] + pub fn size(&self) -> usize { + match self { + NullStateAdapter::Flat(null_state) => null_state.size(), + NullStateAdapter::Blocked(null_state) => null_state.size(), + } + } + + // Clone and build a single [`BooleanBuffer`] from `seen_values`, + // only used for testing. + #[cfg(test)] + fn build_cloned_seen_values(&self) -> BooleanBuffer { + match self { + NullStateAdapter::Flat(null_state) => null_state.seen_values.finish_cloned(), + NullStateAdapter::Blocked(null_state) => { + null_state.inner.seen_values.finish_cloned() + } + } + } + + #[cfg(test)] + fn build_all_in_once(&mut self) -> NullBuffer { + match self { + NullStateAdapter::Flat(null_state) => null_state.build(EmitTo::All), + NullStateAdapter::Blocked(null_state) => { + let mut return_builder = BooleanBufferBuilder::new(0); + let total_groups = null_state.inner.seen_values.len(); + let block_size = null_state.inner.block_size.unwrap(); + let num_blocks = total_groups.div_ceil(block_size); + for _ in 0..num_blocks { + let blocked_nulls = null_state.build(); + for bit in blocked_nulls.inner().iter() { + return_builder.append(bit); + } + } - let nulls = match emit_to { - EmitTo::All => nulls, + NullBuffer::new(return_builder.finish()) + } + } + } +} + +/// [`NullState`] for `flat groups input` +/// +/// The input are organized like: +/// +/// ```text +/// row_0 group_index_0 +/// row_1 group_index_1 +/// row_2 group_index_2 +/// ... +/// row_n group_index_n +/// ``` +/// +/// If `row_x group_index_x` is not filtered(`group_index_x` is seen) +/// `seen_values[group_index_x]` will be set to `true`. +/// +pub type FlatNullState = NullState; + +impl FlatNullState { + pub fn new() -> Self { + Self::default() + } +} + +impl Default for FlatNullState { + fn default() -> Self { + Self { + seen_values: BooleanBufferBuilder::new(0), + block_size: None, + group_index_operation: FlatGroupIndexOperations, + } + } +} + +impl FlatNullState { + pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer { + match emit_to { + EmitTo::All => NullBuffer::new(self.seen_values.finish()), EmitTo::First(n) => { // split off the first N values in seen_values // // TODO make this more efficient rather than two // copies and bitwise manipulation + let nulls = self.seen_values.finish(); let first_n_null: BooleanBuffer = nulls.iter().take(n).collect(); // reset the existing seen buffer for seen in nulls.iter().skip(n) { self.seen_values.append(seen); } - first_n_null + NullBuffer::new(first_n_null) } + EmitTo::NextBlock => unreachable!(), + } + } +} + +/// [`NullState`] for `blocked groups input` +/// +/// The `input` and `set_bit` logic are same with `FlatNullState` +/// We just define a `emit_state` for it to support blocks emitting. +/// +/// For example, `seen_values` will be organized with a flat approach like: +/// +/// ```text +/// true +/// false +/// false +/// true +/// ``` +/// +/// And assume `block_size` is 2, we will be split the flat booleans to 2 blocks +/// firstly, and then emit them blok by block. +/// +/// ```text +/// // block0 +/// true +/// false +/// +/// // block1 +/// false +/// true +/// ``` +/// +/// The reason why we don't use `blocked approach` to organize data can see in [`NullState`]. +/// +#[derive(Debug)] +pub struct BlockedNullState { + /// Null state of blocked approach + inner: NullState, + + /// State used to control the blocks emitting + emit_state: EmitBlocksState, +} + +impl BlockedNullState { + pub fn new(block_size: usize) -> Self { + let inner = NullState { + seen_values: BooleanBufferBuilder::new(0), + block_size: Some(block_size), + group_index_operation: BlockedGroupIndexOperations::new(block_size), + }; + + Self { + inner, + emit_state: EmitBlocksState::Init, + } + } + + #[inline] + pub fn accumulate( + &mut self, + group_indices: &[usize], + values: &PrimitiveArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + value_fn: F, + ) where + T: ArrowPrimitiveType + Send, + F: FnMut(usize, usize, T::Native) + Send, + { + assert!(!self.is_emitting(), "can not update groups during emitting"); + self.inner.accumulate( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ); + } + + #[inline] + pub fn accumulate_boolean( + &mut self, + group_indices: &[usize], + values: &BooleanArray, + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + value_fn: F, + ) where + F: FnMut(usize, usize, bool) + Send, + { + assert!(!self.is_emitting(), "can not update groups during emitting"); + self.inner.accumulate_boolean( + group_indices, + values, + opt_filter, + total_num_groups, + value_fn, + ); + } + + pub fn build(&mut self) -> NullBuffer { + let (total_num_groups, block_size) = if !self.is_emitting() { + (self.inner.seen_values.len(), self.inner.block_size.unwrap()) + } else { + (0, 0) }; - NullBuffer::new(nulls) + + let init_block_builder = || self.inner.seen_values.finish(); + // TODO: maybe we should return `None` rather than unwrap + let emit_block = self + .emit_state + .emit_block(total_num_groups, block_size, init_block_builder) + .expect("should not emit empty null state"); + + NullBuffer::new(emit_block) + } + + #[inline] + fn is_emitting(&self) -> bool { + self.emit_state.is_emitting() + } + + #[inline] + fn size(&self) -> usize { + // Unnecessary to take care of `emit_state`, it is just the intermediate + // data used during emitting + self.inner.size() + } +} + +impl EmitBlockBuilder for BooleanBuffer { + type B = BooleanBuffer; + + fn build( + &mut self, + emit_index: usize, + block_size: usize, + is_last_block: bool, + last_block_len: usize, + ) -> Self::B { + let slice_offset = emit_index * block_size; + let slice_len = if is_last_block { + last_block_len + } else { + block_size + }; + + self.slice(slice_offset, slice_len) } } @@ -386,6 +741,8 @@ pub fn accumulate( /// * `group_idx`: The group index for the current row /// * `batch_idx`: The index of the current row in the input arrays /// * `columns`: Reference to all input arrays for accessing values +// TODO: support `blocked group index` for `accumulate_multiple` +// (for supporting `blocked group index` for correlation group accumulator) pub fn accumulate_multiple( group_indices: &[usize], value_columns: &[&PrimitiveArray], @@ -449,6 +806,8 @@ pub fn accumulate_multiple( /// /// See [`NullState::accumulate`], for more details on other /// arguments. +// TODO: support `blocked group index` for `accumulate_indices` +// (for supporting `blocked group index` for count group accumulator) pub fn accumulate_indices( group_indices: &[usize], nulls: Option<&NullBuffer>, @@ -577,29 +936,13 @@ pub fn accumulate_indices( } } -/// Ensures that `builder` contains a `BooleanBufferBuilder with at -/// least `total_num_groups`. -/// -/// All new entries are initialized to `default_value` -fn initialize_builder( - builder: &mut BooleanBufferBuilder, - total_num_groups: usize, - default_value: bool, -) -> &mut BooleanBufferBuilder { - if builder.len() < total_num_groups { - let new_groups = total_num_groups - builder.len(); - builder.append_n(new_groups, default_value); - } - builder -} - #[cfg(test)] mod test { use super::*; use arrow::array::{Int32Array, UInt32Array}; use rand::{rngs::ThreadRng, Rng}; - use std::collections::HashSet; + use std::{cmp, collections::HashSet}; #[test] fn accumulate() { @@ -625,11 +968,14 @@ mod test { }) .collect(); + // Test flat style Fixture { group_indices, values, values_with_nulls, filter, + block_size: 4, + acc_rounds: 5, } .run() } @@ -656,6 +1002,13 @@ mod test { /// filter (defaults to None) filter: BooleanArray, + + /// block size for testing [`BlockedNullState`] + block_size: usize, + + /// how many rounds we call the `accumulate`, use this to test situation + /// about calling `accumulate` multiple times for better coverage + acc_rounds: usize, } impl Fixture { @@ -672,6 +1025,12 @@ mod test { let values: Vec = (0..num_values).map(|_| rng.gen()).collect(); + // random block size + let block_size = rng.gen_range(1..num_groups).next_power_of_two(); + + // random acc rounds + let acc_rounds = rng.gen_range(1..=group_indices.len()); + // 10% chance of false // 10% change of null // 80% chance of true @@ -707,6 +1066,8 @@ mod test { values, values_with_nulls, filter, + block_size, + acc_rounds, } } @@ -731,7 +1092,14 @@ mod test { let filter = &self.filter; // no null, no filters - Self::accumulate_test(group_indices, &values_array, None, total_num_groups); + Self::accumulate_test( + group_indices, + &values_array, + None, + total_num_groups, + self.block_size, + self.acc_rounds, + ); // nulls, no filters Self::accumulate_test( @@ -739,6 +1107,8 @@ mod test { &values_with_nulls_array, None, total_num_groups, + self.block_size, + self.acc_rounds, ); // no nulls, filters @@ -747,6 +1117,8 @@ mod test { &values_array, Some(filter), total_num_groups, + self.block_size, + self.acc_rounds, ); // nulls, filters @@ -755,6 +1127,8 @@ mod test { &values_with_nulls_array, Some(filter), total_num_groups, + self.block_size, + self.acc_rounds, ); } @@ -766,26 +1140,97 @@ mod test { values: &UInt32Array, opt_filter: Option<&BooleanArray>, total_num_groups: usize, + block_size: usize, + acc_rounds: usize, ) { + // Test `accumulate` of `FlatNullState` + accumulate in once Self::accumulate_values_test( group_indices, values, opt_filter, total_num_groups, + None, + None, + ); + + // Test `accumulate` of `FlatNullState` + accumulate in multiple times + Self::accumulate_values_test( + group_indices, + values, + opt_filter, + total_num_groups, + None, + Some(acc_rounds), + ); + + // Test `accumulate` of `BlockedNullState` + accumulate in once + Self::accumulate_values_test( + group_indices, + values, + opt_filter, + total_num_groups, + Some(block_size), + None, + ); + + // Test `accumulate` of `BlockedNullState` + accumulate in multiple times + Self::accumulate_values_test( + group_indices, + values, + opt_filter, + total_num_groups, + Some(block_size), + Some(acc_rounds), ); - Self::accumulate_indices_test(group_indices, values.nulls(), opt_filter); // Convert values into a boolean array (anything above the // average is true, otherwise false) let avg: usize = values.iter().filter_map(|v| v.map(|v| v as usize)).sum(); let boolean_values: BooleanArray = values.iter().map(|v| v.map(|v| v as usize > avg)).collect(); + + // Test `accumulate_boolean` of `FlatNullState` + accumulate in once + Self::accumulate_boolean_test( + group_indices, + &boolean_values, + opt_filter, + total_num_groups, + None, + None, + ); + + // Test `accumulate_boolean` of `FlatNullState` + accumulate in multiple times Self::accumulate_boolean_test( group_indices, &boolean_values, opt_filter, total_num_groups, + None, + Some(acc_rounds), ); + + // Test `accumulate_boolean` of `BlockedNullState` + accumulate in once + Self::accumulate_boolean_test( + group_indices, + &boolean_values, + opt_filter, + total_num_groups, + Some(block_size), + None, + ); + + // Test `accumulate_boolean` of `BlockedNullState` + accumulate in multiple times + Self::accumulate_boolean_test( + group_indices, + &boolean_values, + opt_filter, + total_num_groups, + Some(block_size), + Some(acc_rounds), + ); + + // Test `accumulate_indices` + Self::accumulate_indices_test(group_indices, values.nulls(), opt_filter); } /// This is effectively a different implementation of @@ -795,19 +1240,82 @@ mod test { values: &UInt32Array, opt_filter: Option<&BooleanArray>, total_num_groups: usize, + block_size: Option, + acc_rounds: Option, ) { - let mut accumulated_values = vec![]; - let mut null_state = NullState::new(); + // Chunking `group_indices`, `values`, `opt_filter`, and we also need to generate + // `chunked acc_group_indices` basing on `group_indices` + let (group_indices_chunks, values_chunks, opt_filter_chunks) = + if let Some(rounds) = acc_rounds { + let chunk_size = group_indices.len() / rounds; + + let group_indices_chunks = group_indices + .chunks(chunk_size) + .map(|chunk| chunk.to_vec()) + .collect::>(); + + let values_chunks = values + .iter() + .collect::>() + .chunks(chunk_size) + .map(|chunk| UInt32Array::from_iter(chunk.iter().copied())) + .collect::>(); + + let opt_filter_chunks = if let Some(filter) = opt_filter { + filter + .iter() + .collect::>() + .chunks(chunk_size) + .map(|chunk| Some(BooleanArray::from_iter(chunk.iter()))) + .collect::>() + } else { + vec![None; values_chunks.len()] + }; - null_state.accumulate( - group_indices, - values, - opt_filter, - total_num_groups, - |group_index, value| { - accumulated_values.push((group_index, value)); - }, - ); + (group_indices_chunks, values_chunks, opt_filter_chunks) + } else { + ( + vec![group_indices.to_vec()], + vec![values.clone()], + vec![opt_filter.cloned()], + ) + }; + + let mut total_num_groups_chunks = vec![]; + let mut cur_total_num_groups = usize::MIN; + for group_indices in &group_indices_chunks { + let num_groups = *group_indices.iter().max().unwrap() + 1; + cur_total_num_groups = cmp::max(cur_total_num_groups, num_groups); + total_num_groups_chunks.push(cur_total_num_groups); + } + + // Build needed test contexts + let (mut null_state, block_size) = if let Some(blk_size) = block_size { + (NullStateAdapter::new(Some(blk_size)), blk_size) + } else { + (NullStateAdapter::new(None), 0) + }; + + // Start the test + let mut accumulated_values = vec![]; + for (((acc_group_indices, values), total_num_groups), cur_opt_filter) in + group_indices_chunks + .into_iter() + .zip(values_chunks) + .zip(total_num_groups_chunks) + .zip(opt_filter_chunks) + { + null_state.accumulate( + &acc_group_indices, + &values, + cur_opt_filter.as_ref(), + total_num_groups, + |block_id, block_offset, value| { + let flatten_index = block_id * block_size + block_offset; + accumulated_values.push((flatten_index, value)); + }, + ); + } // Figure out the expected values let mut expected_values = vec![]; @@ -841,13 +1349,13 @@ mod test { assert_eq!(accumulated_values, expected_values, "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); - let seen_values = null_state.seen_values.finish_cloned(); + let seen_values = null_state.build_cloned_seen_values(); mock.validate_seen_values(&seen_values); // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); - let null_buffer = null_state.build(EmitTo::All); + let null_buffer = null_state.build_all_in_once(); assert_eq!(null_buffer, expected_null_buffer); } @@ -911,19 +1419,82 @@ mod test { values: &BooleanArray, opt_filter: Option<&BooleanArray>, total_num_groups: usize, + block_size: Option, + acc_rounds: Option, ) { - let mut accumulated_values = vec![]; - let mut null_state = NullState::new(); + // Chunking `group_indices`, `values`, `opt_filter`, and we also need to generate + // `chunked acc_group_indices` basing on `group_indices` + let (group_indices_chunks, values_chunks, opt_filter_chunks) = + if let Some(rounds) = acc_rounds { + let chunk_size = group_indices.len() / rounds; + + let group_indices_chunks = group_indices + .chunks(chunk_size) + .map(|chunk| chunk.to_vec()) + .collect::>(); + + let values_chunks = values + .iter() + .collect::>() + .chunks(chunk_size) + .map(|chunk| BooleanArray::from_iter(chunk.iter().copied())) + .collect::>(); + + let opt_filter_chunks = if let Some(filter) = opt_filter { + filter + .iter() + .collect::>() + .chunks(chunk_size) + .map(|chunk| Some(BooleanArray::from_iter(chunk.iter()))) + .collect::>() + } else { + vec![None; values_chunks.len()] + }; - null_state.accumulate_boolean( - group_indices, - values, - opt_filter, - total_num_groups, - |group_index, value| { - accumulated_values.push((group_index, value)); - }, - ); + (group_indices_chunks, values_chunks, opt_filter_chunks) + } else { + ( + vec![group_indices.to_vec()], + vec![values.clone()], + vec![opt_filter.cloned()], + ) + }; + + let mut total_num_groups_chunks = vec![]; + let mut cur_total_num_groups = usize::MIN; + for group_indices in &group_indices_chunks { + let num_groups = *group_indices.iter().max().unwrap() + 1; + cur_total_num_groups = cmp::max(cur_total_num_groups, num_groups); + total_num_groups_chunks.push(cur_total_num_groups); + } + + // Build needed test contexts + let (mut null_state, block_size) = if let Some(blk_size) = block_size { + (NullStateAdapter::new(Some(blk_size)), blk_size) + } else { + (NullStateAdapter::new(None), 0) + }; + + // Start the test + let mut accumulated_values = vec![]; + for (((acc_group_indices, values), total_num_groups), opt_filter) in + group_indices_chunks + .into_iter() + .zip(values_chunks) + .zip(total_num_groups_chunks) + .zip(opt_filter_chunks) + { + null_state.accumulate_boolean( + &acc_group_indices, + &values, + opt_filter.as_ref(), + total_num_groups, + |block_id, block_offset, value| { + let flatten_index = block_id * block_size + block_offset; + accumulated_values.push((flatten_index, value)); + }, + ); + } // Figure out the expected values let mut expected_values = vec![]; @@ -958,13 +1529,13 @@ mod test { assert_eq!(accumulated_values, expected_values, "\n\naccumulated_values:{accumulated_values:#?}\n\nexpected_values:{expected_values:#?}"); - let seen_values = null_state.seen_values.finish_cloned(); + let seen_values = null_state.build_cloned_seen_values(); mock.validate_seen_values(&seen_values); // Validate the final buffer (one value per group) let expected_null_buffer = mock.expected_null_buffer(total_num_groups); - let null_buffer = null_state.build(EmitTo::All); + let null_buffer = null_state.build_all_in_once(); assert_eq!(null_buffer, expected_null_buffer); } diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/blocks.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/blocks.rs new file mode 100644 index 000000000000..f8386a24d40b --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/blocks.rs @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Aggregation intermediate results blocks in blocked approach + +use std::{ + fmt::Debug, + iter, mem, + ops::{Index, IndexMut}, +}; + +use datafusion_expr_common::groups_accumulator::EmitTo; + +// ======================================================================== +// Basic abstractions: `Blocks` and `Block` +// ======================================================================== + +/// Structure used to store aggregation intermediate results in `blocked approach` +/// +/// Aggregation intermediate results will be stored as multiple [`Block`]s +/// (simply you can think a [`Block`] as a `Vec`). And `Blocks` is the structure +/// to represent such multiple [`Block`]s. +/// +/// More details about `blocked approach` can see in: [`GroupsAccumulator::supports_blocked_groups`]. +/// +/// [`GroupsAccumulator::supports_blocked_groups`]: datafusion_expr_common::groups_accumulator::GroupsAccumulator::supports_blocked_groups +/// +#[derive(Debug)] +pub struct Blocks> { + /// Data in blocks + inner: Vec, + + /// Optimization for high cardinality case + current: B, + + /// Block size + /// + /// It states: + /// - `Some(blk_size)`, it represents multiple block exists, each one + /// has the `blk_size` len. + /// - `None` , only single block exists. + block_size: Option, + + /// Total groups number in blocks + total_num_groups: usize, + + /// Emit state used to control the emitting process + emit_state: EmitBlocksState, +} + +impl> Blocks { + #[inline] + pub fn new(block_size: Option) -> Self { + Self { + inner: Vec::with_capacity(1024), + current: B::default(), + total_num_groups: 0, + block_size, + emit_state: EmitBlocksState::Init, + } + } + + /// Expand blocks to make it large enough to store `total_num_groups` groups, + /// and we fill the new allocated block with `default_val` + pub fn expand(&mut self, total_num_groups: usize, default_val: B::T) { + assert!(!self.is_emitting(), "can not update groups during emitting"); + if self.total_num_groups >= total_num_groups { + return; + } + + // We compute how many blocks we need to store the `total_num_groups` groups. + // And if found the `exist_blocks` are not enough, we allocate more. + let needed_blocks = + total_num_groups.div_ceil(self.block_size.unwrap_or(usize::MAX)); + let exist_blocks = self + .total_num_groups + .div_ceil(self.block_size.unwrap_or(usize::MAX)); + if exist_blocks < needed_blocks { + // Take current and push into `inner`, because it is already not the last + let old_last = mem::take(&mut self.current); + + if !old_last.is_empty() { + self.inner.push(old_last); + } + + // allocate blocks + let allocated_blocks = needed_blocks - exist_blocks; + self.inner.extend( + iter::repeat_with(|| { + let build_ctx = self.block_size.map(|blk_size| { + BuildBlockContext::new(blk_size, default_val.clone()) + }); + B::build(build_ctx) + }) + .take(allocated_blocks), + ); + + // pop the last, and set the current + let new_last = self.inner.pop().unwrap(); + self.current = new_last; + } + + // If in `blocked approach`, we can return now. + // But In `flat approach`, we keep only `single block`, if found the + // `single block` not large enough, we allocate a larger one and copy + // the exist data to it(such copy is really expansive). + if self.block_size.is_none() { + self.current.expand(total_num_groups, default_val.clone()); + } + + self.total_num_groups = total_num_groups; + } + + /// Push block + pub fn push_block(&mut self, block: B) { + assert!(!self.is_emitting(), "can not update groups during emitting"); + + let old_last = mem::take(&mut self.current); + if !old_last.is_empty() { + self.inner.push(old_last); + } + let block_len = block.len(); + self.current = block; + self.total_num_groups += block_len; + } + + /// Emit blocks iteratively + /// + /// Because we don't know few about how to init the[`EmitBlockBuilder`], + /// so we expose `init_block_builder` to let the caller define it. + pub fn emit_next_block(&mut self, mut init_block_builder: F) -> Option + where + F: FnMut(&mut Vec) -> E, + { + let (total_num_groups, block_size) = if !self.is_emitting() { + // TODO: I think, we should set `total_num_groups` to 0 when blocks + // emitting starts. because it is used to represent number of `exist groups`, + // and I think `emitting groups` actually not exist anymore. + // But we still can't do it now for keep the same semantic with `GroupValues`. + + let old_last = mem::take(&mut self.current); + if !old_last.is_empty() { + self.inner.push(old_last); + } + + (self.total_num_groups, self.block_size.unwrap_or(usize::MAX)) + } else { + (0, 0) + }; + + let emit_block = + self.emit_state + .emit_block(total_num_groups, block_size, || { + init_block_builder(&mut self.inner) + })?; + + self.total_num_groups -= emit_block.len(); + + Some(emit_block) + } + + #[inline] + pub fn num_blocks(&self) -> usize { + self.total_num_groups + .div_ceil(self.block_size.unwrap_or(usize::MAX)) + } + + #[inline] + pub fn total_num_groups(&self) -> usize { + self.total_num_groups + } + + // FIXME + #[inline] + pub fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + #[inline] + pub fn clear(&mut self) { + self.inner.clear(); + self.current = B::default(); + self.total_num_groups = 0; + } + + #[inline] + fn is_emitting(&self) -> bool { + self.emit_state.is_emitting() + } +} + +impl> Index for Blocks { + type Output = B; + + #[inline] + fn index(&self, index: usize) -> &Self::Output { + &self.inner[index] + } +} + +impl> IndexMut for Blocks { + #[inline] + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + &mut self.inner[index] + } +} + +/// The abstraction to represent one aggregation intermediate result block +/// in `blocked approach`, multiple blocks compose a [`Blocks`] +/// +/// Many types of aggregation intermediate result exist, and we define an interface +/// to abstract the necessary behaviors of various intermediate result types. +/// +pub trait Block: Debug + Default { + type T: Clone; + + /// How to build the block + fn build(build_ctx: Option>) -> Self; + + /// Expand the block to `new_len` with `default_val` + /// + /// In `flat approach`, we will only keep single block, and need to + /// expand it when it is not large enough. + fn expand(&mut self, new_len: usize, default_val: Self::T); + + /// Truncate the block to `new_len` + fn truncate(&mut self, new_len: usize); + + /// Block len + fn len(&self) -> usize; + + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +pub struct BuildBlockContext { + block_size: usize, + default_val: T, +} + +impl BuildBlockContext { + pub fn new(block_size: usize, default_val: T) -> Self { + Self { + block_size, + default_val, + } + } +} + +// ======================================================================== +// The common blocks emitting logic +// ======================================================================== + +/// Emit blocks state +/// +/// There are two states: +/// - Init, we can only update blocks in this state +/// +/// - Emitting, we can't update blocks in this state until all +/// blocks are emitted, and the state is reset to `Init` +/// +#[derive(Debug)] +pub enum EmitBlocksState { + Init, + Emitting(EmitBlocksContext), +} + +/// Emit blocks context +#[derive(Debug)] +pub struct EmitBlocksContext { + /// Index of next emitted block + next_emit_index: usize, + + /// Block size of emitting [`Blocks`] + block_size: usize, + + /// Number of blocks needed to emit + num_blocks: usize, + + /// The len of last block + /// + /// Due to the last block is possibly non-full, so we compute + /// and store its len. + /// + last_block_len: usize, + + /// Emitted block builder + block_builder: E, +} + +impl EmitBlocksState { + pub fn emit_block( + &mut self, + total_num_groups: usize, + block_size: usize, + mut init_block_builder: F, + ) -> Option + where + F: FnMut() -> E, + { + loop { + match self { + Self::Init => { + // Init needed contexts + let num_blocks = total_num_groups.div_ceil(block_size); + let mut last_block_len = total_num_groups % block_size; + last_block_len = if last_block_len > 0 { + last_block_len + } else { + block_size + }; + + let block_builder = init_block_builder(); + + let emit_ctx = EmitBlocksContext { + next_emit_index: 0, + block_size, + num_blocks, + last_block_len, + block_builder, + }; + + *self = Self::Emitting(emit_ctx); + } + + Self::Emitting(EmitBlocksContext { + next_emit_index, + block_size, + num_blocks, + last_block_len, + block_builder, + }) => { + // Found empty blocks, return and reset directly + if next_emit_index == num_blocks { + *self = Self::Init; + break None; + } + + // Get current emit block idx + let emit_index = *next_emit_index; + // And then we advance the block idx + *next_emit_index += 1; + + // Process and generate the emit block + let is_last_block = next_emit_index == num_blocks; + let emit_block = block_builder.build( + emit_index, + *block_size, + is_last_block, + *last_block_len, + ); + + // Finally we check if all blocks emitted, if so, we reset the + // emit context to allow new updates + if next_emit_index == num_blocks { + *self = Self::Init; + } + + break Some(emit_block); + } + } + } + } + + #[inline] + pub fn is_emitting(&self) -> bool { + !matches!(self, Self::Init) + } +} + +pub trait EmitBlockBuilder: Debug { + type B; + + fn build( + &mut self, + emit_index: usize, + block_size: usize, + is_last_block: bool, + last_block_len: usize, + ) -> Self::B; +} + +// ======================================================================== +// The most commonly used implementation `GeneralBlocks` +// ======================================================================== + +/// Usually we use `Vec` to represent `Block`, so we define `Blocks>` +/// as the `GeneralBlocks` +pub type GeneralBlocks = Blocks, Vec>>; + +/// As mentioned in [`GeneralBlocks`], we usually use `Vec` to represent `Block`, +/// so we implement `Block` trait for `Vec` +impl Block for Vec { + type T = Ty; + + fn build(build_ctx: Option>) -> Self { + if let Some(BuildBlockContext { + block_size, + default_val, + }) = build_ctx + { + vec![default_val; block_size] + } else { + Vec::new() + } + } + + #[inline] + fn expand(&mut self, new_len: usize, default_val: Self::T) { + self.resize(new_len, default_val); + } + + #[inline] + fn truncate(&mut self, new_len: usize) { + self.truncate(new_len); + } + + #[inline] + fn len(&self) -> usize { + self.len() + } +} + +impl GeneralBlocks { + pub fn emit(&mut self, emit_to: EmitTo) -> Option> { + let init_block_builder = |inner: &mut Vec>| mem::take(inner); + + if matches!(emit_to, EmitTo::NextBlock) { + assert!( + self.block_size.is_some(), + "only support emit next block in blocked groups" + ); + self.emit_next_block(init_block_builder) + } else { + // TODO: maybe remove `EmitTo::take_needed` and move the + // pattern matching codes here after supporting blocked approach + // for all exist accumulators, to avoid matching twice + assert!( + self.block_size.is_none(), + "only support emit all/first in flat groups" + ); + + // We perform single block emitting through steps: + // - Pop the `block` firstly + // - Take `need rows` from `block` + // - Push back the `block` if still some rows in it + let mut block = self.emit_next_block(init_block_builder)?; + let emit_block = emit_to.take_needed(&mut block); + + if !block.is_empty() { + self.push_block(block); + } + + Some(emit_block) + } + } + + #[inline(always)] + pub fn get_mut(&mut self, block_id: usize, block_offset: usize) -> &mut T { + if block_id == self.inner.len() { + unsafe { + return self.current.get_unchecked_mut(block_offset); + } + } + + unsafe { + self.inner + .get_unchecked_mut(block_id) + .get_unchecked_mut(block_offset) + } + } + + #[inline(always)] + pub fn get(&self, block_id: usize, block_offset: usize) -> &T { + if block_id == self.inner.len() { + unsafe { + return self.current.get_unchecked(block_offset); + } + } + + &self.inner[block_id][block_offset] + } +} + +impl EmitBlockBuilder for Vec> { + type B = Vec; + + fn build( + &mut self, + emit_index: usize, + _block_size: usize, + is_last_block: bool, + last_block_len: usize, + ) -> Self::B { + let mut emit_block = mem::take(&mut self[emit_index]); + if is_last_block { + emit_block.truncate(last_block_len); + } + emit_block + } +} + +#[cfg(test)] +mod test { + // use datafusion_expr_common::groups_accumulator::EmitTo; + + // use crate::aggregate::groups_accumulator::{ + // blocks::GeneralBlocks, + // group_index_operations::{ + // BlockedGroupIndexOperations, FlatGroupIndexOperations, GroupIndexOperations, + // }, + // }; + + // type TestBlocks = GeneralBlocks; + + // fn fill_blocks( + // blocks: &mut TestBlocks, + // offset: usize, + // values: &[i32], + // ) { + // let block_size = blocks.block_size.unwrap_or_default(); + // for (idx, &val) in values.iter().enumerate() { + // let group_index = offset + idx; + // let block_id = O::get_block_id(group_index, block_size); + // let block_offset = O::get_block_offset(group_index, block_size); + // blocks[block_id][block_offset] = val; + // } + // } + + // #[test] + // fn test_single_block() { + // let mut blocks = TestBlocks::new(None); + // assert_eq!(blocks.num_blocks(), 0); + // assert_eq!(blocks.total_num_groups(), 0); + + // for _ in 0..2 { + // // Expand to 5 groups with 42, contexts to check: + // // - Exist num blocks: 1 + // // - Exist num groups: 5 + // // - Exist groups: `[42; 5]` + // blocks.expand(5, 42); + // assert_eq!(blocks.num_blocks(), 1); + // assert_eq!(blocks.total_num_groups(), 5); + // assert_eq!(&blocks[0], &[42; 5]); + + // // Modify the first 5 groups to `[0, 1, 2, 3, 4]`, contexts to check: + // // - Exist groups: `[0, 1, 2, 3, 4]` + // let values0 = [0, 1, 2, 3, 4]; + // fill_blocks::(&mut blocks, 0, &values0); + // assert_eq!(&blocks[0], &values0); + + // // Expand to 10 groups with 42, contexts to check: + // // - Exist num blocks: 1 + // // - Exist num groups: 10 + // // - Exist groups: `[0, 1, 2, 3, 4, 42, 42, 42, 42, 42]` + // blocks.expand(10, 42); + // assert_eq!(blocks.num_blocks(), 1); + // assert_eq!(blocks.total_num_groups(), 10); + // assert_eq!(&blocks[0], &[0, 1, 2, 3, 4, 42, 42, 42, 42, 42]); + + // // Modify the last 5 groups to `[5, 6, 7, 8, 9]`, contexts to check: + // // - Exist groups: `[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]` + // let values1 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; + // fill_blocks::(&mut blocks, 5, &values1[5..10]); + // assert_eq!(&blocks[0], &values1); + + // // Emit first 4 groups, contexts to check: + // // - Emitted num groups: 4 + // // - Emitted groups: `[0, 1 ,2, 3]` + // // - Exist num blocks: 1 + // // - Exist num groups: 6 + // // - Exist groups: `[4, 5, 6, 7, 8, 9]` + // let emit_block = blocks.emit(EmitTo::First(4)).unwrap(); + // assert_eq!(emit_block.len(), 4); + // assert_eq!(&emit_block, &[0, 1, 2, 3]); + // assert_eq!(blocks.num_blocks(), 1); + // assert_eq!(blocks.total_num_groups(), 6); + // assert_eq!(&blocks[0], &[4, 5, 6, 7, 8, 9]); + + // // Resize back to 12 groups after emit first 4 with 42, contexts to check: + // // - Exist num blocks: 1 + // // - Exist num groups: 12 + // // - Exist groups: `[4, 5, 6, 7, 8, 9, 42, 42, 42, 42, 42, 42]` + // blocks.expand(12, 42); + // assert_eq!(blocks.num_blocks(), 1); + // assert_eq!(blocks.total_num_groups(), 12); + // assert_eq!(&blocks[0], &[4, 5, 6, 7, 8, 9, 42, 42, 42, 42, 42, 42]); + + // // Modify the last 6 groups to `[20, 21, 22, 23, 24, 25]`, contexts to check: + // // - Exist groups: `[4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25]` + // let values2 = [4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25]; + // fill_blocks::(&mut blocks, 6, &values2[6..12]); + // assert_eq!(&blocks[0], &values2); + + // // Emit all, contexts to check: + // // - Emitted num groups: 12 + // // - Emitted groups: `[4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25]` + // // - Exist num blocks: 0 + // // - Exist num groups: 0 + // let emit_block = blocks.emit(EmitTo::All).unwrap(); + // assert_eq!(emit_block.len(), 12); + // assert_eq!(&emit_block, &[4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25]); + // assert_eq!(blocks.num_blocks(), 0); + // assert_eq!(blocks.total_num_groups(), 0); + + // // Check emit empty blocks + // assert!(blocks.emit(EmitTo::All).is_none()); + // assert!(blocks.emit(EmitTo::First(1)).is_none()); + + // // Test in next round + // } + // } + + // #[test] + // fn test_multi_blocks() { + // let mut blocks = TestBlocks::new(Some(3)); + // assert_eq!(blocks.num_blocks(), 0); + // assert_eq!(blocks.total_num_groups(), 0); + + // for _ in 0..2 { + // // Expand to 5 groups with 42, contexts to check: + // // - Exist num blocks: 2 + // // - Exist num groups: 5 + // // - Exist block 0: `[42; 3]` + // // - Exist block 1: `[42; 3]` + // // (in `blocked approach`, groups will always be expanded to len + // // of `block_size * N`) + // blocks.expand(5, 42); + // assert_eq!(blocks.num_blocks(), 2); + // assert_eq!(blocks.total_num_groups(), 5); + // assert_eq!(&blocks[0], &[42; 3]); + // assert_eq!(&blocks[1], &[42; 3]); + + // // Modify the first 5 groups to `[0, 1, 2, 3, 4]`, contexts to check: + // // - Exist block 0: `[0, 1, 2]` + // // - Exist block 1: `[3, 4, 42]` + // let values = [0, 1, 2, 3, 4]; + // fill_blocks::(&mut blocks, 0, &values); + // assert_eq!(&blocks[0], &[0, 1, 2]); + // assert_eq!(&blocks[1], &[3, 4, 42]); + + // // Expand to 10 groups with 42, contexts to check: + // // - Exist num blocks: 4 + // // - Exist num groups: 10 + // // - Exist block 0: `[0, 1, 2]` + // // - Exist block 1: `[3, 4, 42]` + // // - Exist block 2: `[42, 42, 42]` + // // - Exist block 3: `[42, 42, 42]` + // blocks.expand(10, 42); + // assert_eq!(blocks.num_blocks(), 4); + // assert_eq!(blocks.total_num_groups(), 10); + // assert_eq!(&blocks[0], &[0, 1, 2]); + // assert_eq!(&blocks[1], &[3, 4, 42]); + // assert_eq!(&blocks[2], &[42, 42, 42]); + // assert_eq!(&blocks[3], &[42, 42, 42]); + + // // Modify the last 5 groups to `[5, 6, 7, 8, 9]`, contexts to check: + // // - Exist block 0: `[0, 1, 2]` + // // - Exist block 1: `[3, 4, 5]` + // // - Exist block 2: `[6, 7, 8]` + // // - Exist block 3: `[9, 42, 42]` + // let values = [5, 6, 7, 8, 9]; + // fill_blocks::(&mut blocks, 5, &values); + // assert_eq!(&blocks[0], &[0, 1, 2]); + // assert_eq!(&blocks[1], &[3, 4, 5]); + // assert_eq!(&blocks[2], &[6, 7, 8]); + // assert_eq!(&blocks[3], &[9, 42, 42]); + + // // Emit blocks, it is actually an alternative of `EmitTo::All`, so when we + // // start to emit the first block, contexts should be: + // // - Emitted block 0: `[0, 1, 2]` + // // - Exist num blocks: 3 + // // - Exist num groups: 7 + // // - Emitting flag: true + // let emit_block = blocks.emit(EmitTo::NextBlock).unwrap(); + // assert_eq!(blocks.num_blocks(), 3); + // assert_eq!(blocks.total_num_groups(), 7); + // assert!(blocks.is_emitting()); + // assert_eq!(&emit_block, &[0, 1, 2]); + + // // Continue to emit rest 3 blocks, contexts to check: + // // - Exist num blocks checking + // // - Exist num groups checking + // // - Emitting flag checking + // // - Emitted block 1: `[3, 4, 5]` + // // - Emitted block 2: `[6, 7, 8]` + // // - Emitted block 3: `[9]` + // let emit_block = blocks.emit(EmitTo::NextBlock).unwrap(); + // assert_eq!(blocks.num_blocks(), 2); + // assert_eq!(blocks.total_num_groups(), 4); + // assert!(blocks.is_emitting()); + // assert_eq!(&emit_block, &[3, 4, 5]); + + // let emit_block = blocks.emit(EmitTo::NextBlock).unwrap(); + // assert_eq!(blocks.num_blocks(), 1); + // assert_eq!(blocks.total_num_groups(), 1); + // assert!(blocks.is_emitting()); + // assert_eq!(&emit_block, &[6, 7, 8]); + + // let emit_block = blocks.emit(EmitTo::NextBlock).unwrap(); + // assert_eq!(blocks.num_blocks(), 0); + // assert_eq!(blocks.total_num_groups(), 0); + // assert!(!blocks.is_emitting()); + // assert_eq!(&emit_block, &[9]); + + // // Check emit empty blocks + // assert!(blocks.emit(EmitTo::NextBlock).is_none()); + // // Again for check if it will always be `None` + // assert!(blocks.emit(EmitTo::NextBlock).is_none()); + + // // Test in next round + // } + // } +} diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index 149312e5a9c0..395fe6df9fbb 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use crate::aggregate::groups_accumulator::nulls::filtered_null_mask; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; -use datafusion_common::Result; +use datafusion_common::{internal_err, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -use super::accumulate::NullState; +use super::accumulate::FlatNullState; /// An accumulator that implements a single operation over a /// [`BooleanArray`] where the accumulated state is also boolean (such @@ -43,7 +43,7 @@ where values: BooleanBufferBuilder, /// Track nulls in the input / filters - null_state: NullState, + null_state: FlatNullState, /// Function that computes the output bool_fn: F, @@ -60,7 +60,7 @@ where pub fn new(bool_fn: F, identity: bool) -> Self { Self { values: BooleanBufferBuilder::new(0), - null_state: NullState::new(), + null_state: FlatNullState::new(), bool_fn, identity, } @@ -94,7 +94,7 @@ where values, opt_filter, total_num_groups, - |group_index, new_value| { + |_, group_index, new_value| { let current_value = self.values.get_bit(group_index); let value = (self.bool_fn)(current_value, new_value); self.values.set_bit(group_index, value); @@ -117,6 +117,9 @@ where } first_n } + EmitTo::NextBlock => { + return internal_err!("boolean_op does not support blocked groups") + } }; let nulls = self.null_state.build(emit_to); diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/group_index_operations.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/group_index_operations.rs new file mode 100644 index 000000000000..551d8cab6ea0 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/group_index_operations.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Useful tools for operating group index + +use std::fmt::Debug; + +/// Operations about group index parsing +/// +/// There are mainly 2 `group index` needing parsing: `flat` and `blocked`. +/// +/// # Flat group index +/// `flat group index` format is like: +/// +/// ```text +/// | block_offset(64bit) | +/// ``` +/// +/// It is used in `flat GroupValues/GroupAccumulator`, only a single block +/// exists, so its `block_id` is always 0, and use all 64 bits to store the +/// `block offset`. +/// +/// # Blocked group index +/// `blocked group index` format is like: +/// +/// ```text +/// | block_id(32bit) | block_offset(32bit) +/// ``` +/// +/// It is used in `blocked GroupValues/GroupAccumulator`, multiple blocks +/// exist, and we use high 32 bits to store `block_id`, and low 32 bit to +/// store `block_offset`. +/// +/// The `get_block_offset` method requires to return `block_offset` as u64, +/// that is for compatible for `flat group index`'s parsing. +/// +pub trait GroupIndexOperations: Debug + Send + Sync { + fn get_block_id(&self, group_index: usize) -> usize; + + fn get_block_offset(&self, group_index: usize) -> usize; +} + +#[derive(Debug)] +pub struct BlockedGroupIndexOperations { + block_size: usize, + exponent: usize, +} + +impl BlockedGroupIndexOperations { + #[inline] + pub fn new(block_size: usize) -> Self { + assert!( + block_size.is_power_of_two(), + "block size must be power of two" + ); + let exponent = block_size.trailing_zeros() as usize; + Self { + block_size, + exponent, + } + } +} + +impl GroupIndexOperations for BlockedGroupIndexOperations { + #[inline] + fn get_block_id(&self, group_index: usize) -> usize { + group_index >> self.exponent + } + + #[inline] + fn get_block_offset(&self, group_index: usize) -> usize { + group_index & (self.block_size - 1) + } +} + +#[derive(Debug)] +pub struct FlatGroupIndexOperations; + +impl GroupIndexOperations for FlatGroupIndexOperations { + #[inline] + fn get_block_id(&self, _group_index: usize) -> usize { + 0 + } + + #[inline] + fn get_block_offset(&self, group_index: usize) -> usize { + group_index + } +} diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index 078982c983fc..a96c395ec7ae 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -26,7 +26,8 @@ use arrow::datatypes::DataType; use datafusion_common::{internal_datafusion_err, DataFusionError, Result}; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; -use super::accumulate::NullState; +use crate::aggregate::groups_accumulator::accumulate::NullStateAdapter; +use crate::aggregate::groups_accumulator::blocks::{Blocks, GeneralBlocks}; /// An accumulator that implements a single operation over /// [`ArrowPrimitiveType`] where the accumulated state is the same as @@ -43,8 +44,8 @@ where T: ArrowPrimitiveType + Send, F: Fn(&mut T::Native, T::Native) + Send + Sync, { - /// values per group, stored as the native type - values: Vec, + /// Values per group, stored as the native type + values: GeneralBlocks, /// The output type (needed for Decimal precision and scale) data_type: DataType, @@ -53,7 +54,7 @@ where starting_value: T::Native, /// Track nulls in the input / filters - null_state: NullState, + null_state: NullStateAdapter, /// Function that computes the primitive result prim_fn: F, @@ -66,9 +67,9 @@ where { pub fn new(data_type: &DataType, prim_fn: F) -> Self { Self { - values: vec![], + values: Blocks::new(None), data_type: data_type.clone(), - null_state: NullState::new(), + null_state: NullStateAdapter::new(None), starting_value: T::default_value(), prim_fn, } @@ -96,8 +97,8 @@ where assert_eq!(values.len(), 1, "single argument to update_batch"); let values = values[0].as_primitive::(); - // update values - self.values.resize(total_num_groups, self.starting_value); + // Expand to ensure values are large enough + self.values.expand(total_num_groups, self.starting_value); // NullState dispatches / handles tracking nulls and groups that saw no values self.null_state.accumulate( @@ -105,8 +106,8 @@ where values, opt_filter, total_num_groups, - |group_index, new_value| { - let value = &mut self.values[group_index]; + |block_id, block_offset, new_value| { + let value = self.values.get_mut(block_id, block_offset); (self.prim_fn)(value, new_value); }, ); @@ -115,7 +116,9 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let values = emit_to.take_needed(&mut self.values); + let values = self.values.emit(emit_to).ok_or_else(|| { + internal_datafusion_err!("try to evaluate empty accumulator") + })?; let nulls = self.null_state.build(emit_to); let values = PrimitiveArray::::new(values.into(), Some(nulls)) // no copy .with_data_type(self.data_type.clone()); @@ -196,6 +199,24 @@ where } fn size(&self) -> usize { - self.values.capacity() * size_of::() + self.null_state.size() + let values_cap = self.values.iter().map(|b| b.capacity()).sum::(); + let values_size = values_cap * size_of::(); + values_size + self.null_state.size() + } + + fn supports_blocked_groups(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + block_size + .as_ref() + .map(|blk_size| assert!(blk_size.is_power_of_two())); + + self.values.clear(); + self.values = Blocks::new(block_size); + self.null_state = NullStateAdapter::new(block_size); + + Ok(()) } } diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 798a039f50b1..86411a27be1b 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -40,7 +40,7 @@ use datafusion_expr::{ ReversedUDAF, Signature, }; -use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::FlatNullState; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::{ filtered_null_mask, set_nulls, }; @@ -533,7 +533,7 @@ where sums: Vec, /// Track nulls in the input / filters - null_state: NullState, + null_state: FlatNullState, /// Function that computes the final average (value / count) avg_fn: F, @@ -555,7 +555,7 @@ where sum_data_type: sum_data_type.clone(), counts: vec![], sums: vec![], - null_state: NullState::new(), + null_state: FlatNullState::new(), avg_fn, } } @@ -579,12 +579,16 @@ where // increment counts, update sums self.counts.resize(total_num_groups, 0); self.sums.resize(total_num_groups, T::default_value()); + + // `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator` + // still not support blocked groups. + // More details can see `GroupsAccumulator::supports_blocked_groups`. self.null_state.accumulate( group_indices, values, opt_filter, total_num_groups, - |group_index, new_value| { + |_, group_index, new_value| { let sum = &mut self.sums[group_index]; *sum = sum.add_wrapping(new_value); @@ -662,24 +666,31 @@ where let partial_sums = values[1].as_primitive::(); // update counts with partial counts self.counts.resize(total_num_groups, 0); + + // `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator` + // still not support blocked groups. + // More details can see `GroupsAccumulator::supports_blocked_groups`. self.null_state.accumulate( group_indices, partial_counts, opt_filter, total_num_groups, - |group_index, partial_count| { + |_, group_index, partial_count| { self.counts[group_index] += partial_count; }, ); // update sums self.sums.resize(total_num_groups, T::default_value()); + // `block_id` is ignored in `value_fn`, because `AvgGroupsAccumulator` + // still not support blocked groups. + // More details can see `GroupsAccumulator::supports_blocked_groups`. self.null_state.accumulate( group_indices, partial_sums, opt_filter, total_num_groups, - |group_index, new_value: ::Native| { + |_, group_index, new_value: ::Native| { let sum = &mut self.sums[group_index]; *sum = sum.add_wrapping(new_value); }, diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index ac57256ce882..61dd06b4028d 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -38,7 +38,7 @@ use log::debug; use crate::covariance::CovarianceAccumulator; use crate::stddev::StddevAccumulator; -use datafusion_common::{plan_err, Result, ScalarValue}; +use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; use datafusion_expr::{ function::{AccumulatorArgs, StateFieldsArgs}, type_coercion::aggregates::NUMERICS, @@ -448,6 +448,9 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { let n = match emit_to { EmitTo::All => self.count.len(), EmitTo::First(n) => n, + EmitTo::NextBlock => { + return internal_err!("correlation does not support blocked groups") + } }; let mut values = Vec::with_capacity(n); @@ -501,6 +504,9 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { let n = match emit_to { EmitTo::All => self.count.len(), EmitTo::First(n) => n, + EmitTo::NextBlock => { + return internal_err!("correlation does not support blocked groups") + } }; Ok(vec![ diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 530b4620809b..8ea067e6916c 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -427,18 +427,21 @@ where }) } - fn take_orderings(&mut self, emit_to: EmitTo) -> Vec> { + fn take_orderings(&mut self, emit_to: EmitTo) -> Result>> { let result = emit_to.take_needed(&mut self.orderings); match emit_to { EmitTo::All => self.size_of_orderings = 0, EmitTo::First(_) => { self.size_of_orderings -= - result.iter().map(ScalarValue::size_of_vec).sum::() + result.iter().map(ScalarValue::size_of_vec).sum::(); + } + EmitTo::NextBlock => { + return internal_err!("first_last does not support blocked groups") } } - result + Ok(result) } fn take_need( @@ -460,6 +463,9 @@ where } first_n } + EmitTo::NextBlock => { + unreachable!("this group values still not support blocked groups") + } } } @@ -512,17 +518,18 @@ where fn take_state( &mut self, emit_to: EmitTo, - ) -> (ArrayRef, Vec>, BooleanBuffer) { + ) -> Result<(ArrayRef, Vec>, BooleanBuffer)> { emit_to.take_needed(&mut self.min_of_each_group_buf.0); self.min_of_each_group_buf .1 .truncate(self.min_of_each_group_buf.0.len()); - ( + let orderings = self.take_orderings(emit_to)?; + Ok(( self.take_vals_and_null_buf(emit_to), - self.take_orderings(emit_to), + orderings, Self::take_need(&mut self.is_sets, emit_to), - ) + )) } // should be used in test only @@ -674,11 +681,11 @@ where } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - Ok(self.take_state(emit_to).0) + self.take_state(emit_to).map(|state| state.0) } fn state(&mut self, emit_to: EmitTo) -> Result> { - let (val_arr, orderings, is_sets) = self.take_state(emit_to); + let (val_arr, orderings, is_sets) = self.take_state(emit_to)?; let mut result = Vec::with_capacity(self.orderings.len() + 2); result.push(val_arr); diff --git a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs index 05321c2ff52d..f361ee8ad667 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_bytes.rs @@ -199,7 +199,7 @@ impl GroupsAccumulator for MinMaxBytesAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let (data_capacity, min_maxes) = self.inner.emit_to(emit_to); + let (data_capacity, min_maxes) = self.inner.emit_to(emit_to)?; // Convert the Vec of bytes to a vec of Strings (at no cost) fn bytes_to_str( @@ -494,13 +494,13 @@ impl MinMaxBytesState { /// /// - `data_capacity`: the total length of all strings and their contents, /// - `min_maxes`: the actual min/max values for each group - fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec>>) { + fn emit_to(&mut self, emit_to: EmitTo) -> Result<(usize, Vec>>)> { match emit_to { EmitTo::All => { - ( + Ok(( std::mem::take(&mut self.total_data_bytes), // reset total bytes and min_max std::mem::take(&mut self.min_max), - ) + )) } EmitTo::First(n) => { let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect(); @@ -509,7 +509,10 @@ impl MinMaxBytesState { .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0)) .sum(); self.total_data_bytes -= first_data_capacity; - (first_data_capacity, first_min_maxes) + Ok((first_data_capacity, first_min_maxes)) + } + EmitTo::NextBlock => { + internal_err!("min/max bytes does not support blocked groups") } } } diff --git a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs index 8038f2f01d90..e2ae29cace59 100644 --- a/datafusion/functions-aggregate/src/min_max/min_max_struct.rs +++ b/datafusion/functions-aggregate/src/min_max/min_max_struct.rs @@ -100,7 +100,7 @@ impl GroupsAccumulator for MinMaxStructAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let (_, min_maxes) = self.inner.emit_to(emit_to); + let (_, min_maxes) = self.inner.emit_to(emit_to)?; let fields = match &self.inner.data_type { DataType::Struct(fields) => fields, _ => return internal_err!("Data type is not a struct"), @@ -274,13 +274,13 @@ impl MinMaxStructState { /// /// - `data_capacity`: the total length of all strings and their contents, /// - `min_maxes`: the actual min/max values for each group - fn emit_to(&mut self, emit_to: EmitTo) -> (usize, Vec>) { + fn emit_to(&mut self, emit_to: EmitTo) -> Result<(usize, Vec>)> { match emit_to { EmitTo::All => { - ( + Ok(( std::mem::take(&mut self.total_data_bytes), // reset total bytes and min_max std::mem::take(&mut self.min_max), - ) + )) } EmitTo::First(n) => { let first_min_maxes: Vec<_> = self.min_max.drain(..n).collect(); @@ -289,7 +289,10 @@ impl MinMaxStructState { .map(|opt| opt.as_ref().map(|s| s.len()).unwrap_or(0)) .sum(); self.total_data_bytes -= first_data_capacity; - (first_data_capacity, first_min_maxes) + Ok((first_data_capacity, first_min_maxes)) + } + EmitTo::NextBlock => { + internal_err!("min/max struct does not support blocked groups") } } } diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 49912954ac81..66fa71107d14 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -18,10 +18,10 @@ pub(crate) mod groups_accumulator { #[allow(unused_imports)] pub(crate) mod accumulate { - pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; + pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::FlatNullState; } pub use datafusion_functions_aggregate_common::aggregate::groups_accumulator::{ - accumulate::NullState, GroupsAccumulatorAdapter, + accumulate::FlatNullState, GroupsAccumulatorAdapter, }; } pub(crate) mod stats { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 9f795c81fa48..f7e5f7c7d2ea 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -47,7 +47,7 @@ pub mod execution_props { pub use datafusion_expr::var_provider::{VarProvider, VarType}; } -pub use aggregate::groups_accumulator::{GroupsAccumulatorAdapter, NullState}; +pub use aggregate::groups_accumulator::{FlatNullState, GroupsAccumulatorAdapter}; pub use analysis::{analyze, AnalysisContext, ExprBoundaries}; pub use equivalence::{ calculate_union, AcrossPartitions, ConstExpr, EquivalenceProperties, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 4f58b575f3a0..263794be293a 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -52,6 +52,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index ce56ca4f7dfd..63a54a4da9a2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -24,7 +24,7 @@ use arrow::array::types::{ }; use arrow::array::{downcast_primitive, ArrayRef, RecordBatch}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::EmitTo; @@ -110,6 +110,51 @@ pub(crate) trait GroupValues: Send { /// Clear the contents and shrink the capacity to the size of the batch (free up memory usage) fn clear_shrink(&mut self, batch: &RecordBatch); + + /// Returns `true` if this accumulator supports blocked groups. + /// + /// Blocked groups(or called blocked management approach) is an optimization + /// to reduce the cost of managing aggregation intermediate states. + /// + /// Here is brief introduction for two states management approaches: + /// - Blocked approach, states are stored and managed in multiple `Vec`s, + /// we call it `Block`s. Organize like this is for avoiding to resize `Vec` + /// and allocate a new `Vec` instead to reduce cost and get better performance. + /// When locating data in `Block`s, we need to use `block_id` to locate the + /// needed `Block` at first, and use `block_offset` to locate the needed + /// data in `Block` after. + /// + /// - Single approach, all states are stored and managed in a single large `Block`. + /// So when locating data, `block_id` will always be 0, and we only need `block_offset` + /// to locate data in the single `Block`. + /// + /// More details can see: + /// + /// + fn supports_blocked_groups(&self) -> bool { + false + } + + /// Alter the block size in the `group values` + /// + /// If the target block size is `None`, it will use a single big + /// block(can think it a `Vec`) to manage the state. + /// + /// If the target block size` is `Some(blk_size)`, it will try to + /// set the block size to `blk_size`, and the try will only success + /// when the `group values` has supported blocked mode. + /// + /// NOTICE: After altering block size, all data in existing group values will be cleared. + /// + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + if block_size.is_some() { + return Err(DataFusionError::NotImplemented( + "this group values doesn't support blocked mode yet".to_string(), + )); + } + + Ok(()) + } } /// Return a specialized implementation of [`GroupValues`] for the given schema. diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index dee482cab186..92f292f8bea2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -40,7 +40,7 @@ use arrow::datatypes::{ UInt8Type, }; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::{not_impl_err, DataFusionError, Result}; +use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; @@ -1154,6 +1154,11 @@ impl GroupValues for GroupValuesColumn { output } + EmitTo::NextBlock => { + return internal_err!( + "group_values_column does not support blocked groups" + ) + } }; // TODO: Materialize dictionaries in group keys (#7647) diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index aa9eee5157b6..2d8cb938ca6a 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -22,7 +22,7 @@ use arrow::compute::cast; use arrow::datatypes::{DataType, SchemaRef}; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::hash_utils::create_hashes; -use datafusion_common::Result; +use datafusion_common::{internal_err, Result}; use datafusion_execution::memory_pool::proxy::{HashTableAllocExt, VecAllocExt}; use datafusion_expr::EmitTo; use hashbrown::hash_table::HashTable; @@ -230,6 +230,9 @@ impl GroupValues for GroupValuesRows { }); output } + EmitTo::NextBlock => { + return internal_err!("group_values_rows does not support blocked groups") + } }; // TODO: Materialize dictionaries in group keys diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index 9686b8c3521d..f2f04efbc1cf 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -17,6 +17,7 @@ use crate::aggregates::group_values::GroupValues; use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch}; +use datafusion_common::internal_err; use datafusion_expr::EmitTo; use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType}; use std::mem::size_of; @@ -116,6 +117,11 @@ impl GroupValues for GroupValuesByes { emit_group_values } + EmitTo::NextBlock => { + return internal_err!( + "group_values_bytes does not support blocked groups" + ) + } }; Ok(vec![group_values]) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index be9a0334e3ee..07ab92f32826 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -17,6 +17,7 @@ use crate::aggregates::group_values::GroupValues; use arrow::array::{Array, ArrayRef, RecordBatch}; +use datafusion_common::internal_err; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; @@ -117,6 +118,11 @@ impl GroupValues for GroupValuesBytesView { emit_group_values } + EmitTo::NextBlock => { + return internal_err!( + "group_values_bytes_view does not support blocked groups" + ) + } }; Ok(vec![group_values]) diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index 279caa50b0a6..383e87b2fcd6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -24,12 +24,16 @@ use arrow::array::{ }; use arrow::datatypes::{i256, DataType}; use arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_expr::EmitTo; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::blocks::EmitBlocksState; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::group_index_operations::{ + BlockedGroupIndexOperations, FlatGroupIndexOperations, GroupIndexOperations, +}; use half::f16; use hashbrown::hash_table::HashTable; -use std::mem::size_of; +use std::mem::{self, size_of}; use std::sync::Arc; /// A trait to allow hashing of floating point numbers @@ -81,6 +85,7 @@ hash_float!(f16, f32, f64); pub struct GroupValuesPrimitive { /// The data type of the output array data_type: DataType, + /// Stores the `(group_index, hash)` based on the hash of its value /// /// We also store `hash` is for reducing cost of rehashing. Such cost @@ -89,25 +94,67 @@ pub struct GroupValuesPrimitive { /// /// map: HashTable<(usize, u64)>, + /// The group index of the null value if any null_group: Option, + /// The values for each group index - values: Vec, + values: Vec>, + /// The random state used to generate hashes random_state: RandomState, + + /// Block size of current `GroupValues` if exist: + /// - If `None`, it means block optimization is disabled, + /// all `group values`` will be stored in a single `Vec` + /// + /// - If `Some(blk_size)`, it means block optimization is enabled, + /// `group values` will be stored in multiple `Vec`s, and each + /// `Vec` if of `blk_size` len, and we call it a `block` + /// + block_size: Option, + + /// Number of current storing groups + /// + /// We maintain it to avoid the expansive dynamic computation of + /// `groups number` and `target group index` in `blocked approach` + /// + /// Especially the computation of `target group index`, we need to + /// perform it on `row-level`, it is actually very very expansive... + /// + /// So Even it will introduce some complexity of maintain, it still + /// be worthy to do that. + /// + total_num_groups: usize, + + /// Flag used in emitting in `blocked approach` + /// Mark if it is during blocks emitting, if so states can't + /// be updated until all blocks are emitted + emit_state: EmitBlocksState>>, } impl GroupValuesPrimitive { pub fn new(data_type: DataType) -> Self { assert!(PrimitiveArray::::is_compatible(&data_type)); + + // As a optimization, we ensure the `single block` always exist + // in flat mode, it can eliminate an expansive row-level empty checking Self { data_type, map: HashTable::with_capacity(128), - values: Vec::with_capacity(128), + values: vec![vec![]], null_group: None, random_state: Default::default(), + block_size: None, + total_num_groups: 0, + emit_state: EmitBlocksState::Init, } } + + #[inline] + fn is_emitting(&self) -> bool { + self.emit_state.is_emitting() + } } impl GroupValues for GroupValuesPrimitive @@ -115,51 +162,70 @@ where T::Native: HashValue, { fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec) -> Result<()> { - assert_eq!(cols.len(), 1); - groups.clear(); - - for v in cols[0].as_primitive::() { - let group_id = match v { - None => *self.null_group.get_or_insert_with(|| { - let group_id = self.values.len(); - self.values.push(Default::default()); - group_id - }), - Some(key) => { - let state = &self.random_state; - let hash = key.hash(state); - let insert = self.map.entry( - hash, - |&(g, _)| unsafe { self.values.get_unchecked(g).is_eq(key) }, - |&(_, h)| h, - ); + if self.is_emitting() { + return internal_err!("can not update groups during blocks emitting"); + } - match insert { - hashbrown::hash_table::Entry::Occupied(o) => o.get().0, - hashbrown::hash_table::Entry::Vacant(v) => { - let g = self.values.len(); - v.insert((g, hash)); - self.values.push(key); - g - } - } + if let Some(block_size) = self.block_size { + let before_add_group = |group_values: &mut Vec>| { + if group_values.is_empty() + || group_values.last().unwrap().len() == block_size + { + let new_block = Vec::with_capacity(block_size); + group_values.push(new_block); } }; - groups.push(group_id) + + let group_index_operation = BlockedGroupIndexOperations::new(block_size); + self.get_or_create_groups( + cols, + groups, + before_add_group, + group_index_operation, + ) + } else { + let group_index_operation = FlatGroupIndexOperations; + self.get_or_create_groups( + cols, + groups, + |_: &mut Vec>| {}, + group_index_operation, + ) } - Ok(()) } fn size(&self) -> usize { - self.map.capacity() * size_of::<(usize, u64)>() + self.values.allocated_size() + let map_size = self.map.capacity() * size_of::<(u64, u64)>(); + let values_size = if !self.values.is_empty() { + // Last block may be non-full, so we compute size with two steps: + // - Compute size of first `n - 1` full blocks + // - Add the size of last block + let num_blocks = self.values.len(); + let full_blocks_size = (num_blocks - 1) + * self + .values + .first() + .map(|blk| blk.len() * blk.allocated_size()) + .unwrap_or_default(); + let last_block_size = self + .values + .last() + .map(|blk| blk.len() * blk.allocated_size()) + .unwrap_or_default(); + full_blocks_size + last_block_size + } else { + 0 + }; + + map_size + values_size } fn is_empty(&self) -> bool { - self.values.is_empty() + self.len() == 0 } fn len(&self) -> usize { - self.values.len() + self.total_num_groups } fn emit(&mut self, emit_to: EmitTo) -> Result> { @@ -179,24 +245,42 @@ where } let array: PrimitiveArray = match emit_to { + // =============================================== + // Emitting in flat mode + // =============================================== EmitTo::All => { + assert!( + self.block_size.is_none(), + "only support EmitTo::All in flat mode" + ); + self.map.clear(); - build_primitive(std::mem::take(&mut self.values), self.null_group.take()) + build_primitive( + mem::take(self.values.last_mut().unwrap()), + self.null_group.take(), + ) } + EmitTo::First(n) => { - self.map.retain(|entry| { + assert!( + self.block_size.is_none(), + "only support EmitTo::First in flat mode" + ); + + self.map.retain(|bucket| { // Decrement group index by n - let group_idx = entry.0; + let group_idx = bucket.0; match group_idx.checked_sub(n) { // Group index was >= n, shift value down Some(sub) => { - entry.0 = sub; + bucket.0 = sub; true } // Group index was < n, so remove from table None => false, } }); + let null_group = match &mut self.null_group { Some(v) if *v >= n => { *v -= n; @@ -205,20 +289,475 @@ where Some(_) => self.null_group.take(), None => None, }; - let mut split = self.values.split_off(n); - std::mem::swap(&mut self.values, &mut split); + + let single_block = self.values.last_mut().unwrap(); + let mut split = single_block.split_off(n); + mem::swap(single_block, &mut split); build_primitive(split, null_group) } + + // =============================================== + // Emitting in blocked mode + // =============================================== + EmitTo::NextBlock => { + let (total_num_groups, block_size) = if !self.is_emitting() { + // Similar as `EmitTo:All`, we will clear the old index infos(like `map`) + // TODO: I think, we should set `total_num_groups` to 0 when blocks + // emitting starts. because it is used to represent number of `exist groups`, + // and I think `emitting groups` actually not exist anymore. + // We can't do it due to we use `total_num_groups(len)` to judge when we + // have emitted all blocks now. Actually we should judge it by checking + // if `None` is return from `emit`. + self.map.clear(); + ( + self.total_num_groups, + self.block_size + .expect("only support EmitTo::Next in blocked group values"), + ) + } else { + (0, 0) + }; + + let init_block_builder = || mem::take(&mut self.values); + let emit_block = self + .emit_state + .emit_block(total_num_groups, block_size, init_block_builder) + .ok_or_else(|| { + internal_datafusion_err!("try to evaluate empty group values") + })?; + + // Check if `null` is in current block: + // - If so, we take it + // - If not, we shift down the group index + let block_size = self.block_size.unwrap(); + let null_group = match &mut self.null_group { + Some(v) if *v >= block_size => { + *v -= block_size; + None + } + Some(_) => self.null_group.take(), + None => None, + }; + + let null_idx = null_group.map(|group_idx| { + let group_index_operation = + BlockedGroupIndexOperations::new(block_size); + group_index_operation.get_block_offset(group_idx) + }); + + build_primitive(emit_block, null_idx) + } }; + self.total_num_groups -= array.len(); + Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))]) } fn clear_shrink(&mut self, batch: &RecordBatch) { let count = batch.num_rows(); - self.values.clear(); - self.values.shrink_to(count); + + // Clear values + // TODO: Only reserve room of values in `flat mode` currently, + // we may need to consider it again when supporting spilling + // for `blocked mode`. + if self.block_size.is_none() { + let single_block = self.values.last_mut().unwrap(); + single_block.clear(); + single_block.shrink_to(count); + } else { + self.values.clear(); + } + + // Clear mappings self.map.clear(); self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared + self.null_group = None; + + // Clear helping structures + self.emit_state = EmitBlocksState::Init; + self.total_num_groups = 0; + } + + fn supports_blocked_groups(&self) -> bool { + true + } + + fn alter_block_size(&mut self, block_size: Option) -> Result<()> { + block_size + .as_ref() + .map(|blk_size| assert!(blk_size.is_power_of_two())); + + // Clear values + self.values.clear(); + + // Clear mappings + self.map.clear(); + self.null_group = None; + + // Clear helping structures + self.emit_state = EmitBlocksState::Init; + self.total_num_groups = 0; + + // As mentioned above, we ensure the `single block` always exist + // in `flat mode` + if block_size.is_none() { + self.values.push(Vec::new()); + } + self.block_size = block_size; + + Ok(()) + } +} + +impl GroupValuesPrimitive +where + T::Native: HashValue, +{ + fn get_or_create_groups( + &mut self, + cols: &[ArrayRef], + groups: &mut Vec, + mut before_add_group: F, + group_index_operation: O, + ) -> Result<()> + where + F: FnMut(&mut Vec>), + O: GroupIndexOperations, + { + assert_eq!(cols.len(), 1); + groups.clear(); + + let block_size = self.block_size.unwrap_or_default(); + for v in cols[0].as_primitive::() { + let group_index = match v { + None => *self.null_group.get_or_insert_with(|| { + // Actions before add new group like checking if room is enough + before_add_group(&mut self.values); + + // Get block infos and update block, + // we need `current block` and `next offset in block` + let current_block = self.values.last_mut().unwrap(); + current_block.push(Default::default()); + + // Compute group index + let group_index = self.total_num_groups; + self.total_num_groups += 1; + + // Get group index and finish actions needed it + group_index + }), + Some(key) => { + let state = &self.random_state; + let hash = key.hash(state); + let insert = self.map.entry( + hash, + |g| unsafe { + g.1 == hash && { + let block_id = group_index_operation.get_block_id(g.0); + let block_offset = + group_index_operation.get_block_offset(g.0); + self.values + .get_unchecked(block_id) + .get_unchecked(block_offset) + .is_eq(key) + } + }, + |g| g.1, + ); + + match insert { + hashbrown::hash_table::Entry::Occupied(o) => o.get().0, + hashbrown::hash_table::Entry::Vacant(v) => { + // Actions before add new group like checking if room is enough + before_add_group(&mut self.values); + + // Get block infos and update block, + // we need `current block` and `next offset in block` + let current_block = self.values.last_mut().unwrap(); + current_block.push(key); + + // Compute group index + let group_index = self.total_num_groups; + self.total_num_groups += 1; + + v.insert((group_index, hash)); + group_index + } + } + } + }; + + groups.push(group_index) + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + use std::sync::Arc; + + use crate::aggregates::group_values::single_group_by::primitive::GroupValuesPrimitive; + use crate::aggregates::group_values::GroupValues; + use arrow::array::{Array, AsArray, UInt32Array}; + use arrow::datatypes::{DataType, UInt32Type}; + use datafusion_expr::EmitTo; + + #[test] + fn test_flat_primitive_group_values() { + // Will cover such insert cases: + // 1.1 Non-null row + distinct + // 1.2 Null row + distinct + // 1.3 Non-null row + non-distinct + // 1.4 Null row + non-distinct + // + // Will cover such emit cases: + // 2.1 Emit first n + // 2.2 Emit all + // 2.3 Insert again + emit + // + + let mut group_values = GroupValuesPrimitive::::new(DataType::UInt32); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + + let mut group_indices = vec![]; + let data0 = Arc::new(UInt32Array::from(vec![ + Some(1), + None, + Some(1), + None, + Some(2), + Some(3), + ])); + let data1 = Arc::new(UInt32Array::from(vec![ + Some(3), + Some(3), + None, + Some(4), + Some(5), + ])); + + // 1. Insert case 1.1, 1.3, 1.4 + Emit case 2.1, 2.2 + + // 1.1 Insert data0, contexts to check: + // - Exist num groups: 4 + // - Exist groups empty: false + group_values + .intern(&[Arc::clone(&data0) as _], &mut group_indices) + .unwrap(); + assert_eq!(group_values.len(), 4); + assert!(!group_values.is_empty()); + + // 1.2 Emit first 3 groups, contexts to check: + // - Exist num groups: 1 + // - Exist groups empty: false + // - Emitted groups are top 3 data sorted by their before group indices + let mut expected = BTreeMap::new(); + for (&group_index, value) in group_indices.iter().zip(data0.iter()) { + expected.insert(group_index, value); + } + let mut expected = expected.into_iter().collect::>(); + let last_group_index = expected.len() - 1; + let last_value = expected.last().unwrap().1; + expected.pop(); + + let emit_result = group_values.emit(EmitTo::First(3)).unwrap(); + assert_eq!(group_values.len(), 1); + assert!(!group_values.is_empty()); + let actual = emit_result[0] + .as_primitive::() + .iter() + .enumerate() + .map(|(group_idx, val)| { + assert!(group_idx < last_group_index); + (group_idx, val) + }) + .collect::>(); + + assert_eq!(expected, actual); + + // 1.3 Emit last 1 group(by emitting all), contexts to check: + // - Exist num groups: 0 + // - Exist groups empty: true + // - Emitted group equal to `last_value` + let emit_result = group_values.emit(EmitTo::All).unwrap(); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + let emit_col = emit_result[0].as_primitive::(); + let actual = emit_col.is_valid(0).then(|| emit_col.value(0)); + assert_eq!(actual, last_value); + + // 2. Insert case 1.1, 1.2, 1.3 + Emit case 2.1, 2.2, 2.3 + + // 2.1 Insert data1, contexts to check: + // - Exist num groups: 4 + // - Exist groups empty: false + group_values + .intern(&[Arc::clone(&data1) as _], &mut group_indices) + .unwrap(); + assert_eq!(group_values.len(), 4); + assert!(!group_values.is_empty()); + + // 2.2 Emit first 2 groups, contexts to check: + // - Exist num groups: 2 + // - Exist groups empty: false + // - Emitted groups are top 2 data sorted by their before group indices + let mut expected = BTreeMap::new(); + for (&group_index, value) in group_indices.iter().zip(data1.iter()) { + expected.insert(group_index, value); + } + let mut expected = expected.into_iter().collect::>(); + let mut last_twos = Vec::new(); + let last_value0 = expected.pop().unwrap().1; + let last_value1 = expected.pop().unwrap().1; + last_twos.extend([(0, last_value1), (1, last_value0)]); + + let emit_result = group_values.emit(EmitTo::First(2)).unwrap(); + assert_eq!(group_values.len(), 2); + assert!(!group_values.is_empty()); + let actual = emit_result[0] + .as_primitive::() + .iter() + .enumerate() + .map(|(group_idx, val)| { + assert!(group_idx < last_group_index); + (group_idx, val) + }) + .collect::>(); + + assert_eq!(actual, expected); + + // 2.3 Emit last 2 group(by emitting all), contexts to check: + // - Exist num groups: 0 + // - Exist groups empty: true + // - Emitted groups equal to `last_twos` + let emit_result = group_values.emit(EmitTo::All).unwrap(); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + let actual = emit_result[0] + .as_primitive::() + .iter() + .enumerate() + .map(|(group_idx, val)| { + assert!(group_idx < last_group_index); + (group_idx, val) + }) + .collect::>(); + + assert_eq!(actual, last_twos); + } + + #[test] + fn test_blocked_primitive_group_values() { + // Will cover such insert cases: + // 1.1 Non-null row + distinct + // 1.2 Null row + distinct + // 1.3 Non-null row + non-distinct + // 1.4 Null row + non-distinct + // + // Will cover such emit cases: + // 2.1 Emit block + `num_groups % block_size == 0` + // 2.2 Insert again + emit block + `num_groups % block_size != 0` + // + + let mut group_values = GroupValuesPrimitive::::new(DataType::UInt32); + let block_size = 2; + group_values.alter_block_size(Some(block_size)).unwrap(); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + + let mut group_indices = vec![]; + let data0 = Arc::new(UInt32Array::from(vec![ + Some(1), + None, + Some(1), + None, + Some(2), + Some(3), + ])); + let data1 = Arc::new(UInt32Array::from(vec![Some(3), None, Some(4)])); + + // 1. Insert case 1.1, 1.3, 1.4 + Emit case 2.1 + + // 1.1 Insert data0, contexts to check: + // - Exist num groups: 4 + // - Exist groups empty: false + group_values + .intern(&[Arc::clone(&data0) as _], &mut group_indices) + .unwrap(); + assert_eq!(group_values.len(), 4); + assert!(!group_values.is_empty()); + + // 1.2 Emit blocks, contexts to check: + // - Exist num groups: 0 + // - Exist groups empty: true + // - Emitting flag check, + // - Emitted block len check + // - Emitted groups are equal to data sorted by their before group indices + let mut expected = BTreeMap::new(); + for (&group_index, value) in group_indices.iter().zip(data0.iter()) { + expected.insert(group_index, value); + } + let expected = expected.into_iter().collect::>(); + + let emit_result0 = group_values.emit(EmitTo::NextBlock).unwrap(); + assert_eq!(group_values.len(), 2); + assert!(!group_values.is_empty()); + assert!(group_values.is_emitting()); + assert_eq!(emit_result0[0].len(), block_size); + + let emit_result1 = group_values.emit(EmitTo::NextBlock).unwrap(); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + assert!(!group_values.is_emitting()); + assert_eq!(emit_result1[0].len(), block_size); + + let iter0 = emit_result0[0].as_primitive::().iter(); + let iter1 = emit_result1[0].as_primitive::().iter(); + let actual = iter0.chain(iter1).enumerate().collect::>(); + assert_eq!(actual, expected); + + // 2. Insert case 1.1, 1.2 + Emit case 2.2 + + // 2.1 Insert data0, contexts to check: + // - Exist num groups: 3 + // - Exist groups empty: false + group_values + .intern(&[Arc::clone(&data1) as _], &mut group_indices) + .unwrap(); + assert_eq!(group_values.len(), 3); + assert!(!group_values.is_empty()); + + // 1.2 Emit blocks, contexts to check: + // - Exist num groups: 0 + // - Exist groups empty: true + // - Emitting flag check, + // - Emitted block len check + // - Emitted groups are equal to data sorted by their before group indices + let mut expected = BTreeMap::new(); + for (&group_index, value) in group_indices.iter().zip(data1.iter()) { + expected.insert(group_index, value); + } + let expected = expected.into_iter().collect::>(); + + let emit_result0 = group_values.emit(EmitTo::NextBlock).unwrap(); + assert_eq!(group_values.len(), 1); + assert!(!group_values.is_empty()); + assert!(group_values.is_emitting()); + assert_eq!(emit_result0[0].len(), block_size); + + let emit_result1 = group_values.emit(EmitTo::NextBlock).unwrap(); + assert_eq!(group_values.len(), 0); + assert!(group_values.is_empty()); + assert!(!group_values.is_emitting()); + assert_eq!(emit_result1[0].len(), 1); + + let iter0 = emit_result0[0].as_primitive::().iter(); + let iter1 = emit_result1[0].as_primitive::().iter(); + let actual = iter0.chain(iter1).enumerate().collect::>(); + assert_eq!(actual, expected); } } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 232565a04466..9cc4da92b766 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -41,7 +41,7 @@ use arrow::datatypes::SchemaRef; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::proxy::VecAllocExt; -use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion_execution::memory_pool::{MemoryConsumer, MemoryLimit, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::{EmitTo, GroupsAccumulator}; use datafusion_physical_expr::expressions::Column; @@ -62,6 +62,11 @@ pub(crate) enum ExecutionState { /// When producing output, the remaining rows to output are stored /// here and are sliced off as needed in batch_size chunks ProducingOutput(RecordBatch), + /// Producing output block by block. + /// + /// It is the blocked version `ProducingOutput` and will be used + /// when blocked optimization is enabled. + ProducingBlocks(Option), /// Produce intermediate aggregate state for each input row without /// aggregation. /// @@ -339,6 +344,35 @@ impl SkipAggregationProbe { /// │ 2 │ 2 │ 3.0 │ │ 2 │ 2 │ 3.0 │ └────────────┘ /// └─────────────────┘ └─────────────────┘ /// ``` +/// +/// # Blocked approach for intermediate results +/// +/// An important optimization for [`group_values`] and [`accumulators`] +/// is to manage such intermediate results using the blocked approach. +/// +/// In the original method, intermediate results are managed within a single large block +/// (can think of it as a Vec). As this block grows, it often triggers numerous +/// copies, resulting in poor performance. +/// +/// In contrast, the blocked approach allocates capacity for the block +/// based on a predefined block size firstly. +/// And when the block reaches its limit, we allocate a new block +/// (also with the same predefined block size based capacity) +/// instead of expanding the current one and copying the data. +/// This method eliminates unnecessary copies and significantly improves performance. +/// +/// You can find some implementation details(like how to locate data in such two approaches) +/// in [`GroupsAccumulator::supports_blocked_groups`] and [`GroupValues::supports_blocked_groups`]. +/// +/// And for a really detailed introduction to the design of blocked approach, maybe you can see [#7065]. +/// +/// The conditions that trigger the blocked groups optimization can be found in +/// [`maybe_enable_blocked_groups`]. +/// +/// [`group_values`]: Self::group_values +/// [`accumulators`]: Self::accumulators +/// [#7065]: +/// pub(crate) struct GroupedHashAggregateStream { // ======================================================================== // PROPERTIES: @@ -423,6 +457,9 @@ pub(crate) struct GroupedHashAggregateStream { /// current stream. skip_aggregation_probe: Option, + /// Have we enabled the blocked optimization for group values and accumulators + enable_blocked_groups: bool, + // ======================================================================== // EXECUTION RESOURCES: // Fields related to managing execution resources and monitoring performance. @@ -478,7 +515,7 @@ impl GroupedHashAggregateStream { }; // Instantiate the accumulators - let accumulators: Vec<_> = aggregate_exprs + let mut accumulators: Vec<_> = aggregate_exprs .iter() .map(create_group_accumulator) .collect::>()?; @@ -543,7 +580,7 @@ impl GroupedHashAggregateStream { ordering.as_ref(), )?; - let group_values = new_group_values(group_schema, &group_ordering)?; + let mut group_values = new_group_values(group_schema, &group_ordering)?; timer.done(); let exec_state = ExecutionState::ReadingInput; @@ -596,6 +633,15 @@ impl GroupedHashAggregateStream { None }; + // Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. + let enable_blocked_groups = maybe_enable_blocked_groups( + &context, + group_values.as_mut(), + &mut accumulators, + batch_size, + &group_ordering, + )?; + Ok(GroupedHashAggregateStream { schema: agg_schema, input, @@ -615,6 +661,7 @@ impl GroupedHashAggregateStream { spill_state, group_values_soft_limit: agg.limit, skip_aggregation_probe, + enable_blocked_groups, }) } } @@ -639,6 +686,54 @@ pub(crate) fn create_group_accumulator( } } +/// Check if we can enable the blocked optimization for `GroupValues` and `GroupsAccumulator`s. +/// The blocked optimization will be enabled when: +/// - When `enable_aggregation_blocked_groups` is true(default to true) +/// - It is not streaming aggregation(because blocked mode can't support Emit::first(exact n)) +/// - The spilling is disabled(still need to consider more to support it efficiently) +/// - The accumulator is not empty(I am still not sure about logic in this case) +/// - [`GroupValues::supports_blocked_groups`] and all [`GroupsAccumulator::supports_blocked_groups`] are true +/// +/// [`GroupValues::supports_blocked_groups`]: crate::aggregates::group_values::GroupValues::supports_blocked_groups +/// [`GroupsAccumulator::supports_blocked_groups`]: datafusion_expr::GroupsAccumulator::supports_blocked_groups +/// +// TODO: support blocked optimization in streaming, spilling, and maybe empty accumulators case? +fn maybe_enable_blocked_groups( + context: &TaskContext, + group_values: &mut dyn GroupValues, + accumulators: &mut [Box], + block_size: usize, + group_ordering: &GroupOrdering, +) -> Result { + if !context + .session_config() + .options() + .execution + .enable_aggregation_blocked_groups + || !matches!(group_ordering, GroupOrdering::None) + || accumulators.is_empty() + || !matches!(context.memory_pool().memory_limit(), MemoryLimit::Infinite) + { + return Ok(false); + } + + let group_values_supports_blocked = group_values.supports_blocked_groups(); + let accumulators_support_blocked = + accumulators.iter().all(|acc| acc.supports_blocked_groups()); + + match (group_values_supports_blocked, accumulators_support_blocked) { + (true, true) => { + let block_size = (block_size * 8).next_power_of_two(); + group_values.alter_block_size(Some(block_size))?; + accumulators + .iter_mut() + .try_for_each(|acc| acc.alter_block_size(Some(block_size)))?; + Ok(true) + } + _ => Ok(false), + } +} + impl Stream for GroupedHashAggregateStream { type Item = Result; @@ -802,6 +897,52 @@ impl Stream for GroupedHashAggregateStream { ))); } + ExecutionState::ProducingBlocks(None) => { + // Try to emit and then: + // - If found `Err`, throw it, end this stream abnormally + // - If found `None`, it means all blocks are polled, end this stream normally + // - If found `Some`, return it and wait next polling + let emit_result = self.emit(EmitTo::NextBlock, false); + let Ok(batch_opt) = emit_result else { + return Poll::Ready(Some(Err(emit_result.unwrap_err()))); + }; + + // If Emit + self.exec_state = if let Some(batch) = batch_opt { + ExecutionState::ProducingBlocks(Some(batch)) + } else { + if self.input_done { + ExecutionState::Done + } else if self.should_skip_aggregation() { + ExecutionState::SkippingAggregation + } else { + ExecutionState::ReadingInput + } + }; + } + + ExecutionState::ProducingBlocks(Some(batch)) => { + // slice off a part of the batch, if needed + let output_batch; + let size = self.batch_size; + (self.exec_state, output_batch) = if batch.num_rows() <= size { + (ExecutionState::ProducingBlocks(None), batch.clone()) + } else { + // output first batch_size rows + let size = self.batch_size; + let num_remaining = batch.num_rows() - size; + let remaining = batch.slice(size, num_remaining); + let output = batch.slice(0, size); + (ExecutionState::ProducingBlocks(Some(remaining)), output) + }; + // Empty record batches should not be emitted. + // They need to be treated as [`Option`]es and handled separately + debug_assert!(output_batch.num_rows() > 0); + return Poll::Ready(Some(Ok( + output_batch.record_output(&self.baseline_metrics) + ))); + } + ExecutionState::Done => { // release the memory reservation since sending back output batch itself needs // some memory reservation, so make some room for it. @@ -982,6 +1123,9 @@ impl GroupedHashAggregateStream { && self.update_memory_reservation().is_err() { assert_ne!(self.mode, AggregateMode::Partial); + // TODO: support spilling when blocked group optimization is on + // (`enable_blocked_groups` is true) + assert!(!self.enable_blocked_groups); self.spill()?; self.clear_shrink(batch); } @@ -1033,11 +1177,16 @@ impl GroupedHashAggregateStream { /// Currently only [`GroupOrdering::None`] is supported for early emitting. /// TODO: support group_ordering for early emitting fn emit_early_if_necessary(&mut self) -> Result<()> { + // TODO: support spilling when blocked group optimization is on + // (`enable_blocked_groups` is true) if self.group_values.len() >= self.batch_size && matches!(self.group_ordering, GroupOrdering::None) && self.update_memory_reservation().is_err() { assert_eq!(self.mode, AggregateMode::Partial); + // TODO: support spilling when blocked group optimization is on + // (`enable_blocked_groups` is true) + assert!(!self.enable_blocked_groups); let n = self.group_values.len() / self.batch_size * self.batch_size; if let Some(batch) = self.emit(EmitTo::First(n), false)? { self.exec_state = ExecutionState::ProducingOutput(batch); @@ -1100,9 +1249,16 @@ impl GroupedHashAggregateStream { let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); self.exec_state = if self.spill_state.spills.is_empty() { - let batch = self.emit(EmitTo::All, false)?; - batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput) + if !self.enable_blocked_groups { + let batch = self.emit(EmitTo::All, false)?; + batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput) + } else { + ExecutionState::ProducingBlocks(None) + } } else { + // TODO: support spilling when blocked group optimization is on + // (`enable_blocked_groups` is true) + assert!(!self.enable_blocked_groups); // If spill files exist, stream-merge them. self.update_merged_stream()?; ExecutionState::ReadingInput @@ -1130,9 +1286,13 @@ impl GroupedHashAggregateStream { fn switch_to_skip_aggregation(&mut self) -> Result<()> { if let Some(probe) = self.skip_aggregation_probe.as_mut() { if probe.should_skip() { - if let Some(batch) = self.emit(EmitTo::All, false)? { - self.exec_state = ExecutionState::ProducingOutput(batch); - }; + if !self.enable_blocked_groups { + if let Some(batch) = self.emit(EmitTo::All, false)? { + self.exec_state = ExecutionState::ProducingOutput(batch); + }; + } else { + self.exec_state = ExecutionState::ProducingBlocks(None); + } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 3a98a4d18523..4fe73fe357ac 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -217,6 +217,7 @@ datafusion.catalog.newlines_in_values false datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.enable_aggregation_blocked_groups true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.keep_partition_by_columns false @@ -327,6 +328,7 @@ datafusion.catalog.newlines_in_values false Specifies whether newlines in (quote datafusion.execution.batch_size 8192 Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics false Should DataFusion collect statistics after listing files +datafusion.execution.enable_aggregation_blocked_groups true Should DataFusion use a blocked approach to manage grouping state. By default, the blocked approach is used which allocates capacity based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) instead of expanding the current one and copying the data. If `false`, a single allocation approach is used, where values are managed within a single large memory block. As this block grows, it often triggers numerous copies, resulting in poor performance. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index fe9c57857bef..2db6f81d63d5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -98,6 +98,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.use_row_number_estimates_to_optimize_partitioning | false | Should DataFusion use row number estimates at the input to decide whether increasing parallelism is beneficial or not. By default, only exact row numbers (not estimates) are used for this decision. Setting this flag to `true` will likely produce better plans. if the source of statistics is accurate. We plan to make this the default in the future. | | datafusion.execution.enforce_batch_size_in_joins | false | Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. | | datafusion.execution.objectstore_writer_buffer_size | 10485760 | Size (bytes) of data buffer DataFusion uses when writing output files. This affects the size of the data chunks that are uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being written, it may be necessary to increase this size to avoid errors from the remote end point. | +| datafusion.execution.enable_aggregation_blocked_groups | true | Should DataFusion use a blocked approach to manage grouping state. By default, the blocked approach is used which allocates capacity based on a predefined block size firstly. When the block reaches its limit, we allocate a new block (also with the same predefined block size based capacity) instead of expanding the current one and copying the data. If `false`, a single allocation approach is used, where values are managed within a single large memory block. As this block grows, it often triggers numerous copies, resulting in poor performance. | | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible |