Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 66 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,30 @@

## 目录

- [特点](#特点)
- [模块简介](#主要模块)
- [workbranch](#workbranch)
- [supervisor](#supervisor)
- [workspace](#workspace)
- [辅助模块](#辅助模块)
- [futures](#futures)
- [benchmark](#benchmark)
- [如何使用](#如何使用)
- [注意事项](#注意事项)
- [其它](#其它)
- [workspace](#workspace)
- [目录](#目录)
- [特点](#特点)
- [主要模块](#主要模块)
- [**workbranch**](#workbranch)
- [**supervisor**](#supervisor)
- [**workspace**](#workspace-1)
- [辅助模块](#辅助模块)
- [futures](#futures)
- [benchmark](#benchmark)
- [空跑测试](#空跑测试)
- [延迟测试](#延迟测试)
- [如何使用](#如何使用)
- [生成doxygen文档](#生成doxygen文档)
- [简单使用](#简单使用)
- [运行已有实例(以example为例)](#运行已有实例以example为例)
- [安装到系统(支持Win/Linux/Mac)](#安装到系统支持winlinuxmac)
- [注意事项](#注意事项)
- [雷区](#雷区)
- [接口安全性](#接口安全性)
- [时间单位](#时间单位)
- [其它](#其它)
- [参考书目](#参考书目)
- [联系我](#联系我)

## 特点

Expand Down Expand Up @@ -142,6 +155,28 @@ Caught error: YYYY
```


此外,workbranch在工作线程空闲时可以设置三种不同的**等待策略**:
```
enum class WaitStrategy {
LowLatencyMode, // LowLatencyMode mode: original busy-waiting
BalancedMode, // BalancedMode mode: adaptive waiting, sleep 1ms after exceeding max spin count
SleepMode // SleepMode mode: sleeps 1ms per loop or uses condition variables to control the loop
};
```
1. LowLatencyMode:
- 实现方式: 在任务队列为空时,工作线程调用 std::this_thread::yield() 主动让出 CPU 控制权,但立即重新检查任务队列(忙等待)。
- 响应延迟: 线程持续占用 CPU 资源,不断检查任务队列是否有新任务。一旦有新任务到达,线程能够迅速响应,导致平均延迟和最大延迟都较低。
- CPU 占用: 高,因为线程始终在循环中运行,未进行休眠。
2. BalanceddMode:
- 实现方式: 线程在初始的 max_spin_count 次循环内,采用 std::this_thread::yield() 忙等待。如果超过 max_spin_count,线程会休眠 1 毫秒,然后重新开始检查任务队列。
- 响应延迟: 在负载较低时,线程能快速响应新任务。但在负载较高或任务提交频繁时,线程可能会进入休眠,导致响应延迟增加。
- CPU 占用: 较高,但比高性能模式低。
3. SleepMode:
- 实现方式: 线程在每次循环中都休眠固定的时间(代码中为 1 毫秒),然后检查任务队列。
- 响应延迟: 线程在任务队列为空时长时间休眠,即使有新任务到达,也要等待休眠结束后才会处理。这导致任务的响应延迟显著增加,平均延迟和最大延迟都很高。
- CPU 占用: 低,因为线程大部分时间都在休眠。


---

### **supervisor**
Expand Down Expand Up @@ -268,7 +303,7 @@ int main() {
### 空跑测试

测试原理:通过快速提交大量的空任务以考察框架同步任务的开销。<br>
测试环境:Ubuntu20.04 : 16核 : AMD Ryzen 7 5800H with Radeon Graphics 3.20 GHz
测试环境:Ubuntu20.04 : 8核16线程 : AMD Ryzen 7 5800H with Radeon Graphics 3.20 GHz

<**测试1**><br> 在测试1中我们调用了`submit<wsp::task::seq>`,每次打包10个空任务并提交到**workbranch**中执行。结果如下:(代码见`workspace/benchmark/bench1.cc`)

Expand Down Expand Up @@ -309,6 +344,25 @@ threads: 12 | tasks: 100000000 | time-cost: 5.17316 (s)

**总结**:利用workspace进行任务分发,且**workbranch**线程数为1的情况下,整个任务同步框架是静态的,任务同步开销最小。当**workbranch**内的线程数越多,面对大量空任务时对任务队列的竞争越激烈,框架开销越大。(更加详尽的测试结果见`bench.md`,测试代码于`workspace/bench`)

### 延迟测试
测试原理:通过记录空任务的提交时间和执行时间来测试延迟。<br>
测试环境:Windows11 : 8核16线程 : Intel Core i7-11800H @ 2.30 GHz
<**测试4**><br> 在测试4中,我们每次提交一个任务到 **workbranch** 中执行。每个任务记录其提交时间,并在执行时计算延迟。测试的主要目的是评估不同线程调度策略(`WaitStrategy`)下的任务延迟性能。结果如下:(代码见 `workspace/benchmark/bench4.cc`)

```
Strategy: LowLatencyMode | Threads: 8 | Tasks: 1000000 | Avg Latency: 12.65 us | Min Latency: 0 us | Max Latency: 2173 us
Strategy: BalancedMode | Threads: 8 | Tasks: 1000000 | Avg Latency: 4087.06 us | Min Latency: 0 us | Max Latency: 22699 us
Strategy: SleepMode | Threads: 8 | Tasks: 1000000 | Avg Latency: 15920.58 us | Min Latency: 1 us | Max Latency: 52847 us
```

总结:

| 策略 | 等待方式 | 延迟特性 | CPU 占用 | 适用场景 |
|------------------|--------------------------------|----------------|----------------|------------------------------------|
| **LowLatencyMode** | 忙等待(`yield`) | 最低延迟 | 高 CPU 占用 | 高实时性要求、任务频繁到达 |
| **BalanceddMode** | 混合等待(忙等待 + 短暂睡眠) | 中等延迟 | 中等 CPU 占用 | 平衡延迟与资源消耗 |
| **SleepMode** | 完全睡眠(固定 1ms 睡眠) | 较高延迟 | 低 CPU 占用 | 后台任务、长时间运行的稳定任务 |


## 如何使用

Expand Down
12 changes: 7 additions & 5 deletions benchmark/bench1.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ int main(int argn, char** argvs) {
} else {
fprintf(stderr, "Invalid parameter! usage: [threads + tasks]\n");
return -1;
}
}
wsp::workbranch wb(thread_nums);
auto time_cost = timewait([&]{
auto task = []{/* empty task */};
for (int i = 0; i < task_nums/10; ++i) {
auto time_cost = timewait([&] {
auto task = [] { /* empty task */ };
for (int i = 0; i < task_nums / 10; ++i) {
wb.submit<wsp::task::seq>(task, task, task, task, task, task, task, task, task, task);
}
wb.wait_tasks();
});
std::cout<<"threads: "<<std::left<<std::setw(2)<<thread_nums<<" | tasks: "<<task_nums<<" | time-cost: "<<time_cost<<" (s)"<<std::endl;
std::cout << "threads: " << std::left << std::setw(2) << thread_nums << " | tasks: " << task_nums
<< " | time-cost: " << time_cost << " (s)" << std::endl;
}
13 changes: 7 additions & 6 deletions benchmark/bench2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ int main(int argn, char** argvs) {
} else {
fprintf(stderr, "Invalid parameter! usage: [threads + tasks]\n");
return -1;
}
}
wsp::workspace spc;
for (int i = 0; i < thread_nums; ++i) {
spc.attach(new wsp::workbranch());
}
auto time_cost = timewait([&]{
auto task = []{/* empty task */};
for (int i = 0; i < task_nums/10; ++i) {
auto time_cost = timewait([&] {
auto task = [] { /* empty task */ };
for (int i = 0; i < task_nums / 10; ++i) {
spc.submit<wsp::task::seq>(task, task, task, task, task, task, task, task, task, task);
}
spc.for_each([](wsp::workbranch& each){each.wait_tasks();});
spc.for_each([](wsp::workbranch& each) { each.wait_tasks(); });
});
std::cout<<"threads: "<<std::left<<std::setw(2)<<thread_nums<<" | tasks: "<<task_nums<<" | time-cost: "<<time_cost<<" (s)"<<std::endl;
std::cout << "threads: " << std::left << std::setw(2) << thread_nums << " | tasks: " << task_nums
<< " | time-cost: " << time_cost << " (s)" << std::endl;
}
15 changes: 8 additions & 7 deletions benchmark/bench3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ int main(int argn, char** argvs) {
} else {
fprintf(stderr, "Invalid parameter! usage: [threads + tasks]\n");
return -1;
}
}
wsp::workspace spc;
for (int i = 0; i < thread_nums/2; ++i) {
for (int i = 0; i < thread_nums / 2; ++i) {
spc.attach(new wsp::workbranch(2));
}
auto time_cost = timewait([&]{
auto task = []{/* empty task */};
for (int i = 0; i < task_nums/10; ++i) {
auto time_cost = timewait([&] {
auto task = [] { /* empty task */ };
for (int i = 0; i < task_nums / 10; ++i) {
spc.submit<wsp::task::seq>(task, task, task, task, task, task, task, task, task, task);
}
spc.for_each([](wsp::workbranch& each){each.wait_tasks();});
spc.for_each([](wsp::workbranch& each) { each.wait_tasks(); });
});
std::cout<<"threads: "<<std::left<<std::setw(2)<<thread_nums<<" | tasks: "<<task_nums<<" | time-cost: "<<time_cost<<" (s)"<<std::endl;
std::cout << "threads: " << std::left << std::setw(2) << thread_nums << " | tasks: " << task_nums
<< " | time-cost: " << time_cost << " (s)" << std::endl;
}
81 changes: 81 additions & 0 deletions benchmark/bench4.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <chrono>
#include <iomanip>
#include <mutex>
#include <vector>

#include "workspace/workspace.hpp"
#include "timewait.h"

int main(int argn, char** argvs) {
int task_nums, thread_nums;
if (argn == 3) {
thread_nums = atoi(argvs[1]);
task_nums = atoi(argvs[2]);
} else {
fprintf(stderr, "Invalid parameter! usage: [threads + tasks]\n");
return -1;
}

for (auto strategy : {wsp::WaitStrategy::LowLatencyMode, wsp::WaitStrategy::BalancedMode, wsp::WaitStrategy::SleepMode}) {
wsp::workbranch wb(thread_nums, strategy);
std::vector<long long> latencies;
latencies.reserve(task_nums);
std::mutex latency_mutex;

auto task = [&latencies, &latency_mutex](std::chrono::steady_clock::time_point submit_time) {
auto start_time = std::chrono::steady_clock::now();
Copy link
Owner

@Lwhyz Lwhyz Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在轻量的框架里面,获取高精度时间这个调用是相当重的,这样测试反映的耗时都在这个调用上而不是框架本身。只能是全部跑完统计。

Copy link
Owner

@Lwhyz Lwhyz Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hanya@localhost:build$ ./bench4 4 1000000
Strategy: lowlatancy | Threads: 4 | Tasks: 1000000 | Avg Latency: 5.19 us | Min Latency: 0 us | Max Latency: 1027 us
Strategy: balance | Threads: 4 | Tasks: 1000000 | Avg Latency: 6.87 us | Min Latency: 0 us | Max Latency: 1256 us
Strategy: blocking | Threads: 4 | Tasks: 1000000 | Avg Latency: 6.14 us | Min Latency: 0 us | Max Latency: 145 us
hanya@localhost:build$ ./bench4 1 1000000
Strategy: lowlatancy | Threads: 1 | Tasks: 1000000 | Avg Latency: 56448.22 us | Min Latency: 28 us | Max Latency: 113657 us
Strategy: balance | Threads: 1 | Tasks: 1000000 | Avg Latency: 34161.14 us | Min Latency: 1 us | Max Latency: 71510 us
Strategy: blocking | Threads: 1 | Tasks: 1000000 | Avg Latency: 83461.47 us | Min Latency: 59 us | Max Latency: 152813 us

单个worker的总耗时是更低的,但是这里平均延迟高出了几个数量级。

auto latency = std::chrono::duration_cast<std::chrono::microseconds>(start_time - submit_time).count();
{
std::lock_guard<std::mutex> lock(latency_mutex);
latencies.push_back(latency);
}
};

// record submit_time
for (int i = 0; i < task_nums; ++i) {
auto submit_time = std::chrono::steady_clock::now();
wb.submit([=]() { task(submit_time); });
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[=] 把东西都拷进去了,lambdb这里用着要小心。

}

wb.wait_tasks();

// calculate latency
long long total_latency = 0;
long long max_latency = 0;
long long min_latency = latencies.empty() ? 0 : latencies[0];

for (auto latency : latencies) {
total_latency += latency;
if (latency > max_latency) {
max_latency = latency;
}
if (latency < min_latency) {
min_latency = latency;
}
}

double avg_latency = latencies.empty() ? 0.0 : static_cast<double>(total_latency) / latencies.size();

const char* strategy_name = "";
switch (strategy) {
case wsp::WaitStrategy::LowLatencyMode:
strategy_name = "LowLatencyMode";
break;
case wsp::WaitStrategy::BalancedMode:
strategy_name = "BalancedMode";
break;
case wsp::WaitStrategy::SleepMode:
strategy_name = "SleepMode";
break;
}

std::cout << "Strategy: " << std::left << std::setw(15) << strategy_name
<< " | Threads: " << std::setw(2) << thread_nums << " | Tasks: " << std::setw(8) << task_nums
<< " | Avg Latency: " << std::setw(8) << std::right << std::fixed << std::setprecision(2)
<< avg_latency << " us"
<< " | Min Latency: " << std::setw(4) << min_latency << " us"
<< " | Max Latency: " << std::setw(8) << max_latency << " us" << std::endl;
}

return 0;
}
37 changes: 35 additions & 2 deletions include/workspace/workbranch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@

namespace wsp {

enum class WaitStrategy {

This comment was marked as resolved.

LowLatencyMode, // LowLatencyMode mode: original busy-waiting
BalancedMode, // BalancedMode mode: adaptive waiting, sleep 1ms after exceeding max spin count
SleepMode // SleepMode mode: sleeps 1ms per loop or uses condition variables to control the loop
};

namespace details {

class workbranch {
using worker = autothread<detach>;
using worker_map = std::map<worker::id, worker>;

const int max_spin_count = 10000;
WaitStrategy wait_strategy = {};

sz_t decline = 0;
sz_t task_done_workers = 0;
bool is_waiting = false;
Expand All @@ -35,8 +44,10 @@ class workbranch {
/**
* @brief construct function
* @param wks initial number of workers
* @param strategy wait strategy for workers (defaults to LowLatencyMode).
*/
explicit workbranch(int wks = 1) {
explicit workbranch(int wks = 1, WaitStrategy strategy = WaitStrategy::LowLatencyMode) {
wait_strategy = strategy;
for (int i = 0; i < std::max(wks, 1); ++i) {
add_worker(); // worker
}
Expand Down Expand Up @@ -230,6 +241,8 @@ class workbranch {
// thread's default loop
void mission() {
std::function<void()> task;
int spin_count = 0;

while (true) {
if (decline <= 0 && tq.try_pop(task)) {
task();
Expand All @@ -250,7 +263,27 @@ class workbranch {
task_done_cv.notify_one();
thread_cv.wait(locker, [this]{return !is_waiting; });
} else {
std::this_thread::yield();

switch (wait_strategy) {
case WaitStrategy::LowLatencyMode: {
std::this_thread::yield();
break;
}
case WaitStrategy::BalancedMode: {
if (spin_count < max_spin_count) {
++spin_count;
std::this_thread::yield();
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
spin_count = tq.length() == 0 ? 0 : spin_count / 2;
}
break;
}
case WaitStrategy::SleepMode: {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
break;
}
}
}
}
}
Expand Down