Skip to content

Commit 4df79c6

Browse files
committed
fix prune
1 parent bdfac61 commit 4df79c6

File tree

1 file changed

+167
-133
lines changed

1 file changed

+167
-133
lines changed

Diff for: src/query/storages/fuse/src/pruning_pipeline/column_oriented_block_prune.rs

+167-133
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717

1818
use async_channel::Sender;
1919
use chrono::DateTime;
20+
use databend_common_base::base::tokio::sync::OwnedSemaphorePermit;
2021
use databend_common_catalog::plan::block_id_in_segment;
2122
use databend_common_catalog::plan::PartInfoPtr;
2223
use databend_common_exception::ErrorCode;
@@ -36,6 +37,7 @@ use databend_storages_common_table_meta::meta::ColumnMeta;
3637
use databend_storages_common_table_meta::meta::ColumnMetaV0;
3738
use databend_storages_common_table_meta::meta::ColumnStatistics;
3839
use databend_storages_common_table_meta::meta::Compression;
40+
use futures_util::future;
3941

4042
use super::PrunedColumnOrientedSegmentMeta;
4143
use crate::pruning::BlockPruner;
@@ -96,163 +98,195 @@ impl AsyncSink for ColumnOrientedBlockPruneSink {
9698
let block_size_col = segment.block_size_col();
9799
let row_count_col = segment.row_count_col();
98100

101+
let pruning_runtime = &self.block_pruner.pruning_ctx.pruning_runtime;
102+
let pruning_semaphore = &self.block_pruner.pruning_ctx.pruning_semaphore;
103+
104+
let mut pruning_tasks = Vec::with_capacity(block_num);
105+
99106
for block_idx in 0..block_num {
100-
// 1. prune internal column
101-
let location_path = location_path_col.index(block_idx).unwrap();
107+
let location_path = location_path_col.index(block_idx).unwrap().to_string();
108+
109+
// Skip blocks that don't pass internal column pruning
102110
if self
103111
.block_pruner
104112
.pruning_ctx
105113
.internal_column_pruner
106114
.as_ref()
107-
.is_some_and(|pruner| !pruner.should_keep(BLOCK_NAME_COL_NAME, location_path))
115+
.is_some_and(|pruner| !pruner.should_keep(BLOCK_NAME_COL_NAME, &location_path))
108116
{
109117
continue;
110118
}
111119

112-
// 2. prune columns by range index
113-
let mut columns_stat = HashMap::with_capacity(self.column_ids.len());
114-
let mut columns_meta = HashMap::with_capacity(self.column_ids.len());
120+
// Clone necessary data for the async task
121+
let column_ids = self.column_ids.clone();
122+
let segment = segment.clone();
123+
let segment_location = segment_location.clone();
124+
let range_pruner = range_pruner.clone();
125+
let bloom_pruner = bloom_pruner.clone();
126+
let sender = self.sender.as_ref().unwrap().clone();
127+
let location_path = location_path.clone();
128+
let compression_col = compression_col.clone();
129+
let block_size_col = block_size_col.clone();
130+
let row_count_col = row_count_col.clone();
131+
let create_on_col = create_on_col.clone();
132+
let bloom_index_location_col = bloom_index_location_col.clone();
133+
let bloom_index_size_col = bloom_index_size_col.clone();
115134

116-
for column_id in &self.column_ids {
117-
if let Some(stat) = segment.stat_col(*column_id) {
118-
let stat = stat.index(block_idx).unwrap();
119-
let stat = stat.as_tuple().unwrap();
120-
let min = stat[0].to_owned();
121-
let max = stat[1].to_owned();
122-
let null_count = stat[2].as_number().unwrap().as_u_int64().unwrap();
123-
let in_memory_size = stat[3].as_number().unwrap().as_u_int64().unwrap();
124-
let distinct_of_values = match stat[4] {
125-
ScalarRef::Number(number_scalar) => {
126-
Some(*number_scalar.as_u_int64().unwrap())
135+
pruning_tasks.push(move |permit: OwnedSemaphorePermit| {
136+
Box::pin(async move {
137+
let _permit = permit;
138+
// 2. prune columns by range index
139+
let mut columns_stat = HashMap::with_capacity(column_ids.len());
140+
let mut columns_meta = HashMap::with_capacity(column_ids.len());
141+
142+
for column_id in &column_ids {
143+
if let Some(stat) = segment.stat_col(*column_id) {
144+
let stat = stat.index(block_idx).unwrap();
145+
let stat = stat.as_tuple().unwrap();
146+
let min = stat[0].to_owned();
147+
let max = stat[1].to_owned();
148+
let null_count = stat[2].as_number().unwrap().as_u_int64().unwrap();
149+
let in_memory_size = stat[3].as_number().unwrap().as_u_int64().unwrap();
150+
let distinct_of_values = match stat[4] {
151+
ScalarRef::Number(number_scalar) => {
152+
Some(*number_scalar.as_u_int64().unwrap())
153+
}
154+
ScalarRef::Null => None,
155+
_ => unreachable!(),
156+
};
157+
columns_stat.insert(
158+
*column_id,
159+
ColumnStatistics::new(
160+
min,
161+
max,
162+
*null_count,
163+
*in_memory_size,
164+
distinct_of_values,
165+
),
166+
);
127167
}
128-
ScalarRef::Null => None,
129-
_ => unreachable!(),
130-
};
131-
columns_stat.insert(
132-
*column_id,
133-
ColumnStatistics::new(
134-
min,
135-
max,
136-
*null_count,
137-
*in_memory_size,
138-
distinct_of_values,
139-
),
140-
);
141-
}
142-
}
168+
}
143169

144-
if !range_pruner.should_keep(&columns_stat, None) {
145-
continue;
146-
}
170+
if !range_pruner.should_keep(&columns_stat, None) {
171+
return Ok::<_, ()>(());
172+
}
173+
174+
let row_count = row_count_col[block_idx];
175+
let compression = Compression::from_u8(compression_col[block_idx]);
176+
let block_size = block_size_col[block_idx];
147177

148-
let row_count = row_count_col[block_idx];
149-
let compression = compression_col[block_idx];
150-
let compression = Compression::from_u8(compression);
151-
let block_size = block_size_col[block_idx];
178+
// Bloom filter pruning
179+
if let Some(bloom_pruner) = bloom_pruner {
180+
let location_scalar = bloom_index_location_col.index(block_idx).unwrap();
181+
let index_location = match location_scalar {
182+
ScalarRef::Null => None,
183+
ScalarRef::Tuple(tuple) => {
184+
if tuple.len() != 2 {
185+
unreachable!()
186+
}
187+
Some((
188+
tuple[0].as_string().unwrap().to_string(),
189+
*tuple[1].as_number().unwrap().as_u_int64().unwrap(),
190+
))
191+
}
192+
_ => unreachable!(),
193+
};
194+
let index_size = bloom_index_size_col[block_idx];
152195

153-
// TODO(Sky): execute bloom filter pruning parallel
154-
if let Some(bloom_pruner) = bloom_pruner {
155-
let location_scalar = bloom_index_location_col.index(block_idx).unwrap();
156-
let index_location = match location_scalar {
157-
ScalarRef::Null => None,
158-
ScalarRef::Tuple(tuple) => {
159-
if tuple.len() != 2 {
160-
unreachable!()
196+
// used to rebuild bloom index
197+
let block_read_info = BlockReadInfo {
198+
location: location_path.clone(),
199+
row_count,
200+
col_metas: columns_meta.clone(),
201+
compression,
202+
block_size,
203+
};
204+
205+
if !bloom_pruner
206+
.should_keep(
207+
&index_location,
208+
index_size,
209+
&columns_stat,
210+
column_ids.clone(),
211+
&block_read_info,
212+
)
213+
.await
214+
{
215+
return Ok(());
161216
}
162-
Some((
163-
tuple[0].as_string().unwrap().to_string(),
164-
*tuple[1].as_number().unwrap().as_u_int64().unwrap(),
165-
))
166217
}
167-
_ => unreachable!(),
168-
};
169-
let index_size = bloom_index_size_col[block_idx];
170-
171-
// used to rebuild bloom index
172-
let block_read_info = BlockReadInfo {
173-
location: location_path.to_string(),
174-
row_count,
175-
col_metas: columns_meta.clone(),
176-
compression,
177-
block_size,
178-
};
179-
if !bloom_pruner
180-
.should_keep(
181-
&index_location,
182-
index_size,
183-
&columns_stat,
184-
self.column_ids.clone(),
185-
&block_read_info,
186-
)
187-
.await
188-
{
189-
continue;
190-
}
191-
}
192218

193-
// TODO(Sky): add inverted index pruning
219+
// Get create_on value
220+
let create_on = create_on_col.index(block_idx).unwrap();
221+
let create_on = match create_on {
222+
ScalarRef::Null => None,
223+
ScalarRef::Number(number_scalar) => Some(
224+
DateTime::from_timestamp(*number_scalar.as_int64().unwrap(), 0)
225+
.unwrap(),
226+
),
227+
_ => unreachable!(),
228+
};
194229

195-
let create_on = create_on_col.index(block_idx).unwrap();
196-
let create_on = match create_on {
197-
ScalarRef::Null => None,
198-
ScalarRef::Number(number_scalar) => {
199-
Some(DateTime::from_timestamp(*number_scalar.as_int64().unwrap(), 0).unwrap())
200-
}
201-
_ => unreachable!(),
202-
};
230+
let block_meta_index = BlockMetaIndex {
231+
segment_idx: segment_location.segment_idx,
232+
block_idx,
233+
range: None,
234+
page_size: row_count as usize,
235+
block_id: block_id_in_segment(block_num, block_idx),
236+
block_location: location_path.clone(),
237+
segment_location: segment_location.location.0.clone(),
238+
snapshot_location: segment_location.snapshot_loc.clone(),
239+
matched_rows: None,
240+
virtual_block_meta: None,
241+
};
203242

204-
let block_meta_index = BlockMetaIndex {
205-
segment_idx: segment_location.segment_idx,
206-
block_idx,
207-
range: None,
208-
page_size: row_count as usize,
209-
block_id: block_id_in_segment(block_num, block_idx),
210-
// TODO(Sky): this is duplicate with FuseBlockPartInfo.location.
211-
block_location: location_path.to_string(),
212-
segment_location: segment_location.location.0.clone(),
213-
snapshot_location: segment_location.snapshot_loc.clone(),
214-
matched_rows: None,
215-
virtual_block_meta: None,
216-
};
243+
// Collect column metadata
244+
for column_id in &column_ids {
245+
if let Some(meta) = segment.meta_col(*column_id) {
246+
let meta = meta.index(block_idx).unwrap();
247+
let meta = meta.as_tuple().unwrap();
248+
let offset = meta[0].as_number().unwrap().as_u_int64().unwrap();
249+
let length = meta[1].as_number().unwrap().as_u_int64().unwrap();
250+
let num_values = meta[2].as_number().unwrap().as_u_int64().unwrap();
251+
columns_meta.insert(
252+
*column_id,
253+
ColumnMeta::Parquet(ColumnMetaV0 {
254+
offset: *offset,
255+
len: *length,
256+
num_values: *num_values,
257+
}),
258+
);
259+
}
260+
}
217261

218-
for column_id in &self.column_ids {
219-
if let Some(meta) = segment.meta_col(*column_id) {
220-
let meta = meta.index(block_idx).unwrap();
221-
let meta = meta.as_tuple().unwrap();
222-
let offset = meta[0].as_number().unwrap().as_u_int64().unwrap();
223-
let length = meta[1].as_number().unwrap().as_u_int64().unwrap();
224-
let num_values = meta[2].as_number().unwrap().as_u_int64().unwrap();
225-
columns_meta.insert(
226-
*column_id,
227-
ColumnMeta::Parquet(ColumnMetaV0 {
228-
offset: *offset,
229-
len: *length,
230-
num_values: *num_values,
231-
}),
262+
let part_info = FuseBlockPartInfo::create(
263+
location_path,
264+
row_count,
265+
columns_meta,
266+
Some(columns_stat),
267+
compression,
268+
None, // TODO(Sky): sort_min_max
269+
Some(block_meta_index),
270+
create_on,
232271
);
233-
}
234-
}
235272

236-
let part_info = FuseBlockPartInfo::create(
237-
location_path.to_string(),
238-
row_count,
239-
columns_meta,
240-
Some(columns_stat),
241-
compression,
242-
None, // TODO(Sky): sort_min_max
243-
Some(block_meta_index),
244-
create_on,
245-
);
246-
if self
247-
.sender
248-
.as_ref()
249-
.unwrap()
250-
.send(Ok(part_info))
273+
let _ = sender.send(Ok(part_info)).await;
274+
Ok(())
275+
})
276+
});
277+
}
278+
279+
// Execute all pruning tasks in parallel
280+
if !pruning_tasks.is_empty() {
281+
let join_handlers = pruning_runtime
282+
.try_spawn_batch_with_owned_semaphore(pruning_semaphore.clone(), pruning_tasks)
251283
.await
252-
.is_err()
253-
{
254-
break;
255-
}
284+
.map_err(|e| ErrorCode::StorageOther(format!("block pruning failure, {}", e)))?;
285+
286+
// Wait for all tasks to complete
287+
let _ = future::try_join_all(join_handlers)
288+
.await
289+
.map_err(|e| ErrorCode::StorageOther(format!("block pruning failure, {}", e)))?;
256290
}
257291
Ok(false)
258292
}

0 commit comments

Comments
 (0)