-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdispatch_queue.cpp
176 lines (141 loc) · 5.2 KB
/
dispatch_queue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#include "dispatch_queue.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
using time_point = std::chrono::steady_clock::time_point;
struct work_entry {
explicit work_entry(std::function<void()> func_)
: func(std::move(func_)), expiry(time_point()), from_timer(false) {}
work_entry(std::function<void()> func_, time_point expiry_)
: func(std::move(func_)), expiry(expiry_), from_timer(true) {}
std::function<void()> func;
time_point expiry;
bool from_timer;
};
bool operator>(work_entry const &lhs, work_entry const &rhs) {
return lhs.expiry > rhs.expiry;
}
struct dispatch_queue::impl {
impl();
static void dispatch_thread_proc(impl *self);
static void timer_thread_proc(impl *self);
std::mutex work_queue_mtx;
std::condition_variable work_queue_cond;
std::deque<work_entry> work_queue;
std::mutex timer_mtx;
std::condition_variable timer_cond;
std::priority_queue<work_entry, std::vector<work_entry>,
std::greater<work_entry> >
timers;
std::thread work_queue_thread;
std::thread timer_thread;
std::atomic<bool> quit;
std::atomic<bool> work_queue_thread_started;
std::atomic<bool> timer_thread_started;
using work_queue_lock = std::unique_lock<decltype(work_queue_mtx)>;
using timer_lock = std::unique_lock<decltype(timer_mtx)>;
};
void dispatch_queue::impl::dispatch_thread_proc(dispatch_queue::impl *self) {
work_queue_lock work_queue_lock(self->work_queue_mtx);
self->work_queue_cond.notify_one();
self->work_queue_thread_started = true;
while (self->quit == false) {
self->work_queue_cond.wait(work_queue_lock,
[&] { return !self->work_queue.empty(); });
while (!self->work_queue.empty()) {
auto work = self->work_queue.back();
self->work_queue.pop_back();
work_queue_lock.unlock();
work.func();
work_queue_lock.lock();
}
}
}
void dispatch_queue::impl::timer_thread_proc(dispatch_queue::impl *self) {
timer_lock timer_lock(self->timer_mtx);
self->timer_cond.notify_one();
self->timer_thread_started = true;
while (self->quit == false) {
if (self->timers.empty()) {
self->timer_cond.wait(timer_lock, [&] {
return self->quit || !self->timers.empty();
});
}
while (!self->timers.empty()) {
auto const &work = self->timers.top();
if (self->timer_cond.wait_until(timer_lock, work.expiry, [&] {
return self->quit.load();
})) {
break;
}
{
work_queue_lock _(self->work_queue_mtx);
auto where = std::find_if(
self->work_queue.rbegin(), self->work_queue.rend(),
[](work_entry const &w) { return !w.from_timer; });
self->work_queue.insert(where.base(), work);
self->timers.pop();
self->work_queue_cond.notify_one();
}
}
}
}
dispatch_queue::impl::impl()
: quit(false),
work_queue_thread_started(false),
timer_thread_started(false) {
work_queue_lock work_queue_lock(work_queue_mtx);
timer_lock timer_lock(timer_mtx);
work_queue_thread = std::thread(dispatch_thread_proc, this);
timer_thread = std::thread(timer_thread_proc, this);
work_queue_cond.wait(work_queue_lock,
[this] { return work_queue_thread_started.load(); });
timer_cond.wait(timer_lock, [this] { return timer_thread_started.load(); });
}
dispatch_queue::dispatch_queue() : m(new impl) {}
dispatch_queue::~dispatch_queue() {
dispatch_async([this] { m->quit = true; });
m->work_queue_thread.join();
{
impl::timer_lock _(m->timer_mtx);
m->timer_cond.notify_one();
}
m->timer_thread.join();
}
void dispatch_queue::dispatch_async(std::function<void()> func) {
impl::work_queue_lock _(m->work_queue_mtx);
m->work_queue.push_front(work_entry(func));
m->work_queue_cond.notify_one();
}
void dispatch_queue::dispatch_sync(std::function<void()> func) {
std::mutex sync_mtx;
impl::work_queue_lock work_queue_lock(sync_mtx);
std::condition_variable sync_cond;
std::atomic<bool> completed(false);
{
impl::work_queue_lock _(m->work_queue_mtx);
m->work_queue.push_front(work_entry(func));
m->work_queue.push_front(work_entry([&] {
std::unique_lock<decltype(sync_mtx)> sync_cb_lock(sync_mtx);
completed = true;
sync_cond.notify_one();
}));
m->work_queue_cond.notify_one();
}
sync_cond.wait(work_queue_lock, [&] { return completed.load(); });
}
void dispatch_queue::dispatch_after(int msec, std::function<void()> func) {
impl::timer_lock _(m->timer_mtx);
m->timers.push(work_entry(func, std::chrono::steady_clock::now() +
std::chrono::milliseconds(msec)));
m->timer_cond.notify_one();
}
void dispatch_queue::dispatch_flush() {
dispatch_sync([] {});
}