diff --git a/.gitignore b/.gitignore index c587c52bfe9..be672525809 100644 --- a/.gitignore +++ b/.gitignore @@ -203,5 +203,6 @@ tests/appsec/iast/fixtures/taint_sinks/not_exists.txt *.debug *.dSYM/ -# Rust build artifacts +# Rust build artifacts and dependencies src/native/target* +src/native/Cargo.lock diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index 30543d1ceeb..3b4df5cca32 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -2,7 +2,6 @@ from __future__ import annotations import _thread -import abc import os.path import sys import time @@ -52,6 +51,7 @@ class _ProfiledLock: "init_location", "acquired_time", "name", + "is_internal", ) def __init__( @@ -60,6 +60,7 @@ def __init__( tracer: Optional[Tracer], max_nframes: int, capture_sampler: collector.CaptureSampler, + is_internal: bool = False, ) -> None: self.__wrapped__: Any = wrapped self.tracer: Optional[Tracer] = tracer @@ -71,6 +72,9 @@ def __init__( self.init_location: str = f"{os.path.basename(code.co_filename)}:{frame.f_lineno}" self.acquired_time: int = 0 self.name: Optional[str] = None + # If True, this lock is internal to another sync primitive (e.g., Lock inside Semaphore) + # and should not generate profile samples to avoid double-counting + self.is_internal: bool = is_internal ### DUNDER methods ### @@ -161,6 +165,11 @@ def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None: end: End timestamp in nanoseconds is_acquire: True for acquire operations, False for release operations """ + # Skip profiling for internal locks (e.g., Lock inside Semaphore/Condition) + # to avoid double-counting when multiple collectors are active + if self.is_internal: + return + handle: ddup.SampleHandle = ddup.SampleHandle() handle.push_monotonic_ns(end) @@ -262,6 +271,8 @@ class LockCollector(collector.CaptureSamplerCollector): """Record lock usage.""" PROFILED_LOCK_CLASS: Type[Any] + PATCH_MODULE: Any # e.g., threading module + PATCH_ATTR_NAME: str # e.g., "Lock", "RLock", "Semaphore" def __init__( self, @@ -275,11 +286,11 @@ def __init__( self.tracer: Optional[Tracer] = tracer self._original_lock: Any = None - @abc.abstractmethod - def _get_patch_target(self) -> Callable[..., Any]: ... + def _get_patch_target(self) -> Callable[..., Any]: + return getattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME) - @abc.abstractmethod - def _set_patch_target(self, value: Any) -> None: ... + def _set_patch_target(self, value: Any) -> None: + setattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME, value) def _start_service(self) -> None: """Start collecting lock usage.""" @@ -297,12 +308,35 @@ def patch(self) -> None: original_lock: Any = self._original_lock # Capture non-None value def _profiled_allocate_lock(*args: Any, **kwargs: Any) -> _ProfiledLock: - """Simple wrapper that returns profiled locks.""" + """Simple wrapper that returns profiled locks. + + Detects if the lock is being created from within threading.py stdlib + (i.e., internal to Semaphore/Condition) to avoid double-counting. + """ + import threading as threading_module + + # Check if caller is from threading.py (internal lock) + is_internal: bool = False + try: + # Frame 0: _profiled_allocate_lock + # Frame 1: _LockAllocatorWrapper.__call__ + # Frame 2: actual caller (threading.Lock() call site) + caller_filename = sys._getframe(2).f_code.co_filename + threading_module_file = threading_module.__file__ + if threading_module_file and caller_filename: + # Normalize paths to handle symlinks and different path formats + caller_filename_normalized = os.path.normpath(os.path.realpath(caller_filename)) + threading_file_normalized = os.path.normpath(os.path.realpath(threading_module_file)) + is_internal = caller_filename_normalized == threading_file_normalized + except (ValueError, AttributeError, OSError): + pass + return self.PROFILED_LOCK_CLASS( wrapped=original_lock(*args, **kwargs), tracer=self.tracer, max_nframes=self.nframes, capture_sampler=self._capture_sampler, + is_internal=is_internal, ) self._set_patch_target(_LockAllocatorWrapper(_profiled_allocate_lock)) diff --git a/ddtrace/profiling/collector/threading.py b/ddtrace/profiling/collector/threading.py index dc1f1404546..09a4cfba9f8 100644 --- a/ddtrace/profiling/collector/threading.py +++ b/ddtrace/profiling/collector/threading.py @@ -1,7 +1,6 @@ from __future__ import absolute_import import threading -import typing from ddtrace.internal._unpatched import _threading as ddtrace_threading from ddtrace.internal.datadog.profiling import stack_v2 @@ -18,34 +17,32 @@ class _ProfiledThreadingRLock(_lock._ProfiledLock): pass +class _ProfiledThreadingSemaphore(_lock._ProfiledLock): + pass + + class ThreadingLockCollector(_lock.LockCollector): """Record threading.Lock usage.""" PROFILED_LOCK_CLASS = _ProfiledThreadingLock - - def _get_patch_target(self) -> typing.Type[threading.Lock]: - return threading.Lock - - def _set_patch_target( - self, - value: typing.Any, - ) -> None: - threading.Lock = value + PATCH_MODULE = threading + PATCH_ATTR_NAME = "Lock" class ThreadingRLockCollector(_lock.LockCollector): """Record threading.RLock usage.""" PROFILED_LOCK_CLASS = _ProfiledThreadingRLock + PATCH_MODULE = threading + PATCH_ATTR_NAME = "RLock" + - def _get_patch_target(self) -> typing.Type[threading.RLock]: - return threading.RLock +class ThreadingSemaphoreCollector(_lock.LockCollector): + """Record threading.Semaphore usage.""" - def _set_patch_target( - self, - value: typing.Any, - ) -> None: - threading.RLock = value + PROFILED_LOCK_CLASS = _ProfiledThreadingSemaphore + PATCH_MODULE = threading + PATCH_ATTR_NAME = "Semaphore" # Also patch threading.Thread so echion can track thread lifetimes diff --git a/ddtrace/profiling/profiler.py b/ddtrace/profiling/profiler.py index d8695aa2ae6..dc3109db445 100644 --- a/ddtrace/profiling/profiler.py +++ b/ddtrace/profiling/profiler.py @@ -220,6 +220,7 @@ def start_collector(collector_class: Type) -> None: self._collectors_on_import = [ ("threading", lambda _: start_collector(threading.ThreadingLockCollector)), ("threading", lambda _: start_collector(threading.ThreadingRLockCollector)), + ("threading", lambda _: start_collector(threading.ThreadingSemaphoreCollector)), ("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)), ] diff --git a/releasenotes/notes/Added-support-for-profiling-of-threading.Semaphore-objects-to-the-Python-Lock-profiler-aa46b9a5b01ccd3d.yaml b/releasenotes/notes/Added-support-for-profiling-of-threading.Semaphore-objects-to-the-Python-Lock-profiler-aa46b9a5b01ccd3d.yaml new file mode 100644 index 00000000000..10455494622 --- /dev/null +++ b/releasenotes/notes/Added-support-for-profiling-of-threading.Semaphore-objects-to-the-Python-Lock-profiler-aa46b9a5b01ccd3d.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + profiling: Add support for ``threading.Semaphore`` locking type profiling in Python. + The Lock profiler now detects and marks "internal" Lock objects, i.e. those that are part of implementation of higher-level locking types. + One example of such higher-level primitive is ``threading.Semaphore``, which is implemented with ``threading.Condition``, which itself uses ``threading.Lock`` internally. + Marking a locks as internal will prevent it from being logged, which means the sample will only be counted once. diff --git a/tests/profiling_v2/collector/test_threading.py b/tests/profiling_v2/collector/test_threading.py index c733cb174b0..f53f7f81730 100644 --- a/tests/profiling_v2/collector/test_threading.py +++ b/tests/profiling_v2/collector/test_threading.py @@ -21,6 +21,7 @@ from ddtrace.internal.datadog.profiling import ddup from ddtrace.profiling.collector.threading import ThreadingLockCollector from ddtrace.profiling.collector.threading import ThreadingRLockCollector +from ddtrace.profiling.collector.threading import ThreadingSemaphoreCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector import test_collector from tests.profiling.collector.lock_utils import LineNo @@ -30,14 +31,22 @@ # Type aliases for supported classes -LockClassType = Union[Type[threading.Lock], Type[threading.RLock]] -CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]] +LockTypeClass = Union[Type[threading.Lock], Type[threading.RLock], Type[threading.Semaphore]] # threading.Lock and threading.RLock are factory functions that return _thread types. # We reference the underlying _thread types directly to avoid creating instances at import time. -LockClassInst = Union[_thread.LockType, _thread.RLock] +# threading.Semaphore is a Python class, not a factory function. +LockTypeInst = Union[_thread.LockType, _thread.RLock, threading.Semaphore] + +CollectorTypeClass = Union[ + Type[ThreadingLockCollector], + Type[ThreadingRLockCollector], + Type[ThreadingSemaphoreCollector], +] +# Type alias for collector instances +CollectorTypeInst = Union[ThreadingLockCollector, ThreadingRLockCollector, ThreadingSemaphoreCollector] # Module-level globals for testing global lock profiling -_test_global_lock: LockClassInst +_test_global_lock: LockTypeInst class TestBar: ... @@ -50,8 +59,8 @@ class TestBar: ... # Helper classes for testing lock collector class Foo: - def __init__(self, lock_class: LockClassType) -> None: - self.foo_lock: LockClassInst = lock_class() # !CREATE! foolock + def __init__(self, lock_class: LockTypeClass) -> None: + self.foo_lock: LockTypeInst = lock_class() # !CREATE! foolock def foo(self) -> None: with self.foo_lock: # !RELEASE! !ACQUIRE! foolock @@ -59,7 +68,7 @@ def foo(self) -> None: class Bar: - def __init__(self, lock_class: LockClassType) -> None: + def __init__(self, lock_class: LockTypeClass) -> None: self.foo: Foo = Foo(lock_class) def bar(self) -> None: @@ -77,10 +86,14 @@ def bar(self) -> None: ThreadingRLockCollector, "ThreadingRLockCollector(status=, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501 ), + ( + ThreadingSemaphoreCollector, + "ThreadingSemaphoreCollector(status=, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501 + ), ], ) def test_collector_repr( - collector_class: CollectorClassType, + collector_class: CollectorTypeClass, expected_repr: str, ) -> None: test_collector._test_repr(collector_class, expected_repr) @@ -99,22 +112,31 @@ def test_collector_repr( "RLock", r"<_ProfiledLock\(\) at test_threading\.py:{lineno}>", # noqa: E501 ), + ( + ThreadingSemaphoreCollector, + "Semaphore", + # Multiple possible formats across Python versions: + # 1. (pre-3.10) + # 2. (some 3.10-3.11 versions) + # 3. (3.12+) + r"<_ProfiledLock\((|)\) at test_threading\.py:{lineno}>", # noqa: E501 + ), ], ) def test_lock_repr( - collector_class: CollectorClassType, + collector_class: CollectorTypeClass, lock_class_name: str, expected_pattern: str, ) -> None: """Test that __repr__ shows correct format with profiling info.""" import re - collector: Union[ThreadingLockCollector, ThreadingRLockCollector] = collector_class(capture_pct=100) + collector: CollectorTypeInst = collector_class(capture_pct=100) collector.start() try: # Get the lock class from threading module AFTER patching - lock_class: LockClassType = getattr(threading, lock_class_name) - lock: LockClassInst = lock_class() # !CREATE! test_lock_repr + lock_class: LockTypeClass = getattr(threading, lock_class_name) + lock: LockTypeInst = lock_class() # !CREATE! test_lock_repr finally: collector.stop() @@ -430,11 +452,11 @@ def test_all_exceptions_suppressed_by_default() -> None: class BaseThreadingLockCollectorTest: # These should be implemented by child classes @property - def collector_class(self) -> CollectorClassType: + def collector_class(self) -> CollectorTypeClass: raise NotImplementedError("Child classes must implement collector_class") @property - def lock_class(self) -> LockClassType: + def lock_class(self) -> LockTypeClass: raise NotImplementedError("Child classes must implement lock_class") # setup_method and teardown_method which will be called before and after @@ -463,34 +485,33 @@ def teardown_method(self, method: Callable[..., None]) -> None: print("Error removing file: {}".format(e)) def test_wrapper(self) -> None: - collector: ThreadingLockCollector | ThreadingRLockCollector = self.collector_class() - with collector: + with self.collector_class(): class Foobar(object): - def __init__(self, lock_class: LockClassType) -> None: - lock: LockClassInst = lock_class() + def __init__(self, lock_class: LockTypeClass) -> None: + lock: LockTypeInst = lock_class() assert lock.acquire() lock.release() - lock: LockClassInst = self.lock_class() + lock: LockTypeInst = self.lock_class() assert lock.acquire() lock.release() # Try this way too Foobar(self.lock_class) - def test_lock_events(self): + def test_lock_events(self) -> None: # The first argument is the recorder.Recorder which is used for the # v1 exporter. We don't need it for the v2 exporter. with self.collector_class(capture_pct=100): - lock = self.lock_class() # !CREATE! test_lock_events + lock: LockTypeInst = self.lock_class() # !CREATE! test_lock_events lock.acquire() # !ACQUIRE! test_lock_events lock.release() # !RELEASE! test_lock_events # Calling upload will trigger the exporter to write to a file ddup.upload() - profile = pprof_utils.parse_newest_profile(self.output_filename) - linenos = get_lock_linenos("test_lock_events") + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) + linenos: LineNo = get_lock_linenos("test_lock_events") pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -513,11 +534,11 @@ def test_lock_events(self): def test_lock_acquire_events_class(self) -> None: with self.collector_class(capture_pct=100): - lock_class: LockClassType = self.lock_class # Capture for inner class + lock_class: LockTypeClass = self.lock_class # Capture for inner class class Foobar(object): def lockfunc(self) -> None: - lock: LockClassInst = lock_class() # !CREATE! test_lock_acquire_events_class + lock: LockTypeInst = lock_class() # !CREATE! test_lock_acquire_events_class lock.acquire() # !ACQUIRE! test_lock_acquire_events_class Foobar().lockfunc() @@ -547,10 +568,10 @@ def test_lock_events_tracer(self, tracer: Tracer) -> None: tracer=tracer, capture_pct=100, ): - lock1: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_1 + lock1: LockTypeInst = self.lock_class() # !CREATE! test_lock_events_tracer_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_2 + lock2: LockTypeInst = self.lock_class() # !CREATE! test_lock_events_tracer_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_2 lock1.release() # !RELEASE! test_lock_events_tracer_1 span_id: int = t.span_id @@ -609,7 +630,7 @@ def test_lock_events_tracer_non_web(self, tracer: Tracer) -> None: capture_pct=100, ): with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_non_web + lock2: LockTypeInst = self.lock_class() # !CREATE! test_lock_events_tracer_non_web lock2.acquire() # !ACQUIRE! test_lock_events_tracer_non_web span_id: int = t.span_id @@ -650,10 +671,10 @@ def test_lock_events_tracer_late_finish(self, tracer: Tracer) -> None: tracer=tracer, capture_pct=100, ): - lock1: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_1 + lock1: LockTypeInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_1 span: Span = tracer.start_span(name="test", span_type=span_type) # pyright: ignore[reportCallIssue] - lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_2 + lock2: LockTypeInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_2 lock1.release() # !RELEASE! test_lock_events_tracer_late_finish_1 lock2.release() # !RELEASE! test_lock_events_tracer_late_finish_2 @@ -705,10 +726,10 @@ def test_resource_not_collected(self, tracer: Tracer) -> None: tracer=tracer, capture_pct=100, ): - lock1: LockClassInst = self.lock_class() # !CREATE! test_resource_not_collected_1 + lock1: LockTypeInst = self.lock_class() # !CREATE! test_resource_not_collected_1 lock1.acquire() # !ACQUIRE! test_resource_not_collected_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2: LockClassInst = self.lock_class() # !CREATE! test_resource_not_collected_2 + lock2: LockTypeInst = self.lock_class() # !CREATE! test_resource_not_collected_2 lock2.acquire() # !ACQUIRE! test_resource_not_collected_2 lock1.release() # !RELEASE! test_resource_not_collected_1 span_id: int = t.span_id @@ -759,7 +780,7 @@ def test_resource_not_collected(self, tracer: Tracer) -> None: def test_lock_enter_exit_events(self) -> None: with self.collector_class(capture_pct=100): - th_lock: LockClassInst = self.lock_class() # !CREATE! test_lock_enter_exit_events + th_lock: LockTypeInst = self.lock_class() # !CREATE! test_lock_enter_exit_events with th_lock: # !ACQUIRE! !RELEASE! test_lock_enter_exit_events pass @@ -834,8 +855,8 @@ def test_class_member_lock(self, inspect_dir_enabled: bool) -> None: def test_private_lock(self) -> None: class Foo: - def __init__(self, lock_class: LockClassType) -> None: - self.__lock: LockClassInst = lock_class() # !CREATE! test_private_lock + def __init__(self, lock_class: LockTypeClass) -> None: + self.__lock: LockTypeInst = lock_class() # !CREATE! test_private_lock def foo(self) -> None: with self.__lock: # !RELEASE! !ACQUIRE! test_private_lock @@ -873,7 +894,7 @@ def foo(self) -> None: def test_inner_lock(self) -> None: class Bar: - def __init__(self, lock_class: LockClassType) -> None: + def __init__(self, lock_class: LockTypeClass) -> None: self.foo: Foo = Foo(lock_class) def bar(self) -> None: @@ -946,8 +967,8 @@ def test_global_locks(self) -> None: _test_global_lock = self.lock_class() # !CREATE! _test_global_lock class TestBar: - def __init__(self, lock_class: LockClassType) -> None: - self.bar_lock: LockClassInst = lock_class() # !CREATE! bar_lock + def __init__(self, lock_class: LockTypeClass) -> None: + self.bar_lock: LockTypeInst = lock_class() # !CREATE! bar_lock def bar(self) -> None: with self.bar_lock: # !ACQUIRE! !RELEASE! bar_lock @@ -1056,8 +1077,8 @@ def test_upload_resets_profile(self) -> None: def test_lock_hash(self) -> None: """Test that __hash__ allows profiled locks to be used in sets and dicts.""" with self.collector_class(capture_pct=100): - lock1: LockClassInst = self.lock_class() - lock2: LockClassInst = self.lock_class() + lock1: LockTypeInst = self.lock_class() + lock2: LockTypeInst = self.lock_class() # Different locks should have different hashes assert hash(lock1) != hash(lock2) @@ -1066,21 +1087,21 @@ def test_lock_hash(self) -> None: assert hash(lock1) == hash(lock1) # Should be usable in a set - lock_set: set[LockClassInst] = {lock1, lock2} + lock_set: set[LockTypeInst] = {lock1, lock2} assert len(lock_set) == 2 assert lock1 in lock_set assert lock2 in lock_set # Should be usable as dict keys - lock_dict: dict[LockClassInst, str] = {lock1: "first", lock2: "second"} + lock_dict: dict[LockTypeInst, str] = {lock1: "first", lock2: "second"} assert lock_dict[lock1] == "first" assert lock_dict[lock2] == "second" def test_lock_equality(self) -> None: """Test that __eq__ compares locks correctly.""" with self.collector_class(capture_pct=100): - lock1: LockClassInst = self.lock_class() - lock2: LockClassInst = self.lock_class() + lock1: LockTypeInst = self.lock_class() + lock2: LockTypeInst = self.lock_class() # Different locks should not be equal assert lock1 != lock2 @@ -1101,14 +1122,14 @@ def test_lock_equality(self) -> None: def test_lock_getattr_nonexistent(self) -> None: """Test that __getattr__ raises AttributeError for non-existent attributes.""" with self.collector_class(capture_pct=100): - lock: LockClassInst = self.lock_class() + lock: LockTypeInst = self.lock_class() with pytest.raises(AttributeError): _ = lock.this_attribute_does_not_exist # type: ignore[attr-defined] def test_lock_slots_enforced(self) -> None: """Test that __slots__ is defined on _ProfiledLock for memory efficiency.""" with self.collector_class(capture_pct=100): - lock: LockClassInst = self.lock_class() + lock: LockTypeInst = self.lock_class() from ddtrace.profiling.collector._lock import _ProfiledLock assert isinstance(lock, _ProfiledLock) @@ -1123,6 +1144,7 @@ def test_lock_slots_enforced(self) -> None: "init_location", "acquired_time", "name", + "is_internal", } assert set(_ProfiledLock.__slots__) == expected_slots @@ -1131,7 +1153,7 @@ def test_lock_profiling_overhead_reasonable(self) -> None: import time # Measure without profiling (collector stopped) - regular_lock: LockClassInst = self.lock_class() + regular_lock: LockTypeInst = self.lock_class() start: float = time.perf_counter() iterations: int = 10000 # More iterations for stable measurement for _ in range(iterations): @@ -1141,7 +1163,7 @@ def test_lock_profiling_overhead_reasonable(self) -> None: # Measure with profiling at 0% capture (should skip profiling logic) with self.collector_class(capture_pct=0): - profiled_lock: LockClassInst = self.lock_class() + profiled_lock: LockTypeInst = self.lock_class() start = time.perf_counter() for _ in range(iterations): profiled_lock.acquire() @@ -1173,7 +1195,7 @@ def lock_class(self) -> Type[threading.Lock]: def test_lock_getattr(self) -> None: """Test that __getattr__ delegates Lock-specific attributes.""" with self.collector_class(capture_pct=100): - lock: LockClassInst = self.lock_class() + lock: LockTypeInst = self.lock_class() from ddtrace.profiling.collector._lock import _ProfiledLock assert isinstance(lock, _ProfiledLock) @@ -1203,7 +1225,7 @@ def lock_class(self) -> Type[threading.RLock]: def test_lock_getattr(self) -> None: """Test that __getattr__ delegates RLock-specific attributes.""" with self.collector_class(capture_pct=100): - lock: LockClassInst = self.lock_class() + lock: LockTypeInst = self.lock_class() from ddtrace.profiling.collector._lock import _ProfiledLock assert isinstance(lock, _ProfiledLock) @@ -1223,3 +1245,126 @@ def test_lock_getattr(self) -> None: # After releasing, it should not be owned lock.release() assert not lock._is_owned() + + +class TestThreadingSemaphoreCollector(BaseThreadingLockCollectorTest): + """Test Semaphore profiling""" + + @property + def collector_class(self) -> Type[ThreadingSemaphoreCollector]: + return ThreadingSemaphoreCollector + + @property + def lock_class(self) -> Type[threading.Semaphore]: + return threading.Semaphore + + def test_stack_trace_points_to_user_code(self) -> None: + """ + Verify that Semaphore stack traces point to USER CODE, i.e. test_threading.py, + not threading.py internals (like Condition.__enter__). + """ + with self.collector_class(capture_pct=100): + sem: LockTypeInst = self.lock_class(2) # !CREATE! test_stack_trace_points_to_user_code + sem.acquire() # !ACQUIRE! test_stack_trace_points_to_user_code + sem.release() # !RELEASE! test_stack_trace_points_to_user_code + + ddup.upload() + + linenos: LineNo = get_lock_linenos("test_stack_trace_points_to_user_code") + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) + + # stack traces should show test_threading.py (this file), + # not threading.py (where Condition/Semaphore internals live) + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name=self.test_name, # Should be test function name + filename=os.path.basename(__file__), + linenos=linenos, + lock_name="sem", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name=self.test_name, + filename=os.path.basename(__file__), + linenos=linenos, + lock_name="sem", + ), + ], + ) + + def test_internal_lock_marked_correctly(self) -> None: + """Verify that locks created internally by threading.py are marked as internal (`self.is_internal == True`.""" + from ddtrace.profiling.collector.threading import ThreadingLockCollector + + # Start Lock and Semaphore collectors to capture both lock types + with ThreadingLockCollector(capture_pct=100), self.collector_class(capture_pct=100): + # Create a regular lock from user code + regular_lock: LockTypeInst = threading.Lock() + assert hasattr(regular_lock, "is_internal"), "Lock should be wrapped with is_internal attribute" + assert not regular_lock.is_internal, f"Regular lock should NOT be internal, got: {regular_lock.is_internal}" # pyright: ignore[reportAttributeAccessIssue] + + # Create a semaphore - it should NOT be internal + sem: LockTypeInst = self.lock_class(1) + assert not sem.is_internal, f"Semaphore should NOT be internal, got: {sem.is_internal}" # pyright: ignore[reportAttributeAccessIssue] + + # Access the internal lock (Semaphore -> Condition -> Lock) + # The Condition is at sem._cond, and its lock is at sem._cond._lock + internal_lock: LockTypeInst = sem._cond._lock # pyright: ignore[reportAttributeAccessIssue] + assert hasattr(internal_lock, "is_internal"), "Internal lock should be wrapped" + assert internal_lock.is_internal, ( # pyright: ignore[reportAttributeAccessIssue] + "Lock created by threading.py (inside Condition) SHOULD be marked as internal." + ) + + def test_no_double_counting_with_lock_collector(self) -> None: + """ + Verify that we don't double-count Lock() objects with wrapped Semaphore objects, + since Semaphore internally uses Condition which uses Lock. + """ + from ddtrace.profiling.collector.threading import ThreadingLockCollector + + # Start Lock and Semaphore collectors to capture both lock types + with ThreadingLockCollector(capture_pct=100), self.collector_class(capture_pct=100): + # Internally, this creates: Semaphore -> Condition -> Lock + # Both the Semaphore AND the internal Lock are wrapped, but only Semaphore should be profiled + sem: LockTypeInst = self.lock_class(1) # !CREATE! test_no_double_counting + sem.acquire() # !ACQUIRE! test_no_double_counting + sem.release() # !RELEASE! test_no_double_counting + + ddup.upload() + + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) + + # Count lock events (we expect 1 and only 1 acquire / release pair of samples.) + acquire_samples_count: int = len(pprof_utils.get_samples_with_value_type(profile, "lock-acquire")) + release_samples_count: int = len(pprof_utils.get_samples_with_value_type(profile, "lock-release")) + + # Should have exactly 1 event! + # 1 event = Semaphore profiled, internal Lock skipped (correct) + # 2+ events = Both Semaphore AND internal Lock profiled (double-counting bug) + assert acquire_samples_count == 1, f"Expected 1 acquire event (Semaphore only), got {acquire_samples_count}." + assert release_samples_count == 1, f"Expected 1 release event (Semaphore only), got {release_samples_count}." + + # Verify the single event is the Semaphore (not the internal Lock) + linenos: LineNo = get_lock_linenos("test_no_double_counting") + pprof_utils.assert_lock_events( + profile, + expected_acquire_events=[ + pprof_utils.LockAcquireEvent( + caller_name=self.test_name, + filename=os.path.basename(__file__), + linenos=linenos, + lock_name="sem", + ), + ], + expected_release_events=[ + pprof_utils.LockReleaseEvent( + caller_name=self.test_name, + filename=os.path.basename(__file__), + linenos=linenos, + lock_name="sem", + ), + ], + )