1
- from sentry .models .activity import Activity , activity_creation_registry
1
+ from typing import TYPE_CHECKING
2
+
3
+ from sentry .issues .status_change_consumer import group_status_update_registry
2
4
from sentry .silo .base import SiloMode
3
5
from sentry .tasks .base import instrumented_task
4
6
from sentry .taskworker import config , namespaces , retry
5
7
from sentry .utils import metrics
6
8
9
+ if TYPE_CHECKING :
10
+ from sentry .issues .status_change_message import StatusChangeMessageData
11
+ from sentry .models .activity import Activity
12
+ from sentry .models .group import Group
13
+ from sentry .workflow_engine .models import Detector
14
+
7
15
8
16
@instrumented_task (
9
17
name = "sentry.workflow_engine.processors.process_workflow_task" ,
22
30
),
23
31
),
24
32
)
25
- def process_workflow_task (activity_id : int ) -> None :
33
+ def process_workflow_task (activity_id : Activity . id , detector_id : Detector . id ) -> None :
26
34
"""
27
- Process a workflow task identified by the given activity ID.
35
+ Process a workflow task identified by the given Activity ID and Detector ID.
28
36
29
37
This task will retry up to 3 times with a delay of 5 seconds between attempts.
30
38
It has a soft time limit of 50 seconds and a hard time limit of 60 seconds.
@@ -38,14 +46,18 @@ def process_workflow_task(activity_id: int) -> None:
38
46
pass
39
47
40
48
41
- @activity_creation_registry .register ("workflow_status_update" )
42
- def workflow_status_update_handler (activity : Activity , detector_id : int | None ) -> None :
49
+ @group_status_update_registry .register ("workflow_status_update" )
50
+ def workflow_status_update_handler (
51
+ group : Group , status_change_message : StatusChangeMessageData , activity : Activity
52
+ ) -> None :
43
53
"""
44
54
Hook the process_workflow_task into the activity creation registry.
45
55
46
56
Since this handler is called in process for the activity, we want
47
57
to queue a task to process workflows asynchronously.
48
58
"""
59
+ detector_id = status_change_message .get ("detector_id" )
60
+
49
61
if detector_id is None :
50
62
# We should not hit this case, it's should only occur if there is a bug
51
63
# passing it from the workflow_engine to the issue platform.
@@ -56,4 +68,5 @@ def workflow_status_update_handler(activity: Activity, detector_id: int | None)
56
68
metrics .incr (
57
69
"workflow_engine.process_workflow.activity_update" , tags = {"activity_type" : activity .type }
58
70
)
59
- # process_workflow_task.delay(activity.id)
71
+ # TODO - should this also set the group id so we can set it on WorkflowEventData at the top level? :thinking:
72
+ # process_workflow_task.delay(activity.id, detector_id)
0 commit comments