Skip to content

Commit 690141f

Browse files
bugaevcawesomekling
authored andcommitted
LibPthread: Reimplement semaphores
This implementation does not use locking or condition variables internally; it's purely based on atomics and futexes. Notably, concurrent sem_wait() and sem_post() calls can run *completely in parallel* without slowing each other down, as long as there are empty slots for them all to succeed without blocking. Additionally, sem_wait() never executes an atomic operation with release ordering, and sem_post() never executes an atomic operation with acquire ordering (unless you count the syscall). This means the compiler and the hardware are free to reorder code *into* the critical section.
1 parent 00d8dbe commit 690141f

File tree

2 files changed

+101
-126
lines changed

2 files changed

+101
-126
lines changed
+99-123
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,37 @@
11
/*
2-
* Copyright (c) 2021, the SerenityOS developers.
2+
* Copyright (c) 2021, Gunnar Beutner <[email protected]>
3+
* Copyright (c) 2021, Sergey Bugaev <[email protected]>
34
*
45
* SPDX-License-Identifier: BSD-2-Clause
56
*/
67

78
#include <AK/Assertions.h>
9+
#include <AK/Atomic.h>
10+
#include <AK/Types.h>
811
#include <errno.h>
912
#include <semaphore.h>
13+
#include <serenity.h>
1014

11-
int sem_close(sem_t*)
15+
// Whether sem_wait() or sem_post() is responsible for waking any sleeping
16+
// threads.
17+
static constexpr u32 POST_WAKES = 1 << 31;
18+
19+
sem_t* sem_open(const char*, int, ...)
1220
{
1321
errno = ENOSYS;
14-
return -1;
22+
return nullptr;
1523
}
1624

17-
int sem_destroy(sem_t* sem)
25+
int sem_close(sem_t*)
1826
{
19-
auto rc = pthread_mutex_destroy(&sem->mtx);
20-
if (rc != 0) {
21-
errno = rc;
22-
return -1;
23-
}
24-
25-
rc = pthread_cond_destroy(&sem->cv);
26-
if (rc != 0) {
27-
errno = rc;
28-
return -1;
29-
}
30-
31-
return 0;
27+
errno = ENOSYS;
28+
return -1;
3229
}
3330

34-
int sem_getvalue(sem_t* sem, int* sval)
31+
int sem_unlink(const char*)
3532
{
36-
auto rc = pthread_mutex_trylock(&sem->mtx);
37-
38-
if (rc == EBUSY) {
39-
*sval = 0;
40-
return 0;
41-
}
42-
43-
if (rc != 0) {
44-
errno = rc;
45-
return -1;
46-
}
47-
48-
*sval = sem->value;
49-
50-
pthread_mutex_unlock(&sem->mtx);
51-
52-
return 0;
33+
errno = ENOSYS;
34+
return -1;
5335
}
5436

5537
int sem_init(sem_t* sem, int shared, unsigned int value)
@@ -64,116 +46,110 @@ int sem_init(sem_t* sem, int shared, unsigned int value)
6446
return -1;
6547
}
6648

67-
auto rc = pthread_mutex_init(&sem->mtx, nullptr);
68-
if (rc != 0) {
69-
errno = rc;
70-
return -1;
71-
}
72-
73-
rc = pthread_cond_init(&sem->cv, nullptr);
74-
if (rc != 0) {
75-
errno = rc;
76-
return -1;
77-
}
78-
7949
sem->value = value;
80-
8150
return 0;
8251
}
8352

84-
sem_t* sem_open(const char*, int, ...)
53+
int sem_destroy(sem_t*)
8554
{
86-
errno = ENOSYS;
87-
return nullptr;
55+
return 0;
8856
}
8957

90-
int sem_post(sem_t* sem)
58+
int sem_getvalue(sem_t* sem, int* sval)
9159
{
92-
auto rc = pthread_mutex_lock(&sem->mtx);
93-
if (rc != 0) {
94-
errno = rc;
95-
return -1;
96-
}
97-
98-
if (sem->value == SEM_VALUE_MAX) {
99-
pthread_mutex_unlock(&sem->mtx);
100-
errno = EOVERFLOW;
101-
return -1;
102-
}
103-
104-
sem->value++;
105-
106-
rc = pthread_cond_signal(&sem->cv);
107-
if (rc != 0) {
108-
pthread_mutex_unlock(&sem->mtx);
109-
errno = rc;
110-
return -1;
111-
}
112-
113-
rc = pthread_mutex_unlock(&sem->mtx);
114-
if (rc != 0) {
115-
errno = rc;
116-
return -1;
117-
}
118-
60+
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
61+
*sval = value & ~POST_WAKES;
11962
return 0;
12063
}
12164

