From 38b942dbefe15bea961b89ade3050a7d6f2ca3bb Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Thu, 26 Dec 2024 16:06:13 +0100 Subject: [PATCH 1/8] Proof of Concept support for multithreading --- perfsephone/plugin.py | 65 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index 233d527..38998de 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -5,10 +5,23 @@ import inspect import json +import threading from contextlib import contextmanager from dataclasses import asdict from pathlib import Path -from typing import Any, Dict, Final, Generator, List, Optional, Sequence, Tuple, Union +from types import FrameType +from typing import ( + Any, + Dict, + Final, + Generator, + List, + Literal, + Optional, + Sequence, + Tuple, + Union, +) import pyinstrument import pytest @@ -27,6 +40,42 @@ PERFETTO_ARG_NAME: Final[str] = "perfetto_path" +class ThreadProfiler: + def __init__(self) -> None: + self.run_stack_depth: int = 0 + self.profiler: pyinstrument.Profiler = pyinstrument.Profiler(async_mode="disabled") + # self.end = threading.Event() + + def __call__( + self, + frame: FrameType, + event: Literal["call", "line", "return", "exception", "opcode"], + _args: Any, + ) -> Any: + frame.f_trace_lines = False + + if ( + event == "call" + and frame.f_code.co_name == "run" + and frame.f_code.co_filename == threading.__file__ + ): + if self.run_stack_depth == 0: + self.profiler.start() + self.run_stack_depth += 1 + return self.__call__ + + if ( + event == "return" + and frame.f_code.co_name == "run" + and frame.f_code.co_filename == threading.__file__ + ): + self.run_stack_depth -= 1 + if self.run_stack_depth == 0: + self.profiler.stop() + + print(event, frame) + + class PytestPerfettoPlugin: def __init__(self) -> None: self.events: List[SerializableEvent] = [] @@ -149,9 +198,23 @@ def pytest_runtest_logreport(self, report: pytest.TestReport) -> None: def pytest_pyfunc_call(self, pyfuncitem: pytest.Function) -> Generator[None, None, None]: is_async = inspect.iscoroutinefunction(pyfuncitem.function) + profiler = ThreadProfiler() + + # We use `threading.settrace`, as opposed to `threading.setprofile`, as + # `pyinstrument.Profiler().start()` calls `threading.setprofile` under the hood, overriding + # our profiling function. + # + # `threading.settrace` & `threading.setprofile` provides a rather convoluted mechanism of + # starting a pyinstrument profiler as soon as a thread starts executing its `run()` method, + # & stopping said profiler once the `run()` method finishes. + threading.settrace(profiler) # type: ignore + with self.__profile(root_frame_name="call", is_async=is_async) as events: yield + threading.settrace(None) # type: ignore + profiler.profiler.print() + self.events += events @pytest.hookimpl(hookwrapper=True, tryfirst=True) From 482ad956a2e68d8b3d178c47defe7cf118d70733 Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Fri, 27 Dec 2024 16:29:01 +0100 Subject: [PATCH 2/8] Allow the tid to be specified with the `render` method --- perfsephone/perfetto_renderer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/perfsephone/perfetto_renderer.py b/perfsephone/perfetto_renderer.py index 558df2d..4ce76ef 100644 --- a/perfsephone/perfetto_renderer.py +++ b/perfsephone/perfetto_renderer.py @@ -68,7 +68,7 @@ def remove_pytest_related_frames(root_frame: Frame) -> Sequence[Frame]: return [root_frame] -def render(session: Session, start_time: float) -> List[SerializableEvent]: +def render(session: Session, start_time: float, tid: int = 1) -> List[SerializableEvent]: renderer = SpeedscopeRenderer() root_frame = session.root_frame() if root_frame is None: @@ -103,13 +103,14 @@ def render_root_frame(root_frame: Frame) -> List[SerializableEvent]: cat=Category("runtime"), ts=timestamp, args={"file": file or "", "line": str(line or 0), "name": name or ""}, + tid=tid, ) ) elif ( speedscope_event.type == SpeedscopeEventType.CLOSE and name not in SYNTHETIC_LEAF_IDENTIFIERS ): - result.append(EndDurationEvent(ts=timestamp)) + result.append(EndDurationEvent(ts=timestamp, tid=tid)) return result for root in new_roots: From 33ab770364eeb06b74859618fa1784e01009501f Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Fri, 27 Dec 2024 16:29:50 +0100 Subject: [PATCH 3/8] Store the ThreadProfiler run stack depth and profiler in thread local storage --- perfsephone/plugin.py | 65 ++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index 38998de..ac17e5d 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -8,6 +8,7 @@ import threading from contextlib import contextmanager from dataclasses import asdict +from itertools import chain from pathlib import Path from types import FrameType from typing import ( @@ -42,9 +43,8 @@ class ThreadProfiler: def __init__(self) -> None: - self.run_stack_depth: int = 0 - self.profiler: pyinstrument.Profiler = pyinstrument.Profiler(async_mode="disabled") - # self.end = threading.Event() + self.thread_local = threading.local() + self.profilers: Dict[int, pyinstrument.Profiler] = {} def __call__( self, @@ -59,9 +59,12 @@ def __call__( and frame.f_code.co_name == "run" and frame.f_code.co_filename == threading.__file__ ): - if self.run_stack_depth == 0: - self.profiler.start() - self.run_stack_depth += 1 + if getattr(self.thread_local, "run_stack_depth", 0) == 0: + profiler = pyinstrument.Profiler(async_mode="disabled") + self.thread_local.profiler = profiler + self.thread_local.run_stack_depth = 0 + profiler.start() + self.thread_local.run_stack_depth += 1 return self.__call__ if ( @@ -69,11 +72,13 @@ def __call__( and frame.f_code.co_name == "run" and frame.f_code.co_filename == threading.__file__ ): - self.run_stack_depth -= 1 - if self.run_stack_depth == 0: - self.profiler.stop() - - print(event, frame) + self.thread_local.run_stack_depth -= 1 + if self.thread_local.run_stack_depth == 0: + assert hasattr( + self.thread_local, "profiler" + ), "because a profiler must have been started" + self.thread_local.profiler.stop() + self.profilers[threading.get_ident()] = self.thread_local.profiler class PytestPerfettoPlugin: @@ -93,6 +98,17 @@ def __profile( result: List[SerializableEvent] = [] start_event = BeginDurationEvent(name=root_frame_name, cat=Category("test"), args=args) + thread_profiler = ThreadProfiler() + + # We use `threading.settrace`, as opposed to `threading.setprofile`, as + # `pyinstrument.Profiler().start()` calls `threading.setprofile` under the hood, overriding + # our profiling function. + # + # `threading.settrace` & `threading.setprofile` provides a rather convoluted mechanism of + # starting a pyinstrument profiler as soon as a thread starts executing its `run()` method, + # & stopping said profiler once the `run()` method finishes. + threading.settrace(thread_profiler) # type: ignore + result.append(start_event) profiler_async_mode = "enabled" if is_async else "disabled" with pyinstrument.Profiler(async_mode=profiler_async_mode) as profile: @@ -101,8 +117,19 @@ def __profile( start_rendering_event = BeginDurationEvent( name="[pytest-perfetto] Dumping frames", cat=Category("pytest") ) - if profile.last_session is not None: - result += render(profile.last_session, start_time=start_event.ts) + + threading.settrace(None) # type: ignore + + profiles_to_render = chain([profile], thread_profiler.profilers.values()) + + for index, profiler in enumerate(profiles_to_render, start=1): + if profiler.last_session: + result += render( + session=profiler.last_session, + start_time=profiler.last_session.start_time, + tid=index, + ) + end_rendering_event = EndDurationEvent() result += [end_event, start_rendering_event, end_rendering_event] @@ -200,21 +227,9 @@ def pytest_pyfunc_call(self, pyfuncitem: pytest.Function) -> Generator[None, Non profiler = ThreadProfiler() - # We use `threading.settrace`, as opposed to `threading.setprofile`, as - # `pyinstrument.Profiler().start()` calls `threading.setprofile` under the hood, overriding - # our profiling function. - # - # `threading.settrace` & `threading.setprofile` provides a rather convoluted mechanism of - # starting a pyinstrument profiler as soon as a thread starts executing its `run()` method, - # & stopping said profiler once the `run()` method finishes. - threading.settrace(profiler) # type: ignore - with self.__profile(root_frame_name="call", is_async=is_async) as events: yield - threading.settrace(None) # type: ignore - profiler.profiler.print() - self.events += events @pytest.hookimpl(hookwrapper=True, tryfirst=True) From bebeb013e61f0ae0490c3229f3a7ce3504853ee3 Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Fri, 27 Dec 2024 20:54:42 +0100 Subject: [PATCH 4/8] Remove useless `ThreadProfiler` in `pytest_pyfunc_call` --- perfsephone/plugin.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index ac17e5d..8515eb0 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -225,8 +225,6 @@ def pytest_runtest_logreport(self, report: pytest.TestReport) -> None: def pytest_pyfunc_call(self, pyfuncitem: pytest.Function) -> Generator[None, None, None]: is_async = inspect.iscoroutinefunction(pyfuncitem.function) - profiler = ThreadProfiler() - with self.__profile(root_frame_name="call", is_async=is_async) as events: yield From 171a0e7cb6eac76e6af30f6c52d19655309988d3 Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Fri, 27 Dec 2024 21:01:19 +0100 Subject: [PATCH 5/8] Emit a warning log when a test ends with a an active thread --- perfsephone/plugin.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index 8515eb0..f02afb9 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -5,6 +5,7 @@ import inspect import json +import logging import threading from contextlib import contextmanager from dataclasses import asdict @@ -39,6 +40,7 @@ from perfsephone.perfetto_renderer import render PERFETTO_ARG_NAME: Final[str] = "perfetto_path" +logger = logging.getLogger(__name__) class ThreadProfiler: @@ -120,10 +122,19 @@ def __profile( threading.settrace(None) # type: ignore - profiles_to_render = chain([profile], thread_profiler.profilers.values()) + profiles_to_render = ( + profile + for profile in chain([profile], thread_profiler.profilers.values()) + if profile.last_session + ) for index, profiler in enumerate(profiles_to_render, start=1): - if profiler.last_session: + if profiler.is_running: + logger.warning( + "There exists a run-away thread which has not been joined after the end of the" + " test.The thread's profiler will be discarded." + ) + elif profiler.last_session: result += render( session=profiler.last_session, start_time=profiler.last_session.start_time, From e9571ee0ab466987da4f9b99ef06412c9a672933 Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Fri, 27 Dec 2024 21:44:45 +0100 Subject: [PATCH 6/8] Test that multiple threads are actually recorded --- tests/test_plugin.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/test_plugin.py b/tests/test_plugin.py index 35822f5..57dc0f4 100644 --- a/tests/test_plugin.py +++ b/tests/test_plugin.py @@ -1,4 +1,6 @@ +import json from pathlib import Path +from typing import Final import pytest @@ -36,3 +38,40 @@ def test_hello(some_fixture) -> None: result = pytester.runpytest_subprocess(f"--perfetto={temp_perfetto_file_path}") result.assert_outcomes(passed=1) assert temp_perfetto_file_path.exists() + + +def test_given_multiple_threads__then_multiple_distinct_tids_are_reported( + pytester: pytest.Pytester, temp_perfetto_file_path: Path +) -> None: + pytester.makepyfile(""" + import threading + import time + + SLEEP_TIME_S = 0.002 + + def test_hello() -> None: + def foo() -> None: + def bar() -> None: + time.sleep(SLEEP_TIME_S) + thread = threading.Thread(target=bar) + thread.start() + thread.join() + + thread = threading.Thread(target=foo) + thread.start() + thread.join() + """) + pytester.runpytest_subprocess(f"--perfetto={temp_perfetto_file_path}").assert_outcomes(passed=1) + trace_file = json.load(temp_perfetto_file_path.open("r")) + EXPECTED_DISTINCT_TID_COUNT: Final[int] = 3 + + assert ( + len( + { + event["tid"] + for event in trace_file + if event.get("name", "") in ["foo", "bar", "test_hello"] + } + ) + == EXPECTED_DISTINCT_TID_COUNT + ) From d682fa12b8e593c728fccd183bed6092e969be0a Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Sat, 28 Dec 2024 11:35:55 +0100 Subject: [PATCH 7/8] Do not recalculate if the trace invocation was from `threading.Thread.run` --- perfsephone/plugin.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index f02afb9..64e9c9b 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -56,11 +56,11 @@ def __call__( ) -> Any: frame.f_trace_lines = False - if ( - event == "call" - and frame.f_code.co_name == "run" - and frame.f_code.co_filename == threading.__file__ - ): + is_frame_from_thread_run: bool = ( + frame.f_code.co_name == "run" and frame.f_code.co_filename == threading.__file__ + ) + + if event == "call" and is_frame_from_thread_run: if getattr(self.thread_local, "run_stack_depth", 0) == 0: profiler = pyinstrument.Profiler(async_mode="disabled") self.thread_local.profiler = profiler @@ -69,11 +69,7 @@ def __call__( self.thread_local.run_stack_depth += 1 return self.__call__ - if ( - event == "return" - and frame.f_code.co_name == "run" - and frame.f_code.co_filename == threading.__file__ - ): + if event == "return" and is_frame_from_thread_run: self.thread_local.run_stack_depth -= 1 if self.thread_local.run_stack_depth == 0: assert hasattr( From f089062e68b7d38c01f79dd7133ecac092e47ec6 Mon Sep 17 00:00:00 2001 From: Luca Muscat Date: Sat, 28 Dec 2024 11:52:28 +0100 Subject: [PATCH 8/8] Document ThreadProfiler.__call__ --- perfsephone/plugin.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/perfsephone/plugin.py b/perfsephone/plugin.py index 64e9c9b..1e048a9 100644 --- a/perfsephone/plugin.py +++ b/perfsephone/plugin.py @@ -54,23 +54,31 @@ def __call__( event: Literal["call", "line", "return", "exception", "opcode"], _args: Any, ) -> Any: + """This method should only be used with `threading.settrace()`.""" frame.f_trace_lines = False is_frame_from_thread_run: bool = ( frame.f_code.co_name == "run" and frame.f_code.co_filename == threading.__file__ ) + # Detect when `Thread.run()` is called if event == "call" and is_frame_from_thread_run: + # If this is the first time `Thread.run()` is being called on this thread, start the + # profiler. if getattr(self.thread_local, "run_stack_depth", 0) == 0: profiler = pyinstrument.Profiler(async_mode="disabled") self.thread_local.profiler = profiler self.thread_local.run_stack_depth = 0 profiler.start() + # Keep track of the number of active calls of `Thread.run()`. self.thread_local.run_stack_depth += 1 return self.__call__ + # Detect when `Threading.run()` returns. if event == "return" and is_frame_from_thread_run: self.thread_local.run_stack_depth -= 1 + # When there are no more active invocations of `Thread.run()`, this implies that the + # target of the thread being profiled has finished executing. if self.thread_local.run_stack_depth == 0: assert hasattr( self.thread_local, "profiler"