Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,10 @@ std::string PipelineTask::debug_string() {
fmt::format_to(debug_string_buffer,
"PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry run = "
"{}, _wake_up_early = {}, time elapsed since last state changing = {}s, spilling"
" = {}, is running = {}]",
" = {}, is running = {}, _on_blocking_scheduler = {}, thread_id = {}]",
_index, _opened, _eos, _to_string(_exec_state), _dry_run, _wake_up_early.load(),
_state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling, is_running());
_state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling, is_running(),
_on_blocking_scheduler, get_thread_id());
std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
Expand Down
28 changes: 20 additions & 8 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class PriorityTaskQueue;
class Dependency;

class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
private:
struct TaskTracking {
int blocking_thread_id = -1;
int simple_thread_id = -1;
};

public:
PipelineTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
std::shared_ptr<PipelineFragmentContext> fragment_context,
Expand All @@ -66,15 +72,20 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {

std::weak_ptr<PipelineFragmentContext>& fragment_context() { return _fragment_context; }

int get_core_id() const { return _core_id; }
void set_on_blocking_scheduler(bool blocking_scheduler) {
_on_blocking_scheduler = blocking_scheduler;
}
int get_thread_id() const {
return _on_blocking_scheduler ? _tracking.blocking_thread_id : _tracking.simple_thread_id;
}

PipelineTask& set_core_id(int id) {
if (id != _core_id) {
if (_core_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_core_id = id;
PipelineTask& set_thread_id(int thread_id) {
if (_on_blocking_scheduler) {
_tracking.blocking_thread_id = thread_id;
} else {
_tracking.simple_thread_id = thread_id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不要这么做。pipeline task 本身不应该感知到自己是在哪个调度器里,hybrid task scheduler 随时可能结构变化。 可以考虑每次用自己记录的core id % 目标queue 的sub queue 数量来避免core

}
COUNTER_UPDATE(_core_change_times, 1);
return *this;
}

Expand Down Expand Up @@ -193,7 +204,8 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
PipelinePtr _pipeline;
bool _opened;
RuntimeState* _state = nullptr;
int _core_id = -1;
TaskTracking _tracking;
bool _on_blocking_scheduler = false;
uint32_t _schedule_time = 0;
std::unique_ptr<vectorized::Block> _block;

Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ PipelineTaskSPtr MultiCoreTaskQueue::_steal_take(int core_id) {
}

Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) {
int core_id = task->get_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
int thread_id = task->get_thread_id();
if (thread_id < 0) {
thread_id = _next_core.fetch_add(1) % _core_size;
}
return push_back(task, core_id);
return push_back(task, thread_id);
}

Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
Expand All @@ -207,7 +207,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
// if the task not execute but exception early close, core_id == -1
// should not do update_statistics
if (auto core_id = task->get_core_id(); core_id >= 0) {
if (auto core_id = task->get_thread_id(); core_id >= 0) {
task->inc_runtime_ns(time_spent);
_prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent);
}
Expand Down
13 changes: 7 additions & 6 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,14 @@ TaskScheduler::~TaskScheduler() {
}

Status TaskScheduler::start() {
int cores = _task_queue.cores();
RETURN_IF_ERROR(ThreadPoolBuilder(_name)
.set_min_threads(cores)
.set_max_threads(cores)
.set_min_threads(_num_threads)
.set_max_threads(_num_threads)
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool));
LOG_INFO("TaskScheduler set cores").tag("size", cores);
for (int32_t i = 0; i < cores; ++i) {
LOG_INFO("TaskScheduler set cores").tag("size", _num_threads);
for (int32_t i = 0; i < _num_threads; ++i) {
RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); }));
}
return Status::OK();
Expand Down Expand Up @@ -117,7 +116,7 @@ void TaskScheduler::_do_work(int index) {
// Fragment already finished
continue;
}
task->set_running(true).set_core_id(index);
task->set_running(true).set_thread_id(index);
bool done = false;
auto status = Status::OK();
int64_t exec_ns = 0;
Expand Down Expand Up @@ -184,8 +183,10 @@ void TaskScheduler::stop() {

Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
if (task->is_blockable()) {
task->set_on_blocking_scheduler(true);
return _blocking_scheduler.submit(task);
} else {
task->set_on_blocking_scheduler(false);
return _simple_scheduler.submit(task);
}
}
Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,18 @@ class TaskScheduler {
friend class HybridTaskScheduler;

TaskScheduler(int core_num, std::string name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: _name(std::move(name)), _task_queue(core_num), _cgroup_cpu_ctl(cgroup_cpu_ctl) {}
TaskScheduler() : _task_queue(0) {}
: _name(std::move(name)),
_task_queue(core_num),
_num_threads(core_num),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
TaskScheduler() : _task_queue(0), _num_threads(0) {}
std::string _name;
std::unique_ptr<ThreadPool> _fix_thread_pool;

MultiCoreTaskQueue _task_queue;
bool _need_to_stop = false;
bool _shutdown = false;
const int _num_threads;
std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;

void _do_work(int index);
Expand Down
Loading