Skip to content

Commit fb69b01

Browse files
authored
feat(workflow_engine): Setup the Task for process_workflow_updates (#93553)
## Description - Create a celery task namespace for the workflow engine tasks - Create a task for processing an activity in the workflow_engine -- this will allow us to create a celery task to async execute process workflows when the issue platform creates an activity In the code I mention there being a follow-up PR: #93580 is the initial PR to support the `Activity` / `Group` models.
1 parent 2e20d1c commit fb69b01

File tree

6 files changed

+155
-5
lines changed

6 files changed

+155
-5
lines changed

src/sentry/conf/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
10131013
Queue("demo_mode", routing_key="demo_mode"),
10141014
Queue("release_registry", routing_key="release_registry"),
10151015
Queue("seer.seer_automation", routing_key="seer.seer_automation"),
1016+
Queue("workflow_engine.process_workflows", routing_key="workflow_engine.process_workflows"),
10161017
]
10171018

10181019
from celery.schedules import crontab

src/sentry/issues/status_change_consumer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,14 @@ def update_status(group: Group, status_change: StatusChangeMessageData) -> None:
148148
149149
This is used to trigger the `workflow_engine` processing status changes.
150150
"""
151-
latest_activity = Activity.objects.filter(
152-
group_id=group.id, type=activity_type.value
153-
).order_by("-datetime")
154-
for handler in group_status_update_registry.registrations.values():
155-
handler(group, status_change, latest_activity[0])
151+
latest_activity = (
152+
Activity.objects.filter(group_id=group.id, type=activity_type.value)
153+
.order_by("-datetime")
154+
.first()
155+
)
156+
if latest_activity is not None:
157+
for handler in group_status_update_registry.registrations.values():
158+
handler(group, status_change, latest_activity)
156159

157160

158161
def get_group_from_fingerprint(project_id: int, fingerprint: Sequence[str]) -> Group | None:

src/sentry/taskworker/namespaces.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@
131131

132132
uptime_tasks = taskregistry.create_namespace("uptime", app_feature="crons")
133133

134+
workflow_engine_tasks = taskregistry.create_namespace(
135+
"workflow_engine", app_feature="workflow_engine"
136+
)
137+
134138

135139
# Namespaces for testing taskworker tasks
136140
exampletasks = taskregistry.create_namespace(name="examples")

src/sentry/workflow_engine/processors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
"process_workflows",
55
"process_data_packet",
66
"process_delayed_workflows",
7+
"process_workflows",
78
"DelayedWorkflow",
89
]
910

1011
from .data_source import process_data_sources
1112
from .delayed_workflow import DelayedWorkflow, process_delayed_workflows
1213
from .detector import process_detectors
14+
from .workflow import process_workflows

src/sentry/workflow_engine/tasks.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
from sentry.issues.status_change_consumer import group_status_update_registry
2+
from sentry.issues.status_change_message import StatusChangeMessageData
3+
from sentry.models.activity import Activity
4+
from sentry.models.group import Group
5+
from sentry.silo.base import SiloMode
6+
from sentry.tasks.base import instrumented_task
7+
from sentry.taskworker import config, namespaces, retry
8+
from sentry.types.activity import ActivityType
9+
from sentry.utils import metrics
10+
11+
SUPPORTED_ACTIVITIES = [ActivityType.SET_RESOLVED.value]
12+
13+
14+
@instrumented_task(
15+
name="sentry.workflow_engine.tasks.process_workflow_activity",
16+
queue="workflow_engine.process_workflows",
17+
acks_late=True,
18+
default_retry_delay=5,
19+
max_retries=3,
20+
soft_time_limit=50,
21+
time_limit=60,
22+
silo_mode=SiloMode.REGION,
23+
taskworker_config=config.TaskworkerConfig(
24+
namespace=namespaces.workflow_engine_tasks,
25+
processing_deadline_duration=60,
26+
retry=retry.Retry(
27+
times=3,
28+
delay=5,
29+
),
30+
),
31+
)
32+
def process_workflow_activity(activity_id: int, detector_id: int) -> None:
33+
"""
34+
Process a workflow task identified by the given Activity ID and Detector ID.
35+
36+
The task will get the Activity from the database, create a WorkflowEventData object,
37+
and then process the data in `process_workflows`.
38+
"""
39+
# TODO - @saponifi3d - implement this in a follow-up PR. This update will require WorkflowEventData
40+
# to allow for an activity in the `event` attribute. That refactor is a bit noisy
41+
# and will be done in a subsequent pr.
42+
pass
43+
44+
45+
@group_status_update_registry.register("workflow_status_update")
46+
def workflow_status_update_handler(
47+
group: Group, status_change_message: StatusChangeMessageData, activity: Activity
48+
) -> None:
49+
"""
50+
Hook the process_workflow_task into the activity creation registry.
51+
52+
Since this handler is called in process for the activity, we want
53+
to queue a task to process workflows asynchronously.
54+
"""
55+
if activity.type not in SUPPORTED_ACTIVITIES:
56+
# If the activity type is not supported, we do not need to process it.
57+
return
58+
59+
detector_id = status_change_message.get("detector_id")
60+
61+
if detector_id is None:
62+
# We should not hit this case, it's should only occur if there is a bug
63+
# passing it from the workflow_engine to the issue platform.
64+
metrics.incr("workflow_engine.error.tasks.no_detector_id")
65+
return
66+
67+
# TODO - implement in follow-up PR for now, just track a metric that we are seeing the activities.
68+
# process_workflow_task.delay(activity.id, detector_id)
69+
metrics.incr(
70+
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
71+
)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from unittest import mock
2+
3+
from sentry.issues.status_change_consumer import update_status
4+
from sentry.issues.status_change_message import StatusChangeMessageData
5+
from sentry.models.activity import Activity
6+
from sentry.models.group import GroupStatus
7+
from sentry.testutils.cases import TestCase
8+
from sentry.types.activity import ActivityType
9+
from sentry.types.group import GroupSubStatus
10+
from sentry.workflow_engine.tasks import workflow_status_update_handler
11+
12+
13+
class IssuePlatformIntegrationTests(TestCase):
14+
def test_handler_invoked__when_resolved(self):
15+
"""
16+
Integration test to ensure the `update_status` method
17+
will correctly invoke the `workflow_state_update_handler`
18+
and increment the metric.
19+
"""
20+
detector = self.create_detector()
21+
group = self.create_group(
22+
project=self.project,
23+
status=GroupStatus.UNRESOLVED,
24+
substatus=GroupSubStatus.ESCALATING,
25+
)
26+
27+
message = StatusChangeMessageData(
28+
id="test_message_id",
29+
project_id=self.project.id,
30+
new_status=GroupStatus.RESOLVED,
31+
new_substatus=None,
32+
fingerprint=["test_fingerprint"],
33+
detector_id=detector.id,
34+
)
35+
36+
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
37+
update_status(group, message)
38+
mock_incr.assert_called_with(
39+
"workflow_engine.process_workflow.activity_update",
40+
tags={"activity_type": ActivityType.SET_RESOLVED.value},
41+
)
42+
43+
44+
class WorkflowStatusUpdateHandlerTests(TestCase):
45+
def test__no_detector_id(self):
46+
"""
47+
Test that the workflow_status_update_handler does not crash
48+
when no detector_id is provided in the status change message.
49+
"""
50+
group = self.create_group(project=self.project)
51+
activity = Activity(
52+
project=self.project,
53+
group=group,
54+
type=ActivityType.SET_RESOLVED.value,
55+
data={"fingerprint": ["test_fingerprint"]},
56+
)
57+
58+
message = StatusChangeMessageData(
59+
id="test_message_id",
60+
project_id=self.project.id,
61+
new_status=GroupStatus.RESOLVED,
62+
new_substatus=None,
63+
fingerprint=["test_fingerprint"],
64+
detector_id=None, # No detector_id provided
65+
)
66+
67+
with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
68+
workflow_status_update_handler(group, message, activity)
69+
mock_incr.assert_called_with("workflow_engine.error.tasks.no_detector_id")

0 commit comments

Comments
 (0)