Skip to content

Commit 09b7e1b

Browse files
committed
revert compact and recluster
1 parent 4df79c6 commit 09b7e1b

20 files changed

+322
-1558
lines changed

src/query/service/src/pipelines/builders/builder_commit.rs

+24-22
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use databend_common_sql::executor::physical_plans::CommitSink as PhysicalCommitS
2020
use databend_common_sql::executor::physical_plans::CommitType;
2121
use databend_common_sql::executor::physical_plans::MutationKind;
2222
use databend_common_sql::plans::TruncateMode;
23-
use databend_common_storages_fuse::operations::add_table_mutation_aggregator;
2423
use databend_common_storages_fuse::operations::CommitSink;
2524
use databend_common_storages_fuse::operations::MutationGenerator;
25+
use databend_common_storages_fuse::operations::TableMutationAggregator;
2626
use databend_common_storages_fuse::operations::TransformMergeCommitMeta;
2727
use databend_common_storages_fuse::operations::TruncateGenerator;
2828
use databend_common_storages_fuse::FuseTable;
@@ -81,28 +81,30 @@ impl PipelineBuilder {
8181
TransformMergeCommitMeta::create(cluster_key_id)
8282
});
8383
} else {
84-
let base_segments = if matches!(
85-
kind,
86-
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
87-
) {
88-
vec![]
89-
} else {
90-
plan.snapshot.segments().to_vec()
91-
};
84+
self.main_pipeline.add_async_accumulating_transformer(|| {
85+
let base_segments = if matches!(
86+
kind,
87+
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
88+
) {
89+
vec![]
90+
} else {
91+
plan.snapshot.segments().to_vec()
92+
};
9293

93-
// extract re-cluster related mutations from physical plan
94-
let recluster_info = plan.recluster_info.clone().unwrap_or_default();
95-
add_table_mutation_aggregator(
96-
&mut self.main_pipeline,
97-
table,
98-
self.ctx.clone(),
99-
base_segments,
100-
recluster_info.merged_blocks,
101-
recluster_info.removed_segment_indexes,
102-
recluster_info.removed_statistics,
103-
*kind,
104-
plan.table_meta_timestamps,
105-
);
94+
// extract re-cluster related mutations from physical plan
95+
let recluster_info = plan.recluster_info.clone().unwrap_or_default();
96+
97+
TableMutationAggregator::create(
98+
table,
99+
self.ctx.clone(),
100+
base_segments,
101+
recluster_info.merged_blocks,
102+
recluster_info.removed_segment_indexes,
103+
recluster_info.removed_statistics,
104+
*kind,
105+
plan.table_meta_timestamps,
106+
)
107+
});
106108
}
107109

108110
let snapshot_gen = MutationGenerator::new(plan.snapshot.clone(), *kind);

src/query/service/src/pipelines/builders/builder_compact.rs

+40-63
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,11 @@ use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
2626
use databend_common_sql::executor::physical_plans::CompactSource as PhysicalCompactSource;
2727
use databend_common_sql::executor::physical_plans::MutationKind;
2828
use databend_common_sql::StreamContext;
29-
use databend_common_storages_fuse::io::read::ColumnOrientedSegmentReader;
30-
use databend_common_storages_fuse::io::read::RowOrientedSegmentReader;
31-
use databend_common_storages_fuse::operations::add_table_mutation_aggregator;
3229
use databend_common_storages_fuse::operations::BlockCompactMutator;
33-
use databend_common_storages_fuse::operations::ColumnOrientedSegmentsWithIndices;
34-
use databend_common_storages_fuse::operations::CompactSegmentsWithIndices;
30+
use databend_common_storages_fuse::operations::CompactLazyPartInfo;
3531
use databend_common_storages_fuse::operations::CompactSource;
3632
use databend_common_storages_fuse::operations::CompactTransform;
33+
use databend_common_storages_fuse::operations::TableMutationAggregator;
3734
use databend_common_storages_fuse::operations::TransformSerializeBlock;
3835
use databend_common_storages_fuse::FuseTable;
3936

