Skip to content

Commit 6a0b224

Browse files
feature(profiling): add support for direct profiling of threading.Semaphore objects
1 parent deb7cf8 commit 6a0b224

File tree

2 files changed

+243
-3
lines changed

2 files changed

+243
-3
lines changed

ddtrace/profiling/collector/threading.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ class _ProfiledThreadingRLock(_lock._ProfiledLock):
1818
pass
1919

2020

21+
class _ProfiledThreadingSemaphore(_lock._ProfiledLock):
22+
pass
23+
24+
2125
class ThreadingLockCollector(_lock.LockCollector):
2226
"""Record threading.Lock usage."""
2327

@@ -48,6 +52,21 @@ def _set_patch_target(
4852
threading.RLock = value
4953

5054

55+
class ThreadingSemaphoreCollector(_lock.LockCollector):
56+
"""Record threading.Semaphore usage."""
57+
58+
PROFILED_LOCK_CLASS = _ProfiledThreadingSemaphore
59+
60+
def _get_patch_target(self) -> typing.Type[threading.Semaphore]:
61+
return threading.Semaphore
62+
63+
def _set_patch_target(
64+
self,
65+
value: typing.Any,
66+
) -> None:
67+
threading.Semaphore = value
68+
69+
5170
# Also patch threading.Thread so echion can track thread lifetimes
5271
def init_stack_v2() -> None:
5372
if config.stack.enabled and stack_v2.is_available:

tests/profiling_v2/collector/test_threading.py

Lines changed: 224 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from ddtrace.internal.datadog.profiling import ddup
2222
from ddtrace.profiling.collector.threading import ThreadingLockCollector
2323
from ddtrace.profiling.collector.threading import ThreadingRLockCollector
24+
from ddtrace.profiling.collector.threading import ThreadingSemaphoreCollector
2425
from tests.profiling.collector import pprof_utils
2526
from tests.profiling.collector import test_collector
2627
from tests.profiling.collector.lock_utils import LineNo
@@ -30,11 +31,12 @@
3031

3132

3233
# Type aliases for supported classes
33-
LockClassType = Union[Type[threading.Lock], Type[threading.RLock]]
34-
CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]]
34+
LockClassType = Union[Type[threading.Lock], Type[threading.RLock], Type[threading.Semaphore]]
35+
CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector], Type[ThreadingSemaphoreCollector]]
3536
# threading.Lock and threading.RLock are factory functions that return _thread types.
3637
# We reference the underlying _thread types directly to avoid creating instances at import time.
37-
LockClassInst = Union[_thread.LockType, _thread.RLock]
38+
# threading.Semaphore is a Python class, not a factory function.
39+
LockClassInst = Union[_thread.LockType, _thread.RLock, threading.Semaphore]
3840