122-
int sem_trywait(sem_t* sem)
65+
int sem_post(sem_t* sem)
12366
{
124-
auto rc = pthread_mutex_lock(&sem->mtx);
125-
if (rc != 0) {
126-
errno = rc;
127-
return -1;
128-
}
129-
130-
if (sem->value == 0) {
131-
pthread_mutex_unlock(&sem->mtx);
132-
errno = EAGAIN;
133-
return -1;
134-
}
135-
136-
sem->value--;
137-
138-
rc = pthread_mutex_unlock(&sem->mtx);
139-
if (rc != 0) {
140-
errno = rc;
141-
return -1;
142-
}
67+
u32 value = AK::atomic_fetch_add(&sem->value, 1u, AK::memory_order_release);
68+
// Fast path: no need to wake.
69+
if (!(value & POST_WAKES)) [[likely]]
70+
return 0;
14371

72+
// Pass the responsibility for waking more threads if more slots become
73+
// available later to sem_wait() in the thread we're about to wake, as
74+
// opposed to further sem_post() calls that free up those slots.
75+
value = AK::atomic_fetch_and(&sem->value, ~POST_WAKES, AK::memory_order_relaxed);
76+
// Check if another sem_post() call has handled it already.
77+
if (!(value & POST_WAKES)) [[likely]]
78+
return 0;
79+
int rc = futex_wake(&sem->value, 1);
80+
VERIFY(rc == 0);
14481
return 0;
14582
}
14683

147-
int sem_unlink(const char*)
84+
int sem_trywait(sem_t* sem)
14885
{
149-
errno = ENOSYS;
150-
return -1;
86+
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
87+
u32 count = value & ~POST_WAKES;
88+
if (count == 0)
89+
return EAGAIN;
90+
// Decrement the count without touching the flag.
91+
u32 desired = (count - 1) | (value & POST_WAKES);
92+
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
93+
if (exchanged) [[likely]]
94+
return 0;
95+
else
96+
return EAGAIN;
15197
}
15298

15399
int sem_wait(sem_t* sem)
154100
{
155-
auto rc = pthread_mutex_lock(&sem->mtx);
156-
if (rc != 0) {
157-
errno = rc;
158-
return -1;
159-
}
101+
return sem_timedwait(sem, nullptr);
102+
}
160103

161-
while (sem->value == 0) {
162-
rc = pthread_cond_wait(&sem->cv, &sem->mtx);
163-
if (rc != 0) {
164-
pthread_mutex_unlock(&sem->mtx);
165-
errno = rc;
166-
return -1;
104+
int sem_timedwait(sem_t* sem, const struct timespec* abstime)
105+
{
106+
u32 value = AK::atomic_load(&sem->value, AK::memory_order_relaxed);
107+
bool responsible_for_waking = false;
108+
109+
while (true) {
110+
u32 count = value & ~POST_WAKES;
111+
if (count > 0) [[likely]] {
112+
// It looks like there are some free slots.
113+
u32 whether_post_wakes = value & POST_WAKES;
114+
bool going_to_wake = false;
115+
if (responsible_for_waking && !whether_post_wakes) {
116+
// If we have ourselves been woken up previously, and the
117+
// POST_WAKES flag is not set, that means some more slots might
118+
// be available now, and it's us who has to wake up additional
119+
// threads.
120+
if (count > 1) [[unlikely]]
121+
going_to_wake = true;
122+
// Pass the responsibility for waking up further threads back to
123+
// sem_post() calls. In particular, we don't want the threads
124+
// we're about to wake to try to wake anyone else.
125+
whether_post_wakes = POST_WAKES;
126+
}
127+
// Now, try to commit this.
128+
u32 desired = (count - 1) | whether_post_wakes;
129+
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, desired, AK::memory_order_acquire);
130+
if (!exchanged) [[unlikely]]
131+
// Re-evaluate.
132+
continue;
133+
if (going_to_wake) [[unlikely]] {
134+
int rc = futex_wake(&sem->value, count - 1);
135+
VERIFY(rc >= 0);
136+
}
137+
return 0;
167138
}
139+
// We're probably going to sleep, so attempt to set the flag. We do not
140+
// commit to sleeping yet, though, as setting the flag may fail and
141+
// cause us to reevaluate what we're doing.
142+
if (value == 0) {
143+
bool exchanged = AK::atomic_compare_exchange_strong(&sem->value, value, POST_WAKES, AK::memory_order_relaxed);
144+
if (!exchanged) [[unlikely]]
145+
// Re-evaluate.
146+
continue;
147+
value = POST_WAKES;
148+
}
149+
// At this point, we're committed to sleeping.
150+
responsible_for_waking = true;
151+
futex_wait(&sem->value, value, abstime, CLOCK_REALTIME);
152+
// This is the state we will probably see upon being waked:
153+
value = 1;
168154
}
169-
170-
sem->value--;
171-
172-
rc = pthread_mutex_unlock(&sem->mtx);
173-
if (rc != 0) {
174-
errno = rc;
175-
return -1;
176-
}
177-
178-
return 0;
179155
}

Userland/Libraries/LibPthread/semaphore.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
__BEGIN_DECLS
1515

1616
typedef struct {
17-
pthread_mutex_t mtx;
18-
pthread_cond_t cv;
19-
int value;
17+
uint32_t value;
2018
} sem_t;
2119

2220
int sem_close(sem_t*);
@@ -28,6 +26,7 @@ int sem_post(sem_t*);
2826
int sem_trywait(sem_t*);
2927
int sem_unlink(const char*);
3028
int sem_wait(sem_t*);
29+
int sem_timedwait(sem_t*, const struct timespec* abstime);
3130

3231
#define SEM_VALUE_MAX INT_MAX
3332

0 commit comments

Comments
 (0)