Skip to content

Commit e6ca0ff

Browse files
dipannita08copybara-github
authored andcommitted
Add more context with structured logging for metric uploads to GCM Monitoring.
PiperOrigin-RevId: 805110455
1 parent 7c7cb95 commit e6ca0ff

File tree

3 files changed

+60
-10
lines changed

3 files changed

+60
-10
lines changed

ml_goodput_measurement/src/gcp_metrics.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ def create_time_series(
6161

6262
return series
6363

64-
def send_metrics(self, metrics: list[Dict[str, Any]]):
64+
def send_metrics(
65+
self, metrics: list[Dict[str, Any]], context: Dict[str, Any] | None = None
66+
):
6567
"""Sends multiple metrics to GCP Monitoring in a batch with dynamic resources.
6668
6769
Args:
@@ -75,12 +77,12 @@ def send_metrics(self, metrics: list[Dict[str, Any]]):
7577
- 'resource_type': str
7678
- 'resource_labels': dict
7779
"""
80+
time_series_list = []
7881
try:
7982
now = time.time()
8083
seconds = int(now)
8184
nanos = int((now - seconds) * 10**9)
8285

83-
time_series_list = []
8486
for metric in metrics:
8587
try:
8688
metric_labels = metric.get("metric_labels", {})
@@ -96,11 +98,29 @@ def send_metrics(self, metrics: list[Dict[str, Any]]):
9698
)
9799
time_series_list.append(series)
98100
except Exception as e: # pylint: disable=broad-exception-caught
99-
logger.error("Failed to create time series: %s", e)
101+
logger.error(
102+
"Failed to create time series for metric '%s': %s",
103+
metric.get("metric_type", "UNKNOWN"),
104+
e,
105+
)
106+
107+
if not time_series_list:
108+
logger.warning(
109+
"No valid time series were created, skipping GCM upload."
110+
)
111+
return
112+
100113
self.client.create_time_series(
101114
name=self.project_name, time_series=time_series_list
102115
)
103-
logger.info("Sent %d Goodput metrics to GCM Monitoring.", len(metrics))
116+
log_data = {
117+
"message": "Sent Goodput metrics to GCM Monitoring.",
118+
"metrics_count": len(time_series_list),
119+
}
120+
if context:
121+
log_data.update(context)
122+
123+
logger.info(log_data)
104124

105125
except GoogleAPIError as e:
106126
logger.error("Failed to send Goodput metrics to GCM Monitoring: %s", e)

ml_goodput_measurement/src/monitoring.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,13 @@ def _upload_goodput_metrics_to_gcm(
451451

452452
# Send metrics to Google Cloud Monitoring.
453453
if metrics_sender and gcm_metrics:
454-
metrics_sender.send_metrics(gcm_metrics)
454+
log_context = {
455+
'job_name': config.get('job_name', 'unknown-job'),
456+
'pid': os.getpid(),
457+
'worker': multiprocessing.current_process().name,
458+
'metrics_type': 'cumulative',
459+
}
460+
metrics_sender.send_metrics(gcm_metrics, context=log_context)
455461

456462
except Exception as e: # pylint: disable=broad-exception-caught
457463
logger.error(
@@ -502,7 +508,13 @@ def _send_step_deviation_metric_to_gcp(
502508
},
503509
}]
504510
if metrics_sender:
505-
metrics_sender.send_metrics(perf_metric)
511+
log_context = {
512+
'job_name': config.get('job_name', 'unknown-job'),
513+
'pid': os.getpid(),
514+
'worker': multiprocessing.current_process().name,
515+
'metrics_type': 'step-time-deviation',
516+
}
517+
metrics_sender.send_metrics(perf_metric, context=log_context)
506518
except Exception as e: # pylint: disable=broad-exception-caught
507519
logger.error('Error sending step deviation to GCM: %s', e)
508520

@@ -585,7 +597,14 @@ def _upload_interval_goodput_metrics_to_gcm(
585597
})
586598

587599
if metrics_sender and gcm_metrics:
588-
metrics_sender.send_metrics(gcm_metrics)
600+
log_context = {
601+
'job_name': config.get('job_name', 'unknown-job'),
602+
'pid': os.getpid(),
603+
'worker': multiprocessing.current_process().name,
604+
'metrics_type': 'rolling-window',
605+
'window_size': str(window_size),
606+
}
607+
metrics_sender.send_metrics(gcm_metrics, context=log_context)
589608

590609
except Exception as e: # pylint: disable=broad-exception-caught
591610
logger.error(

ml_goodput_measurement/tests/gcp_metrics_test.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ def test_create_time_series(self):
7272
time_series.points[0].value.distribution_value, value
7373
)
7474

75+
@patch(f"{gcp_metrics.__name__}.logger")
7576
@patch("time.time")
76-
def test_send_metrics(self, mock_time):
77+
def test_send_metrics(self, mock_time, mock_logger):
7778
# Set a fixed return value for the mocked time.time()
7879
mock_time.return_value = 1677347200.5
7980

@@ -94,8 +95,8 @@ def test_send_metrics(self, mock_time):
9495
"resource_labels": {"loc": "eu"},
9596
},
9697
]
97-
98-
self.metrics_sender.send_metrics(metrics_to_send)
98+
log_context = {"job_name": "test-job", "pid": 12345}
99+
self.metrics_sender.send_metrics(metrics_to_send, context=log_context)
99100

100101
# Verify that create_time_series was called with the correct arguments
101102
expected_name = f"projects/{self.project_id}"
@@ -126,6 +127,16 @@ def test_send_metrics(self, mock_time):
126127
self.assertEqual(actual.resource.labels, expected.resource.labels)
127128
self.assertEqual(actual.metric.labels, expected.metric.labels)
128129

130+
mock_logger.info.assert_called_once()
131+
expected_log_payload = {
132+
"message": "Sent Goodput metrics to GCM Monitoring.",
133+
"metrics_count": 2,
134+
"job_name": "test-job",
135+
"pid": 12345,
136+
}
137+
actual_log_payload = mock_logger.info.call_args.args[0]
138+
self.assertEqual(actual_log_payload, expected_log_payload)
139+
129140
@patch("cloud_goodput.ml_goodput_measurement.src.gcp_metrics.logger.error")
130141
def test_send_metrics_failure(self, mock_logging_error):
131142

0 commit comments

Comments
 (0)