Skip to content

Commit a92944f

Browse files
committed
- Create a task namespace for workflow engine
- Create a task for handling activity updates
1 parent 030acf2 commit a92944f

File tree

3 files changed

+54
-0
lines changed

3 files changed

+54
-0
lines changed

src/sentry/taskworker/namespaces.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@
131131

132132
uptime_tasks = taskregistry.create_namespace("uptime", app_feature="crons")
133133

134+
workflow_engine_tasks = taskregistry.create_namespace(
135+
"workflow_engine", app_feature="workflow_engine"
136+
)
137+
134138

135139
# Namespaces for testing taskworker tasks
136140
exampletasks = taskregistry.create_namespace(name="examples")

src/sentry/workflow_engine/processors/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
"process_workflows",
55
"process_data_packet",
66
"process_delayed_workflows",
7+
"process_workflows",
78
"DelayedWorkflow",
89
]
910

1011
from .data_source import process_data_sources
1112
from .delayed_workflow import DelayedWorkflow, process_delayed_workflows
1213
from .detector import process_detectors
14+
from .workflow import process_workflows

src/sentry/workflow_engine/tasks.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from sentry.models.activity import Activity, activity_creation_registry
2+
from sentry.silo.base import SiloMode
3+
from sentry.tasks.base import instrumented_task
4+
from sentry.taskworker import config, namespaces, retry
5+
6+
7+
@instrumented_task(
8+
name="sentry.workflow_engine.processors.process_workflow_task",
9+
queue="process_workflows",
10+
default_retry_delay=5,
11+
max_retries=5,
12+
soft_time_limit=50,
13+
time_limit=60,
14+
silo_mode=SiloMode.REGION,
15+
taskworker_config=config.TaskworkerConfig(
16+
namespace=namespaces.workflow_engine_tasks,
17+
processing_deadline_duration=60,
18+
retry=retry.Retry(
19+
times=3,
20+
delay=5,
21+
),
22+
),
23+
)
24+
def process_workflow_task(activity_id: int) -> None:
25+
"""
26+
Process a workflow task identified by the given activity ID.
27+
28+
This task will retry up to 3 times with a delay of 5 seconds between attempts.
29+
It has a soft time limit of 50 seconds and a hard time limit of 60 seconds.
30+
31+
The task will get the Activity from the database, create a WorkflowEventData object,
32+
and then process the data in `process_workflows`.
33+
"""
34+
# TODO implement this in a follow-up PR. This update will require a lot of updates...
35+
pass
36+
37+
38+
@activity_creation_registry.register("workflow_status_update")
39+
def workflow_status_update_handler(activity: Activity) -> None:
40+
"""
41+
Hook the process_workflow_task into the activity creation registry.
42+
43+
Since this handler is called in process for the activity, we want
44+
to queue a task to process workflows asynchronously.
45+
"""
46+
# TODO implement this in a follow-up PR.
47+
# process_workflow_task.delay(activity.id)
48+
pass

0 commit comments

Comments
 (0)