-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy paththread_synchronization.hpp
206 lines (179 loc) · 5.44 KB
/
thread_synchronization.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
//================================================================================================
/// @file thread_synchronization.hpp
///
/// @brief A single header file to automatically include the correct thread synchronization
/// @author Daan Steenbergen
///
/// @copyright 2024 The Open-Agriculture Developers
//================================================================================================
#ifndef THREAD_SYNCHRONIZATION_HPP
#define THREAD_SYNCHRONIZATION_HPP
#if defined CAN_STACK_DISABLE_THREADS || defined ARDUINO
#include <queue>
namespace isobus
{
/// @brief A dummy mutex class when treading is disabled.
class Mutex
{
};
/// @brief A dummy recursive mutex class when treading is disabled.
class RecursiveMutex
{
};
}
/// @brief Disabled LOCK_GUARD macro since threads are disabled.
#define LOCK_GUARD(type, x)
/// @brief A template class for a queue, since threads are disabled this is a simple queue.
/// @tparam T The item type for the queue.
template<typename T>
class LockFreeQueue
{
public:
/// @brief Constructor for the lock free queue.
explicit LockFreeQueue(std::size_t) {}
/// @brief Push an item to the queue.
/// @param item The item to push to the queue.
/// @return Simply returns true, since this version of the queue is not limited in size.
bool push(const T &item)
{
queue.push(item);
return true;
}
/// @brief Peek at the next item in the queue.
/// @param item The item to peek at in the queue.
/// @return True if the item was peeked at in the queue, false if the queue is empty.
bool peek(T &item)
{
if (queue.empty())
{
return false;
}
item = queue.front();
return true;
}
/// @brief Pop an item from the queue.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop()
{
if (queue.empty())
{
return false;
}
queue.pop();
return true;
}
/// @brief Check if the queue is full.
/// @return Always returns false, since this version of the queue is not limited in size.
bool is_full() const
{
return false;
}
/// @brief Clear the queue.
void clear()
{
queue = {};
}
private:
std::queue<T> queue; ///< The queue
};
#else
#include <atomic>
#include <cassert>
#include <mutex>
#include <vector>
namespace isobus
{
using Mutex = std::mutex;
using RecursiveMutex = std::recursive_mutex;
}
/// @brief A macro to automatically lock a mutex and unlock it when the scope ends.
/// @param type The type of the mutex.
/// @param x The mutex to lock.
#define LOCK_GUARD(type, x) const std::lock_guard<type> x##Lock(x)
/// @brief A template class for a lock free queue.
/// @tparam T The item type for the queue.
template<typename T>
class LockFreeQueue
{
public:
/// @brief Constructor for the lock free queue.
explicit LockFreeQueue(std::size_t size) :
buffer(size), capacity(size)
{
// Validate the size of the queue, if assertion is disabled, set the size to 1.
assert(size > 0 && "The size of the queue must be greater than 0.");
if (size == 0)
{
size = 1;
}
}
/// @brief Push an item to the queue.
/// @param item The item to push to the queue.
/// @return True if the item was pushed to the queue, false if the queue is full.
bool push(const T &item)
{
const auto currentWriteIndex = writeIndex.load(std::memory_order_relaxed);
const auto nextWriteIndex = nextIndex(currentWriteIndex);
if (nextWriteIndex == readIndex.load(std::memory_order_acquire))
{
// The buffer is full.
return false;
}
buffer[currentWriteIndex] = item;
writeIndex.store(nextWriteIndex, std::memory_order_release);
return true;
}
/// @brief Peek at the next item in the queue.
/// @param item The item to peek at in the queue.
/// @return True if the item was peeked at in the queue, false if the queue is empty.
bool peek(T &item)
{
const auto currentReadIndex = readIndex.load(std::memory_order_relaxed);
if (currentReadIndex == writeIndex.load(std::memory_order_acquire))
{
// The buffer is empty.
return false;
}
item = buffer[currentReadIndex];
return true;
}
/// @brief Pop an item from the queue.
/// @return True if the item was popped from the queue, false if the queue is empty.
bool pop()
{
const auto currentReadIndex = readIndex.load(std::memory_order_relaxed);
if (currentReadIndex == writeIndex.load(std::memory_order_acquire))
{
// The buffer is empty.
return false;
}
readIndex.store(nextIndex(currentReadIndex), std::memory_order_release);
return true;
}
/// @brief Check if the queue is full.
/// @return True if the queue is full, false if the queue is not full.
bool is_full() const
{
return nextIndex(writeIndex.load(std::memory_order_acquire)) == readIndex.load(std::memory_order_acquire);
}
/// @brief Clear the queue.
void clear()
{
// Simply move the read index to the write index.
readIndex.store(writeIndex.load(std::memory_order_acquire), std::memory_order_release);
}
private:
std::vector<T> buffer; ///< The buffer for the circular buffer.
std::atomic<std::size_t> readIndex = { 0 }; ///< The read index for the circular buffer.
std::atomic<std::size_t> writeIndex = { 0 }; ///< The write index for the circular buffer.
const std::size_t capacity; ///< The capacity of the circular buffer.
/// @brief Get the next index in the circular buffer.
/// @param current The current index.
/// @return The next index in the circular buffer.
std::size_t nextIndex(std::size_t current) const
{
return (current + 1) % capacity;
}
};
#endif
#endif // THREAD_SYNCHRONIZATION_HPP