@@ -57,60 +54,39 @@ impl PipelineBuilder {
5754
let thresholds = table.get_block_thresholds();
5855
let cluster_key_id = table.cluster_key_id();
5956
let mut max_threads = self.ctx.get_settings().get_max_threads()? as usize;
60-
let partitions = compact_block.parts.partitions.clone();
6157

6258
if is_lazy {
6359
let query_ctx = self.ctx.clone();
64-
let column_ids = compact_block.column_ids.clone();
65-
let is_column_oriented = table.is_column_oriented();
6660

61+
let lazy_parts = compact_block
62+
.parts
63+
.partitions
64+
.iter()
65+
.map(|v| {
66+
v.as_any()
67+
.downcast_ref::<CompactLazyPartInfo>()
68+
.unwrap()
69+
.clone()
70+
})
71+
.collect::<Vec<_>>();
72+
73+
let column_ids = compact_block.column_ids.clone();
6774
self.main_pipeline.set_on_init(move || {
6875
let ctx = query_ctx.clone();
69-
let partitions = Runtime::with_worker_threads(
70-
2,
71-
Some("build_compact_tasks".to_string()),
72-
)?
73-
.block_on(async move {
74-
let partitions = if is_column_oriented {
75-
let lazy_parts = partitions
76-
.iter()
77-
.map(|v| {
78-
v.as_any()
79-
.downcast_ref::<ColumnOrientedSegmentsWithIndices>()
80-
.unwrap()
81-
.clone()
82-
})
83-
.collect::<Vec<_>>();
84-
BlockCompactMutator::<ColumnOrientedSegmentReader>::build_compact_tasks(
85-
ctx.clone(),
86-
column_ids.clone(),
87-
cluster_key_id,
88-
thresholds,
89-
lazy_parts,
90-
)
91-
.await?
92-
} else {
93-
let lazy_parts = partitions
94-
.iter()
95-
.map(|v| {
96-
v.as_any()
97-
.downcast_ref::<CompactSegmentsWithIndices>()
98-
.unwrap()
99-
.clone()
100-
})
101-
.collect::<Vec<_>>();
102-
BlockCompactMutator::<RowOrientedSegmentReader>::build_compact_tasks(
103-
ctx.clone(),
104-
column_ids.clone(),
105-
cluster_key_id,
106-
thresholds,
107-
lazy_parts,
108-
)
109-
.await?
110-
};
76+
let partitions =
77+
Runtime::with_worker_threads(2, Some("build_compact_tasks".to_string()))?
78+
.block_on(async move {
79+
let partitions = BlockCompactMutator::build_compact_tasks(
80+
ctx.clone(),
81+
column_ids.clone(),
82+
cluster_key_id,
83+
thresholds,
84+
lazy_parts,
85+
)
86+
.await?;
11187

112-
Result::<_>::Ok(partitions)
113-
})?;
88+
Result::<_>::Ok(partitions)
89+
})?;
11490

11591
let partitions = Partitions::create(PartitionsShuffleKind::Mod, partitions);
11692
query_ctx.set_partitions(partitions)?;
@@ -179,17 +155,18 @@ impl PipelineBuilder {
179155

180156
if is_lazy {
181157
self.main_pipeline.try_resize(1)?;
182-
add_table_mutation_aggregator(
183-
&mut self.main_pipeline,
184-
table,
185-
self.ctx.clone(),
186-
vec![],
187-
vec![],
188-
vec![],
189-
Default::default(),
190-
MutationKind::Compact,
191-
compact_block.table_meta_timestamps,
192-
);
158+
self.main_pipeline.add_async_accumulating_transformer(|| {
159+
TableMutationAggregator::create(
160+
table,
161+
self.ctx.clone(),
162+
vec![],
163+
vec![],
164+
vec![],
165+
Default::default(),
166+
MutationKind::Compact,
167+
compact_block.table_meta_timestamps,
168+
)
169+
});
193170
}
194171
Ok(())
195172
}

src/query/service/src/pipelines/builders/builder_insert_multi_table.rs

+6-18
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ use databend_common_sql::executor::physical_plans::ChunkFilter;
3434
use databend_common_sql::executor::physical_plans::ChunkMerge;
3535
use databend_common_sql::executor::physical_plans::Duplicate;
3636
use databend_common_sql::executor::physical_plans::Shuffle;
37-
use databend_common_storages_fuse::io::read::ColumnOrientedSegmentReader;
38-
use databend_common_storages_fuse::io::read::RowOrientedSegmentReader;
3937
use databend_common_storages_fuse::operations::CommitMultiTableInsert;
4038
use databend_common_storages_fuse::FuseTable;
4139
use databend_common_storages_fuse::TableContext;
@@ -291,22 +289,12 @@ impl PipelineBuilder {
291289
block_thresholds,
292290
target.table_meta_timestamps,
293291
)?));
294-
if table.is_column_oriented() {
295-
mutation_aggregator_builders.push(Box::new(
296-
self.mutation_aggregator_transform_builder::<ColumnOrientedSegmentReader>(
297-
table.clone(),
298-
target.table_meta_timestamps,
299-
)?,
300-
));
301-
} else {
302-
mutation_aggregator_builders.push(Box::new(
303-
self.mutation_aggregator_transform_builder::<RowOrientedSegmentReader>(
304-
table.clone(),
305-
target.table_meta_timestamps,
306-
)?,
307-
));
308-
}
309-
292+
mutation_aggregator_builders.push(Box::new(
293+
self.mutation_aggregator_transform_builder(
294+
table.clone(),
295+
target.table_meta_timestamps,
296+
)?,
297+
));
310298
table_meta_timestampss.insert(table.get_id(), target.table_meta_timestamps);
311299
tables.insert(table.get_id(), table);
312300
}

src/query/service/src/pipelines/builders/builder_mutation.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
3434
use databend_common_sql::binder::MutationStrategy;
3535
use databend_common_sql::executor::physical_plans::Mutation;
3636
use databend_common_sql::executor::physical_plans::MutationKind;
37-
use databend_common_storages_fuse::operations::new_serialize_segment_pipe_item;
3837
use databend_common_storages_fuse::operations::TransformSerializeBlock;
38+
use databend_common_storages_fuse::operations::TransformSerializeSegment;
3939
use databend_common_storages_fuse::operations::UnMatchedExprs;
4040
use databend_common_storages_fuse::FuseTable;
4141

@@ -61,13 +61,13 @@ impl PipelineBuilder {
6161
let io_request_semaphore =
6262
Arc::new(Semaphore::new(self.settings.get_max_threads()? as usize));
6363

64-
let serialize_segment_transform = new_serialize_segment_pipe_item(
64+
let serialize_segment_transform = TransformSerializeSegment::new(
6565
InputPort::create(),
6666
OutputPort::create(),
6767
table,
6868
block_thresholds,
6969
merge_into.table_meta_timestamps,
70-
)?;
70+
);
7171

7272
// For row_id port, create rowid_aggregate_mutator
7373
// For matched data port and unmatched port, do serialize
@@ -161,7 +161,7 @@ impl PipelineBuilder {
161161
vec.push(create_dummy_item());
162162
}
163163
// data port
164-
vec.push(serialize_segment_transform);
164+
vec.push(serialize_segment_transform.into_pipe_item());
165165
vec
166166
};
167167

src/query/service/src/pipelines/builders/builder_replace_into.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ use databend_common_sql::BindContext;
4949
use databend_common_sql::Metadata;
5050
use databend_common_sql::MetadataRef;
5151
use databend_common_sql::NameResolutionContext;
52-
use databend_common_storages_fuse::operations::new_serialize_segment_pipe_item;
5352
use databend_common_storages_fuse::operations::BroadcastProcessor;
5453
use databend_common_storages_fuse::operations::ReplaceIntoProcessor;
5554
use databend_common_storages_fuse::operations::TransformSerializeBlock;
55+
use databend_common_storages_fuse::operations::TransformSerializeSegment;
5656
use databend_common_storages_fuse::operations::UnbranchedReplaceIntoProcessor;
5757
use databend_common_storages_fuse::FuseTable;
5858
use parking_lot::RwLock;
@@ -134,13 +134,13 @@ impl PipelineBuilder {
134134
let mut block_builder = serialize_block_transform.get_block_builder();
135135
block_builder.source_schema = table.schema_with_stream();
136136

137-
let serialize_segment_transform = new_serialize_segment_pipe_item(
137+
let serialize_segment_transform = TransformSerializeSegment::new(
138138
InputPort::create(),
139139
OutputPort::create(),
140140
table,
141141
*block_thresholds,
142142
replace.table_meta_timestamps,
143-
)?;
143+
);
144144
if !*need_insert {
145145
if segment_partition_num == 0 {
146146
return Ok(());
@@ -207,7 +207,7 @@ impl PipelineBuilder {
207207
if segment_partition_num == 0 {
208208
let dummy_item = create_dummy_item();
209209
self.main_pipeline.add_pipe(Pipe::create(2, 2, vec![
210-
serialize_segment_transform,
210+
serialize_segment_transform.into_pipe_item(),
211211
dummy_item,
212212
]));
213213
} else {
@@ -230,7 +230,7 @@ impl PipelineBuilder {
230230
let item_size = segment_partition_num + 1;
231231
let mut pipe_items = Vec::with_capacity(item_size);
232232
// setup the dummy transform
233-
pipe_items.push(serialize_segment_transform);
233+
pipe_items.push(serialize_segment_transform.into_pipe_item());
234234

235235
let max_threads = self.settings.get_max_threads()?;
236236
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

src/query/service/src/pipelines/builders/transform_builder.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,9 @@ use databend_common_sql::evaluator::BlockOperator;
3434
use databend_common_sql::evaluator::CompoundBlockOperator;
3535
use databend_common_sql::ColumnSet;
3636
use databend_common_storages_factory::Table;
37-
use databend_common_storages_fuse::io::read::SegmentReader;
38-
use databend_common_storages_fuse::operations::new_serialize_segment_processor;
3937
use databend_common_storages_fuse::operations::TableMutationAggregator;
4038
use databend_common_storages_fuse::operations::TransformSerializeBlock;
39+
use databend_common_storages_fuse::operations::TransformSerializeSegment;
4140
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
4241
use databend_common_storages_fuse::FuseTable;
4342
use databend_storages_common_table_meta::meta;
@@ -146,25 +145,26 @@ impl PipelineBuilder {
146145
) -> Result<impl Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>> {
147146
Ok(move |input, output| {
148147
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
149-
new_serialize_segment_processor(
148+
let proc = TransformSerializeSegment::new(
150149
input,
151150
output,
152151
fuse_table,
153152
block_thresholds,
154153
table_meta_timestamps,
155-
)
154+
);
155+
proc.into_processor()
156156
})
157157
}
158158

159-
pub(crate) fn mutation_aggregator_transform_builder<R: SegmentReader>(
159+
pub(crate) fn mutation_aggregator_transform_builder(
160160
&self,
161161
table: Arc<dyn Table>,
162162
table_meta_timestamps: TableMetaTimestamps,
163163
) -> Result<impl Fn(Arc<InputPort>, Arc<OutputPort>) -> Result<ProcessorPtr>> {
164164
let ctx = self.ctx.clone();
165165
Ok(move |input, output| {
166166
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
167-
let aggregator = TableMutationAggregator::<R>::create(
167+
let aggregator = TableMutationAggregator::create(
168168
fuse_table,
169169
ctx.clone(),
170170
vec![],

src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use databend_common_sql::executor::physical_plans::CommitType;
2727
use databend_common_sql::executor::physical_plans::CompactSource;
2828
use databend_common_sql::executor::physical_plans::MutationKind;
2929
use databend_common_sql::executor::PhysicalPlan;
30-
use databend_common_storages_fuse::io::read::RowOrientedSegmentReader;
3130
use databend_common_storages_fuse::io::SegmentsIO;
3231
use databend_common_storages_fuse::operations::BlockCompactMutator;
3332
use databend_common_storages_fuse::operations::CompactBlockPartInfo;
@@ -244,7 +243,7 @@ async fn test_safety() -> Result<()> {
244243
};
245244

246245
eprintln!("running target select");
247-
let mut block_compact_mutator = BlockCompactMutator::<RowOrientedSegmentReader>::new(
246+
let mut block_compact_mutator = BlockCompactMutator::new(
248247
ctx.clone(),
249248
threshold,
250249
compact_params,
@@ -287,7 +286,7 @@ pub async fn verify_compact_tasks(
287286
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
288287
actual_blocks_number += extra.unchanged_blocks.len();
289288
for b in &extra.unchanged_blocks {
290-
actual_block_ids.insert(b.1.location.0.clone());
289+
actual_block_ids.insert(b.1.location.clone());
291290
}
292291
}
293292
CompactBlockPartInfo::CompactTaskInfo(task) => {
@@ -318,7 +317,7 @@ pub async fn verify_compact_tasks(
318317
let segment = SegmentInfo::try_from(compact_segment)?;
319318
except_blocks_number += segment.blocks.len();
320319
for b in &segment.blocks {
321-
except_block_ids.insert(b.location.0.clone());
320+
except_block_ids.insert(b.location.clone());
322321
}
323322
}
324323
assert_eq!(except_blocks_number, actual_blocks_number);

0 commit comments

Comments
 (0)