diff --git a/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json b/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json index 58b15b9ece..42d08e64cd 100644 --- a/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json +++ b/deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json @@ -118,8 +118,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_matched_tokens{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_matched_tokens", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -227,8 +227,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_offload_requests{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_offload_requests", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -323,8 +323,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_offload_blocks_d2h{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_offload_blocks_d2h", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -336,102 +336,6 @@ "title": "Offload Blocks - Device to Host", "type": "timeseries" }, - { - "datasource": { - "type": "prometheus", - "uid": "P1809F7CD0C75ACF3" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green" - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 18 - }, - "id": 1, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "12.0.1", - "targets": [ - { - "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_save_kv_layer_requests{dynamo_namespace=\"kvbm_connector_worker\"}", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "__auto", - "range": true, - "refId": "A", - "useBackend": false - } - ], - "title": "Save KV Layer Requests", - "type": "timeseries" - }, { "collapsed": false, "gridPos": { @@ -528,8 +432,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_onboard_requests{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_onboard_requests", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -624,8 +528,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_onboard_blocks_h2d{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_onboard_blocks_h2d", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -720,8 +624,8 @@ "targets": [ { "disableTextWrap": false, - "editorMode": "builder", - "expr": "dynamo_component_onboard_blocks_d2d{dynamo_namespace=\"kvbm_connector_leader\"}", + "editorMode": "code", + "expr": "kvbm_onboard_blocks_d2d", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "__auto", @@ -750,4 +654,4 @@ "title": "KVBM Dashboard", "uid": "3f679257-70a5-402c-92b4-05382337b548", "version": 7 -} \ No newline at end of file +} diff --git a/deploy/metrics/prometheus.yml b/deploy/metrics/prometheus.yml index 52b2c9a44b..d4bdfb20bd 100644 --- a/deploy/metrics/prometheus.yml +++ b/deploy/metrics/prometheus.yml @@ -59,16 +59,10 @@ scrape_configs: - targets: ['host.docker.internal:9091'] # metrics aggregation service on host # KVBM leader related metrics - - job_name: 'kvbm-leader-metrics' + - job_name: 'kvbm-metrics' scrape_interval: 2s static_configs: - - targets: ['host.docker.internal:6882'] - - # KVBM worker related metrics - - job_name: 'kvbm-worker-metrics' - scrape_interval: 2s - static_configs: - - targets: ['host.docker.internal:6881'] + - targets: ['host.docker.internal:6880'] # Uncomment to see its own Prometheus metrics # - job_name: 'prometheus' diff --git a/docs/guides/run_kvbm_in_trtllm.md b/docs/guides/run_kvbm_in_trtllm.md index b60a27979d..30d453b41f 100644 --- a/docs/guides/run_kvbm_in_trtllm.md +++ b/docs/guides/run_kvbm_in_trtllm.md @@ -109,18 +109,51 @@ Follow below steps to enable metrics collection and view via Grafana dashboard: # Start the basic services (etcd & natsd), along with Prometheus and Grafana docker compose -f deploy/docker-compose.yml --profile metrics up -d -# set env var DYN_SYSTEM_ENABLED to true, DYN_SYSTEM_PORT to 6880, DYN_KVBM_SLEEP to 5, when launch via dynamo -# NOTE: Make sure port 6881 (for KVBM worker metrics) and port 6882 (for KVBM leader metrics) are available. -# NOTE: DYN_KVBM_SLEEP is needed to avoid metrics port conflict between KVBM leader and worker -DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=6880 DYN_KVBM_SLEEP=5 \ +# set env var DYN_KVBM_METRICS to true, when launch via dynamo +# Optionally set DYN_KVBM_METRICS_PORT to choose the /metrics port (default: 6880). +DYN_KVBM_METRICS=true \ python3 -m dynamo.trtllm \ --model-path deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --served-model-name deepseek-ai/DeepSeek-R1-Distill-Llama-8B \ --extra-engine-args /tmp/kvbm_llm_api_config.yaml & # optional if firewall blocks KVBM metrics ports to send prometheus metrics -sudo ufw allow 6881/tcp -sudo ufw allow 6882/tcp +sudo ufw allow 6880/tcp ``` View grafana metrics via http://localhost:3001 (default login: dynamo/dynamo) and look for KVBM Dashboard + +## Benchmark KVBM + +Once the model is loaded ready, follow below steps to use LMBenchmark to benchmark KVBM performance: +```bash +git clone https://github.com/LMCache/LMBenchmark.git + +# show case of running the synthetic multi-turn chat dataset. +# we are passing model, endpoint, output file prefix and qps to the sh script. +cd LMBenchmark/synthetic-multi-round-qa +./long_input_short_output_run.sh \ + "deepseek-ai/DeepSeek-R1-Distill-Llama-8B" \ + "http://localhost:8000" \ + "benchmark_kvbm" \ + 1 + +# Average TTFT and other perf numbers would be in the output from above cmd +``` +More details about how to use LMBenchmark could be found [here](https://github.com/LMCache/LMBenchmark). + +`NOTE`: if metrics are enabled as mentioned in the above section, you can observe KV offloading, and KV onboarding in the grafana dashboard. + +To compare, you can remove the `kv_connector_config` section from the LLM API config and run `trtllm-serve` with the updated config as the baseline. +```bash +cat > "/tmp/llm_api_config.yaml" < bool { + std::env::var("DYN_KVBM_METRICS") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false) +} + +pub fn parse_kvbm_metrics_port() -> u16 { + match std::env::var("DYN_KVBM_METRICS_PORT") { + Ok(val) => match val.trim().parse::() { + Ok(port) => port, + Err(_) => { + tracing::warn!( + "[kvbm] Invalid DYN_KVBM_METRICS_PORT='{}', falling back to 6880", + val + ); + 6880 + } + }, + Err(_) => { + tracing::warn!( + "DYN_KVBM_METRICS_PORT not present or couldn’t be interpreted, falling back to 6880" + ); + 6880 + } + } +} diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs index a7647df35c..9c267a2f95 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/recorder.rs @@ -100,11 +100,11 @@ impl KvConnectorLeaderRecorder { let drt = drt.inner().clone(); let handle: Handle = drt.runtime().primary(); - let ns = drt - .namespace(kvbm_connector::KVBM_CONNECTOR_LEADER) - .unwrap(); - - let kvbm_metrics = KvbmMetrics::new(&ns); + let kvbm_metrics = KvbmMetrics::new( + &KvbmMetricsRegistry::default(), + kvbm_metrics_endpoint_enabled(), + parse_kvbm_metrics_port(), + ); let kvbm_metrics_clone = kvbm_metrics.clone(); let token = CancellationToken::new(); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs index bb746625b9..7f3b51d6dd 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs @@ -8,10 +8,12 @@ use crate::llm::block_manager::BlockManagerBuilder; use crate::llm::block_manager::vllm::connector::leader::slot::{ ConnectorSlotManager, SlotManager, SlotState, }; +use crate::llm::block_manager::vllm::connector::leader::{ + kvbm_metrics_endpoint_enabled, parse_kvbm_metrics_port, +}; use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; use anyhow; -use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector; +use dynamo_llm::block_manager::metrics_kvbm::{KvbmMetrics, KvbmMetricsRegistry}; use std::collections::HashSet; use std::sync::{Arc, OnceLock}; use tokio::runtime::Handle; @@ -76,11 +78,12 @@ impl KvConnectorLeader { let drt = drt.inner().clone(); let handle: Handle = drt.runtime().primary(); - let ns = drt - .namespace(kvbm_connector::KVBM_CONNECTOR_LEADER) - .unwrap(); + let kvbm_metrics = KvbmMetrics::new( + &KvbmMetricsRegistry::default(), + kvbm_metrics_endpoint_enabled(), + parse_kvbm_metrics_port(), + ); - let kvbm_metrics = KvbmMetrics::new(&ns); let kvbm_metrics_clone = kvbm_metrics.clone(); let slot_manager_cell = Arc::new(OnceLock::new()); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs index 6f8f55e596..5b017db93a 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs @@ -20,10 +20,8 @@ use crate::{ use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::layout::LayoutType; -use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use dynamo_llm::block_manager::storage::torch::TorchTensor; use dynamo_runtime::DistributedRuntime; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; pub trait Worker: Send + Sync { @@ -71,8 +69,6 @@ pub struct KvConnectorWorker { /// cuda events created by the python side layer_events: Vec, - - kvbm_metrics: KvbmMetrics, } impl KvConnectorWorker { @@ -98,11 +94,6 @@ impl KvConnectorWorker { trtllm_rank ); - let kvbm_metrics = KvbmMetrics::new( - &drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER) - .unwrap(), - ); - Ok(Self { drt, kvbm_worker: OnceLock::new(), @@ -116,7 +107,6 @@ impl KvConnectorWorker { iteration: 0, layers_complete: 0, layer_events: Vec::new(), - kvbm_metrics, }) } } @@ -236,7 +226,6 @@ impl Worker for KvConnectorWorker { self.connector.enqueue_request(operation); } } - self.kvbm_metrics.save_kv_layer_requests.inc(); Ok(()) } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index f4c2bbad78..3c498a3d8e 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -5,7 +5,6 @@ use dynamo_llm::block_manager::connector::protocol::TransferType; use dynamo_llm::block_manager::connector::scheduler::{ Scheduler, TransferSchedulerClient, WorkerSchedulerClient, }; -use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use std::collections::HashSet; use std::sync::{Arc, OnceLock}; @@ -16,7 +15,6 @@ use crate::{ DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor, to_pyerr, }; -use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use crate::llm::block_manager::distributed::PyLayoutType; use anyhow; @@ -75,8 +73,6 @@ pub struct KvConnectorWorker { /// cuda events created by the python side layer_events: Vec, - - kvbm_metrics: KvbmMetrics, } impl KvConnectorWorker { @@ -97,11 +93,6 @@ impl KvConnectorWorker { )? .detach(); - let kvbm_metrics = KvbmMetrics::new( - &drt.namespace(kvbm_connector::KVBM_CONNECTOR_WORKER) - .unwrap(), - ); - tracing::info!( "KvConnectorWorker initialized with worker_id: {}", vllm_worker_id @@ -120,7 +111,6 @@ impl KvConnectorWorker { layers_complete: 0, kv_cache_layers: Vec::new(), layer_events: Vec::new(), - kvbm_metrics, }) } } @@ -324,7 +314,6 @@ impl Worker for KvConnectorWorker { self.connector.enqueue_request(operation); } } - self.kvbm_metrics.save_kv_layer_requests.inc(); Ok(()) } diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py b/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py index b3f775f97c..af1db0107a 100644 --- a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py +++ b/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_leader.py @@ -17,17 +17,12 @@ KvConnectorLeader as RustKvConnectorLeader, ) from dynamo.llm.trtllm_integration.rust import SchedulerOutput as RustSchedulerOutput -from dynamo.llm.utils import find_and_set_available_port_from_env, maybe_sleep from dynamo.runtime import DistributedRuntime class DynamoKVBMConnectorLeader(KvCacheConnectorScheduler): def __init__(self, llm_args: TorchLlmArgs): super().__init__(llm_args) - # NOTE: this is needed in TRTLLM to avoid metrics port conflict with KVBM worker, - # since there is no startup order in TRTLLM and race condition is possible. - maybe_sleep() - find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() mappings = self._llm_args.parallel_config.to_mapping() diff --git a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py b/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py index c62bdb84d9..6b0f32b9e2 100644 --- a/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py +++ b/lib/bindings/python/src/dynamo/llm/trtllm_integration/connector/kvbm_connector_worker.py @@ -9,7 +9,6 @@ from dynamo.llm.trtllm_integration.rust import ( KvConnectorWorker as RustKvConnectorWorker, ) -from dynamo.llm.utils import find_and_set_available_port_from_env from dynamo.runtime import DistributedRuntime @@ -17,7 +16,6 @@ class DynamoKVBMConnectorWorker(KvCacheConnectorWorker): def __init__(self, llm_args: TorchLlmArgs): super().__init__(llm_args) - find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() mappings = self._llm_args.parallel_config.to_mapping() diff --git a/lib/bindings/python/src/dynamo/llm/utils.py b/lib/bindings/python/src/dynamo/llm/utils.py deleted file mode 100644 index 4b22866016..0000000000 --- a/lib/bindings/python/src/dynamo/llm/utils.py +++ /dev/null @@ -1,42 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import os -import socket -import time - - -def maybe_sleep(): - """ - Maybe sleep for the duration specified in the environment variable if it is set. - """ - sleep_duration = int(os.environ.get("DYN_KVBM_SLEEP", "0")) - if sleep_duration > 0: - print(f"Sleeping {sleep_duration} seconds to avoid metrics port conflict") - time.sleep(sleep_duration) - - -# TODO(keiven|ziqi): Auto port selection to be done in Rust -def find_and_set_available_port_from_env(env_var="DYN_SYSTEM_PORT"): - """ - Find an available port from the environment variable. - """ - port = int(os.environ.get(env_var, "0")) - if port == 0: - # No port specified, let system pick - pass - while True: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - # Port is available - s.bind(("127.0.0.1", port)) - s.close() - os.environ[env_var] = str(port) - print(f"Port {port} is available, setting env var {env_var} to {port}") - break - except OSError: - # Port is in use, try next - port += 1 - s.close() - except Exception as e: - raise RuntimeError(f"Error finding available port: {e}") diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py index 9f9556efef..8c69909c48 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_leader.py @@ -29,7 +29,6 @@ # from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput from dynamo.llm import KvbmLeader -from dynamo.llm.utils import find_and_set_available_port_from_env from dynamo.llm.vllm_integration.rust import KvbmRequest from dynamo.llm.vllm_integration.rust import KvConnectorLeader as RustKvConnectorLeader from dynamo.llm.vllm_integration.rust import SchedulerOutput as RustSchedulerOutput @@ -54,7 +53,6 @@ class KvConnectorLeader: def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): drt = kwargs.get("drt", None) if drt is None: - find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() else: self.drt = drt diff --git a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py index 4a4c682884..13560ecbcb 100644 --- a/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py +++ b/lib/bindings/python/src/dynamo/llm/vllm_integration/connector_worker.py @@ -28,7 +28,6 @@ # KvConnectorWorker as RustKvConnectorWorker, # ) -from dynamo.llm.utils import find_and_set_available_port_from_env from dynamo.llm.vllm_integration.rust import KvConnectorWorker as RustKvConnectorWorker from dynamo.runtime import DistributedRuntime @@ -43,8 +42,6 @@ class KvConnectorWorker: def __init__(self, vllm_config: "VllmConfig", engine_id: str, **kwargs): drt = kwargs.get("drt", None) if drt is None: - # this is needed to avoid metrics port conflict with KVBM leader side DRT if metrics is enabled - find_and_set_available_port_from_env("DYN_SYSTEM_PORT") self.drt = DistributedRuntime.detached() else: self.drt = drt diff --git a/lib/llm/src/block_manager/metrics_kvbm.rs b/lib/llm/src/block_manager/metrics_kvbm.rs index 07fa9f1be0..4c435faf5a 100644 --- a/lib/llm/src/block_manager/metrics_kvbm.rs +++ b/lib/llm/src/block_manager/metrics_kvbm.rs @@ -1,8 +1,19 @@ // SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use dynamo_runtime::metrics::MetricsRegistry; -use prometheus::IntCounter; +use axum::Router; +use dynamo_runtime::metrics::prometheus_names::{ + kvbm::{ + MATCHED_TOKENS, OFFLOAD_BLOCKS_D2H, OFFLOAD_REQUESTS, ONBOARD_BLOCKS_D2D, + ONBOARD_BLOCKS_H2D, ONBOARD_REQUESTS, + }, + sanitize_prometheus_name, +}; +use prometheus::{IntCounter, Opts, Registry}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread}; +use tokio::{net::TcpListener, sync::Notify}; + +use crate::http::service::{RouteDoc, metrics::router}; #[derive(Clone, Debug)] pub struct KvbmMetrics { @@ -21,60 +32,167 @@ pub struct KvbmMetrics { // number of blocks onboarded from disk to device pub onboard_blocks_d2d: IntCounter, - // number of save kv layer requests - pub save_kv_layer_requests: IntCounter, - // number of matched tokens from KVBM pub matched_tokens: IntCounter, + + shutdown_notify: Option>, } impl KvbmMetrics { - pub fn new(mr: &dyn MetricsRegistry) -> Self { + /// Create raw metrics and (once per process) spawn an axum server exposing `/metrics` at metrics_port. + /// Non-blocking: the HTTP server runs on a background task. + pub fn new(mr: &KvbmMetricsRegistry, create_endpoint: bool, metrics_port: u16) -> Self { + // 1) register kvbm metrics let offload_requests = mr - .create_intcounter("offload_requests", "The number of offload requests", &[]) + .create_intcounter(OFFLOAD_REQUESTS, "The number of offload requests", &[]) .unwrap(); let offload_blocks_d2h = mr .create_intcounter( - "offload_blocks_d2h", + OFFLOAD_BLOCKS_D2H, "The number of offload blocks from device to host", &[], ) .unwrap(); let onboard_requests = mr - .create_intcounter("onboard_requests", "The number of onboard requests", &[]) + .create_intcounter(ONBOARD_REQUESTS, "The number of onboard requests", &[]) .unwrap(); let onboard_blocks_h2d = mr .create_intcounter( - "onboard_blocks_h2d", + ONBOARD_BLOCKS_H2D, "The number of onboard blocks from host to device", &[], ) .unwrap(); let onboard_blocks_d2d = mr .create_intcounter( - "onboard_blocks_d2d", + ONBOARD_BLOCKS_D2D, "The number of onboard blocks from disk to device", &[], ) .unwrap(); - let save_kv_layer_requests = mr - .create_intcounter( - "save_kv_layer_requests", - "The number of save kv layer requests", - &[], - ) - .unwrap(); let matched_tokens = mr - .create_intcounter("matched_tokens", "The number of matched tokens", &[]) + .create_intcounter(MATCHED_TOKENS, "The number of matched tokens", &[]) .unwrap(); + + // early return if no endpoint is needed + if !create_endpoint { + return Self { + offload_requests, + offload_blocks_d2h, + onboard_requests, + onboard_blocks_h2d, + onboard_blocks_d2d, + matched_tokens, + shutdown_notify: None, + }; + } + + // 2) start HTTP server in background with graceful shutdown via Notify + let registry = mr.inner(); // Arc + let notify = Arc::new(Notify::new()); + let notify_for_task = notify.clone(); + + let addr = SocketAddr::from(([0, 0, 0, 0], metrics_port)); + let (_route_docs, app): (Vec, Router) = router( + (*registry).clone(), // take owned Registry (Clone) for router to wrap in Arc + None, // or Some("/metrics".to_string()) to override the path + ); + + let run_server = async move { + let listener = match TcpListener::bind(addr).await { + Ok(listener) => listener, + Err(err) => { + panic!("failed to bind metrics server to {addr}: {err}"); + } + }; + + if let Err(err) = axum::serve(listener, app) + .with_graceful_shutdown(async move { + // wait for shutdown signal + notify_for_task.notified().await; + }) + .await + { + tracing::error!("[kvbm] metrics server error: {err}"); + } + }; + + // Spawn on existing runtime if present, otherwise start our own. + if tokio::runtime::Handle::try_current().is_ok() { + tokio::spawn(run_server); + } else { + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("build tokio runtime"); + rt.block_on(run_server); + }); + } + Self { offload_requests, offload_blocks_d2h, onboard_requests, onboard_blocks_h2d, onboard_blocks_d2d, - save_kv_layer_requests, matched_tokens, + shutdown_notify: Some(notify), + } + } +} + +impl Drop for KvbmMetrics { + fn drop(&mut self) { + if let Some(n) = &self.shutdown_notify { + // (all KvbmMetrics clones) + 1 (held by server task) + // strong_count == 2 means this is the last metrics instance + if Arc::strong_count(n) == 2 { + n.notify_waiters(); + } } } } + +/// A raw, standalone Prometheus metrics registry implementation using the fixed prefix: `kvbm_` +#[derive(Debug, Clone)] +pub struct KvbmMetricsRegistry { + registry: Arc, + prefix: String, +} + +impl KvbmMetricsRegistry { + pub fn new() -> Self { + Self { + registry: Arc::new(Registry::new()), + prefix: "kvbm".to_string(), + } + } + + pub fn create_intcounter( + &self, + name: &str, + description: &str, + labels: &[(&str, &str)], + ) -> anyhow::Result { + let metrics_name = sanitize_prometheus_name(&format!("{}_{}", self.prefix, name))?; + let const_labels: HashMap = labels + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + let opts = Opts::new(metrics_name, description).const_labels(const_labels); + let c = IntCounter::with_opts(opts)?; + self.registry.register(Box::new(c.clone()))?; + Ok(c) + } + + pub fn inner(&self) -> Arc { + Arc::clone(&self.registry) + } +} + +impl Default for KvbmMetricsRegistry { + fn default() -> Self { + Self::new() + } +} diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 438cd761f8..de4a786916 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -318,13 +318,25 @@ pub mod distributed_runtime { pub const UPTIME_SECONDS: &str = "uptime_seconds"; } -/// KVBM connector -pub mod kvbm_connector { - /// KVBM connector leader - pub const KVBM_CONNECTOR_LEADER: &str = "kvbm_connector_leader"; +/// KVBM +pub mod kvbm { + /// The number of offload requests + pub const OFFLOAD_REQUESTS: &str = "offload_requests"; - /// KVBM connector worker - pub const KVBM_CONNECTOR_WORKER: &str = "kvbm_connector_worker"; + /// The number of offload blocks from device to host + pub const OFFLOAD_BLOCKS_D2H: &str = "offload_blocks_d2h"; + + /// The number of onboard requests + pub const ONBOARD_REQUESTS: &str = "onboard_requests"; + + /// The number of onboard blocks from host to device + pub const ONBOARD_BLOCKS_H2D: &str = "onboard_blocks_h2d"; + + /// The number of onboard blocks from disk to device + pub const ONBOARD_BLOCKS_D2D: &str = "onboard_blocks_d2d"; + + /// The number of matched tokens + pub const MATCHED_TOKENS: &str = "matched_tokens"; } /// KvStats metrics from LLM workers