Skip to content

Commit e916995

Browse files
authored
Faster locking (#45)
1 parent 93d0df0 commit e916995

File tree

3 files changed

+114
-113
lines changed

3 files changed

+114
-113
lines changed

dictdatabase/locking.py

Lines changed: 90 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -23,55 +23,78 @@ def os_touch(path: str):
2323
os.close(fd)
2424

2525

26-
def get_lock_file_names(ddb_dir: str, db_name: str, *, id: str = None, time_ns: int = None, stage: str = None, mode: str = None) -> list[str]:
27-
"""
28-
Returns a list of lock file names in the configured storage directory as
29-
strings. The directory is not include, only the file names are returned.
30-
If any of the arguments are None, they are treated as a wildcard.
26+
class LockFileMeta:
3127

32-
Do not use glob, because it is slow. Compiling the regex pattern
33-
takes a long time, and it is called many times.
34-
"""
35-
res = []
36-
for x in os.listdir(ddb_dir):
37-
if not x.endswith(".lock"):
38-
continue
39-
f_name, f_id, f_time_ns, f_stage, f_mode, _ = x.split(".")
40-
if f_name != db_name:
41-
continue
42-
if id is not None and f_id != id:
43-
continue
44-
if time_ns is not None and f_time_ns != str(time_ns):
45-
continue
46-
if stage is not None and f_stage != stage:
47-
continue
48-
if mode is not None and f_mode != mode:
49-
continue
50-
res.append(x)
51-
return res
52-
53-
54-
def any_lock_files(ddb_dir: str, db_name: str, *, id: str = None, time_ns: int = None, stage: str = None, mode: str = None) -> bool:
55-
"""
56-
Like get_lock_file_names, but returns True if there is at least one
57-
lock file with the given arguments.
58-
"""
59-
for x in os.listdir(ddb_dir):
60-
if not x.endswith(".lock"):
61-
continue
62-
f_name, f_id, f_time_ns, f_stage, f_mode, _ = x.split(".")
63-
if f_name != db_name:
64-
continue
65-
if id is not None and f_id != id:
66-
continue
67-
if time_ns is not None and f_time_ns != str(time_ns):
68-
continue
69-
if stage is not None and f_stage != stage:
70-
continue
71-
if mode is not None and f_mode != mode:
72-
continue
73-
return True
74-
return False
28+
__slots__ = ("ddb_dir", "name", "id", "time_ns", "stage", "mode")
29+
30+
ddb_dir: str
31+
name: str
32+
id: str
33+
time_ns: str
34+
stage: str
35+
mode: str
36+
37+
def __init__(self, ddb_dir, name, id, time_ns, stage, mode):
38+
self.ddb_dir = ddb_dir
39+
self.name = name
40+
self.id = id
41+
self.time_ns = time_ns
42+
self.stage = stage
43+
self.mode = mode
44+
45+
@property
46+
def path(self):
47+
lock_file = f"{self.name}.{self.id}.{self.time_ns}.{self.stage}.{self.mode}.lock"
48+
return os.path.join(self.ddb_dir, lock_file)
49+
50+
51+
class FileLocksSnapshot:
52+
53+
__slots__ = ("any_has_locks", "any_write_locks", "any_has_write_locks", "locks")
54+
55+
any_has_locks: bool
56+
any_write_locks: bool
57+
any_has_write_locks: bool
58+
locks: list[LockFileMeta]
59+
60+
def __init__(self, ddb_dir, db_name, ignore_during_orphan_check):
61+
self.any_has_locks = False
62+
self.any_write_locks = False
63+
self.any_has_write_locks = False
64+
self.locks = []
65+
66+
for x in os.listdir(ddb_dir):
67+
if not x.endswith(".lock"):
68+
continue
69+
f_name, f_id, f_time_ns, f_stage, f_mode, _ = x.split(".")
70+
if f_name != db_name:
71+
continue
72+
73+
lock_meta = LockFileMeta(ddb_dir, f_name, f_id, f_time_ns, f_stage, f_mode)
74+
75+
# Remove orphaned locks
76+
if lock_meta.path != ignore_during_orphan_check:
77+
lock_age = time.monotonic_ns() - int(lock_meta.time_ns)
78+
if lock_age > LOCK_TIMEOUT * 1_000_000_000:
79+
os.unlink(lock_meta.path)
80+
print(f"Found orphaned lock ({lock_meta.path}). Remove")
81+
continue
82+
83+
self.locks.append(lock_meta)
84+
85+
# Lock existence
86+
if lock_meta.stage == "has":
87+
self.any_has_locks = True
88+
if lock_meta.mode == "write":
89+
self.any_has_write_locks = True
90+
if lock_meta.mode == "write":
91+
self.any_write_locks = True
92+
93+
def lock_exists(self, id: str, stage: str, mode: str) -> bool:
94+
return any(x.id == id and x.stage == stage and x.mode == mode for x in self.locks)
95+
96+
def get_need_locks(self) -> list[LockFileMeta]:
97+
return [l for l in self.locks if l.stage == "need"]
7598

7699

77100
class AbstractLock:
@@ -80,14 +103,15 @@ class AbstractLock:
80103
call super().__init__(...) and then only exit __init__ when the lock is aquired.
81104
"""
82105

83-
__slots__ = ("id", "time_ns", "db_name", "need_path", "path", "ddb_dir")
106+
__slots__ = ("id", "time_ns", "db_name", "need_path", "path", "ddb_dir", "snapshot")
84107

85108
id: str
86109
time_ns: int
87110
db_name: str
88111
need_path: str
89112
path: str
90113
ddb_dir: str
114+
snapshot: FileLocksSnapshot
91115

92116
def __init__(self, db_name: str):
93117
"""
@@ -98,6 +122,8 @@ def __init__(self, db_name: str):
98122
self.time_ns = time.monotonic_ns()
99123
self.db_name = db_name.replace("/", "___").replace(".", "____")
100124
self.ddb_dir = os.path.join(config.storage_directory, ".ddb")
125+
if not os.path.isdir(self.ddb_dir):
126+
os.mkdir(self.ddb_dir)
101127

102128
def _lock(self):
103129
raise NotImplementedError
@@ -119,35 +145,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
119145
self._unlock()
120146

121147
def make_lock_path(self, stage: str, mode: str) -> str:
122-
if not os.path.isdir(self.ddb_dir):
123-
os.mkdir(self.ddb_dir)
124148
return os.path.join(self.ddb_dir, f"{self.db_name}.{self.id}.{self.time_ns}.{stage}.{mode}.lock")
125149

126150
def is_oldest_need_lock(self) -> bool:
127-
# len(need_locks) is at least 1 since this function is only called if
128-
# there is a need_lock
129-
need_locks = get_lock_file_names(self.ddb_dir, self.db_name, stage="need")
130-
# Get need locks id and time_ns
131-
need_locks_id_time = []
132-
for lock in need_locks:
133-
_, l_id, l_time_ns, _, _, _ = lock.split(".")
134-
need_locks_id_time.append((l_id, l_time_ns))
151+
# len(need_locks) is at least 1 since this function is only called if there is a need_lock
152+
need_locks = self.snapshot.get_need_locks()
135153
# Sort by time_ns. If multiple, the the one with the smaller id is first
136-
need_locks_id_time = sorted(need_locks_id_time, key=lambda x: int(x[0])) # Sort by id
137-
need_locks_id_time = sorted(need_locks_id_time, key=lambda x: int(x[1])) # Sort by time_ns
138-
return need_locks_id_time[0][0] == self.id
139-
140-
def remove_orphaned_locks(self):
141-
for lock_name in get_lock_file_names(self.ddb_dir, self.db_name):
142-
lock_path = os.path.join(self.ddb_dir, lock_name)
143-
if self.need_path == lock_path:
144-
continue
145-
_, _, time_ns, _, _, _ = lock_name.split(".")
146-
if time.monotonic_ns() - int(time_ns) > LOCK_TIMEOUT * 1_000_000_000:
147-
os.unlink(lock_path)
148-
print(f"Found orphaned lock ({lock_name}). Remove")
149-
150-
154+
need_locks = sorted(need_locks, key=lambda l: int(l.id))
155+
need_locks = sorted(need_locks, key=lambda l: int(l.time_ns))
156+
return need_locks[0].id == self.id
151157

152158

153159
class ReadLock(AbstractLock):
@@ -157,7 +163,8 @@ def _lock(self):
157163
os_touch(self.need_path)
158164

159165
# Except if current thread already has a read lock
160-
if any_lock_files(self.ddb_dir, self.db_name, id=self.id, stage="has", mode="read"):
166+
self.snapshot = FileLocksSnapshot(self.ddb_dir, self.db_name, self.need_path)
167+
if self.snapshot.lock_exists(self.id, "has", "read"):
161168
os.unlink(self.need_path)
162169
raise RuntimeError("Thread already has a read lock. Do not try to obtain a read lock twice.")
163170

@@ -166,40 +173,38 @@ def _lock(self):
166173

167174
# Iterate until this is the oldest need* lock and no haswrite locks exist, or no *write locks exist
168175
while True:
169-
self.remove_orphaned_locks()
170176
# If no writing is happening, allow unlimited reading
171-
if not any_lock_files(self.ddb_dir, self.db_name, mode="write"):
177+
if not self.snapshot.any_write_locks:
172178
os_touch(self.path)
173179
os.unlink(self.need_path)
174180
return
175181
# A needwrite or haswrite lock exists
176-
if self.is_oldest_need_lock() and not any_lock_files(self.ddb_dir, self.db_name, stage="has", mode="write"):
182+
if not self.snapshot.any_has_write_locks and self.is_oldest_need_lock():
177183
os_touch(self.path)
178184
os.unlink(self.need_path)
179185
return
180186
time.sleep(SLEEP_TIMEOUT)
181-
187+
self.snapshot = FileLocksSnapshot(self.ddb_dir, self.db_name, self.need_path)
182188

183189

184190
class WriteLock(AbstractLock):
185191
def _lock(self):
186192
# Instantly signal that we need to write
193+
self.path = self.make_lock_path("has", "write")
187194
self.need_path = self.make_lock_path("need", "write")
188195
os_touch(self.need_path)
189196

190197
# Except if current thread already has a write lock
191-
if any_lock_files(self.ddb_dir, self.db_name, id=self.id, stage="has", mode="write"):
198+
self.snapshot = FileLocksSnapshot(self.ddb_dir, self.db_name, self.need_path)
199+
if self.snapshot.lock_exists(self.id, "has", "write"):
192200
os.unlink(self.need_path)
193201
raise RuntimeError("Thread already has a write lock. Do try to obtain a write lock twice.")
194202

195-
# Make path of the hyptoetical haswrite lock
196-
self.path = self.make_lock_path("has", "write")
197-
198203
# Iterate until this is the oldest need* lock and no has* locks exist
199204
while True:
200-
self.remove_orphaned_locks()
201-
if self.is_oldest_need_lock() and not any_lock_files(self.ddb_dir, self.db_name, stage="has"):
205+
if not self.snapshot.any_has_locks and self.is_oldest_need_lock():
202206
os_touch(self.path)
203207
os.unlink(self.need_path)
204208
return
205209
time.sleep(SLEEP_TIMEOUT)
210+
self.snapshot = FileLocksSnapshot(self.ddb_dir, self.db_name, self.need_path)

tests/benchmark/run_parallel.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77
import os
88
from dataclasses import dataclass
9-
from pyinstrument import Profiler
109
import random
1110
from path_dict import PathDict
1211

tests/test_locking.py

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,44 +15,41 @@ def test_double_lock_exception(use_test_dir, use_compression):
1515
def test_get_lock_names(use_test_dir, use_compression):
1616
lock = locking.ReadLock("db")
1717
lock._lock()
18-
assert locking.get_lock_file_names(lock.ddb_dir, "none") == []
19-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db")) == 1
20-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", id=str(threading.get_native_id()))) == 1
21-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", id="none")) == 0
22-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", time_ns=lock.time_ns)) == 1
23-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", time_ns="none")) == 0
24-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", stage="has")) == 1
25-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", stage="none")) == 0
26-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", mode="read")) == 1
27-
assert len(locking.get_lock_file_names(lock.ddb_dir, "db", mode="none")) == 0
28-
lock._unlock()
2918

19+
ls = locking.FileLocksSnapshot(lock.ddb_dir, "none", lock.need_path)
20+
assert ls.locks == []
21+
ls = locking.FileLocksSnapshot(lock.ddb_dir, "db", lock.need_path)
22+
assert len(ls.locks) == 1
23+
24+
assert ls.locks[0].id == str(threading.get_native_id())
25+
assert ls.locks[0].time_ns == str(lock.time_ns)
26+
assert ls.locks[0].stage == "has"
27+
assert ls.locks[0].mode == "read"
28+
29+
assert ls.any_has_locks
30+
assert not ls.any_write_locks
31+
assert not ls.any_has_write_locks
3032

31-
def test_count_lock_files(use_test_dir, use_compression):
32-
lock = locking.ReadLock("db")
33-
lock._lock()
34-
assert locking.get_lock_file_names(lock.ddb_dir, "none") == []
35-
assert locking.any_lock_files(lock.ddb_dir, "db")
36-
assert locking.any_lock_files(lock.ddb_dir, "db", id=str(threading.get_native_id()))
37-
assert not locking.any_lock_files(lock.ddb_dir, "db", id="none")
38-
assert locking.any_lock_files(lock.ddb_dir, "db", time_ns=lock.time_ns)
39-
assert not locking.any_lock_files(lock.ddb_dir, "db", time_ns="none")
40-
assert locking.any_lock_files(lock.ddb_dir, "db", stage="has")
41-
assert not locking.any_lock_files(lock.ddb_dir, "db", stage="none")
42-
assert locking.any_lock_files(lock.ddb_dir, "db", mode="read")
43-
assert not locking.any_lock_files(lock.ddb_dir, "db", mode="none")
4433
lock._unlock()
4534

4635

36+
37+
38+
4739
def test_remove_orphaned_locks(use_test_dir):
4840
prev_config = locking.LOCK_TIMEOUT
4941
locking.LOCK_TIMEOUT = 0.1
5042
lock = locking.ReadLock("db")
5143
lock._lock()
44+
45+
ls = locking.FileLocksSnapshot(lock.ddb_dir, "db", lock.need_path)
46+
assert len(ls.locks) == 1
47+
5248
time.sleep(0.2)
53-
assert locking.any_lock_files(lock.ddb_dir, "db")
54-
lock.remove_orphaned_locks()
55-
assert not locking.any_lock_files(lock.ddb_dir, "db")
49+
# Trigger the removal of orphaned locks
50+
ls = locking.FileLocksSnapshot(lock.ddb_dir, "db", lock.need_path)
51+
52+
assert len(ls.locks) == 0
5653
locking.LOCK_TIMEOUT = prev_config
5754

5855

0 commit comments

Comments
 (0)