Skip to content

Commit cb0746d

Browse files
branch-4.0: [pipeline](conf) make blocking scheduler configurable #57354 (#57392)
Cherry-picked from #57354 Co-authored-by: Gabriel <[email protected]>
1 parent e836969 commit cb0746d

File tree

5 files changed

+14
-3
lines changed

5 files changed

+14
-3
lines changed

be/src/common/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ DEFINE_Bool(enable_graceful_exit_check, "false");
10911091
DEFINE_Bool(enable_debug_points, "false");
10921092

10931093
DEFINE_Int32(pipeline_executor_size, "0");
1094+
DEFINE_Int32(blocking_pipeline_executor_size, "0");
10941095
DEFINE_Bool(enable_workload_group_for_scan, "false");
10951096
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
10961097

be/src/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ DECLARE_Bool(enable_graceful_exit_check);
11221122
DECLARE_Bool(enable_debug_points);
11231123

11241124
DECLARE_Int32(pipeline_executor_size);
1125+
DECLARE_Int32(blocking_pipeline_executor_size);
11251126

11261127
// block file cache
11271128
DECLARE_Bool(enable_file_cache);

be/src/pipeline/task_scheduler.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,11 @@ class TaskScheduler {
8080

8181
class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
8282
public:
83-
HybridTaskScheduler(int core_num, std::string name,
83+
HybridTaskScheduler(int exec_thread_num, int blocking_exec_thread_num, std::string name,
8484
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
85-
: _blocking_scheduler(core_num * 2, name + "_blocking_scheduler", cgroup_cpu_ctl),
86-
_simple_scheduler(core_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
85+
: _blocking_scheduler(blocking_exec_thread_num, name + "_blocking_scheduler",
86+
cgroup_cpu_ctl),
87+
_simple_scheduler(exec_thread_num, name + "_simple_scheduler", cgroup_cpu_ctl) {}
8788

8889
Status submit(PipelineTaskSPtr task) override;
8990

be/src/runtime/workload_group/workload_group.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,10 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
414414
if (exec_thread_num <= 0) {
415415
exec_thread_num = CpuInfo::num_cores();
416416
}
417+
int blocking_exec_thread_num = config::blocking_pipeline_executor_size;
418+
if (blocking_exec_thread_num <= 0) {
419+
blocking_exec_thread_num = CpuInfo::num_cores() * 2;
420+
}
417421

418422
int num_disk = 1;
419423
int num_cpus = 1;
@@ -484,6 +488,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
484488
.total_query_slot_count = total_query_slot_count,
485489
.slot_mem_policy = slot_mem_policy,
486490
.pipeline_exec_thread_num = exec_thread_num,
491+
.blocking_pipeline_exec_thread_num = blocking_exec_thread_num,
487492
.max_flush_thread_num = max_flush_thread_num,
488493
.min_flush_thread_num = min_flush_thread_num};
489494
}
@@ -522,6 +527,7 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
522527
uint64_t wg_id = wg_info->id;
523528
std::string wg_name = wg_info->name;
524529
int pipeline_exec_thread_num = wg_info->pipeline_exec_thread_num;
530+
int blocking_exec_thread_num = wg_info->blocking_pipeline_exec_thread_num;
525531
int scan_thread_num = wg_info->scan_thread_num;
526532
int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
527533
int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
@@ -532,6 +538,7 @@ Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
532538
if (_task_sched == nullptr) {
533539
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
534540
std::make_unique<pipeline::HybridTaskScheduler>(pipeline_exec_thread_num,
541+
blocking_exec_thread_num,
535542
"p_" + wg_name, cg_cpu_ctl_ptr);
536543
Status ret = pipeline_task_scheduler->start();
537544
if (ret.ok()) {

be/src/runtime/workload_group/workload_group.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ struct WorkloadGroupInfo {
285285
int cgroup_cpu_hard_limit = 0;
286286
const bool valid = true;
287287
const int pipeline_exec_thread_num = 0;
288+
const int blocking_pipeline_exec_thread_num = 0;
288289
const int max_flush_thread_num = 0;
289290
const int min_flush_thread_num = 0;
290291

0 commit comments

Comments
 (0)