Skip to content

feat(workflow_engine): Setup the Task for process_workflow_updates #93553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

saponifi3d
Copy link
Contributor

@saponifi3d saponifi3d commented Jun 13, 2025

Description

  • Create a celery task namespace for the workflow engine tasks
  • Create a task for processing an activity in the workflow_engine -- this will allow us to create a celery task to async execute process workflows when the issue platform creates an activity

In the code I mention there being a follow-up PR: #93580 is the initial PR to support the Activity / Group models.

@github-actions github-actions bot added the Scope: Backend Automatically applied to PRs that change backend components label Jun 13, 2025
Copy link

codecov bot commented Jun 13, 2025

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
26548 3 26545 222
View the top 3 failed test(s) by shortest run time
tests.sentry.tasks.test_queues_registered.CeleryQueueRegisteredTest::test
Stack Traces | 0.106s run time
#x1B[1m#x1B[.../sentry/tasks/test_queues_registered.py#x1B[0m:18: in test
    assert not missing_queue_tasks, (
#x1B[1m#x1B[31mE   AssertionError: Found tasks with queues that are undefined. These must be defined in settings.CELERY_QUEUES.#x1B[0m
#x1B[1m#x1B[31mE     Task Info:#x1B[0m
#x1B[1m#x1B[31mE      - Task: sentry.workflow_engine.processors.process_workflow_task, Queue: process_workflows.#x1B[0m
#x1B[1m#x1B[31mE   assert not [' - Task: sentry.workflow_engine.processors.process_workflow_task, Queue: process_workflows']#x1B[0m
tests.sentry.tasks.test_celery_queues.CeleryQueueRegisteredTest::test
Stack Traces | 0.14s run time
#x1B[1m#x1B[.../sentry/tasks/test_celery_queues.py#x1B[0m:19: in test
    assert not missing_queue_tasks, (
#x1B[1m#x1B[31mE   AssertionError: Found tasks with queues that are undefined. These must be defined in settings.CELERY_QUEUES.#x1B[0m
#x1B[1m#x1B[31mE     Task Info:#x1B[0m
#x1B[1m#x1B[31mE      - Task: sentry.workflow_engine.processors.process_workflow_task, Queue: process_workflows.#x1B[0m
#x1B[1m#x1B[31mE   assert not [' - Task: sentry.workflow_engine.processors.process_workflow_task, Queue: process_workflows']#x1B[0m
tests.sentry.workflow_engine.test_tasks.IssuePlatformIntegrationTests::test_handler_invoked__when_resolved
Stack Traces | 2.51s run time
#x1B[1m#x1B[.../sentry/workflow_engine/test_tasks.py#x1B[0m:37: in test_handler_invoked__when_resolved
    mock_incr.assert_called_with(
#x1B[1m#x1B[.../hostedtoolcache/Python/3.13.1.../x64/lib/python3.13/unittest/mock.py#x1B[0m:977: in assert_called_with
    raise AssertionError(_error_message()) from cause
#x1B[1m#x1B[31mE   AssertionError: expected call not found.#x1B[0m
#x1B[1m#x1B[31mE   Expected: incr('workflow_engine.process_workflow.activity_update', tags={'activity_type': 1})#x1B[0m
#x1B[1m#x1B[31mE     Actual: incr('jobs.all.dispatched', skip_internal=False)#x1B[0m

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@saponifi3d saponifi3d force-pushed the jcallender/aci/workflow-status-update-handler branch from 5001848 to 8b6fffc Compare June 13, 2025 21:52
saponifi3d added a commit that referenced this pull request Jun 13, 2025
## Description
Starting implementation for:
https://www.notion.so/sentry/Workflow-Status-Changes-1fa8b10e4b5d80a48acddb95d160da1f?source=copy_link#1fa8b10e4b5d80e6bb1aef39cab2c6dc

This will add a registry of handlers when an activity is created, we'll
use this in the [workflow engine to kick-off a
task](#93553) to execute
`process_workflows`.

This also adds a way to pass the detector_id through the
StatusMessageData all the way to a hook that we can invoke and read
directly from the StatusMessageData later. We are also passed the
`activity` so `workflow_engine` can trigger `.send_notification` (if
evaluated to do so).
Base automatically changed from jcalelnder/aci/activity-registry to master June 13, 2025 22:45
@saponifi3d saponifi3d force-pushed the jcallender/aci/workflow-status-update-handler branch from 8814284 to 62fc6a1 Compare June 13, 2025 22:46
@saponifi3d saponifi3d marked this pull request as ready for review June 13, 2025 22:47
@saponifi3d saponifi3d requested review from a team as code owners June 13, 2025 22:47
Comment on lines +18 to +27
max_retries=5,
soft_time_limit=50,
time_limit=60,
silo_mode=SiloMode.REGION,
taskworker_config=config.TaskworkerConfig(
namespace=namespaces.workflow_engine_tasks,
processing_deadline_duration=60,
retry=retry.Retry(
times=3,
delay=5,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyone have any thoughts on what these values for retry / limits should be? just using what we have been.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't any right/wrong answer here. However, if you are going to define Retry you should also define on which exceptions that retrys should be performed. Without that retries will only be automatically performed for TimeoutError or an explicit RetryError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for the additional info -- seems like a number we can fudge around with in the future then, and we can use the same configuration as we are with issue alert processing (that's where i pulled these initial numbers from)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would we not want to go with acks_late=True in cases where we'd rather double process than drop one?

Comment on lines +18 to +27
max_retries=5,
soft_time_limit=50,
time_limit=60,
silo_mode=SiloMode.REGION,
taskworker_config=config.TaskworkerConfig(
namespace=namespaces.workflow_engine_tasks,
processing_deadline_duration=60,
retry=retry.Retry(
times=3,
delay=5,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't any right/wrong answer here. However, if you are going to define Retry you should also define on which exceptions that retrys should be performed. Without that retries will only be automatically performed for TimeoutError or an explicit RetryError.


The task will get the Activity from the database, create a WorkflowEventData object,
and then process the data in `process_workflows`.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on what kind of throughput tasks in the workflow_tasks namespace will have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - not sure about throughput, i think that it will vary pretty heavily depending on configurations / products. for example, a metric issue with anomaly detection will be more computing time than evaluating if an event is new.

In the end, the throughput would be similar to issue alerts / metric alerts / crons -- all three of those product verticals will be executing here, but they're all fairly different rates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Scope: Backend Automatically applied to PRs that change backend components
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants