Skip to content

Commit 54e2313

Browse files
tzulingkkeivenchang
authored andcommitted
feat: Add model label for vllm backend metrics (#2474)
Co-authored-by: Keiven Chang <[email protected]> Signed-off-by: Hannah Zhang <[email protected]>
1 parent 53ff9d5 commit 54e2313

File tree

11 files changed

+121
-32
lines changed

11 files changed

+121
-32
lines changed

components/backends/vllm/src/dynamo/vllm/main.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,14 @@ async def init_prefill(runtime: DistributedRuntime, config: Config):
150150
# (temp reason): we don't support re-routing prefill requests
151151
# (long-term reason): prefill engine should pull from a global queue so there is
152152
# only a few in-flight requests that can be quickly finished
153-
generate_endpoint.serve_endpoint(handler.generate, graceful_shutdown=True),
154-
clear_endpoint.serve_endpoint(handler.clear_kv_blocks),
153+
generate_endpoint.serve_endpoint(
154+
handler.generate,
155+
graceful_shutdown=True,
156+
metrics_labels=[("model", config.model)],
157+
),
158+
clear_endpoint.serve_endpoint(
159+
handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
160+
),
155161
)
156162
except Exception as e:
157163
logger.error(f"Failed to serve endpoints: {e}")
@@ -178,7 +184,11 @@ async def init(runtime: DistributedRuntime, config: Config):
178184
.client()
179185
)
180186

181-
factory = StatLoggerFactory(component, config.engine_args.data_parallel_rank or 0)
187+
factory = StatLoggerFactory(
188+
component,
189+
config.engine_args.data_parallel_rank or 0,
190+
metrics_labels=[("model", config.model)],
191+
)
182192
engine_client, vllm_config, default_sampling_params = setup_vllm_engine(
183193
config, factory
184194
)
@@ -239,8 +249,14 @@ async def init(runtime: DistributedRuntime, config: Config):
239249
await asyncio.gather(
240250
# for decode, we want to transfer the in-flight requests to other decode engines,
241251
# because waiting them to finish can take a long time for long OSLs
242-
generate_endpoint.serve_endpoint(handler.generate, graceful_shutdown=False),
243-
clear_endpoint.serve_endpoint(handler.clear_kv_blocks),
252+
generate_endpoint.serve_endpoint(
253+
handler.generate,
254+
graceful_shutdown=False,
255+
metrics_labels=[("model", config.model)],
256+
),
257+
clear_endpoint.serve_endpoint(
258+
handler.clear_kv_blocks, metrics_labels=[("model", config.model)]
259+
),
244260
)
245261
except Exception as e:
246262
logger.error(f"Failed to serve endpoints: {e}")

components/backends/vllm/src/dynamo/vllm/publisher.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from typing import Optional
4+
from typing import List, Optional, Tuple
55

66
from vllm.config import VllmConfig
77
from vllm.v1.metrics.loggers import StatLoggerBase
@@ -36,9 +36,16 @@ def log_engine_initialized(self):
3636
class DynamoStatLoggerPublisher(StatLoggerBase):
3737
"""Stat logger publisher. Wrapper for the WorkerMetricsPublisher to match the StatLoggerBase interface."""
3838

39-
def __init__(self, component: Component, dp_rank: int) -> None:
39+
def __init__(
40+
self,
41+
component: Component,
42+
dp_rank: int,
43+
metrics_labels: Optional[List[Tuple[str, str]]] = None,
44+
) -> None:
4045
self.inner = WorkerMetricsPublisher()
41-
self.inner.create_endpoint(component)
46+
# Use labels directly for the new create_endpoint signature
47+
metrics_labels = metrics_labels or []
48+
self.inner.create_endpoint(component, metrics_labels)
4249
self.dp_rank = dp_rank
4350
self.num_gpu_block = 1
4451
self.request_total_slots = 1
@@ -129,15 +136,23 @@ def log_engine_initialized(self) -> None:
129136
class StatLoggerFactory:
130137
"""Factory for creating stat logger publishers. Required by vLLM."""
131138

132-
def __init__(self, component: Component, dp_rank: int = 0) -> None:
139+
def __init__(
140+
self,
141+
component: Component,
142+
dp_rank: int = 0,
143+
metrics_labels: Optional[List[Tuple[str, str]]] = None,
144+
) -> None:
133145
self.component = component
134146
self.created_logger: Optional[DynamoStatLoggerPublisher] = None
135147
self.dp_rank = dp_rank
148+
self.metrics_labels = metrics_labels or []
136149

