From 8c17188ac1ddd5b6a3115f1ba12de1f2222ec5d0 Mon Sep 17 00:00:00 2001 From: Jacek N Date: Fri, 12 Sep 2025 13:48:59 +0100 Subject: [PATCH] Add prometheus metrics endpoint to job monitor This PR adds new /prometheus endpoint to the job monitor. For convenience I also renamed a few status fields. One future improvement could be to make histogram buckets configurable at runtime but in the v1 I'd like it keep it simple if possible. --- .../Dockerfile.jobmonitor | 4 +- src/MissionParallelCatchup/job_monitor.py | 74 +++++++++++++++++-- 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/src/MissionParallelCatchup/Dockerfile.jobmonitor b/src/MissionParallelCatchup/Dockerfile.jobmonitor index 35fbdd49..415e1806 100644 --- a/src/MissionParallelCatchup/Dockerfile.jobmonitor +++ b/src/MissionParallelCatchup/Dockerfile.jobmonitor @@ -14,8 +14,8 @@ RUN apt-get update && \ COPY ./job_monitor.py /app RUN python3 -m pip install --upgrade pip && \ - python3 -m pip install --no-cache-dir redis requests + python3 -m pip install --no-cache-dir redis requests prometheus_client EXPOSE 8080 -CMD ["/usr/bin/python3", "job_monitor.py"] \ No newline at end of file +CMD ["/usr/bin/python3", "job_monitor.py"] diff --git a/src/MissionParallelCatchup/job_monitor.py b/src/MissionParallelCatchup/job_monitor.py index 9dceea9a..44da5953 100644 --- a/src/MissionParallelCatchup/job_monitor.py +++ b/src/MissionParallelCatchup/job_monitor.py @@ -7,9 +7,14 @@ import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer +from prometheus_client import Gauge, Histogram, generate_latest, REGISTRY, CONTENT_TYPE_LATEST from datetime import datetime, timezone # Configuration + +# Histogram buckets +# 5m 15m 30m 1h 1.5h 2h +metric_buckets = (300, 900, 1800, 3600, 5400, 7200, float("inf")) REDIS_HOST = os.getenv('REDIS_HOST', 'redis') REDIS_PORT = int(os.getenv('REDIS_PORT', '6379')) JOB_QUEUE = os.getenv('JOB_QUEUE', 'ranges') @@ -51,10 +56,16 @@ def get_logging_level(): # In-memory status data structure and threading lock status = { 'num_remain': 1, # initialize the job remaining to non-zero to indicate something is running, just the status hasn't been updated yet - 'num_succeeded': 0, + 'queue_remain_count': 0, + 'queue_succeeded_count': 0, + 'queue_failed_count': 0, + 'queue_in_progress_count': 0, 'jobs_failed': [], 'jobs_in_progress': [], - 'workers': [] + 'workers': [], + 'workers_up': 0, + 'workers_down': 0, + 'workers_refresh_duration': 0, } status_lock = threading.Lock() @@ -63,6 +74,16 @@ def get_logging_level(): } metrics_lock = threading.Lock() +# Create metrics +metric_catchup_queues = Gauge('ssc_parallel_catchup_queues', 'Exposes size of each job queues', ["queue"]) +metric_workers = Gauge('ssc_parallel_catchup_workers', 'Exposes catch up worker status', ["status"]) +metric_refresh_duration = Gauge('ssc_parallel_catchup_workers_refresh_duration_seconds', 'Time it took to refresh status of all workers') +metric_full_duration = Histogram('ssc_parallel_catchup_job_full_duration_seconds', 'Exposes full job duration as histogram', buckets=metric_buckets) +metric_tx_apply_duration = Histogram('ssc_parallel_catchup_job_tx_apply_duration_seconds', 'Exposes job TX apply duration as histogram', buckets=metric_buckets) +metric_mission_duration = Gauge('ssc_parallel_catchup_mission_duration_seconds', 'Number of seconds since the mission started ') +metric_retries = Gauge('ssc_parallel_catchup_job_retried_count', 'Number of jobs that were retried') + + class RequestHandler(BaseHTTPRequestHandler): def do_GET(self): if self.path == '/status': @@ -71,6 +92,11 @@ def do_GET(self): self.end_headers() with status_lock: self.wfile.write(json.dumps(status).encode()) + elif self.path == '/prometheus': + self.send_response(200) + self.send_header('Content-type', CONTENT_TYPE_LATEST) + self.end_headers() + self.wfile.write(generate_latest(REGISTRY)) elif self.path == '/metrics': self.send_response(200) self.send_header('Content-type', 'application/json') @@ -84,25 +110,35 @@ def do_GET(self): def retry_jobs_in_progress(): while redis_client.llen(PROGRESS_QUEUE) > 0: job = redis_client.lmove(PROGRESS_QUEUE, JOB_QUEUE, "RIGHT", "LEFT") + metric_retries.inc() logger.info("moved job %s from %s to %s", job, PROGRESS_QUEUE, JOB_QUEUE) def update_status_and_metrics(): global status + mission_start_time = time.time() while True: try: # Ping each worker status worker_statuses = [] all_workers_down = True + workers_up = 0 + workers_down = 0 + logger.info("Starting worker liveness check") + workers_refresh_start_time = time.time() for i in range(WORKER_COUNT): worker_name = f"{WORKER_PREFIX}-{i}.{WORKER_PREFIX}.{NAMESPACE}.svc.cluster.local" try: response = requests.get(f"http://{worker_name}:11626/info") logger.debug("Worker %s is running, status code %d, response: %s", worker_name, response.status_code, response.json()) worker_statuses.append({'worker_id': i, 'status': 'running', 'info': response.json()['info']['status']}) + workers_up += 1 all_workers_down = False except requests.exceptions.RequestException: logger.debug("Worker %s is down", worker_name) worker_statuses.append({'worker_id': i, 'status': 'down'}) + workers_down += 1 + workers_refresh_duration = time.time() - workers_refresh_start_time + logger.info("Finished workers liveness check") # Retry stuck jobs if all_workers_down and redis_client.llen(PROGRESS_QUEUE) > 0: logger.info("all workers are down but some jobs are stuck in progress") @@ -111,21 +147,40 @@ def update_status_and_metrics(): # Check the queue status # For remaining and successful jobs, we just print their count, do not care what they are and who owns it - num_remain = redis_client.llen(JOB_QUEUE) - num_succeeded = redis_client.llen(SUCCESS_QUEUE) + queue_remain_count = redis_client.llen(JOB_QUEUE) + queue_succeeded_count = redis_client.llen(SUCCESS_QUEUE) # For failed and in-progress jobs, we retrieve their full content jobs_failed = redis_client.lrange(FAILED_QUEUE, 0, -1) jobs_in_progress = redis_client.lrange(PROGRESS_QUEUE, 0, -1) + queue_failed_count = len(jobs_failed) + queue_in_progress_count = len(jobs_in_progress) + # Get run duration + mission_duration = time.time() - mission_start_time # update the status with status_lock: status = { - 'num_remain': num_remain, - 'num_succeeded': num_succeeded, + 'num_remain': queue_remain_count, # Needed for backwards compatibility with the reset of the code + 'queue_remain_count': queue_remain_count, + 'queue_succeeded_count': queue_succeeded_count, + 'queue_failed_count': queue_failed_count, + 'queue_in_progress_count': queue_in_progress_count, 'jobs_failed': jobs_failed, 'jobs_in_progress': jobs_in_progress, - 'workers': worker_statuses + 'workers': worker_statuses, + 'workers_up': workers_up, + 'workers_down': workers_down, + 'workers_refresh_duration': workers_refresh_duration, + 'mission_duration': mission_duration, } + metric_catchup_queues.labels(queue="remain").set(queue_remain_count) + metric_catchup_queues.labels(queue="succeeded").set(queue_succeeded_count) + metric_catchup_queues.labels(queue="failed").set(queue_failed_count) + metric_catchup_queues.labels(queue="in_progress").set(queue_in_progress_count) + metric_workers.labels(status="up").set(workers_up) + metric_workers.labels(status="down").set(workers_down) + metric_refresh_duration.set(workers_refresh_duration) + metric_mission_duration.set(mission_duration) logger.info("Status: %s", json.dumps(status)) # update the metrics @@ -133,6 +188,11 @@ def update_status_and_metrics(): if len(new_metrics) > 0: with metrics_lock: metrics['metrics'].extend(new_metrics) + for timing in new_metrics: + # Example: 36024000/8320|213|1.47073e+06ms|2965s + _, _, tx_apply, full_duration = timing.split('|') + metric_full_duration.observe(float(full_duration.rstrip('s'))) + metric_tx_apply_duration.observe(float(tx_apply.rstrip('ms'))/1000) logger.info("Metrics: %s", json.dumps(metrics)) except Exception as e: