Skip to content

Commit 9bb8499

Browse files
authored
fix(recipe): fix deadlock in r/w lock recipe (#650)
The lock must only consider contenders with a sequence number lower than it's own sequence number as also stated in the Zookeeper recipe description for shared locks[0]. This wasn't working correctly as the ReadLock also considered WriteLocks with a higher sequence number as contenders. This can lead to a deadlock as described in #649. [0]: https://zookeeper.apache.org/doc/r3.7.0/recipes.html#Shared+Locks Closes #649
1 parent 4042a85 commit 9bb8499

File tree

2 files changed

+71
-9
lines changed

2 files changed

+71
-9
lines changed

kazoo/recipe/lock.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,20 @@ def _get_predecessor(self, node):
294294
(e.g. rlock), this and also edge cases where the lock's ephemeral node
295295
is gone.
296296
"""
297+
node_sequence = node[len(self.prefix):]
297298
children = self.client.get_children(self.path)
298299
found_self = False
299300
# Filter out the contenders using the computed regex
300301
contender_matches = []
301302
for child in children:
302303
match = self._contenders_re.search(child)
303304
if match is not None:
304-
contender_matches.append(match)
305+
contender_sequence = match.group(1)
306+
# Only consider contenders with a smaller sequence number.
307+
# A contender with a smaller sequence number has a higher
308+
# priority.
309+
if contender_sequence < node_sequence:
310+
contender_matches.append(match)
305311
if child == node:
306312
# Remember the node's match object so we can short circuit
307313
# below.
@@ -313,15 +319,13 @@ def _get_predecessor(self, node):
313319
# node was removed.
314320
raise ForceRetryError()
315321

316-
predecessor = None
317-
# Sort the contenders using the sequence number extracted by the regex,
318-
# then extract the original string.
319-
for match in sorted(contender_matches, key=lambda m: m.groups()):
320-
if match is found_self:
321-
break
322-
predecessor = match.string
322+
if not contender_matches:
323+
return None
323324

324-
return predecessor
325+
# Sort the contenders using the sequence number extracted by the regex
326+
# and return the original string of the predecessor.
327+
sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
328+
return sorted_matches[-1].string
325329

326330
def _find_node(self):
327331
children = self.client.get_children(self.path)

kazoo/tests/test_lock.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,64 @@ def test_write_lock(self):
467467
gotten = lock2.acquire(blocking=False)
468468
assert gotten is False
469469

470+
def test_rw_lock(self):
471+
reader_event = self.make_event()
472+
reader_lock = self.client.ReadLock(self. lockpath, "reader")
473+
reader_thread = self.make_thread(
474+
target=self._thread_lock_acquire_til_event,
475+
args=("reader", reader_lock, reader_event)
476+
)
477+
478+
writer_event = self.make_event()
479+
writer_lock = self.client.WriteLock(self. lockpath, "writer")
480+
writer_thread = self.make_thread(
481+
target=self._thread_lock_acquire_til_event,
482+
args=("writer", writer_lock, writer_event)
483+
)
484+
485+
# acquire a write lock ourselves first to make the others line up
486+
lock = self.client.WriteLock(self.lockpath, "test")
487+
lock.acquire()
488+
489+
reader_thread.start()
490+
writer_thread.start()
491+
492+
# wait for everyone to line up on the lock
493+
wait = self.make_wait()
494+
wait(lambda: len(lock.contenders()) == 3)
495+
contenders = lock.contenders()
496+
497+
assert contenders[0] == "test"
498+
remaining = contenders[1:]
499+
500+
# release the lock and contenders should claim it in order
501+
lock.release()
502+
503+
contender_bits = {
504+
"reader": (reader_thread, reader_event),
505+
"writer": (writer_thread, writer_event),
506+
}
507+
508+
for contender in ("reader", "writer"):
509+
thread, event = contender_bits[contender]
510+
511+
with self.condition:
512+
while not self.active_thread:
513+
self.condition.wait()
514+
assert self.active_thread == contender
515+
516+
assert lock.contenders() == remaining
517+
remaining = remaining[1:]
518+
519+
event.set()
520+
521+
with self.condition:
522+
while self.active_thread:
523+
self.condition.wait()
524+
525+
reader_thread.join()
526+
writer_thread.join()
527+
470528

471529
class TestSemaphore(KazooTestCase):
472530
def __init__(self, *args, **kw):

0 commit comments

Comments
 (0)