137150
def create_stat_logger(self, dp_rank: int) -> StatLoggerBase:
138151
if self.dp_rank != dp_rank:
139152
return NullStatLogger()
140-
logger = DynamoStatLoggerPublisher(self.component, dp_rank)
153+
logger = DynamoStatLoggerPublisher(
154+
self.component, dp_rank, metrics_labels=self.metrics_labels
155+
)
141156
self.created_logger = logger
142157

143158
return logger

examples/multimodal/components/worker.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ def setup_vllm_engine(self, component: Component, endpoint: Endpoint):
140140

141141
# Create vLLM engine with metrics logger and KV event publisher attached
142142
self.stats_logger = StatLoggerFactory(
143-
component, self.engine_args.data_parallel_rank or 0
143+
component,
144+
self.engine_args.data_parallel_rank or 0,
145+
metrics_labels=[("model", self.engine_args.model)],
144146
)
145147
self.engine_client = AsyncLLM.from_vllm_config(
146148
vllm_config=vllm_config,
@@ -353,7 +355,9 @@ async def generate(self, request: vLLMMultimodalRequest):
353355
extra_args.pop("serialized_request", None)
354356
decode_request.sampling_params.extra_args = extra_args
355357
logger.debug("Decode request: %s", decode_request)
356-
async for decode_response in await self.decode_worker_client.round_robin(
358+
async for (
359+
decode_response
360+
) in await self.decode_worker_client.round_robin(
357361
decode_request.model_dump_json()
358362
):
359363
output = MyRequestOutput.model_validate_json(decode_response.data())

lib/bindings/python/rust/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,19 +513,24 @@ impl Component {
513513

514514
#[pymethods]
515515
impl Endpoint {
516-
#[pyo3(signature = (generator, graceful_shutdown = true))]
516+
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None))]
517517
fn serve_endpoint<'p>(
518518
&self,
519519
py: Python<'p>,
520520
generator: PyObject,
521521
graceful_shutdown: Option<bool>,
522+
metrics_labels: Option<Vec<(String, String)>>,
522523
) -> PyResult<Bound<'p, PyAny>> {
523524
let engine = Arc::new(engine::PythonAsyncEngine::new(
524525
generator,
525526
self.event_loop.clone(),
526527
)?);
527528
let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?;
528-
let builder = self.inner.endpoint_builder().handler(ingress);
529+
let builder = self
530+
.inner
531+
.endpoint_builder()
532+
.metrics_labels(metrics_labels)
533+
.handler(ingress);
529534
let graceful_shutdown = graceful_shutdown.unwrap_or(true);
530535
pyo3_async_runtimes::tokio::future_into_py(py, async move {
531536
builder

lib/bindings/python/rust/llm/kv.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,35 @@ impl WorkerMetricsPublisher {
5555
})
5656
}
5757

