Skip to content

Commit dd10420

Browse files
Tweak statsd and arroyo metrics config (#297)
* Tweak statsd and arroyo metrics config * add extra tag * from the env var * clean up the extra namespacing * unit tests
1 parent 828bc39 commit dd10420

File tree

7 files changed

+51
-40
lines changed

7 files changed

+51
-40
lines changed

src/launchpad/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ def create_kafka_consumer(
4747
os.environ["KAFKA_HEALTHCHECK_FILE"] = healthcheck_path
4848
logger.info(f"Using healthcheck file: {healthcheck_path}")
4949

50-
configure_metrics(DatadogMetricsBackend())
5150
config = get_kafka_config()
51+
configure_metrics(DatadogMetricsBackend(config.group_id))
5252

5353
environment = os.getenv("LAUNCHPAD_ENV")
5454
if not environment:

src/launchpad/service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,14 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
136136
logger.info(f"Processing artifact: {artifact_id} (project: {project_id}, org: {organization_id})")
137137

138138
if self._statsd:
139-
self._statsd.increment("launchpad.artifact.processing.started")
139+
self._statsd.increment("artifact.processing.started")
140140

141141
self.process_artifact(artifact_id, project_id, organization_id)
142142

143143
logger.info(f"Analysis completed for artifact {artifact_id}")
144144

145145
if self._statsd:
146-
self._statsd.increment("launchpad.artifact.processing.completed")
146+
self._statsd.increment("artifact.processing.completed")
147147

148148
except Exception as e:
149149
# Log the full error for debugging
@@ -153,7 +153,7 @@ def handle_kafka_message(self, payload: PreprodArtifactEvents) -> None:
153153
)
154154

155155
if self._statsd:
156-
self._statsd.increment("launchpad.artifact.processing.failed")
156+
self._statsd.increment("artifact.processing.failed")
157157

158158
def process_artifact(self, artifact_id: str, project_id: str, organization_id: str) -> None:
159159
"""
@@ -346,7 +346,7 @@ def _update_artifact_error(
346346
# Log error to datadog with tags for better monitoring
347347
if self._statsd:
348348
self._statsd.increment(
349-
"launchpad.artifact.processing.error",
349+
"artifact.processing.error",
350350
tags=[
351351
f"error_code:{error_code.value}",
352352
f"error_type:{error_message.name}",

src/launchpad/utils/arroyo_metrics.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from typing import Optional, Union
66

77
from arroyo.utils.metrics import MetricName, Metrics, Tags
8-
from datadog.dogstatsd.base import DogStatsd
98

109
from launchpad.utils.statsd import get_statsd
1110

@@ -17,8 +16,9 @@ class DatadogMetricsBackend(Metrics):
1716
This bridges Arroyo's metrics interface with DataDog StatsD.
1817
"""
1918

20-
def __init__(self, statsd: Optional[DogStatsd] = None) -> None:
21-
self._statsd = statsd or get_statsd()
19+
def __init__(self, group_id: str) -> None:
20+
self._statsd = get_statsd("consumer")
21+
self._constant_tags = {"consumer_group": group_id}
2222

2323
def increment(
2424
self,
@@ -46,7 +46,11 @@ def timing(self, name: MetricName, value: Union[int, float], tags: Optional[Tags
4646
self._statsd.timing(name, timing_value, tags=self._format_tags(tags))
4747

4848
def _format_tags(self, tags: Optional[Tags]) -> Optional[list[str]]:
49-
"""Convert Arroyo tags format to DataDog tags format."""
50-
if not tags:
49+
"""Convert Arroyo tags format to DataDog tags format, merging with constant tags."""
50+
merged_tags = self._constant_tags.copy()
51+
if tags:
52+
merged_tags.update(tags)
53+
54+
if not merged_tags:
5155
return None
52-
return [f"{key}:{value}" for key, value in tags.items()]
56+
return [f"{key}:{value}" for key, value in merged_tags.items()]

src/launchpad/utils/statsd.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import os
22

3+
from typing import Literal
4+
35
from datadog.dogstatsd.base import DogStatsd
46

57
# There are a few weird issues with DataDog documented in other Sentry repos.
@@ -12,29 +14,34 @@
1214
# - not using the global initialize() and statsd instances.
1315

1416

15-
_statsd: DogStatsd | None = None
17+
_statsd_instances: dict[str, DogStatsd] = {}
18+
19+
20+
def get_statsd(environment: Literal["default", "consumer"] = "default") -> DogStatsd:
21+
global _statsd_instances
1622

23+
if environment in _statsd_instances:
24+
return _statsd_instances[environment]
1725

18-
def get_statsd() -> DogStatsd:
19-
global _statsd
26+
disable_telemetry = True
27+
origin_detection_enabled = False
2028

21-
if s := _statsd:
22-
# Type checker does not seem to be able to work out _statsd
23-
# must be set here hence the :=.
24-
return s
25-
else:
26-
disable_telemetry = True
27-
origin_detection_enabled = False
29+
host = os.getenv("STATSD_HOST", "127.0.0.1")
30+
port_str = os.getenv("STATSD_PORT", "8125")
2831

29-
host = os.getenv("STATSD_HOST", "127.0.0.1")
30-
port_str = os.getenv("STATSD_PORT", "8125")
32+
try:
33+
port = int(port_str)
34+
except ValueError:
35+
raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}")
3136

32-
try:
33-
port = int(port_str)
34-
except ValueError:
35-
raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}")
37+
# Create namespace with environment
38+
namespace = "launchpad" if environment == "default" else "launchpad_consumer"
3639

37-
_statsd = DogStatsd(
38-
host=host, port=port, disable_telemetry=disable_telemetry, origin_detection_enabled=origin_detection_enabled
39-
)
40-
return _statsd
40+
_statsd_instances[environment] = DogStatsd(
41+
host=host,
42+
port=port,
43+
namespace=namespace,
44+
disable_telemetry=disable_telemetry,
45+
origin_detection_enabled=origin_detection_enabled,
46+
)
47+
return _statsd_instances[environment]

tests/integration/test_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ async def test_kafka_message_processing(self):
4848
mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123")
4949

5050
# Verify statsd metrics were sent
51-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.started")
52-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed")
51+
service._statsd.increment.assert_any_call("artifact.processing.started")
52+
service._statsd.increment.assert_any_call("artifact.processing.completed")
5353

5454
# Reset mocks for next test
5555
mock_process.reset_mock()

tests/integration/test_service.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ def test_handle_kafka_message_ios(self, mock_process):
7272
mock_process.assert_called_once_with("ios-test-123", "test-project-ios", "test-org-123")
7373

7474
# Verify metrics were recorded
75-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.started")
76-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed")
75+
service._statsd.increment.assert_any_call("artifact.processing.started")
76+
service._statsd.increment.assert_any_call("artifact.processing.completed")
7777

7878
@patch.object(LaunchpadService, "process_artifact")
7979
def test_handle_kafka_message_android(self, mock_process):
@@ -97,8 +97,8 @@ def test_handle_kafka_message_android(self, mock_process):
9797
mock_process.assert_called_once_with("android-test-456", "test-project-android", "test-org-456")
9898

9999
# Verify metrics were recorded
100-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.started")
101-
service._statsd.increment.assert_any_call("launchpad.artifact.processing.completed")
100+
service._statsd.increment.assert_any_call("artifact.processing.started")
101+
service._statsd.increment.assert_any_call("artifact.processing.completed")
102102

103103
@patch.object(LaunchpadService, "process_artifact")
104104
def test_handle_kafka_message_error(self, mock_process):
@@ -127,5 +127,5 @@ def test_handle_kafka_message_error(self, mock_process):
127127
# Verify the metrics were called correctly
128128
calls = service._statsd.increment.call_args_list
129129
assert len(calls) == 2
130-
assert calls[0][0][0] == "launchpad.artifact.processing.started"
131-
assert calls[1][0][0] == "launchpad.artifact.processing.failed"
130+
assert calls[0][0][0] == "artifact.processing.started"
131+
assert calls[1][0][0] == "artifact.processing.failed"

tests/unit/test_service_error_handling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def test_update_artifact_error_with_detailed_message(self, mock_sentry_client):
246246

247247
# Verify datadog logging
248248
service._statsd.increment.assert_called_once_with(
249-
"launchpad.artifact.processing.error",
249+
"artifact.processing.error",
250250
tags=[
251251
"error_code:3",
252252
"error_type:PREPROCESSING_FAILED",

0 commit comments

Comments
 (0)