From d80b5ee61d71cd3704557923b500c8ab58ee343c Mon Sep 17 00:00:00 2001 From: Divyank Jain Date: Tue, 27 Aug 2024 16:15:49 -0400 Subject: [PATCH] code additions --- temporalio/worker/_workflow.py | 100 +++++++++++++++++- temporalio/worker/_workflow_instance.py | 18 ++++ temporalio/worker/workflow_sandbox/_runner.py | 10 +- tests/worker/test_workflow.py | 36 ++++++- 4 files changed, 157 insertions(+), 7 deletions(-) diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 9d9754193..079c77527 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -6,8 +6,22 @@ import concurrent.futures import logging import os +import sys from datetime import timezone -from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type +from threading import get_ident +from types import TracebackType +from typing import ( + Callable, + Dict, + List, + Literal, + MutableMapping, + Optional, + Sequence, + Set, + Tuple, + Type, +) import temporalio.activity import temporalio.api.common.v1 @@ -250,9 +264,10 @@ async def _handle_activation( activate_task, self._deadlock_timeout_seconds ) except asyncio.TimeoutError: - raise RuntimeError( - f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)" - ) + raise _DeadlockError.from_deadlocked_workflow( + workflow, self._deadlock_timeout_seconds + ) from None + except Exception as err: # We cannot fail a cache eviction, we must just log and not complete # the activation (failed or otherwise). This should only happen in @@ -268,6 +283,9 @@ async def _handle_activation( self._could_not_evict_count += 1 return + if isinstance(err, _DeadlockError): + err.swap_traceback() + logger.exception( "Failed handling activation on workflow with run ID %s", act.run_id ) @@ -421,3 +439,77 @@ def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]: for typ in v.failure_exception_types ) ) + + +class _DeadlockError(Exception): + """Exception class for deadlocks. Contains functionality to swap the default traceback for another.""" + + def __init__(self, message: str, replacement_tb: Optional[TracebackType] = None): + """Create a new DeadlockError, with message `msg` and optionally a traceback `tb` to be swapped in later. + + Args: + message: Message to be presented through exception. + replacement_tb: Optional TracebackType to be swapped later. + """ + super().__init__(message) + self._new_tb = replacement_tb + + def swap_traceback(self) -> None: + """Swap the current traceback for the replacement passed during construction. Used to work around Python adding the current frame to the stack trace. + + Returns: + None + """ + if self._new_tb: + self.__traceback__ = self._new_tb + self._new_tb = None + + @classmethod + def from_deadlocked_workflow( + cls, workflow: WorkflowInstance, timeout: Optional[int] + ): + msg = f"[TMPRL1101] Potential deadlock detected: workflow didn't yield within {timeout} second(s)." + tid = workflow.get_thread_id() + if not tid: + return cls(msg) + + try: + tb = cls._gen_tb_helper(tid) + if tb: + return cls(msg, tb) + return cls(f"{msg} (no frames available)") + except Exception as err: + return cls(f"{msg} (failed getting frames: {err})") + + @staticmethod + def _gen_tb_helper( + tid: int, + ) -> Optional[TracebackType]: + """Take a thread id and construct a stack trace. + + Returns: + the traceback that was constructed, None if the thread could not be found. + """ + frame = sys._current_frames().get(tid) + if not frame: + return None + + # not using traceback.extract_stack() because it obfuscates the frame objects (specifically f_lasti) + thread_frames = [frame] + while frame.f_back: + frame = frame.f_back + thread_frames.append(frame) + + thread_frames.reverse() + + size = 0 + tb = None + for frm in thread_frames: + tb = TracebackType(tb, frm, frm.f_lasti, frm.f_lineno) + size += sys.getsizeof(tb) + + while size > 200000 and tb: + size -= sys.getsizeof(tb) + tb = tb.tb_next + + return tb diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index a5e5b63ed..de54849c6 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -10,6 +10,7 @@ import logging import random import sys +import threading import traceback import warnings from abc import ABC, abstractmethod @@ -158,6 +159,16 @@ def activate( """ raise NotImplementedError + def get_thread_id(self) -> Optional[int]: + """Return the thread identifier that this workflow is running on. + + Not an abstractmethod because it is not mandatory to implement. Used primarily for getting the frames of a deadlocked thread. + + Returns: + Thread ID if the workflow is running, None if not. + """ + return None + class UnsandboxedWorkflowRunner(WorkflowRunner): """Workflow runner that does not do any sandboxing.""" @@ -300,6 +311,12 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: # We only create the metric meter lazily self._metric_meter: Optional[_ReplaySafeMetricMeter] = None + # For tracking the thread this workflow is running on (primarily for deadlock situations) + self._current_thread_id: Optional[int] = None + + def get_thread_id(self) -> Optional[int]: + return self._current_thread_id + #### Activation functions #### # These are in alphabetical order and besides "activate", all other calls # are "_apply_" + the job field name. @@ -320,6 +337,7 @@ def activate( self._time_ns = act.timestamp.ToNanoseconds() self._is_replaying = act.is_replaying + self._current_thread_id = threading.get_ident() activation_err: Optional[Exception] = None try: # Split into job sets with patches, then signals + updates, then diff --git a/temporalio/worker/workflow_sandbox/_runner.py b/temporalio/worker/workflow_sandbox/_runner.py index 87dbab2cd..7dc82d83c 100644 --- a/temporalio/worker/workflow_sandbox/_runner.py +++ b/temporalio/worker/workflow_sandbox/_runner.py @@ -6,8 +6,9 @@ from __future__ import annotations +import threading from datetime import datetime, timedelta, timezone -from typing import Any, Sequence, Type +from typing import Any, Optional, Sequence, Type import temporalio.bridge.proto.workflow_activation import temporalio.bridge.proto.workflow_completion @@ -112,6 +113,8 @@ def __init__( self.runner_class = runner_class self.importer = Importer(restrictions, RestrictionContext()) + self._current_thread_id: Optional[int] = None + # Create the instance self.globals_and_locals = { "__file__": "workflow_sandbox.py", @@ -169,8 +172,13 @@ def _run_code(self, code: str, **extra_globals: Any) -> None: self.globals_and_locals[k] = v try: temporalio.workflow.unsafe._set_in_sandbox(True) + self._current_thread_id = threading.get_ident() exec(code, self.globals_and_locals, self.globals_and_locals) finally: temporalio.workflow.unsafe._set_in_sandbox(False) + self._current_thread_id = None for k, v in extra_globals.items(): self.globals_and_locals.pop(k, None) + + def get_thread_id(self) -> Optional[int]: + return self._current_thread_id diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index d922bafa5..9f007a220 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -2108,6 +2108,38 @@ async def status() -> str: async def test_workflow_enhanced_stack_trace(client: Client): + """Expected format of __enhanced_stack_trace: + + EnhancedStackTrace : { + + sdk (StackTraceSDKInfo) : { + name: string, + version: string + }, + + sources (map) : { + filename: (StackTraceFileSlice) { + line_offset: int, + content: string + }, + ... + }, + + stacks (StackTrace[]) : [ + (StackTraceFileLocation) { + file_path: string, + line: int, + column: int, + function_name: string, + internal_code: bool + }, + ... + ] + } + + More details available in API repository: temporal/api/sdk/v1/enhanced_stack_trace.proto + """ + async with new_worker( client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel] ) as worker: @@ -2570,7 +2602,7 @@ async def last_history_task_failure() -> str: try: await assert_eq_eventually( - "[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)", + "[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).", last_history_task_failure, timeout=timedelta(seconds=5), interval=timedelta(seconds=1), @@ -2627,7 +2659,7 @@ async def last_history_task_failure() -> str: return "" await assert_eq_eventually( - "[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)", + "[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).", last_history_task_failure, timeout=timedelta(seconds=5), interval=timedelta(seconds=1),