58-
#[pyo3(signature = (component))]
58+
#[pyo3(signature = (component, metrics_labels = None))]
5959
fn create_endpoint<'p>(
6060
&self,
6161
py: Python<'p>,
6262
component: Component,
63+
metrics_labels: Option<Vec<(String, String)>>,
6364
) -> PyResult<Bound<'p, PyAny>> {
6465
let rs_publisher = self.inner.clone();
6566
let rs_component = component.inner.clone();
6667
pyo3_async_runtimes::tokio::future_into_py(py, async move {
68+
// Convert Python labels to Option<&[(&str, &str)]> expected by Rust API
69+
let metrics_labels_ref: Option<Vec<(&str, &str)>> =
70+
if let Some(metrics_labels) = metrics_labels.as_ref() {
71+
if metrics_labels.is_empty() {
72+
None
73+
} else {
74+
Some(
75+
metrics_labels
76+
.iter()
77+
.map(|(k, v)| (k.as_str(), v.as_str()))
78+
.collect(),
79+
)
80+
}
81+
} else {
82+
None
83+
};
84+
6785
rs_publisher
68-
.create_endpoint(rs_component)
86+
.create_endpoint(rs_component, metrics_labels_ref.as_deref())
6987
.await
7088
.map_err(to_pyerr)?;
7189
Ok(())

lib/llm/src/kv_router/publisher.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,11 @@ impl WorkerMetricsPublisher {
498498
self.tx.send(metrics)
499499
}
500500

501-
pub async fn create_endpoint(&self, component: Component) -> Result<()> {
501+
pub async fn create_endpoint(
502+
&self,
503+
component: Component,
504+
metrics_labels: Option<&[(&str, &str)]>,
505+
) -> Result<()> {
502506
let mut metrics_rx = self.rx.clone();
503507
let handler = Arc::new(KvLoadEndpointHandler::new(metrics_rx.clone()));
504508
let handler = Ingress::for_engine(handler)?;
@@ -514,13 +518,20 @@ impl WorkerMetricsPublisher {
514518

515519
self.start_nats_metrics_publishing(component.namespace().clone(), worker_id);
516520

521+
let metrics_labels = metrics_labels.map(|v| {
522+
v.iter()
523+
.map(|(k, v)| (k.to_string(), v.to_string()))
524+
.collect::<Vec<_>>()
525+
});
526+
517527
component
518528
.endpoint(KV_METRICS_ENDPOINT)
519529
.endpoint_builder()
520530
.stats_handler(move |_| {
521531
let metrics = metrics_rx.borrow_and_update().clone();
522532
serde_json::to_value(&*metrics).unwrap()
523533
})
534+
.metrics_labels(metrics_labels)
524535
.handler(handler)
525536
.start()
526537
.await

lib/llm/src/mocker/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl MockVllmEngine {
189189
tokio::spawn({
190190
let publisher = metrics_publisher.clone();
191191
async move {
192-
if let Err(e) = publisher.create_endpoint(comp.clone()).await {
192+
if let Err(e) = publisher.create_endpoint(comp.clone(), None).await {
193193
tracing::error!("Metrics endpoint failed: {e}");
194194
}
195195
}

lib/runtime/examples/system_metrics/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::sync::Arc;
1616
pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
1717
pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
1818
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
19-
pub const DEFAULT_MODEL_NAME: &str = "dyn_example_model";
2019

2120
/// Stats structure returned by the endpoint's stats handler
2221
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]

lib/runtime/src/component/endpoint.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ pub struct EndpointConfig {
4141
#[builder(default, private)]
4242
_stats_handler: Option<EndpointStatsHandler>,
4343

44+
/// Additional labels for metrics
45+
#[builder(default, setter(into))]
46+
metrics_labels: Option<Vec<(String, String)>>,
47+
4448
/// Whether to wait for inflight requests to complete during shutdown
4549
#[builder(default = "true")]
4650
graceful_shutdown: bool,
@@ -59,7 +63,7 @@ impl EndpointConfigBuilder {
5963
}
6064

6165
pub async fn start(self) -> Result<()> {
62-
let (endpoint, lease, handler, stats_handler, graceful_shutdown) =
66+
let (endpoint, lease, handler, stats_handler, metrics_labels, graceful_shutdown) =
6367
self.build_internal()?.dissolve();
6468
let lease = lease.or(endpoint.drt().primary_lease());
6569
let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0);
@@ -74,8 +78,11 @@ impl EndpointConfigBuilder {
7478
// acquire the registry lock
7579
let registry = endpoint.drt().component_registry.inner.lock().await;
7680

81+
let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
82+
.as_ref()
83+
.map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
7784
// Add metrics to the handler. The endpoint provides additional information to the handler.
78-
handler.add_metrics(&endpoint)?;
85+
handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
7986

8087
// get the group
8188
let group = registry

lib/runtime/src/pipeline/network.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,12 @@ impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
300300
.map_err(|_| anyhow::anyhow!("Segment already set"))
301301
}
302302

303-
pub fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()> {
304-
let metrics = WorkHandlerMetrics::from_endpoint(endpoint)
303+
pub fn add_metrics(
304+
&self,
305+
endpoint: &crate::component::Endpoint,
306+
metrics_labels: Option<&[(&str, &str)]>,
307+
) -> Result<()> {
308+
let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
305309
.map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
306310

307311
self.metrics
@@ -345,7 +349,11 @@ pub trait PushWorkHandler: Send + Sync {
345349
async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
346350

347351
/// Add metrics to the handler
348-
fn add_metrics(&self, endpoint: &crate::component::Endpoint) -> Result<()>;
352+
fn add_metrics(
353+
&self,
354+
endpoint: &crate::component::Endpoint,
355+
metrics_labels: Option<&[(&str, &str)]>,
356+
) -> Result<()>;
349357
}
350358

351359
/*

0 commit comments

Comments
 (0)