Skip to content

Commit dac2f51

Browse files
kconsbillyvg
authored andcommitted
fix(aci): Don't propagate traces to buffer processing tasks (#93521)
We don't have any real need for parent or peer traces in these tasks, and not propagating should leave us with dramatically smaller traces that will load much more quickly and be easier to read. Note that this affects delayed_workflow and delayed_processing; delayed_workflow is the one we've had more issues loading, but I don't see it as having significant downside for either so I didn't make it conditional by task.
1 parent 4e613b1 commit dac2f51

File tree

4 files changed

+26
-15
lines changed

4 files changed

+26
-15
lines changed

src/sentry/rules/processing/buffer_processing.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
107107
metrics.distribution(f"{processing_type}.event_count", event_count)
108108

109109
if event_count < batch_size:
110-
return task.delay(project_id)
110+
return task.apply_async(
111+
kwargs={"project_id": project_id}, headers={"sentry-propagate-traces": False}
112+
)
111113

112114
if should_emit_logs:
113115
logger.info(
@@ -133,7 +135,10 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
133135
# remove the batched items from the project alertgroup_to_event_data
134136
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
135137

136-
task.delay(project_id, batch_key)
138+
task.apply_async(
139+
kwargs={"project_id": project_id, "batch_key": batch_key},
140+
headers={"sentry-propagate-traces": False},
141+
)
137142

138143

139144
def process_buffer() -> None:

tests/sentry/rules/processing/test_buffer_processing.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -253,22 +253,26 @@ def setUp(self):
253253
self.group_three = self.create_group(self.project)
254254
self.rule = self.create_alert_rule()
255255

256-
@patch("sentry.rules.processing.delayed_processing.apply_delayed.delay")
256+
@patch("sentry.rules.processing.delayed_processing.apply_delayed.apply_async")
257257
def test_no_redis_data(self, mock_apply_delayed):
258258
process_in_batches(self.project.id, "delayed_processing")
259-
mock_apply_delayed.assert_called_once_with(self.project.id)
259+
mock_apply_delayed.assert_called_once_with(
260+
kwargs={"project_id": self.project.id}, headers={"sentry-propagate-traces": False}
261+
)
260262

261-
@patch("sentry.rules.processing.delayed_processing.apply_delayed.delay")
263+
@patch("sentry.rules.processing.delayed_processing.apply_delayed.apply_async")
262264
def test_basic(self, mock_apply_delayed):
263265
self.push_to_hash(self.project.id, self.rule.id, self.group.id)
264266
self.push_to_hash(self.project.id, self.rule.id, self.group_two.id)
265267
self.push_to_hash(self.project.id, self.rule.id, self.group_three.id)
266268

267269
process_in_batches(self.project.id, "delayed_processing")
268-
mock_apply_delayed.assert_called_once_with(self.project.id)
270+
mock_apply_delayed.assert_called_once_with(
271+
kwargs={"project_id": self.project.id}, headers={"sentry-propagate-traces": False}
272+
)
269273

270274
@override_options({"delayed_processing.batch_size": 2})
271-
@patch("sentry.rules.processing.delayed_processing.apply_delayed.delay")
275+
@patch("sentry.rules.processing.delayed_processing.apply_delayed.apply_async")
272276
def test_batch(self, mock_apply_delayed):
273277
self.push_to_hash(self.project.id, self.rule.id, self.group.id)
274278
self.push_to_hash(self.project.id, self.rule.id, self.group_two.id)
@@ -278,11 +282,11 @@ def test_batch(self, mock_apply_delayed):
278282
assert mock_apply_delayed.call_count == 2
279283

280284
# Validate the batches are created correctly
281-
batch_one_key = mock_apply_delayed.call_args_list[0][0][1]
285+
batch_one_key = mock_apply_delayed.call_args_list[0][1]["kwargs"]["batch_key"]
282286
batch_one = buffer.backend.get_hash(
283287
model=Project, field={"project_id": self.project.id, "batch_key": batch_one_key}
284288
)
285-
batch_two_key = mock_apply_delayed.call_args_list[1][0][1]
289+
batch_two_key = mock_apply_delayed.call_args_list[1][1]["kwargs"]["batch_key"]
286290
batch_two = buffer.backend.get_hash(
287291
model=Project, field={"project_id": self.project.id, "batch_key": batch_two_key}
288292
)

tests/sentry/rules/processing/test_delayed_processing.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,7 +1407,7 @@ def test_cleanup_redis(self):
14071407
assert rule_group_data == {}
14081408

14091409
@override_options({"delayed_processing.batch_size": 2})
1410-
@patch("sentry.rules.processing.delayed_processing.apply_delayed.delay")
1410+
@patch("sentry.rules.processing.delayed_processing.apply_delayed.apply_async")
14111411
def test_batched_cleanup(self, mock_apply_delayed):
14121412
group_two = self.create_group(self.project)
14131413
group_three = self.create_group(self.project)
@@ -1422,8 +1422,8 @@ def test_batched_cleanup(self, mock_apply_delayed):
14221422
rules_to_groups[self.rule.id].add(group_three.id)
14231423

14241424
process_in_batches(self.project.id, "delayed_processing")
1425-
batch_one_key = mock_apply_delayed.call_args_list[0][0][1]
1426-
batch_two_key = mock_apply_delayed.call_args_list[1][0][1]
1425+
batch_one_key = mock_apply_delayed.call_args_list[0][1]["kwargs"]["batch_key"]
1426+
batch_two_key = mock_apply_delayed.call_args_list[1][1]["kwargs"]["batch_key"]
14271427

14281428
# Verify process_rulegroups_in_batches removed the data from the buffer
14291429
rule_group_data = buffer.backend.get_hash(Project, {"project_id": self.project.id})

tests/sentry/workflow_engine/processors/test_delayed_workflow.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -881,14 +881,16 @@ def test_cleanup_redis(self):
881881
assert data == {}
882882

883883
@override_options({"delayed_processing.batch_size": 2})
884-
@patch("sentry.workflow_engine.processors.delayed_workflow.process_delayed_workflows.delay")
884+
@patch(
885+
"sentry.workflow_engine.processors.delayed_workflow.process_delayed_workflows.apply_async"
886+
)
885887
def test_batched_cleanup(self, mock_process_delayed):
886888
self._push_base_events()
887889
all_data = buffer.backend.get_hash(Workflow, {"project_id": self.project.id})
888890

889891
process_in_batches(self.project.id, "delayed_workflow")
890-
batch_one_key = mock_process_delayed.call_args_list[0][0][1]
891-
batch_two_key = mock_process_delayed.call_args_list[1][0][1]
892+
batch_one_key = mock_process_delayed.call_args_list[0][1]["kwargs"]["batch_key"]
893+
batch_two_key = mock_process_delayed.call_args_list[1][1]["kwargs"]["batch_key"]
892894

893895
# Verify we removed the data from the buffer
894896
data = buffer.backend.get_hash(Workflow, {"project_id": self.project.id})

0 commit comments

Comments
 (0)