Skip to content

Commit bd1575d

Browse files
Handle waiting locks
1 parent cff5244 commit bd1575d

File tree

9 files changed

+218
-145
lines changed

9 files changed

+218
-145
lines changed

cmapi/cmapi_server/process_dispatchers/container.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from cmapi_server.constants import (
1414
IFLAG, LIBJEMALLOC_DEFAULT_PATH, MCS_INSTALL_BIN, ALL_MCS_PROGS,
1515
)
16-
from cmapi_server.process_dispatchers import utils as dispatcher_utils
16+
from cmapi_server.process_dispatchers.locks import release_shmem_locks
1717
from cmapi_server.exceptions import CMAPIBasicError
1818
from cmapi_server.process_dispatchers.base import BaseDispatcher
1919

@@ -223,7 +223,7 @@ def stop(
223223
# Run pre-stop lock reset before saving BRM
224224
# These stale locks can occur if the controllernode couldn't stop correctly
225225
# and they cause mcs-savebrm.py to hang
226-
dispatcher_utils.release_shmem_locks(logger)
226+
locks.release_shmem_locks(logger)
227227

228228
# start mcs-savebrm.py before stoping workernode
229229
logger.debug('Waiting to save BRM.')
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import logging
2+
import re
3+
from dataclasses import dataclass
4+
from time import sleep
5+
from typing import Optional, List
6+
from cmapi_server.constants import SHMEM_LOCKS_PATH
7+
from cmapi_server.process_dispatchers.base import BaseDispatcher
8+
9+
10+
@dataclass
11+
class LocksState:
12+
id: int
13+
name: Optional[str]
14+
readers: int = 0
15+
writers: int = 0
16+
readers_waiting: int = 0
17+
writers_waiting: int = 0
18+
mutex_locked: bool = False
19+
20+
def __str__(self):
21+
name = f"({self.name})" if self.name else ""
22+
return (f"LS {self.id}{name}: {self.readers}r/{self.writers}w" +
23+
f" {self.readers_waiting}rw/{self.writers_waiting}ww m={int(self.mutex_locked)}")
24+
25+
26+
def parse_locks_detail(cmd_output: str, logger: logging.Logger) -> List[LocksState]:
27+
"""Parse detailed per-lock state from mcs-shmem-locks output.
28+
29+
Missing or malformed numeric fields are treated as 0. A logger must be provided
30+
and will be used to emit warnings for malformed lines.
31+
"""
32+
states: List[LocksState] = []
33+
current: Optional[LocksState] = None
34+
current_id = 0
35+
36+
for raw in cmd_output.splitlines():
37+
line = raw.strip()
38+
if not line:
39+
continue
40+
41+
if line.endswith('RWLock'):
42+
# push previous
43+
if current is not None:
44+
states.append(current)
45+
current_id += 1
46+
name = line[:-len('RWLock')].strip() or None
47+
current = LocksState(id=current_id, name=name)
48+
continue
49+
50+
if current is None:
51+
continue
52+
53+
field_specs = [
54+
(r'^readers\s*=\s*(\d+)$', 'readers'),
55+
(r'^writers\s*=\s*(\d+)$', 'writers'),
56+
(r'^readers waiting\s*=\s*(\d+)$', 'readers_waiting'),
57+
(r'^writers waiting\s*=\s*(\d+)$', 'writers_waiting'),
58+
]
59+
60+
matched = False
61+
for pattern, attr in field_specs:
62+
m = re.search(pattern, line)
63+
if m:
64+
try:
65+
setattr(current, attr, int(m.group(1)))
66+
except ValueError:
67+
logger.warning('Failed to parse %s from line: %s', attr, raw)
68+
setattr(current, attr, 0)
69+
matched = True
70+
break
71+
if matched:
72+
continue
73+
74+
m = re.search(r'^mutex locked\s*=\s*(\d+)$', line)
75+
if m:
76+
try:
77+
current.mutex_locked = int(m.group(1)) != 0
78+
except ValueError:
79+
current.mutex_locked = False
80+
continue
81+
82+
# push the last one
83+
if current is not None:
84+
states.append(current)
85+
86+
return states
87+
88+
89+
def release_shmem_locks(logger: logging.Logger, max_iterations: int = 5) -> bool:
90+
"""Attempt to release active shmem locks.
91+
92+
- Inspect all locks.
93+
- Unlock writer lock (there can be only one active, but there can be multiple waiting)
94+
- Unlock each reader lock sequentially
95+
- Re-check and repeat up to max_iterations.
96+
97+
Returns True on success (no active readers/writers remain), False otherwise.
98+
"""
99+
# We adapt attempts/sleep when there are waiting locks to allow promotions
100+
attempt = 0
101+
while True:
102+
attempt += 1
103+
success, out = BaseDispatcher.exec_command(f'{SHMEM_LOCKS_PATH} --lock-id 0')
104+
if not success or not out:
105+
logger.error('Failed to inspect shmem locks during unlock (attempt %d)', attempt)
106+
return False
107+
108+
states = parse_locks_detail(out, logger=logger)
109+
active_total = sum(s.readers + s.writers for s in states)
110+
waiting_total = sum(s.readers_waiting + s.writers_waiting for s in states)
111+
if active_total == 0 and waiting_total == 0:
112+
return True
113+
114+
logger.debug(
115+
'Lock release attempt %d: active=%d waiting=%d; detailed=%s',
116+
attempt, active_total, waiting_total, states
117+
)
118+
119+
for st in states:
120+
if st.writers > 0:
121+
cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -w -u'
122+
ok, _ = BaseDispatcher.exec_command(cmd)
123+
if not ok:
124+
logger.warning('Failed to unlock writer for lock-id=%d', st.id)
125+
126+
if st.readers > 0:
127+
for _ in range(st.readers):
128+
cmd = f'{SHMEM_LOCKS_PATH} -i {st.id} -r -u'
129+
ok, _ = BaseDispatcher.exec_command(cmd)
130+
if not ok:
131+
logger.warning('Failed to unlock a reader for lock-id=%d', st.id)
132+
break
133+
134+
# Wait for state to settle; longer if we have waiting locks
135+
sleep(2 if waiting_total > 0 else 1)
136+
137+
# Allow more attempts when there are waiting locks
138+
effective_max = max_iterations if waiting_total == 0 else max(max_iterations, 15)
139+
if attempt >= effective_max:
140+
break
141+
142+
logger.error('Failed to fully release shmem locks using mcs-shmem-locks after %d attempts (active/waiting remain)', attempt)
143+
return False
144+
145+
146+
def sum_active_from_states(states: List[LocksState]) -> int:
147+
return sum(s.readers + s.writers for s in states)
148+
149+
150+
def sum_waiting_from_states(states: List[LocksState]) -> int:
151+
return sum(s.readers_waiting + s.writers_waiting for s in states)

cmapi/cmapi_server/process_dispatchers/systemd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Union, Tuple
66

77
from cmapi_server.process_dispatchers.base import BaseDispatcher
8-
from cmapi_server.process_dispatchers import utils as dispatcher_utils
8+
from cmapi_server.process_dispatchers.locks import release_shmem_locks
99

1010

1111
class SystemdDispatcher(BaseDispatcher):
@@ -168,7 +168,7 @@ def stop(
168168
# Run pre-stop lock reset before saving BRM
169169
# These stale locks can occur if the controllernode couldn't stop correctly
170170
# and they cause mcs-savebrm.py to hang
171-
dispatcher_utils.release_shmem_locks(logging.getLogger(__name__))
171+
release_shmem_locks(logging.getLogger(__name__))
172172

173173
service_name = f'{service_name}@1.service {service_name}@2.service'
174174
cls._workernode_enable(False, use_sudo)

cmapi/cmapi_server/process_dispatchers/utils.py

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)