3941
# Module-level globals for testing global lock profiling
4042
_test_global_lock: LockClassInst
@@ -1223,3 +1225,222 @@ def test_lock_getattr(self) -> None:
12231225
# After releasing, it should not be owned
12241226
lock.release()
12251227
assert not lock._is_owned()
1228+
1229+
1230+
class TestThreadingSemaphoreCollector(BaseThreadingLockCollectorTest):
1231+
"""Test Semaphore profiling"""
1232+
1233+
@property
1234+
def collector_class(self) -> Type[ThreadingSemaphoreCollector]:
1235+
return ThreadingSemaphoreCollector
1236+
1237+
@property
1238+
def lock_class(self) -> Type[threading.Semaphore]:
1239+
return threading.Semaphore
1240+
1241+
def test_semaphore_with_value(self) -> None:
1242+
"""Test that Semaphore works with different initial values."""
1243+
with self.collector_class(capture_pct=100):
1244+
from ddtrace.profiling.collector._lock import _ProfiledLock
1245+
1246+
# Test with value=1
1247+
sem1 = self.lock_class(1)
1248+
assert isinstance(sem1, _ProfiledLock)
1249+
assert sem1.acquire(timeout=1)
1250+
assert not sem1.acquire(timeout=0.01) # Should block
1251+
sem1.release()
1252+
1253+
# Test with value=3
1254+
sem3 = self.lock_class(3)
1255+
assert isinstance(sem3, _ProfiledLock)
1256+
for i in range(3):
1257+
assert sem3.acquire(timeout=1), f"Acquire {i+1} failed"
1258+
assert not sem3.acquire(timeout=0.01) # Should block at 4th
1259+
for i in range(3):
1260+
sem3.release()
1261+
1262+
# Test with default value (1)
1263+
sem_default = self.lock_class()
1264+
assert isinstance(sem_default, _ProfiledLock)
1265+
assert sem_default.acquire(timeout=1)
1266+
assert not sem_default.acquire(timeout=0.01) # Should block
1267+
sem_default.release()
1268+
1269+
def test_semaphore_multiple_acquires(self) -> None:
1270+
"""Test that Semaphore correctly handles multiple acquires."""
1271+
with self.collector_class(capture_pct=100):
1272+
from ddtrace.profiling.collector._lock import _ProfiledLock
1273+
1274+
sem = self.lock_class(2)
1275+
assert isinstance(sem, _ProfiledLock)
1276+
1277+
# Should be able to acquire twice
1278+
assert sem.acquire(timeout=1)
1279+
assert sem.acquire(timeout=1)
1280+
1281+
# Third acquire should fail (timeout)
1282+
assert not sem.acquire(timeout=0.01)
1283+
1284+
# Release one and try again
1285+
sem.release()
1286+
assert sem.acquire(timeout=1)
1287+
1288+
# Clean up
1289+
sem.release()
1290+
sem.release()
1291+
1292+
def test_semaphore_non_blocking_acquire(self) -> None:
1293+
"""Test non-blocking acquire behavior."""
1294+
with self.collector_class(capture_pct=100):
1295+
from ddtrace.profiling.collector._lock import _ProfiledLock
1296+
1297+
sem = self.lock_class(2)
1298+
assert isinstance(sem, _ProfiledLock)
1299+
1300+
# Non-blocking acquires should succeed immediately
1301+
assert sem.acquire(blocking=False)
1302+
assert sem.acquire(blocking=False)
1303+
1304+
# Third should fail immediately (not timeout)
1305+
assert not sem.acquire(blocking=False)
1306+
1307+
# Clean up
1308+
sem.release()
1309+
sem.release()
1310+
1311+
def test_semaphore_concurrent_threads(self) -> None:
1312+
"""Test that multiple threads can hold semaphore simultaneously."""
1313+
import time
1314+
1315+
with self.collector_class(capture_pct=100):
1316+
from ddtrace.profiling.collector._lock import _ProfiledLock
1317+
1318+
sem = self.lock_class(3) # Allow 3 threads
1319+
assert isinstance(sem, _ProfiledLock)
1320+
1321+
results = []
1322+
threads_holding = []
1323+
1324+
def worker(worker_id):
1325+
# Acquire the semaphore
1326+
if sem.acquire(timeout=2):
1327+
threads_holding.append(worker_id)
1328+
results.append(f"worker-{worker_id}-acquired")
1329+
time.sleep(0.05) # Hold it briefly
1330+
results.append(f"worker-{worker_id}-releasing")
1331+
sem.release()
1332+
else:
1333+
results.append(f"worker-{worker_id}-timeout")
1334+
1335+
# Start 5 threads, but only 3 can hold semaphore at once
1336+
threads = []
1337+
for i in range(5):
1338+
t = threading.Thread(target=worker, args=(i,))
1339+
threads.append(t)
1340+
t.start()
1341+
1342+
# Wait for all threads
1343+
for t in threads:
1344+
t.join(timeout=5)
1345+
1346+
# All workers should have acquired and released
1347+
assert len([r for r in results if "acquired" in r]) == 5
1348+
assert len([r for r in results if "releasing" in r]) == 5
1349+
1350+
# At most 3 threads should have been holding at once
1351+
# (This is approximate since we're just checking they all got through)
1352+
1353+
def test_semaphore_blocking_contention(self) -> None:
1354+
"""Test that threads block when semaphore is at capacity."""
1355+
import time
1356+
1357+
with self.collector_class(capture_pct=100):
1358+
from ddtrace.profiling.collector._lock import _ProfiledLock
1359+
1360+
sem = self.lock_class(1) # Only 1 thread allowed
1361+
assert isinstance(sem, _ProfiledLock)
1362+
1363+
acquired_times = []
1364+
released_times = []
1365+
1366+
def holder():
1367+
"""Hold the semaphore for a while"""
1368+
sem.acquire()
1369+
acquired_times.append(time.time())
1370+
time.sleep(0.1) # Hold for 100ms
1371+
released_times.append(time.time())
1372+
sem.release()
1373+
1374+
def waiter():
1375+
"""Try to acquire - should wait"""
1376+
start = time.time()
1377+
sem.acquire(timeout=1)
1378+
acquired_times.append(time.time())
1379+
wait_time = time.time() - start
1380+
sem.release()
1381+
return wait_time
1382+
1383+
# Start holder thread
1384+
holder_thread = threading.Thread(target=holder)
1385+
holder_thread.start()
1386+
time.sleep(0.01) # Ensure holder gets it first
1387+
1388+
# Start waiter thread - should block
1389+
waiter_result = []
1390+
waiter_thread = threading.Thread(target=lambda: waiter_result.append(waiter()))
1391+
waiter_thread.start()
1392+
1393+
# Wait for both
1394+
holder_thread.join(timeout=2)
1395+
waiter_thread.join(timeout=2)
1396+
1397+
# Waiter should have waited for holder to release
1398+
assert len(waiter_result) == 1
1399+
assert waiter_result[0] >= 0.08 # Should have waited ~100ms
1400+
1401+
def test_semaphore_zero_value(self) -> None:
1402+
"""Test semaphore with value=0 (initially blocking)."""
1403+
import time
1404+
1405+
with self.collector_class(capture_pct=100):
1406+
from ddtrace.profiling.collector._lock import _ProfiledLock
1407+
1408+
sem = self.lock_class(0) # No permits available
1409+
assert isinstance(sem, _ProfiledLock)
1410+
1411+
# Immediate acquire should fail
1412+
assert not sem.acquire(blocking=False)
1413+
assert not sem.acquire(timeout=0.01)
1414+
1415+
# Release to make one available
1416+
sem.release()
1417+
1418+
# Now acquire should work
1419+
assert sem.acquire(timeout=1)
1420+
sem.release()
1421+
1422+
def test_semaphore_bounded_semaphore(self) -> None:
1423+
"""Test that BoundedSemaphore gets wrapped and prevents over-release."""
1424+
with self.collector_class(capture_pct=100):
1425+
from ddtrace.profiling.collector._lock import _ProfiledLock
1426+
1427+
# BoundedSemaphore might be wrapped if it inherits from Semaphore patching
1428+
bounded_sem = threading.BoundedSemaphore(2)
1429+
1430+
# Check if it's wrapped (may not be if BoundedSemaphore needs separate collector)
1431+
if isinstance(bounded_sem, _ProfiledLock):
1432+
# Test basic functionality
1433+
assert bounded_sem.acquire(timeout=1)
1434+
bounded_sem.release()
1435+
1436+
# BoundedSemaphore should raise ValueError on over-release
1437+
bounded_sem.acquire()
1438+
bounded_sem.release()
1439+
try:
1440+
bounded_sem.release() # Over-release
1441+
assert False, "Should have raised ValueError"
1442+
except ValueError:
1443+
pass # Expected
1444+
else:
1445+
# Document that BoundedSemaphore is not currently wrapped
1446+
print("Note: BoundedSemaphore not wrapped, needs separate collector")

0 commit comments

Comments
 (0)