Skip to content

Commit f4bc2e0

Browse files
committed
Emit a span for workflow activation handling
1 parent e59b7ed commit f4bc2e0

File tree

1 file changed

+18
-0
lines changed

1 file changed

+18
-0
lines changed

temporalio/worker/_workflow_instance.py

+18
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
cast,
4343
)
4444

45+
from google.protobuf.json_format import MessageToJson
46+
from opentelemetry import trace
4547
from typing_extensions import Self, TypeAlias, TypedDict
4648

4749
import temporalio.activity
@@ -75,6 +77,7 @@
7577
)
7678

7779
logger = logging.getLogger(__name__)
80+
tracer = trace.get_tracer(__name__)
7881

7982
# Set to true to log all cases where we're ignoring things during delete
8083
LOG_IGNORE_DURING_DELETE = False
@@ -325,6 +328,21 @@ def get_thread_id(self) -> Optional[int]:
325328

326329
def activate(
327330
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
331+
) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion:
332+
with tracer.start_as_current_span("HandleWorkflowActivation") as span:
333+
span.set_attribute("rpc.method", "WorkflowActivation")
334+
span.set_attribute("rpc.request.type", "WorkflowActivation")
335+
span.set_attribute("rpc.request.payload", MessageToJson(act))
336+
span.set_attribute("temporalWorkflowID", self._info.workflow_id)
337+
span.set_attribute("temporal.worker", True)
338+
completion = self._activate(act)
339+
span.set_attribute("rpc.response.type", "WorkflowActivationCompletion")
340+
span.set_attribute("rpc.response.payload", MessageToJson(completion))
341+
trace.get_tracer_provider().force_flush() # type: ignore
342+
return completion
343+
344+
def _activate(
345+
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
328346
) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion:
329347
# Reset current completion, time, and whether replaying
330348
self._current_completion = (

0 commit comments

Comments
 (0)