From b31793958a63137d697329aa5e0a569985e07768 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 2 Oct 2025 22:04:48 +0000 Subject: [PATCH 01/34] no errors for now --- components/src/dynamo/sglang/main.py | 9 +++++ .../multimodal_encode_worker_handler.py | 2 +- .../sglang/utils/multimodal_chat_processor.py | 2 +- lib/bindings/python/rust/lib.rs | 33 +++++++++++++++++++ lib/bindings/python/src/dynamo/_core.pyi | 6 ++++ 5 files changed, 50 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index d682c89339..06aec70fda 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -128,6 +128,9 @@ async def register_model(): health_check_payload = SglangHealthCheckPayload(engine).to_dict() + test_endpoint = component.endpoint("test") + test_endpoint.register_custom_endpoint("/test") + try: # Start endpoint immediately and register model concurrently # Requests queue until ready_event is set @@ -139,6 +142,12 @@ async def register_model(): health_check_payload=health_check_payload, ), register_model(), + test_endpoint.serve_endpoint( + handler.generate, + graceful_shutdown=True, + metrics_labels=metrics_labels, + health_check_payload=health_check_payload, + ), ) except Exception as e: logging.error(f"Failed to serve endpoints: {e}") diff --git a/components/src/dynamo/sglang/request_handlers/multimodal_encode_worker_handler.py b/components/src/dynamo/sglang/request_handlers/multimodal_encode_worker_handler.py index 5880555cc6..42b8eaec60 100644 --- a/components/src/dynamo/sglang/request_handlers/multimodal_encode_worker_handler.py +++ b/components/src/dynamo/sglang/request_handlers/multimodal_encode_worker_handler.py @@ -5,7 +5,7 @@ from typing import AsyncIterator import torch -from sglang.srt.conversation import chat_templates +from sglang.srt.parser.conversation import chat_templates from transformers import AutoImageProcessor, AutoModel, AutoTokenizer import dynamo.nixl_connect as connect diff --git a/components/src/dynamo/sglang/utils/multimodal_chat_processor.py b/components/src/dynamo/sglang/utils/multimodal_chat_processor.py index e30e460a43..d734c59b4e 100644 --- a/components/src/dynamo/sglang/utils/multimodal_chat_processor.py +++ b/components/src/dynamo/sglang/utils/multimodal_chat_processor.py @@ -3,7 +3,7 @@ import logging -from sglang.srt.conversation import chat_templates +from sglang.srt.parser.conversation import chat_templates logger = logging.getLogger(__name__) diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index 9f72bfe4fe..111808b68a 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -644,6 +644,39 @@ impl Component { #[pymethods] impl Endpoint { + #[pyo3(signature = (endpoint_path))] + fn register_custom_endpoint<'p>( + &self, + py: Python<'p>, + endpoint_path: &str, + ) -> PyResult> { + // validate that the endpoint path looks like "/" and does not end with a slash + if !endpoint_path.starts_with("/") || endpoint_path.ends_with("/") { + return Err(PyErr::new::( + "Endpoint path must start with a slash and not end with a slash", + )); + } + + let endpoint_path = endpoint_path.to_string(); + + let drt = self.inner.drt(); + let etcd_client = drt.etcd_client(); + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + if let Some(etcd_client) = etcd_client { + etcd_client + .kv_create( + endpoint_path.as_str(), + serde_json::to_vec_pretty(&serde_json::Value::Null).unwrap(), + None, + ) + .await + .map_err(to_pyerr)?; + } + Ok(()) + }) + } + #[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))] fn serve_endpoint<'p>( &self, diff --git a/lib/bindings/python/src/dynamo/_core.pyi b/lib/bindings/python/src/dynamo/_core.pyi index e4545f21a2..bfd2dad485 100644 --- a/lib/bindings/python/src/dynamo/_core.pyi +++ b/lib/bindings/python/src/dynamo/_core.pyi @@ -115,6 +115,12 @@ class Endpoint: ... + def register_custom_endpoint(self, endpoint_path: str) -> None: + """ + Register a custom endpoint path to our discovery plane by dynamo ingress + """ + ... + async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None: """ Serve an endpoint discoverable by all connected clients at From 6d25ddc812154f32e839dd489945d32480fa54c7 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Fri, 3 Oct 2025 22:38:39 +0000 Subject: [PATCH 02/34] bump --- lib/bindings/python/rust/lib.rs | 10 ++- lib/llm/src/http/service.rs | 1 + lib/llm/src/http/service/dynamic_endpoint.rs | 80 ++++++++++++++++++++ lib/llm/src/http/service/service_v2.rs | 1 + 4 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 lib/llm/src/http/service/dynamic_endpoint.rs diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index 111808b68a..e2388b3f2b 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -664,10 +664,16 @@ impl Endpoint { pyo3_async_runtimes::tokio::future_into_py(py, async move { if let Some(etcd_client) = etcd_client { + let key = format!( + "{}/{}", + dynamo_llm::http::service::dynamic_endpoint::DYNAMIC_ENDPOINT_PATH, + endpoint_path.trim_start_matches('/') + ); etcd_client .kv_create( - endpoint_path.as_str(), - serde_json::to_vec_pretty(&serde_json::Value::Null).unwrap(), + &key, + serde_json::to_vec_pretty(&serde_json::Value::String(endpoint_path)) + .unwrap(), None, ) .await diff --git a/lib/llm/src/http/service.rs b/lib/llm/src/http/service.rs index 7f163a200f..a6b7ab2198 100644 --- a/lib/llm/src/http/service.rs +++ b/lib/llm/src/http/service.rs @@ -21,6 +21,7 @@ mod openai; pub mod disconnect; +pub mod dynamic_endpoint; pub mod error; pub mod health; pub mod metrics; diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs new file mode 100644 index 0000000000..31df457b6c --- /dev/null +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -0,0 +1,80 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{RouteDoc, service_v2}; +use axum::{ + Json, Router, + http::{Method, StatusCode}, + response::IntoResponse, + routing::post, +}; +use std::sync::Arc; + +pub const DYNAMIC_ENDPOINT_PATH: &str = "dynamic_endpoint"; + +pub fn dynamic_endpoint_router( + state: Arc, + path: Option, +) -> (Vec, Router) { + let wildcard_path = "/{*path}"; + let path = path.unwrap_or_else(|| wildcard_path.to_string()); + + let docs: Vec = vec![RouteDoc::new(Method::POST, &path)]; + + let router = Router::new() + .route(&path, post(dynamic_endpoint_handler)) + .with_state(state); + + (docs, router) +} + +async fn dynamic_endpoint_handler( + axum::extract::State(state): axum::extract::State>, +) -> impl IntoResponse { + let mut dynamic_endpoints: Vec = Vec::new(); + + let Some(etcd_client) = state.etcd_client() else { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get etcd client" + })), + ); + }; + + let kvs = match etcd_client + .kv_get_prefix(format!("{}/", DYNAMIC_ENDPOINT_PATH)) + .await + { + Ok(kvs) => kvs, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get dynamic endpoints" + })), + ); + } + }; + + for kv in kvs { + match serde_json::from_slice::(kv.value()) { + Ok(path) => dynamic_endpoints.push(path), + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to parse dynamic endpoint" + })), + ); + } + } + } + + return ( + StatusCode::OK, + Json(serde_json::json!({ + "dynamic_endpoints": dynamic_endpoints + })), + ); +} diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 21d3e8422c..531ff51f33 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -325,6 +325,7 @@ impl HttpServiceConfigBuilder { super::openai::list_models_router(state.clone(), var(HTTP_SVC_MODELS_PATH_ENV).ok()), super::health::health_check_router(state.clone(), var(HTTP_SVC_HEALTH_PATH_ENV).ok()), super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()), + super::dynamic_endpoint::dynamic_endpoint_router(state.clone(), None), ]; let endpoint_routes = From 3b492f4b6f7ad7f73523caed4edc3bb8252841a4 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Fri, 3 Oct 2025 23:22:35 +0000 Subject: [PATCH 03/34] refactor --- components/src/dynamo/sglang/main.py | 2 +- lib/bindings/python/rust/lib.rs | 46 +++----------------- lib/bindings/python/src/dynamo/_core.pyi | 8 +--- lib/llm/src/http/service/dynamic_endpoint.rs | 29 ++++-------- lib/runtime/src/component.rs | 4 +- lib/runtime/src/component/endpoint.rs | 8 ++++ 6 files changed, 27 insertions(+), 70 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index 06aec70fda..82babf18a2 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -129,7 +129,6 @@ async def register_model(): health_check_payload = SglangHealthCheckPayload(engine).to_dict() test_endpoint = component.endpoint("test") - test_endpoint.register_custom_endpoint("/test") try: # Start endpoint immediately and register model concurrently @@ -147,6 +146,7 @@ async def register_model(): graceful_shutdown=True, metrics_labels=metrics_labels, health_check_payload=health_check_payload, + http_endpoint_path="/test", ), ) except Exception as e: diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index e2388b3f2b..2b750e444b 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -644,46 +644,7 @@ impl Component { #[pymethods] impl Endpoint { - #[pyo3(signature = (endpoint_path))] - fn register_custom_endpoint<'p>( - &self, - py: Python<'p>, - endpoint_path: &str, - ) -> PyResult> { - // validate that the endpoint path looks like "/" and does not end with a slash - if !endpoint_path.starts_with("/") || endpoint_path.ends_with("/") { - return Err(PyErr::new::( - "Endpoint path must start with a slash and not end with a slash", - )); - } - - let endpoint_path = endpoint_path.to_string(); - - let drt = self.inner.drt(); - let etcd_client = drt.etcd_client(); - - pyo3_async_runtimes::tokio::future_into_py(py, async move { - if let Some(etcd_client) = etcd_client { - let key = format!( - "{}/{}", - dynamo_llm::http::service::dynamic_endpoint::DYNAMIC_ENDPOINT_PATH, - endpoint_path.trim_start_matches('/') - ); - etcd_client - .kv_create( - &key, - serde_json::to_vec_pretty(&serde_json::Value::String(endpoint_path)) - .unwrap(), - None, - ) - .await - .map_err(to_pyerr)?; - } - Ok(()) - }) - } - - #[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))] + #[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None, http_endpoint_path = None))] fn serve_endpoint<'p>( &self, py: Python<'p>, @@ -691,6 +652,7 @@ impl Endpoint { graceful_shutdown: Option, metrics_labels: Option>, health_check_payload: Option<&Bound<'p, PyDict>>, + http_endpoint_path: Option<&str>, ) -> PyResult> { let engine = Arc::new(engine::PythonAsyncEngine::new( generator, @@ -728,6 +690,10 @@ impl Endpoint { builder = builder.health_check_payload(payload); } + if let Some(http_endpoint_path) = http_endpoint_path { + builder = builder.http_endpoint_path(http_endpoint_path); + } + let graceful_shutdown = graceful_shutdown.unwrap_or(true); pyo3_async_runtimes::tokio::future_into_py(py, async move { builder diff --git a/lib/bindings/python/src/dynamo/_core.pyi b/lib/bindings/python/src/dynamo/_core.pyi index bfd2dad485..a1549fd67d 100644 --- a/lib/bindings/python/src/dynamo/_core.pyi +++ b/lib/bindings/python/src/dynamo/_core.pyi @@ -115,13 +115,7 @@ class Endpoint: ... - def register_custom_endpoint(self, endpoint_path: str) -> None: - """ - Register a custom endpoint path to our discovery plane by dynamo ingress - """ - ... - - async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None: + async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None, http_endpoint_path: Optional[str] = None) -> None: """ Serve an endpoint discoverable by all connected clients at `{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}` diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 31df457b6c..31c9597d86 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -8,6 +8,7 @@ use axum::{ response::IntoResponse, routing::post, }; +use dynamo_runtime::instances::list_all_instances; use std::sync::Arc; pub const DYNAMIC_ENDPOINT_PATH: &str = "dynamic_endpoint"; @@ -31,8 +32,6 @@ pub fn dynamic_endpoint_router( async fn dynamic_endpoint_handler( axum::extract::State(state): axum::extract::State>, ) -> impl IntoResponse { - let mut dynamic_endpoints: Vec = Vec::new(); - let Some(etcd_client) = state.etcd_client() else { return ( StatusCode::INTERNAL_SERVER_ERROR, @@ -42,34 +41,22 @@ async fn dynamic_endpoint_handler( ); }; - let kvs = match etcd_client - .kv_get_prefix(format!("{}/", DYNAMIC_ENDPOINT_PATH)) - .await - { - Ok(kvs) => kvs, + let instances = match list_all_instances(etcd_client).await { + Ok(instances) => instances, Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ - "message": "Failed to get dynamic endpoints" + "message": "Failed to get instances" })), ); } }; - for kv in kvs { - match serde_json::from_slice::(kv.value()) { - Ok(path) => dynamic_endpoints.push(path), - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to parse dynamic endpoint" - })), - ); - } - } - } + let dynamic_endpoints = instances + .iter() + .filter_map(|instance| instance.http_endpoint_path.clone()) + .collect::>(); return ( StatusCode::OK, diff --git a/lib/runtime/src/component.rs b/lib/runtime/src/component.rs index a5231ab51e..521ec6f358 100644 --- a/lib/runtime/src/component.rs +++ b/lib/runtime/src/component.rs @@ -101,6 +101,8 @@ pub struct Instance { pub namespace: String, pub instance_id: i64, pub transport: TransportType, + #[serde(skip_serializing_if = "Option::is_none")] + pub http_endpoint_path: Option, } impl Instance { @@ -463,7 +465,7 @@ impl Endpoint { .expect("Endpoint name and component name should be valid") } - /// The fully path of an instance in etcd + /// The full path of an instance in etcd pub fn etcd_path_with_lease_id(&self, lease_id: i64) -> String { format!("{INSTANCE_ROOT_PATH}/{}", self.unique_path(lease_id)) } diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index b79a4676cb..e3f46b9692 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -44,6 +44,10 @@ pub struct EndpointConfig { #[educe(Debug(ignore))] #[builder(default, setter(into, strip_option))] health_check_payload: Option, + + /// Expose this endpoint over HTTP at this path + #[builder(default, setter(into, strip_option))] + http_endpoint_path: Option, } impl EndpointConfigBuilder { @@ -67,6 +71,7 @@ impl EndpointConfigBuilder { metrics_labels, graceful_shutdown, health_check_payload, + http_endpoint_path, ) = self.build_internal()?.dissolve(); let lease = lease.or(endpoint.drt().primary_lease()); let lease_id = lease.as_ref().map(|l| l.id()).unwrap_or(0); @@ -128,6 +133,7 @@ impl EndpointConfigBuilder { let subject = endpoint.subject_to(lease_id); let etcd_path = endpoint.etcd_path_with_lease_id(lease_id); let etcd_client = endpoint.component.drt.etcd_client.clone(); + let http_endpoint_path = http_endpoint_path.clone(); // Register health check target in SystemHealth if provided if let Some(health_check_payload) = &health_check_payload { @@ -137,6 +143,7 @@ impl EndpointConfigBuilder { namespace: namespace_name.clone(), instance_id: lease_id, transport: TransportType::NatsTcp(subject.clone()), + http_endpoint_path: http_endpoint_path.clone(), }; tracing::debug!(subject = %subject, "Registering endpoint health check target"); let guard = system_health.lock().unwrap(); @@ -230,6 +237,7 @@ impl EndpointConfigBuilder { namespace: namespace_name, instance_id: lease_id, transport: TransportType::NatsTcp(subject), + http_endpoint_path: http_endpoint_path, }; let info = serde_json::to_vec_pretty(&info)?; From 2124d91040575741091884f8f32029650a279171 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Fri, 3 Oct 2025 23:44:54 +0000 Subject: [PATCH 04/34] found --- lib/llm/src/http/service/dynamic_endpoint.rs | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 31c9597d86..4d4b6af96b 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -9,6 +9,7 @@ use axum::{ routing::post, }; use dynamo_runtime::instances::list_all_instances; +use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use std::sync::Arc; pub const DYNAMIC_ENDPOINT_PATH: &str = "dynamic_endpoint"; @@ -31,6 +32,7 @@ pub fn dynamic_endpoint_router( async fn dynamic_endpoint_handler( axum::extract::State(state): axum::extract::State>, + axum::extract::Path(path): axum::extract::Path, ) -> impl IntoResponse { let Some(etcd_client) = state.etcd_client() else { return ( @@ -58,10 +60,20 @@ async fn dynamic_endpoint_handler( .filter_map(|instance| instance.http_endpoint_path.clone()) .collect::>(); - return ( - StatusCode::OK, - Json(serde_json::json!({ - "dynamic_endpoints": dynamic_endpoints - })), - ); + let path = format!("/{}", &path); + if dynamic_endpoints.contains(&path) { + return ( + StatusCode::OK, + Json(serde_json::json!({ + "message": "Dynamic endpoint found" + })), + ); + } else { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({ + "message": "Dynamic endpoint not found" + })), + ); + } } From ac9357ed7d73ccbe85d4720f99d447114aca1b0f Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sat, 4 Oct 2025 00:58:33 +0000 Subject: [PATCH 05/34] nice --- components/src/dynamo/sglang/main.py | 2 +- .../sglang/request_handlers/decode_handler.py | 3 + lib/llm/src/http/service/dynamic_endpoint.rs | 122 ++++++++++++++++-- 3 files changed, 117 insertions(+), 10 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index 82babf18a2..ba00bc9c79 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -142,7 +142,7 @@ async def register_model(): ), register_model(), test_endpoint.serve_endpoint( - handler.generate, + handler.test, graceful_shutdown=True, metrics_labels=metrics_labels, health_check_payload=health_check_payload, diff --git a/components/src/dynamo/sglang/request_handlers/decode_handler.py b/components/src/dynamo/sglang/request_handlers/decode_handler.py index 548cfb773e..3c3d58a0e3 100644 --- a/components/src/dynamo/sglang/request_handlers/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/decode_handler.py @@ -66,6 +66,9 @@ def _build_sampling_params(self, request: dict) -> dict: return {k: v for k, v in param_mapping.items() if v is not None} + async def test(self, request: dict): + yield {"message": "Hello, world!"} + async def generate(self, request: dict): sampling_params = self._build_sampling_params(request) input_param = self._get_input_param(request) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 4d4b6af96b..31df4eeca9 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{RouteDoc, service_v2}; +use crate::types::Annotated; use axum::{ Json, Router, http::{Method, StatusCode}, @@ -9,6 +10,7 @@ use axum::{ routing::post, }; use dynamo_runtime::instances::list_all_instances; +use dynamo_runtime::{DistributedRuntime, Runtime, component::Client}; use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use std::sync::Arc; @@ -60,15 +62,8 @@ async fn dynamic_endpoint_handler( .filter_map(|instance| instance.http_endpoint_path.clone()) .collect::>(); - let path = format!("/{}", &path); - if dynamic_endpoints.contains(&path) { - return ( - StatusCode::OK, - Json(serde_json::json!({ - "message": "Dynamic endpoint found" - })), - ); - } else { + let fmt_path = format!("/{}", &path); + if !dynamic_endpoints.contains(&fmt_path) { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({ @@ -76,4 +71,113 @@ async fn dynamic_endpoint_handler( })), ); } + + let rt = match Runtime::from_current() { + Ok(rt) => rt, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get runtime" + })), + ); + } + }; + let drt = match DistributedRuntime::from_settings(rt).await { + Ok(drt) => drt, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get distributed runtime" + })), + ); + } + }; + + // grab all instances that expose this endpoint + let target_instances = instances + .iter() + .filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone())) + .collect::>(); + + // use pushrouter .direct to forward the request to the filtered instances sequentially + let mut target_clients: Vec = Vec::new(); + for instance in target_instances { + let ns = match drt.namespace(instance.namespace.clone()) { + Ok(ns) => ns, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get namespace" + })), + ); + } + }; + let c = match ns.component(instance.component.clone()) { + Ok(c) => c, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get component" + })), + ); + } + }; + let ep = c.endpoint(path.clone()); + let c = match ep.client().await { + Ok(c) => c, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get client" + })), + ); + } + }; + target_clients.push(c.clone()); + } + + let mut all_responses = Vec::new(); + for client in target_clients { + let router = match PushRouter::<(), Annotated>::from_client( + client, + Default::default(), + ) + .await + { + Ok(router) => router, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": "Failed to get router" + })), + ); + } + }; + let mut stream = match router.round_robin(().into()).await { + Ok(s) => s, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"message": "Failed to route"})), + ); + } + }; + + while let Some(resp) = stream.next().await { + all_responses.push(resp); + } + } + + return ( + StatusCode::OK, + Json(serde_json::json!({ + "responses": all_responses + })), + ); } From 999c716bb38048a0617cc83ca8be44fab9c20142 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sat, 4 Oct 2025 01:18:19 +0000 Subject: [PATCH 06/34] err --- lib/llm/src/http/service/dynamic_endpoint.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 31df4eeca9..91893876c8 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -74,7 +74,7 @@ async fn dynamic_endpoint_handler( let rt = match Runtime::from_current() { Ok(rt) => rt, - Err(e) => { + Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ @@ -85,7 +85,7 @@ async fn dynamic_endpoint_handler( }; let drt = match DistributedRuntime::from_settings(rt).await { Ok(drt) => drt, - Err(e) => { + Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ @@ -117,7 +117,7 @@ async fn dynamic_endpoint_handler( }; let c = match ns.component(instance.component.clone()) { Ok(c) => c, - Err(e) => { + Err(_) => { return ( StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ From 889f0c56b4e19c60871e473dcd78dbaa03ef56cd Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 00:03:59 +0000 Subject: [PATCH 07/34] cleanup --- lib/llm/src/http/service/dynamic_endpoint.rs | 173 ++++++------------- 1 file changed, 52 insertions(+), 121 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 91893876c8..92b8dcb3ea 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -14,8 +14,6 @@ use dynamo_runtime::{DistributedRuntime, Runtime, component::Client}; use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use std::sync::Arc; -pub const DYNAMIC_ENDPOINT_PATH: &str = "dynamic_endpoint"; - pub fn dynamic_endpoint_router( state: Arc, path: Option, @@ -32,30 +30,17 @@ pub fn dynamic_endpoint_router( (docs, router) } -async fn dynamic_endpoint_handler( - axum::extract::State(state): axum::extract::State>, - axum::extract::Path(path): axum::extract::Path, -) -> impl IntoResponse { - let Some(etcd_client) = state.etcd_client() else { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get etcd client" - })), - ); - }; - - let instances = match list_all_instances(etcd_client).await { - Ok(instances) => instances, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get instances" - })), - ); - } - }; +async fn inner_dynamic_endpoint_handler( + state: Arc, + path: String, +) -> Result { + let etcd_client = state + .etcd_client() + .ok_or_else(|| "Failed to get etcd client")?; + + let instances = list_all_instances(etcd_client) + .await + .map_err(|_| "Failed to get instances")?; let dynamic_endpoints = instances .iter() @@ -64,120 +49,66 @@ async fn dynamic_endpoint_handler( let fmt_path = format!("/{}", &path); if !dynamic_endpoints.contains(&fmt_path) { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({ - "message": "Dynamic endpoint not found" - })), - ); + return Err("Dynamic endpoint not found"); } - let rt = match Runtime::from_current() { - Ok(rt) => rt, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get runtime" - })), - ); - } - }; - let drt = match DistributedRuntime::from_settings(rt).await { - Ok(drt) => drt, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get distributed runtime" - })), - ); - } - }; + let rt = Runtime::from_current().map_err(|_| "Failed to get runtime")?; + let drt = DistributedRuntime::from_settings(rt) + .await + .map_err(|_| "Failed to get distributed runtime")?; - // grab all instances that expose this endpoint let target_instances = instances .iter() .filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone())) .collect::>(); - // use pushrouter .direct to forward the request to the filtered instances sequentially let mut target_clients: Vec = Vec::new(); for instance in target_instances { - let ns = match drt.namespace(instance.namespace.clone()) { - Ok(ns) => ns, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get namespace" - })), - ); - } - }; - let c = match ns.component(instance.component.clone()) { - Ok(c) => c, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get component" - })), - ); - } - }; + let ns = drt + .namespace(instance.namespace.clone()) + .map_err(|_| "Failed to get namespace")?; + let c = ns + .component(instance.component.clone()) + .map_err(|_| "Failed to get component")?; let ep = c.endpoint(path.clone()); - let c = match ep.client().await { - Ok(c) => c, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get client" - })), - ); - } - }; - target_clients.push(c.clone()); + let client = ep.client().await.map_err(|_| "Failed to get client")?; + target_clients.push(client); } let mut all_responses = Vec::new(); for client in target_clients { - let router = match PushRouter::<(), Annotated>::from_client( - client, - Default::default(), - ) - .await - { - Ok(router) => router, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({ - "message": "Failed to get router" - })), - ); - } - }; - let mut stream = match router.round_robin(().into()).await { - Ok(s) => s, - Err(_) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"message": "Failed to route"})), - ); - } - }; + let router = + PushRouter::<(), Annotated>::from_client(client, Default::default()) + .await + .map_err(|_| "Failed to get router")?; + + let mut stream = router + .round_robin(().into()) + .await + .map_err(|_| "Failed to route")?; while let Some(resp) = stream.next().await { all_responses.push(resp); } } - return ( - StatusCode::OK, - Json(serde_json::json!({ - "responses": all_responses - })), - ); + Ok(Json(serde_json::json!({ + "responses": all_responses + }))) +} + +async fn dynamic_endpoint_handler( + axum::extract::State(state): axum::extract::State>, + axum::extract::Path(path): axum::extract::Path, +) -> impl IntoResponse { + inner_dynamic_endpoint_handler(state, path) + .await + .map_err(|err_string| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "message": err_string + })), + ) + }) } From 95624833294adb56708544c8025bc1717d5405a8 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 00:13:24 +0000 Subject: [PATCH 08/34] bump --- .cargo/config.toml | 1 + lib/llm/src/http/service/dynamic_endpoint.rs | 4 +--- lib/runtime/src/component/endpoint.rs | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index fb79aed4d7..a4b6a19856 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,3 +4,4 @@ [build] # tokio-console needs this rustflags = ["--cfg", "tokio_unstable"] +rustc-wrapper = "sccache" diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 92b8dcb3ea..bdccffc433 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -34,9 +34,7 @@ async fn inner_dynamic_endpoint_handler( state: Arc, path: String, ) -> Result { - let etcd_client = state - .etcd_client() - .ok_or_else(|| "Failed to get etcd client")?; + let etcd_client = state.etcd_client().ok_or("Failed to get etcd client")?; let instances = list_all_instances(etcd_client) .await diff --git a/lib/runtime/src/component/endpoint.rs b/lib/runtime/src/component/endpoint.rs index e3f46b9692..a3db962854 100644 --- a/lib/runtime/src/component/endpoint.rs +++ b/lib/runtime/src/component/endpoint.rs @@ -237,7 +237,7 @@ impl EndpointConfigBuilder { namespace: namespace_name, instance_id: lease_id, transport: TransportType::NatsTcp(subject), - http_endpoint_path: http_endpoint_path, + http_endpoint_path, }; let info = serde_json::to_vec_pretty(&info)?; From 48bc524978f346538f2f41e54ca1d5e899038567 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 00:19:30 +0000 Subject: [PATCH 09/34] go --- .cargo/config.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index a4b6a19856..b608842463 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -3,5 +3,4 @@ [build] # tokio-console needs this -rustflags = ["--cfg", "tokio_unstable"] -rustc-wrapper = "sccache" +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file From c2d10ee718ba03f51a68545af2d511892aa649d3 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 01:08:06 +0000 Subject: [PATCH 10/34] working --- components/src/dynamo/sglang/main.py | 18 ++---- .../sglang/request_handlers/__init__.py | 2 + .../sglang/request_handlers/decode_handler.py | 3 - .../request_handlers/native_api_handler.py | 61 +++++++++++++++++++ 4 files changed, 69 insertions(+), 15 deletions(-) create mode 100644 components/src/dynamo/sglang/request_handlers/native_api_handler.py diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index ba00bc9c79..c3b9de99e4 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -28,6 +28,7 @@ MultimodalProcessorHandler, MultimodalWorkerHandler, PrefillWorkerHandler, + NativeApiHandler, ) configure_dynamo_logging() @@ -73,8 +74,6 @@ async def init(runtime: DistributedRuntime, config: Config): generate_endpoint = component.endpoint(dynamo_args.endpoint) - # TODO: think about implementing DisaggregationStrategy for P->D - # TODO: implement a `next` field in the config to dynamically set the next client prefill_client = None if config.serving_mode == DisaggregationMode.DECODE: logging.info("Initializing prefill client") @@ -108,6 +107,10 @@ async def init(runtime: DistributedRuntime, config: Config): component, engine, config, publisher, kv_publisher, prefill_client ) + native_api_handler = NativeApiHandler(component, engine, metrics_labels) + + native_api_tasks = await native_api_handler.init_native_apis() + async def register_model(): """Register the model and signal readiness""" registration_success = await register_llm_with_runtime_config( @@ -128,10 +131,7 @@ async def register_model(): health_check_payload = SglangHealthCheckPayload(engine).to_dict() - test_endpoint = component.endpoint("test") - try: - # Start endpoint immediately and register model concurrently # Requests queue until ready_event is set await asyncio.gather( generate_endpoint.serve_endpoint( @@ -141,13 +141,7 @@ async def register_model(): health_check_payload=health_check_payload, ), register_model(), - test_endpoint.serve_endpoint( - handler.test, - graceful_shutdown=True, - metrics_labels=metrics_labels, - health_check_payload=health_check_payload, - http_endpoint_path="/test", - ), + *native_api_tasks, ) except Exception as e: logging.error(f"Failed to serve endpoints: {e}") diff --git a/components/src/dynamo/sglang/request_handlers/__init__.py b/components/src/dynamo/sglang/request_handlers/__init__.py index 9a9e47ee69..1f24e3151f 100644 --- a/components/src/dynamo/sglang/request_handlers/__init__.py +++ b/components/src/dynamo/sglang/request_handlers/__init__.py @@ -14,6 +14,7 @@ MultimodalWorkerHandler, ) from .prefill_handler import PrefillWorkerHandler +from .native_api_handler import NativeApiHandler __all__ = [ "BaseWorkerHandler", @@ -23,4 +24,5 @@ "MultimodalEncodeWorkerHandler", "MultimodalWorkerHandler", "MultimodalPrefillWorkerHandler", + "NativeApiHandler", ] diff --git a/components/src/dynamo/sglang/request_handlers/decode_handler.py b/components/src/dynamo/sglang/request_handlers/decode_handler.py index 3c3d58a0e3..548cfb773e 100644 --- a/components/src/dynamo/sglang/request_handlers/decode_handler.py +++ b/components/src/dynamo/sglang/request_handlers/decode_handler.py @@ -66,9 +66,6 @@ def _build_sampling_params(self, request: dict) -> dict: return {k: v for k, v in param_mapping.items() if v is not None} - async def test(self, request: dict): - yield {"message": "Hello, world!"} - async def generate(self, request: dict): sampling_params = self._build_sampling_params(request) input_param = self._get_input_param(request) diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py new file mode 100644 index 0000000000..f1eba63f53 --- /dev/null +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -0,0 +1,61 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# SGLang Native APIs: https://docs.sglang.ai/basic_usage/native_api.html +# Code: https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/entrypoints/http_server.py + +import asyncio +import logging +from typing import List, Optional, Tuple + +from dynamo._core import Component +import sglang as sgl + + +class NativeApiHandler: + """Mixin to add sglang native API endpoints to worker handlers""" + + def __init__(self, component: Component, engine: sgl.Engine, metrics_labels: Optional[List[Tuple[str, str]]] = None): + self.component = component + self.engine = engine + self.metrics_labels = metrics_labels + self.native_api_tasks = [] + + async def init_native_apis( + self, + ) -> List[asyncio.Task]: + """ + Initialize and register native API endpoints. + Returns list of tasks to be gathered. + """ + logging.info("Initializing native SGLang API endpoints") + + tasks = [] + + model_info_ep = self.component.endpoint("get_model_info") + tasks.append( + model_info_ep.serve_endpoint( + self.get_model_info, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/get_model_info", + ) + ) + + self.native_api_tasks = tasks + logging.info(f"Registered {len(tasks)} native API endpoints") + return tasks + + async def get_model_info(self, request: dict): + """Native API: Get model information""" + + tokenizer_manager = self.engine.tokenizer_manager + + result = { + "model_path": tokenizer_manager.server_args.model_path, + "tokenizer_path": tokenizer_manager.server_args.tokenizer_path, + "preferred_sampling_params": tokenizer_manager.server_args.preferred_sampling_params, + "weight_version": tokenizer_manager.server_args.weight_version, + } + + yield {"data": [result]} From f3d74b88791e28d666f7471348f4076edfaef3cb Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 01:10:03 +0000 Subject: [PATCH 11/34] go --- .../sglang/request_handlers/native_api_handler.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index f1eba63f53..b2238cfca8 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -30,6 +30,8 @@ async def init_native_apis( """ logging.info("Initializing native SGLang API endpoints") + self.tm = self.engine.tokenizer_manager + tasks = [] model_info_ep = self.component.endpoint("get_model_info") @@ -47,15 +49,11 @@ async def init_native_apis( return tasks async def get_model_info(self, request: dict): - """Native API: Get model information""" - - tokenizer_manager = self.engine.tokenizer_manager - result = { - "model_path": tokenizer_manager.server_args.model_path, - "tokenizer_path": tokenizer_manager.server_args.tokenizer_path, - "preferred_sampling_params": tokenizer_manager.server_args.preferred_sampling_params, - "weight_version": tokenizer_manager.server_args.weight_version, + "model_path": self.tm.server_args.model_path, + "tokenizer_path": self.tm.server_args.tokenizer_path, + "preferred_sampling_params": self.tm.server_args.preferred_sampling_params, + "weight_version": self.tm.server_args.weight_version, } yield {"data": [result]} From 3820e80f7665543e5dfbdae3774c4f5d36e590d4 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 02:05:03 +0000 Subject: [PATCH 12/34] profile --- .../request_handlers/native_api_handler.py | 57 ++++++++++++++++++- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index b2238cfca8..1067bc5fde 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -10,6 +10,7 @@ from dynamo._core import Component import sglang as sgl +from sglang.srt.managers.io_struct import ProfileReqInput class NativeApiHandler: @@ -35,14 +36,28 @@ async def init_native_apis( tasks = [] model_info_ep = self.component.endpoint("get_model_info") - tasks.append( + start_profile_ep = self.component.endpoint("start_profile") + stop_profile_ep = self.component.endpoint("stop_profile") + tasks.extend([ model_info_ep.serve_endpoint( self.get_model_info, graceful_shutdown=True, metrics_labels=self.metrics_labels, http_endpoint_path="/get_model_info", - ) - ) + ), + start_profile_ep.serve_endpoint( + self.start_profile, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/start_profile", + ), + stop_profile_ep.serve_endpoint( + self.stop_profile, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/stop_profile", + ), + ]) self.native_api_tasks = tasks logging.info(f"Registered {len(tasks)} native API endpoints") @@ -57,3 +72,39 @@ async def get_model_info(self, request: dict): } yield {"data": [result]} + + async def start_profile(self, request: dict): + try: + obj = ProfileReqInput.model_validate(request) + except Exception: + obj = None + + if obj is None: + obj = ProfileReqInput() + + output_dir = obj.output_dir or f"profile_{self.tm.server_args.model_path}" + + await self.tm.start_profile( + output_dir=output_dir, + start_step=obj.start_step, + num_steps=obj.num_steps, + activities=obj.activities, + with_stack=obj.with_stack, + record_shapes=obj.record_shapes, + profile_by_stage=obj.profile_by_stage, + ) + + yield {"data": [{"status": "started profile"}]} + + async def stop_profile(self, request: dict): + asyncio.create_task(self.tm.stop_profile()) + yield { + "data": [ + { + "status": ( + "Stopped profile. This might take a long time to complete. " + f"Results should be available in the 'profile_{self.tm.server_args.model_path}' directory." + ) + } + ] + } From a791ca70c75cd0d0abcd409c1b630ab30744dd36 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 02:06:56 +0000 Subject: [PATCH 13/34] bump: --- components/src/dynamo/sglang/main.py | 2 +- .../sglang/request_handlers/__init__.py | 2 +- .../request_handlers/native_api_handler.py | 66 +++++++++++-------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index c3b9de99e4..5c27b48c02 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -27,8 +27,8 @@ MultimodalPrefillWorkerHandler, MultimodalProcessorHandler, MultimodalWorkerHandler, - PrefillWorkerHandler, NativeApiHandler, + PrefillWorkerHandler, ) configure_dynamo_logging() diff --git a/components/src/dynamo/sglang/request_handlers/__init__.py b/components/src/dynamo/sglang/request_handlers/__init__.py index 1f24e3151f..f99db039f2 100644 --- a/components/src/dynamo/sglang/request_handlers/__init__.py +++ b/components/src/dynamo/sglang/request_handlers/__init__.py @@ -13,8 +13,8 @@ MultimodalPrefillWorkerHandler, MultimodalWorkerHandler, ) -from .prefill_handler import PrefillWorkerHandler from .native_api_handler import NativeApiHandler +from .prefill_handler import PrefillWorkerHandler __all__ = [ "BaseWorkerHandler", diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index 1067bc5fde..e03e47ba13 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -8,61 +8,69 @@ import logging from typing import List, Optional, Tuple -from dynamo._core import Component import sglang as sgl from sglang.srt.managers.io_struct import ProfileReqInput +from dynamo._core import Component + class NativeApiHandler: """Mixin to add sglang native API endpoints to worker handlers""" - - def __init__(self, component: Component, engine: sgl.Engine, metrics_labels: Optional[List[Tuple[str, str]]] = None): + + def __init__( + self, + component: Component, + engine: sgl.Engine, + metrics_labels: Optional[List[Tuple[str, str]]] = None, + ): self.component = component self.engine = engine self.metrics_labels = metrics_labels self.native_api_tasks = [] async def init_native_apis( - self, + self, ) -> List[asyncio.Task]: """ Initialize and register native API endpoints. Returns list of tasks to be gathered. """ logging.info("Initializing native SGLang API endpoints") - + self.tm = self.engine.tokenizer_manager tasks = [] - + model_info_ep = self.component.endpoint("get_model_info") start_profile_ep = self.component.endpoint("start_profile") stop_profile_ep = self.component.endpoint("stop_profile") - tasks.extend([ - model_info_ep.serve_endpoint( - self.get_model_info, - graceful_shutdown=True, - metrics_labels=self.metrics_labels, - http_endpoint_path="/get_model_info", - ), - start_profile_ep.serve_endpoint( - self.start_profile, - graceful_shutdown=True, - metrics_labels=self.metrics_labels, - http_endpoint_path="/start_profile", - ), - stop_profile_ep.serve_endpoint( - self.stop_profile, - graceful_shutdown=True, - metrics_labels=self.metrics_labels, - http_endpoint_path="/stop_profile", - ), - ]) - + tasks.extend( + [ + model_info_ep.serve_endpoint( + self.get_model_info, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/get_model_info", + ), + start_profile_ep.serve_endpoint( + self.start_profile, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/start_profile", + ), + stop_profile_ep.serve_endpoint( + self.stop_profile, + graceful_shutdown=True, + metrics_labels=self.metrics_labels, + http_endpoint_path="/stop_profile", + ), + ] + ) + self.native_api_tasks = tasks logging.info(f"Registered {len(tasks)} native API endpoints") return tasks - + async def get_model_info(self, request: dict): result = { "model_path": self.tm.server_args.model_path, @@ -70,7 +78,7 @@ async def get_model_info(self, request: dict): "preferred_sampling_params": self.tm.server_args.preferred_sampling_params, "weight_version": self.tm.server_args.weight_version, } - + yield {"data": [result]} async def start_profile(self, request: dict): From 5886ae4b969dab65624dec83e5225412c64bf355 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 16:59:08 +0000 Subject: [PATCH 14/34] redundant space --- .cargo/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index b608842463..fb79aed4d7 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -3,4 +3,4 @@ [build] # tokio-console needs this -rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file +rustflags = ["--cfg", "tokio_unstable"] From 516663875325a6887cb798fc7e6de2103062ebcb Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Sun, 5 Oct 2025 16:59:31 +0000 Subject: [PATCH 15/34] bump --- .../src/dynamo/sglang/request_handlers/native_api_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index e03e47ba13..5953010481 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -15,7 +15,7 @@ class NativeApiHandler: - """Mixin to add sglang native API endpoints to worker handlers""" + """Handler to add sglang native API endpoints to workers""" def __init__( self, From 35aa49e00cacc1709fd3caff76c3ced9d861ea28 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 15:44:07 +0000 Subject: [PATCH 16/34] slightly logic change since only agg endpoints supported now --- components/src/dynamo/sglang/main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index 5c27b48c02..682ffdcaa8 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -75,6 +75,7 @@ async def init(runtime: DistributedRuntime, config: Config): generate_endpoint = component.endpoint(dynamo_args.endpoint) prefill_client = None + native_api_tasks = [] if config.serving_mode == DisaggregationMode.DECODE: logging.info("Initializing prefill client") prefill_client = ( @@ -83,6 +84,9 @@ async def init(runtime: DistributedRuntime, config: Config): .endpoint("generate") .client() ) + else: + native_api_handler = NativeApiHandler(component, engine, metrics_labels) + native_api_tasks = await native_api_handler.init_native_apis() publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component) @@ -107,10 +111,6 @@ async def init(runtime: DistributedRuntime, config: Config): component, engine, config, publisher, kv_publisher, prefill_client ) - native_api_handler = NativeApiHandler(component, engine, metrics_labels) - - native_api_tasks = await native_api_handler.init_native_apis() - async def register_model(): """Register the model and signal readiness""" registration_success = await register_llm_with_runtime_config( From b0ae2d4bdef428864d8efc921dcbdb39068c0990 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 15:50:51 +0000 Subject: [PATCH 17/34] reshuffle --- components/backends/sglang/README.md | 4 +--- components/src/dynamo/sglang/main.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/components/backends/sglang/README.md b/components/backends/sglang/README.md index 507d847055..52c6a93a3b 100644 --- a/components/backends/sglang/README.md +++ b/components/backends/sglang/README.md @@ -117,9 +117,7 @@ uv pip install maturin cd $DYNAMO_HOME/lib/bindings/python maturin develop --uv cd $DYNAMO_HOME -# installs sglang supported version along with dynamo -# include the prerelease flag to install flashinfer rc versions -uv pip install --prerelease=allow -e .[sglang] +uv pip install -e .[sglang] ``` diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index 682ffdcaa8..fb14b5dde2 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -74,6 +74,8 @@ async def init(runtime: DistributedRuntime, config: Config): generate_endpoint = component.endpoint(dynamo_args.endpoint) + publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component) + prefill_client = None native_api_tasks = [] if config.serving_mode == DisaggregationMode.DECODE: @@ -88,8 +90,6 @@ async def init(runtime: DistributedRuntime, config: Config): native_api_handler = NativeApiHandler(component, engine, metrics_labels) native_api_tasks = await native_api_handler.init_native_apis() - publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component) - kv_publisher = None if server_args.kv_events_config: kv_events = json.loads(server_args.kv_events_config) From f50c54bf8efc8d273271c2987f18729fb8c0f13e Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 19:36:14 +0000 Subject: [PATCH 18/34] selective pieces return a 404 --- lib/llm/src/http/service/dynamic_endpoint.rs | 40 ++++++++++++-------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index bdccffc433..e8cce5e351 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -33,12 +33,15 @@ pub fn dynamic_endpoint_router( async fn inner_dynamic_endpoint_handler( state: Arc, path: String, -) -> Result { - let etcd_client = state.etcd_client().ok_or("Failed to get etcd client")?; +) -> Result { + let etcd_client = state.etcd_client().ok_or(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to get etcd client", + ))?; let instances = list_all_instances(etcd_client) .await - .map_err(|_| "Failed to get instances")?; + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get instances"))?; let dynamic_endpoints = instances .iter() @@ -47,13 +50,17 @@ async fn inner_dynamic_endpoint_handler( let fmt_path = format!("/{}", &path); if !dynamic_endpoints.contains(&fmt_path) { - return Err("Dynamic endpoint not found"); + return Err((StatusCode::NOT_FOUND, "Dynamic endpoint not found")); } - let rt = Runtime::from_current().map_err(|_| "Failed to get runtime")?; - let drt = DistributedRuntime::from_settings(rt) - .await - .map_err(|_| "Failed to get distributed runtime")?; + let rt = Runtime::from_current() + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get runtime"))?; + let drt = DistributedRuntime::from_settings(rt).await.map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to get distributed runtime", + ) + })?; let target_instances = instances .iter() @@ -64,12 +71,15 @@ async fn inner_dynamic_endpoint_handler( for instance in target_instances { let ns = drt .namespace(instance.namespace.clone()) - .map_err(|_| "Failed to get namespace")?; + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get namespace"))?; let c = ns .component(instance.component.clone()) - .map_err(|_| "Failed to get component")?; + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get component"))?; let ep = c.endpoint(path.clone()); - let client = ep.client().await.map_err(|_| "Failed to get client")?; + let client = ep + .client() + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get client"))?; target_clients.push(client); } @@ -78,12 +88,12 @@ async fn inner_dynamic_endpoint_handler( let router = PushRouter::<(), Annotated>::from_client(client, Default::default()) .await - .map_err(|_| "Failed to get router")?; + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get router"))?; let mut stream = router .round_robin(().into()) .await - .map_err(|_| "Failed to route")?; + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route"))?; while let Some(resp) = stream.next().await { all_responses.push(resp); @@ -101,9 +111,9 @@ async fn dynamic_endpoint_handler( ) -> impl IntoResponse { inner_dynamic_endpoint_handler(state, path) .await - .map_err(|err_string| { + .map_err(|(status_code, err_string)| { ( - StatusCode::INTERNAL_SERVER_ERROR, + status_code, Json(serde_json::json!({ "message": err_string })), From 3e6bf2cefa495bb0fe2b6890af9166815b7a6538 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 20:17:27 +0000 Subject: [PATCH 19/34] singleton drt --- Cargo.lock | 1 + components/src/dynamo/sglang/main.py | 42 +++------------- .../request_handlers/native_api_handler.py | 50 ------------------- lib/bindings/python/Cargo.lock | 1 + lib/llm/Cargo.toml | 1 + lib/llm/src/http/service/dynamic_endpoint.rs | 8 ++- lib/llm/src/http/service/service_v2.rs | 17 ++++++- 7 files changed, 30 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83994db565..6719228b99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2141,6 +2141,7 @@ dependencies = [ "approx", "assert_matches", "async-nats", + "async-once-cell", "async-stream", "async-trait", "async_zmq", diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index ad32ec4fb9..90a03a6926 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -72,11 +72,11 @@ async def init(runtime: DistributedRuntime, config: Config): generate_endpoint = component.endpoint(dynamo_args.endpoint) -<<<<<<< HEAD - publisher, metrics_task, metrics_labels = await setup_sgl_metrics(engine, component) + # publisher instantiates the metrics and kv event publishers + publisher, metrics_task, metrics_labels = await setup_sgl_metrics( + engine, config, component, generate_endpoint + ) -======= ->>>>>>> main prefill_client = None native_api_tasks = [] if config.serving_mode == DisaggregationMode.DECODE: @@ -87,31 +87,9 @@ async def init(runtime: DistributedRuntime, config: Config): .endpoint("generate") .client() ) -<<<<<<< HEAD - else: - native_api_handler = NativeApiHandler(component, engine, metrics_labels) - native_api_tasks = await native_api_handler.init_native_apis() - - kv_publisher = None - if server_args.kv_events_config: - kv_events = json.loads(server_args.kv_events_config) - ep = kv_events.get("endpoint") - zmq_ep = ep.replace("*", get_ip()) if ep else None - - zmq_config = ZmqKvEventPublisherConfig( - worker_id=generate_endpoint.lease_id(), - kv_block_size=server_args.page_size, - zmq_endpoint=zmq_ep, - ) - logging.info(f"Setting up ZMQ kv event publisher at {zmq_ep}") - kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config) -======= - - # publisher instantiates the metrics and kv event publishers - publisher, metrics_task, metrics_labels = await setup_sgl_metrics( - engine, config, component, generate_endpoint - ) ->>>>>>> main + # TODO: implement other native APIs and come up with clean layer to apply to agg/disagg/etc + if config.serving_mode == DisaggregationMode.AGGREGATED: + native_api_tasks = await NativeApiHandler(component, engine, metrics_labels).init_native_apis() # Readiness gate: requests wait until model is registered ready_event = asyncio.Event() @@ -129,10 +107,6 @@ async def init(runtime: DistributedRuntime, config: Config): metrics_labels=metrics_labels, health_check_payload=health_check_payload, ), -<<<<<<< HEAD - register_model(), - *native_api_tasks, -======= register_llm_with_readiness_gate( engine, generate_endpoint, @@ -140,7 +114,7 @@ async def init(runtime: DistributedRuntime, config: Config): dynamo_args, readiness_gate=ready_event, ), ->>>>>>> main + *native_api_tasks, ) except Exception as e: logging.error(f"Failed to serve endpoints: {e}") diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index 5953010481..74e37b045c 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -42,8 +42,6 @@ async def init_native_apis( tasks = [] model_info_ep = self.component.endpoint("get_model_info") - start_profile_ep = self.component.endpoint("start_profile") - stop_profile_ep = self.component.endpoint("stop_profile") tasks.extend( [ model_info_ep.serve_endpoint( @@ -52,18 +50,6 @@ async def init_native_apis( metrics_labels=self.metrics_labels, http_endpoint_path="/get_model_info", ), - start_profile_ep.serve_endpoint( - self.start_profile, - graceful_shutdown=True, - metrics_labels=self.metrics_labels, - http_endpoint_path="/start_profile", - ), - stop_profile_ep.serve_endpoint( - self.stop_profile, - graceful_shutdown=True, - metrics_labels=self.metrics_labels, - http_endpoint_path="/stop_profile", - ), ] ) @@ -80,39 +66,3 @@ async def get_model_info(self, request: dict): } yield {"data": [result]} - - async def start_profile(self, request: dict): - try: - obj = ProfileReqInput.model_validate(request) - except Exception: - obj = None - - if obj is None: - obj = ProfileReqInput() - - output_dir = obj.output_dir or f"profile_{self.tm.server_args.model_path}" - - await self.tm.start_profile( - output_dir=output_dir, - start_step=obj.start_step, - num_steps=obj.num_steps, - activities=obj.activities, - with_stack=obj.with_stack, - record_shapes=obj.record_shapes, - profile_by_stage=obj.profile_by_stage, - ) - - yield {"data": [{"status": "started profile"}]} - - async def stop_profile(self, request: dict): - asyncio.create_task(self.tm.stop_profile()) - yield { - "data": [ - { - "status": ( - "Stopped profile. This might take a long time to complete. " - f"Results should be available in the 'profile_{self.tm.server_args.model_path}' directory." - ) - } - ] - } diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index c461e88c21..9ef56be6c3 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -1470,6 +1470,7 @@ dependencies = [ "akin", "anyhow", "async-nats", + "async-once-cell", "async-stream", "async-trait", "async_zmq", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 9d152160ae..48bb290a79 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -42,6 +42,7 @@ aho-corasick = "1.1" anyhow = { workspace = true } dynamo-async-openai = { workspace = true } dynamo-parsers = { workspace = true } +async-once-cell = { version = "0.5.4" } async-stream = { workspace = true } async-trait = { workspace = true } async-nats = { workspace = true } diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index e8cce5e351..3337279658 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -9,8 +9,8 @@ use axum::{ response::IntoResponse, routing::post, }; +use dynamo_runtime::component::Client; use dynamo_runtime::instances::list_all_instances; -use dynamo_runtime::{DistributedRuntime, Runtime, component::Client}; use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use std::sync::Arc; @@ -50,12 +50,10 @@ async fn inner_dynamic_endpoint_handler( let fmt_path = format!("/{}", &path); if !dynamic_endpoints.contains(&fmt_path) { - return Err((StatusCode::NOT_FOUND, "Dynamic endpoint not found")); + return Err((StatusCode::NOT_FOUND, "Endpoint not found")); } - let rt = Runtime::from_current() - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get runtime"))?; - let drt = DistributedRuntime::from_settings(rt).await.map_err(|_| { + let drt = state.distributed_runtime().await.map_err(|_| { ( StatusCode::INTERNAL_SERVER_ERROR, "Failed to get distributed runtime", diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 531ff51f33..778ea61160 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -16,21 +16,23 @@ use crate::discovery::ModelManager; use crate::endpoint_type::EndpointType; use crate::request_template::RequestTemplate; use anyhow::Result; +use async_once_cell::OnceCell; use axum_server::tls_rustls::RustlsConfig; use derive_builder::Builder; use dynamo_runtime::logging::make_request_span; use dynamo_runtime::transports::etcd; +use dynamo_runtime::{DistributedRuntime, Runtime}; use std::net::SocketAddr; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tower_http::trace::TraceLayer; /// HTTP service shared state -#[derive(Default)] pub struct State { metrics: Arc, manager: Arc, etcd_client: Option, + distributed_runtime: Arc>>, flags: StateFlags, } @@ -76,6 +78,7 @@ impl State { manager, metrics: Arc::new(Metrics::default()), etcd_client: None, + distributed_runtime: Arc::new(OnceCell::new()), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -90,6 +93,7 @@ impl State { manager, metrics: Arc::new(Metrics::default()), etcd_client, + distributed_runtime: Arc::new(OnceCell::new()), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -115,6 +119,17 @@ impl State { self.etcd_client.as_ref() } + pub async fn distributed_runtime(&self) -> Result> { + self.distributed_runtime + .get_or_try_init(async { + let rt = Runtime::from_current()?; + let drt = DistributedRuntime::from_settings(rt).await?; + Ok(Arc::new(drt)) + }) + .await + .map(|drt| drt.clone()) + } + // TODO pub fn sse_keep_alive(&self) -> Option { None From f6f7c9190204f08ac7b38cb878f1fed8b3de2c30 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 20:19:12 +0000 Subject: [PATCH 20/34] bump --- components/src/dynamo/sglang/main.py | 4 +++- .../src/dynamo/sglang/request_handlers/native_api_handler.py | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/components/src/dynamo/sglang/main.py b/components/src/dynamo/sglang/main.py index 90a03a6926..18f161ffaf 100644 --- a/components/src/dynamo/sglang/main.py +++ b/components/src/dynamo/sglang/main.py @@ -89,7 +89,9 @@ async def init(runtime: DistributedRuntime, config: Config): ) # TODO: implement other native APIs and come up with clean layer to apply to agg/disagg/etc if config.serving_mode == DisaggregationMode.AGGREGATED: - native_api_tasks = await NativeApiHandler(component, engine, metrics_labels).init_native_apis() + native_api_tasks = await NativeApiHandler( + component, engine, metrics_labels + ).init_native_apis() # Readiness gate: requests wait until model is registered ready_event = asyncio.Event() diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index 74e37b045c..2d430899f2 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -9,7 +9,6 @@ from typing import List, Optional, Tuple import sglang as sgl -from sglang.srt.managers.io_struct import ProfileReqInput from dynamo._core import Component From e774001a8967d049e6173a99af0ff05fbca3fadc Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 20:24:04 +0000 Subject: [PATCH 21/34] bump --- lib/llm/src/http/service/service_v2.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 778ea61160..a48ca440ce 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -126,8 +126,7 @@ impl State { let drt = DistributedRuntime::from_settings(rt).await?; Ok(Arc::new(drt)) }) - .await - .map(|drt| drt.clone()) + .await.cloned() } // TODO From 45028103fbf160f46f613faabab293c3814876dd Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Mon, 6 Oct 2025 20:25:55 +0000 Subject: [PATCH 22/34] bump --- lib/llm/src/http/service/service_v2.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index a48ca440ce..a978306c0a 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -126,7 +126,8 @@ impl State { let drt = DistributedRuntime::from_settings(rt).await?; Ok(Arc::new(drt)) }) - .await.cloned() + .await + .cloned() } // TODO From 0cb978ac7fc340c3ebd46343ac7bfb746271cc60 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 15:33:06 +0000 Subject: [PATCH 23/34] bump --- lib/llm/src/http/service/dynamic_endpoint.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 3337279658..44f2444c90 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -30,6 +30,13 @@ pub fn dynamic_endpoint_router( (docs, router) } +/// Dynamic endpoint handler that discovers component instances from the discovery plane and fans out +/// requests to all instances that registered the matching HTTP endpoint path. +/// +/// Example: POST to `/get_model_info` discovers all instances with `http_endpoint_path = "/get_model_info"`, +/// queries each one, and returns `{"responses": [instance1_result, instance2_result, ...]}`. +/// +/// Returns 404 if no instances have registered the endpoint. async fn inner_dynamic_endpoint_handler( state: Arc, path: String, From 95e3a4f79c7a1fe142a8115a34ee58f429db5f7f Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 17:21:49 +0000 Subject: [PATCH 24/34] swap etcd with drt in http builder --- lib/llm/src/entrypoint/input/http.rs | 5 +-- lib/llm/src/http/service/dynamic_endpoint.rs | 14 +++---- lib/llm/src/http/service/health.rs | 7 +++- lib/llm/src/http/service/service_v2.rs | 40 ++++++-------------- 4 files changed, 24 insertions(+), 42 deletions(-) diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 8dd22cee9c..a2c506c70b 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -55,11 +55,10 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul let http_service = match engine_config { EngineConfig::Dynamic(_) => { let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?; - let etcd_client = distributed_runtime.etcd_client(); // This allows the /health endpoint to query etcd for active instances - http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone()); + http_service_builder = http_service_builder.with_drt(Some(distributed_runtime.clone())); let http_service = http_service_builder.build()?; - match etcd_client { + match distributed_runtime.etcd_client() { Some(ref etcd_client) => { let router_config = engine_config.local_model().router_config(); // Listen for models registering themselves in etcd, add them to HTTP service diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 44f2444c90..cda6ae1647 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -41,12 +41,15 @@ async fn inner_dynamic_endpoint_handler( state: Arc, path: String, ) -> Result { - let etcd_client = state.etcd_client().ok_or(( + let drt = state + .distributed_runtime() + .expect("Failed to get distributed runtime"); + let etcd_client = drt.etcd_client().ok_or(( StatusCode::INTERNAL_SERVER_ERROR, "Failed to get etcd client", ))?; - let instances = list_all_instances(etcd_client) + let instances = list_all_instances(&etcd_client) .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get instances"))?; @@ -60,13 +63,6 @@ async fn inner_dynamic_endpoint_handler( return Err((StatusCode::NOT_FOUND, "Endpoint not found")); } - let drt = state.distributed_runtime().await.map_err(|_| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get distributed runtime", - ) - })?; - let target_instances = instances .iter() .filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone())) diff --git a/lib/llm/src/http/service/health.rs b/lib/llm/src/http/service/health.rs index 6be55254ad..97f9cd5609 100644 --- a/lib/llm/src/http/service/health.rs +++ b/lib/llm/src/http/service/health.rs @@ -52,8 +52,11 @@ async fn live_handler( async fn health_handler( axum::extract::State(state): axum::extract::State>, ) -> impl IntoResponse { - let instances = if let Some(etcd_client) = state.etcd_client() { - match list_all_instances(etcd_client).await { + let drt = state + .distributed_runtime() + .expect("Failed to get distributed runtime"); + let instances = if let Some(etcd_client) = drt.etcd_client() { + match list_all_instances(&etcd_client).await { Ok(instances) => instances, Err(err) => { tracing::warn!("Failed to fetch instances from etcd: {}", err); diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index a978306c0a..ccd206cbbc 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -16,12 +16,10 @@ use crate::discovery::ModelManager; use crate::endpoint_type::EndpointType; use crate::request_template::RequestTemplate; use anyhow::Result; -use async_once_cell::OnceCell; use axum_server::tls_rustls::RustlsConfig; use derive_builder::Builder; +use dynamo_runtime::DistributedRuntime; use dynamo_runtime::logging::make_request_span; -use dynamo_runtime::transports::etcd; -use dynamo_runtime::{DistributedRuntime, Runtime}; use std::net::SocketAddr; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -31,8 +29,7 @@ use tower_http::trace::TraceLayer; pub struct State { metrics: Arc, manager: Arc, - etcd_client: Option, - distributed_runtime: Arc>>, + distributed_runtime: Option, flags: StateFlags, } @@ -77,8 +74,7 @@ impl State { Self { manager, metrics: Arc::new(Metrics::default()), - etcd_client: None, - distributed_runtime: Arc::new(OnceCell::new()), + distributed_runtime: None, flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -88,12 +84,11 @@ impl State { } } - pub fn new_with_etcd(manager: Arc, etcd_client: Option) -> Self { + pub fn new_with_drt(manager: Arc, drt: Option) -> Self { Self { manager, metrics: Arc::new(Metrics::default()), - etcd_client, - distributed_runtime: Arc::new(OnceCell::new()), + distributed_runtime: drt, flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -115,19 +110,8 @@ impl State { self.manager.clone() } - pub fn etcd_client(&self) -> Option<&etcd::Client> { - self.etcd_client.as_ref() - } - - pub async fn distributed_runtime(&self) -> Result> { - self.distributed_runtime - .get_or_try_init(async { - let rt = Runtime::from_current()?; - let drt = DistributedRuntime::from_settings(rt).await?; - Ok(Arc::new(drt)) - }) - .await - .cloned() + pub fn distributed_runtime(&self) -> Option<&DistributedRuntime> { + self.distributed_runtime.as_ref() } // TODO @@ -186,7 +170,7 @@ pub struct HttpServiceConfig { request_template: Option, #[builder(default = "None")] - etcd_client: Option, + distributed_runtime: Option, } impl HttpService { @@ -309,8 +293,8 @@ impl HttpServiceConfigBuilder { let config: HttpServiceConfig = self.build_internal()?; let model_manager = Arc::new(ModelManager::new()); - let etcd_client = config.etcd_client; - let state = Arc::new(State::new_with_etcd(model_manager, etcd_client)); + let drt = config.distributed_runtime; + let state = Arc::new(State::new_with_drt(model_manager, drt)); state .flags @@ -371,8 +355,8 @@ impl HttpServiceConfigBuilder { self } - pub fn with_etcd_client(mut self, etcd_client: Option) -> Self { - self.etcd_client = Some(etcd_client); + pub fn with_drt(mut self, drt: Option) -> Self { + self.distributed_runtime = Some(drt); self } From cdc4da05b425bb2724d4fd66f505c11facd28a8b Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 17:28:17 +0000 Subject: [PATCH 25/34] gp --- Cargo.lock | 1 - lib/bindings/python/Cargo.lock | 1 - lib/llm/Cargo.toml | 1 - 3 files changed, 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6719228b99..83994db565 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2141,7 +2141,6 @@ dependencies = [ "approx", "assert_matches", "async-nats", - "async-once-cell", "async-stream", "async-trait", "async_zmq", diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index 9ef56be6c3..c461e88c21 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -1470,7 +1470,6 @@ dependencies = [ "akin", "anyhow", "async-nats", - "async-once-cell", "async-stream", "async-trait", "async_zmq", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 48bb290a79..9d152160ae 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -42,7 +42,6 @@ aho-corasick = "1.1" anyhow = { workspace = true } dynamo-async-openai = { workspace = true } dynamo-parsers = { workspace = true } -async-once-cell = { version = "0.5.4" } async-stream = { workspace = true } async-trait = { workspace = true } async-nats = { workspace = true } From 3d79298509900785903817b88434bc7f74bc8ef0 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 19:10:17 +0000 Subject: [PATCH 26/34] lel --- .../request_handlers/native_api_handler.py | 1 + lib/llm/src/http/service/dynamic_endpoint.rs | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/components/src/dynamo/sglang/request_handlers/native_api_handler.py b/components/src/dynamo/sglang/request_handlers/native_api_handler.py index 2d430899f2..0b9cc767ff 100644 --- a/components/src/dynamo/sglang/request_handlers/native_api_handler.py +++ b/components/src/dynamo/sglang/request_handlers/native_api_handler.py @@ -57,6 +57,7 @@ async def init_native_apis( return tasks async def get_model_info(self, request: dict): + _ = request result = { "model_path": self.tm.server_args.model_path, "tokenizer_path": self.tm.server_args.tokenizer_path, diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index cda6ae1647..fba46e7159 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -76,11 +76,17 @@ async fn inner_dynamic_endpoint_handler( let c = ns .component(instance.component.clone()) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get component"))?; - let ep = c.endpoint(path.clone()); + let ep = c.endpoint(instance.endpoint.clone()); let client = ep .client() .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get client"))?; + client.wait_for_instances().await.map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to wait for instances", + ) + })?; target_clients.push(client); } @@ -91,10 +97,10 @@ async fn inner_dynamic_endpoint_handler( .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get router"))?; - let mut stream = router - .round_robin(().into()) - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route"))?; + let mut stream = router.round_robin(().into()).await.map_err(|e| { + tracing::error!("Failed to route: {:?}", e); + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route") + })?; while let Some(resp) = stream.next().await { all_responses.push(resp); From 7ffe3e26b075dc3dad56e1d0edfa216a4d42901a Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 19:30:57 +0000 Subject: [PATCH 27/34] test --- tests/serve/test_sglang.py | 7 ++++++- tests/utils/payload_builder.py | 15 ++++++++++++++- tests/utils/payloads.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/tests/serve/test_sglang.py b/tests/serve/test_sglang.py index 9d6de3dbd4..fc763c1072 100644 --- a/tests/serve/test_sglang.py +++ b/tests/serve/test_sglang.py @@ -19,6 +19,7 @@ completion_payload_default, embedding_payload, embedding_payload_default, + model_info_payload_default, ) logger = logging.getLogger(__name__) @@ -42,7 +43,11 @@ class SGLangConfig(EngineConfig): model="deepseek-ai/DeepSeek-R1-Distill-Llama-8B", env={}, models_port=8000, - request_payloads=[chat_payload_default(), completion_payload_default()], + request_payloads=[ + chat_payload_default(), + completion_payload_default(), + model_info_payload_default(), + ], ), "disaggregated": SGLangConfig( name="disaggregated", diff --git a/tests/utils/payload_builder.py b/tests/utils/payload_builder.py index 26404d3896..3737353193 100644 --- a/tests/utils/payload_builder.py +++ b/tests/utils/payload_builder.py @@ -9,6 +9,7 @@ CompletionPayload, EmbeddingPayload, MetricsPayload, + ModelInfoPayload, ) # Common default text prompt used across tests @@ -61,7 +62,6 @@ def completion_payload_default( expected_response=expected_response or ["AI"], ) - def metric_payload_default( min_num_requests: int, repeat_count: int = 1, @@ -76,6 +76,19 @@ def metric_payload_default( ) +def model_info_payload_default( + repeat_count: int = 1, + expected_response: Optional[List[str]] = None, + expected_log: Optional[List[str]] = None, +) -> ModelInfoPayload: + return ModelInfoPayload( + body={}, + repeat_count=repeat_count, + expected_log=expected_log or [], + expected_response=expected_response or ["Model:"], + ) + + def chat_payload( content: Union[str, List[Dict[str, Any]]], repeat_count: int = 1, diff --git a/tests/utils/payloads.py b/tests/utils/payloads.py index e7b547e576..e70bee86f2 100644 --- a/tests/utils/payloads.py +++ b/tests/utils/payloads.py @@ -190,6 +190,35 @@ def response_handler(self, response: Any) -> str: return EmbeddingPayload.extract_embeddings(response) +@dataclass +class ModelInfoPayload(BasePayload): + """Payload for get_model_info endpoint.""" + + endpoint: str = "/get_model_info" + + @staticmethod + def extract_model_info(response): + """ + Process get_model_info API responses. + """ + response.raise_for_status() + result = response.json() + assert "responses" in result, "Missing 'responses' in response" + assert len(result["responses"]) > 0, "Empty responses in response" + + data = result["responses"][0].get("data", {}) + assert "data" in data, "Missing 'data' in response data" + assert len(data["data"]) > 0, "Empty data in response" + + model_info = data["data"][0] + assert "model_path" in model_info, "Missing 'model_path' in model info" + + return f"Model: {model_info['model_path']}" + + def response_handler(self, response: Any) -> str: + return ModelInfoPayload.extract_model_info(response) + + @dataclass class MetricsPayload(BasePayload): endpoint: str = "/metrics" From 28e268dfef58e999f63a6a6061d86ce0bc18cfdf Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 19:31:42 +0000 Subject: [PATCH 28/34] no panicgit add .! --- lib/llm/src/http/service/dynamic_endpoint.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index fba46e7159..2f1851ceea 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -41,9 +41,10 @@ async fn inner_dynamic_endpoint_handler( state: Arc, path: String, ) -> Result { - let drt = state - .distributed_runtime() - .expect("Failed to get distributed runtime"); + let drt = state.distributed_runtime().ok_or(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to get distributed runtime", + ))?; let etcd_client = drt.etcd_client().ok_or(( StatusCode::INTERNAL_SERVER_ERROR, "Failed to get etcd client", From 785292029ef235b51b26516487f6d2d1e3247022 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 19:33:17 +0000 Subject: [PATCH 29/34] bruh --- tests/utils/payload_builder.py | 1 + tests/utils/payloads.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/utils/payload_builder.py b/tests/utils/payload_builder.py index 3737353193..501b7e55c1 100644 --- a/tests/utils/payload_builder.py +++ b/tests/utils/payload_builder.py @@ -62,6 +62,7 @@ def completion_payload_default( expected_response=expected_response or ["AI"], ) + def metric_payload_default( min_num_requests: int, repeat_count: int = 1, diff --git a/tests/utils/payloads.py b/tests/utils/payloads.py index e70bee86f2..02e2b1c36b 100644 --- a/tests/utils/payloads.py +++ b/tests/utils/payloads.py @@ -205,14 +205,14 @@ def extract_model_info(response): result = response.json() assert "responses" in result, "Missing 'responses' in response" assert len(result["responses"]) > 0, "Empty responses in response" - + data = result["responses"][0].get("data", {}) assert "data" in data, "Missing 'data' in response data" assert len(data["data"]) > 0, "Empty data in response" - + model_info = data["data"][0] assert "model_path" in model_info, "Missing 'model_path' in model info" - + return f"Model: {model_info['model_path']}" def response_handler(self, response: Any) -> str: From a3e28391adea643f96929c6e8b391229a602bebb Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 20:58:36 +0000 Subject: [PATCH 30/34] bump --- lib/llm/src/http/service/dynamic_endpoint.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 2f1851ceea..83c5e60009 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -40,6 +40,7 @@ pub fn dynamic_endpoint_router( async fn inner_dynamic_endpoint_handler( state: Arc, path: String, + body: serde_json::Value, ) -> Result { let drt = state.distributed_runtime().ok_or(( StatusCode::INTERNAL_SERVER_ERROR, @@ -93,12 +94,14 @@ async fn inner_dynamic_endpoint_handler( let mut all_responses = Vec::new(); for client in target_clients { - let router = - PushRouter::<(), Annotated>::from_client(client, Default::default()) - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get router"))?; + let router = PushRouter::>::from_client( + client, + Default::default(), + ) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get router"))?; - let mut stream = router.round_robin(().into()).await.map_err(|e| { + let mut stream = router.round_robin(body.clone().into()).await.map_err(|e| { tracing::error!("Failed to route: {:?}", e); (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route") })?; @@ -116,8 +119,10 @@ async fn inner_dynamic_endpoint_handler( async fn dynamic_endpoint_handler( axum::extract::State(state): axum::extract::State>, axum::extract::Path(path): axum::extract::Path, + body: Option>, ) -> impl IntoResponse { - inner_dynamic_endpoint_handler(state, path) + let body = body.map(|Json(v)| v).unwrap_or(serde_json::json!({})); + inner_dynamic_endpoint_handler(state, path, body) .await .map_err(|(status_code, err_string)| { ( From 11e84390e7424cce9987f0fb5f02d52acc184967 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Tue, 7 Oct 2025 21:00:05 +0000 Subject: [PATCH 31/34] simple --- lib/llm/src/http/service/dynamic_endpoint.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 83c5e60009..42c49b1d6b 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -65,13 +65,11 @@ async fn inner_dynamic_endpoint_handler( return Err((StatusCode::NOT_FOUND, "Endpoint not found")); } - let target_instances = instances + let mut target_clients: Vec = Vec::new(); + for instance in instances .iter() .filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone())) - .collect::>(); - - let mut target_clients: Vec = Vec::new(); - for instance in target_instances { + { let ns = drt .namespace(instance.namespace.clone()) .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get namespace"))?; From 2b7d61a77400053e9816d11b66b16dee53b17a13 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 8 Oct 2025 06:06:01 +0000 Subject: [PATCH 32/34] full refactor --- lib/llm/src/entrypoint/input/http.rs | 27 +- lib/llm/src/http/service.rs | 1 + lib/llm/src/http/service/clear_kv_blocks.rs | 248 ------------------- lib/llm/src/http/service/dynamic_endpoint.rs | 62 +---- lib/llm/src/http/service/dynamic_registry.rs | 217 ++++++++++++++++ lib/llm/src/http/service/service_v2.rs | 12 +- lib/runtime/src/protocols.rs | 2 +- 7 files changed, 265 insertions(+), 304 deletions(-) delete mode 100644 lib/llm/src/http/service/clear_kv_blocks.rs create mode 100644 lib/llm/src/http/service/dynamic_registry.rs diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index a2c506c70b..6e89c7217f 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -17,6 +17,7 @@ use crate::{ completions::{NvCreateCompletionRequest, NvCreateCompletionResponse}, }, }; +use dynamo_runtime::component::INSTANCE_ROOT_PATH; use dynamo_runtime::transports::etcd; use dynamo_runtime::{DistributedRuntime, Runtime}; use dynamo_runtime::{distributed::DistributedConfig, pipeline::RouterMode}; @@ -70,7 +71,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul } else { Some(namespace.to_string()) }; - run_watcher( + run_model_watcher( distributed_runtime, http_service.state().manager_clone(), etcd_client.clone(), @@ -83,6 +84,10 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul http_service.state().metrics_clone(), ) .await?; + + // Start dynamic HTTP endpoint watcher (instances prefix) + run_endpoint_watcher(etcd_client.clone(), Arc::new(http_service.clone())) + .await?; } None => { // Static endpoints don't need discovery @@ -220,7 +225,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul /// Spawns a task that watches for new models in etcd at network_prefix, /// and registers them with the ModelManager so that the HTTP service can use them. #[allow(clippy::too_many_arguments)] -async fn run_watcher( +async fn run_model_watcher( runtime: DistributedRuntime, model_manager: Arc, etcd_client: etcd::Client, @@ -264,6 +269,24 @@ async fn run_watcher( Ok(()) } +/// Spawns a task that watches instance records for dynamic HTTP endpoints and updates the +/// DynamicEndpointWatcher held in the HTTP service state. +async fn run_endpoint_watcher( + etcd_client: etcd::Client, + http_service: Arc, +) -> anyhow::Result<()> { + if let Some(dep_watcher) = http_service.state().dynamic_registry() { + let instances_watcher = etcd_client + .kv_get_and_watch_prefix(INSTANCE_ROOT_PATH) + .await?; + let (_prefix2, _watcher2, instances_rx) = instances_watcher.dissolve(); + tokio::spawn(async move { + dep_watcher.watch(instances_rx).await; + }); + } + Ok(()) +} + /// Updates HTTP service endpoints based on available model types fn update_http_endpoints(service: Arc, model_type: ModelUpdate) { tracing::debug!( diff --git a/lib/llm/src/http/service.rs b/lib/llm/src/http/service.rs index a6b7ab2198..6f4e9182e4 100644 --- a/lib/llm/src/http/service.rs +++ b/lib/llm/src/http/service.rs @@ -22,6 +22,7 @@ mod openai; pub mod disconnect; pub mod dynamic_endpoint; +pub mod dynamic_registry; pub mod error; pub mod health; pub mod metrics; diff --git a/lib/llm/src/http/service/clear_kv_blocks.rs b/lib/llm/src/http/service/clear_kv_blocks.rs deleted file mode 100644 index ee1cc3bc3e..0000000000 --- a/lib/llm/src/http/service/clear_kv_blocks.rs +++ /dev/null @@ -1,248 +0,0 @@ -// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -use super::{service_v2, RouteDoc}; -use axum::{http::Method, response::IntoResponse, routing::post, Json, Router}; -use serde_json::json; -use std::sync::Arc; - -use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; - -pub const CLEAR_KV_ENDPOINT: &str = "clear_kv_blocks"; - -pub fn clear_kv_blocks_router( - state: Arc, - path: Option, -) -> (Vec, Router) { - let path = path.unwrap_or_else(|| "/clear_kv_blocks".to_string()); - - let docs: Vec = vec![RouteDoc::new(Method::POST, &path)]; - - let router = Router::new() - .route(&path, post(clear_kv_blocks_handler)) - .with_state(state); - - (docs, router) -} - -async fn clear_kv_blocks_handler( - axum::extract::State(state): axum::extract::State>, -) -> impl IntoResponse { - let model_entries = state.manager().get_model_entries(); - - // if there are no active workers - if model_entries.is_empty() { - return Json(serde_json::json!({ - "message": "No active worker groups found" - })); - } - - let distributed = match state.runtime() { - Some(runtime) => runtime, - None => { - return Json(serde_json::json!({ - "message": "Failed to create distributed runtime", - })); - } - }; - - let mut cleared_workers = Vec::new(); - let mut failed_workers = Vec::new(); - - // update cleared and failed workers - let mut add_worker_result = |success: bool, - name: String, - status: &str, - ns: &str, - comp: &str, - message: Option| { - let mut result = json!({ - "name": name, - "endpoint": format!("{}/{}/{}", ns, comp, CLEAR_KV_ENDPOINT), - "status": status, - }); - if success { - if let Some(m) = message { - result["response"] = json!(m); - } - cleared_workers.push(result); - } else { - if let Some(m) = message { - result["error"] = json!(m); - } - failed_workers.push(result); - } - }; - - // create client for each model entry - for entry in &model_entries { - let namespace = &entry.endpoint_id.namespace; - let component = &entry.endpoint_id.component; - let entry_name = entry.name.to_string(); - - tracing::debug!("Processing worker group: {}/{}", namespace, component); - - let namespace_obj = match distributed.namespace(namespace) { - Ok(ns) => ns, - Err(e) => { - add_worker_result( - false, - entry_name, - "Failed to get namespace", - namespace, - component, - Some(e.to_string()), - ); - continue; - } - }; - - let component_obj = match namespace_obj.component(component) { - Ok(comp) => comp, - Err(e) => { - add_worker_result( - false, - entry_name, - "Failed to get component", - namespace, - component, - Some(e.to_string()), - ); - continue; - } - }; - - let endpoint: dynamo_runtime::component::Endpoint = - component_obj.endpoint(CLEAR_KV_ENDPOINT); - - let client = match endpoint.client().await { - Ok(c) => c, - Err(e) => { - add_worker_result( - false, - entry_name, - "Failed to get client", - namespace, - component, - Some(e.to_string()), - ); - continue; - } - }; - - let router = match PushRouter::<(), serde_json::Value>::from_client( - client.clone(), - Default::default(), - ) - .await - { - Ok(r) => r, - Err(e) => { - add_worker_result( - false, - entry_name, - "Failed to create router", - namespace, - component, - Some(e.to_string()), - ); - continue; - } - }; - - let instances = match component_obj.list_instances().await { - Ok(instances) => instances, - Err(e) => { - add_worker_result( - false, - entry_name, - "Failed to get instances for worker group", - namespace, - component, - Some(e.to_string()), - ); - continue; - } - }; - - if instances.is_empty() { - add_worker_result( - false, - entry_name, - "No instances found for worker group", - namespace, - component, - None, - ); - continue; - } - - let instances_filtered = instances - .clone() - .into_iter() - .filter(|instance| instance.endpoint == CLEAR_KV_ENDPOINT) - .collect::>(); - - if instances_filtered.is_empty() { - let found_endpoints: Vec = instances - .iter() - .map(|instance| instance.endpoint.clone()) - .collect(); - add_worker_result( - false, - entry_name, - &format!( - "Worker group doesn't support clear_kv_blocks. Supported endpoints: {}", - found_endpoints.join(", ") - ), - namespace, - component, - None, - ); - continue; - } - - for instance in &instances_filtered { - let instance_name = format!("{}-instance-{}", entry.name, instance.id()); - match router.direct(().into(), instance.id()).await { - Ok(mut stream) => match stream.next().await { - Some(response) => { - add_worker_result( - true, - instance_name, - "Successfully cleared kv blocks for instance", - namespace, - component, - Some(response.to_string()), - ); - } - None => { - add_worker_result( - false, - instance_name, - "No response from instance", - namespace, - component, - None, - ); - } - }, - Err(e) => { - add_worker_result( - false, - instance_name, - "Failed to send request for instance", - namespace, - component, - Some(e.to_string()), - ); - } - } - } - } - - Json(serde_json::json!({ - "cleared_workers": cleared_workers, - "failed_workers": failed_workers - })) -} diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 42c49b1d6b..403f1c1933 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -9,8 +9,6 @@ use axum::{ response::IntoResponse, routing::post, }; -use dynamo_runtime::component::Client; -use dynamo_runtime::instances::list_all_instances; use dynamo_runtime::{pipeline::PushRouter, stream::StreamExt}; use std::sync::Arc; @@ -30,11 +28,8 @@ pub fn dynamic_endpoint_router( (docs, router) } -/// Dynamic endpoint handler that discovers component instances from the discovery plane and fans out -/// requests to all instances that registered the matching HTTP endpoint path. -/// -/// Example: POST to `/get_model_info` discovers all instances with `http_endpoint_path = "/get_model_info"`, -/// queries each one, and returns `{"responses": [instance1_result, instance2_result, ...]}`. +/// Dynamic endpoint handler that fans out requests to all instances that registered +/// the matching HTTP endpoint path, using the background registry. /// /// Returns 404 if no instances have registered the endpoint. async fn inner_dynamic_endpoint_handler( @@ -42,53 +37,16 @@ async fn inner_dynamic_endpoint_handler( path: String, body: serde_json::Value, ) -> Result { - let drt = state.distributed_runtime().ok_or(( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get distributed runtime", - ))?; - let etcd_client = drt.etcd_client().ok_or(( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get etcd client", - ))?; - - let instances = list_all_instances(&etcd_client) - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get instances"))?; - - let dynamic_endpoints = instances - .iter() - .filter_map(|instance| instance.http_endpoint_path.clone()) - .collect::>(); - let fmt_path = format!("/{}", &path); - if !dynamic_endpoints.contains(&fmt_path) { - return Err((StatusCode::NOT_FOUND, "Endpoint not found")); - } - - let mut target_clients: Vec = Vec::new(); - for instance in instances - .iter() - .filter(|instance| instance.http_endpoint_path == Some(fmt_path.clone())) + let registry = state.dynamic_registry(); + let target_clients = match registry + .expect("Dynamic registry not found") + .get_clients(&fmt_path) + .await { - let ns = drt - .namespace(instance.namespace.clone()) - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get namespace"))?; - let c = ns - .component(instance.component.clone()) - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get component"))?; - let ep = c.endpoint(instance.endpoint.clone()); - let client = ep - .client() - .await - .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get client"))?; - client.wait_for_instances().await.map_err(|_| { - ( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to wait for instances", - ) - })?; - target_clients.push(client); - } + Some(clients) if !clients.is_empty() => clients, + _ => return Err((StatusCode::NOT_FOUND, "Endpoint not found")), + }; let mut all_responses = Vec::new(); for client in target_clients { diff --git a/lib/llm/src/http/service/dynamic_registry.rs b/lib/llm/src/http/service/dynamic_registry.rs new file mode 100644 index 0000000000..34c44adc42 --- /dev/null +++ b/lib/llm/src/http/service/dynamic_registry.rs @@ -0,0 +1,217 @@ +// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +//! Dynamic HTTP endpoint watcher for native HTTP paths. +//! +//! This watcher maintains a small, in-memory mapping from HTTP path -> set of +//! `EndpointId` and a cache of `EndpointId` -> `Client` (one per endpoint). +//! It consumes etcd watch events for instance records and updates the mapping +//! on PUT/DELETE. The HTTP hot path performs a read-only lookup to get Clients +//! and does not touch etcd. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use tokio::sync::{RwLock, mpsc::Receiver}; + +use dynamo_runtime::DistributedRuntime; +use dynamo_runtime::component::Client; +use dynamo_runtime::component::Instance; +use dynamo_runtime::protocols::EndpointId; +use dynamo_runtime::transports::etcd::WatchEvent; + +fn normalize_path(path: &str) -> String { + if path.is_empty() { + return "/".to_string(); + } + if path.starts_with('/') { + path.to_string() + } else { + format!("/{}", path) + } +} + +#[derive(Default)] +struct RegistryInner { + // path -> set of endpoint triples registered for that path + paths: HashMap>, + // endpoint -> constructed client (one per endpoint) + endpoint_clients: HashMap, + // instance etcd key -> (path, endpoint) for delete bookkeeping + instance_index: HashMap, +} + +/// Watches instance records and exposes fast lookups from HTTP path -> Clients. +#[derive(Clone)] +pub struct DynamicEndpointWatcher { + drt: Option, + inner: Arc>, +} + +impl DynamicEndpointWatcher { + pub fn new(drt: Option) -> Self { + Self { + drt, + inner: Arc::new(RwLock::new(RegistryInner::default())), + } + } + + /// Watch loop: consume etcd WatchEvents and update local state. + pub async fn watch(&self, mut rx: Receiver) { + while let Some(evt) = rx.recv().await { + match evt { + WatchEvent::Put(kv) => { + let key = match kv.key_str() { + Ok(k) => k.to_string(), + Err(e) => { + tracing::warn!("Invalid UTF-8 in instance key: {e:?}"); + continue; + } + }; + match serde_json::from_slice::(kv.value()) { + Ok(instance) => { + if let Err(e) = self.add_instance(&key, instance).await { + tracing::warn!("Failed to process instance PUT: {e:?}"); + } + } + Err(err) => { + tracing::warn!("Failed to parse instance on PUT: {err:?}"); + } + } + } + WatchEvent::Delete(kv) => { + let key = match kv.key_str() { + Ok(k) => k.to_string(), + Err(e) => { + tracing::warn!("Invalid UTF-8 in instance key on DELETE: {e:?}"); + continue; + } + }; + self.remove_instance(&key).await; + } + } + } + } + + async fn ensure_client(&self, eid: &EndpointId) -> anyhow::Result { + if let Some(c) = self.inner.read().await.endpoint_clients.get(eid) { + return Ok(c.clone()); + } + let drt = self + .drt + .clone() + .ok_or_else(|| anyhow::anyhow!("No DistributedRuntime available"))?; + let ns = drt + .namespace(eid.namespace.clone()) + .map_err(|e| anyhow::anyhow!("namespace(): {e}"))?; + let comp = ns + .component(eid.component.clone()) + .map_err(|e| anyhow::anyhow!("component(): {e}"))?; + let ep = comp.endpoint(eid.name.clone()); + let client = ep.client().await?; + // Ensure at least one instance is observed before publishing the client + let _ = client.wait_for_instances().await?; + self.inner + .write() + .await + .endpoint_clients + .insert(eid.clone(), client.clone()); + tracing::info!( + path = %eid.as_url(), + namespace = %eid.namespace, + component = %eid.component, + endpoint = %eid.name, + "Dynamic HTTP endpoint client ready" + ); + Ok(client) + } + + async fn add_instance(&self, key: &str, instance: Instance) -> anyhow::Result<()> { + let Some(path) = instance.http_endpoint_path.as_ref() else { + // not a dynamic HTTP endpoint; ignore + return Ok(()); + }; + let path = normalize_path(path); + + let endpoint_id = EndpointId { + namespace: instance.namespace, + component: instance.component, + name: instance.endpoint, + }; + + let mut guard = self.inner.write().await; + + guard + .instance_index + .insert(key.to_string(), (path.clone(), endpoint_id.clone())); + + let set = guard.paths.entry(path.clone()).or_insert_with(HashSet::new); + let inserted_new = set.insert(endpoint_id.clone()); + let need_client = inserted_new && !guard.endpoint_clients.contains_key(&endpoint_id); + drop(guard); // release before await + + if need_client { + if let Err(e) = self.ensure_client(&endpoint_id).await { + tracing::warn!("Failed to create client for dynamic endpoint triple: {e:?}"); + } + tracing::info!( + http_path = %path, + namespace = %endpoint_id.namespace, + component = %endpoint_id.component, + endpoint = %endpoint_id.name, + "Registered dynamic HTTP endpoint path" + ); + } + + Ok(()) + } + + async fn remove_instance(&self, key: &str) { + let (_path, endpoint_id) = { + let mut guard = self.inner.write().await; + match guard.instance_index.remove(key) { + Some(v) => { + if let Some(set) = guard.paths.get_mut(&v.0) { + set.remove(&v.1); + if set.is_empty() { + guard.paths.remove(&v.0); + } + } + v + } + None => return, + } + }; + + let still_used = { + let guard = self.inner.read().await; + guard.paths.values().any(|set| set.contains(&endpoint_id)) + }; + if !still_used { + let mut guard = self.inner.write().await; + if guard.endpoint_clients.remove(&endpoint_id).is_some() { + tracing::info!( + namespace = %endpoint_id.namespace, + component = %endpoint_id.component, + endpoint = %endpoint_id.name, + "Removed dynamic HTTP endpoint client" + ); + } + } + } + + /// Get a cloned list of clients for a path. Returns None if the path is unknown. + pub async fn get_clients(&self, path: &str) -> Option> { + let path = normalize_path(path); + let guard = self.inner.read().await; + let triples: Vec = guard + .paths + .get(&path) + .map(|set| set.iter().cloned().collect())?; + let clients = triples + .into_iter() + .filter_map(|t| guard.endpoint_clients.get(&t).cloned()) + .collect::>(); + Some(clients) + } +} diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index ccd206cbbc..ebcac32174 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -11,6 +11,7 @@ use std::time::Duration; use super::Metrics; use super::RouteDoc; +use super::dynamic_registry::DynamicEndpointWatcher; use super::metrics; use crate::discovery::ModelManager; use crate::endpoint_type::EndpointType; @@ -30,6 +31,7 @@ pub struct State { metrics: Arc, manager: Arc, distributed_runtime: Option, + dynamic_registry: Option, flags: StateFlags, } @@ -75,6 +77,7 @@ impl State { manager, metrics: Arc::new(Metrics::default()), distributed_runtime: None, + dynamic_registry: None, flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -88,7 +91,8 @@ impl State { Self { manager, metrics: Arc::new(Metrics::default()), - distributed_runtime: drt, + distributed_runtime: drt.clone(), + dynamic_registry: Some(DynamicEndpointWatcher::new(drt)), flags: StateFlags { chat_endpoints_enabled: AtomicBool::new(false), cmpl_endpoints_enabled: AtomicBool::new(false), @@ -114,6 +118,10 @@ impl State { self.distributed_runtime.as_ref() } + pub fn dynamic_registry(&self) -> Option { + self.dynamic_registry.clone() + } + // TODO pub fn sse_keep_alive(&self) -> Option { None @@ -315,6 +323,8 @@ impl HttpServiceConfigBuilder { // Note: Metrics polling task will be started in run() method to have access to cancellation token + // Start dynamic endpoint watcher: rely on upstream to provide rx; handled in http.rs run() + let mut router = axum::Router::new(); let mut all_docs = Vec::new(); diff --git a/lib/runtime/src/protocols.rs b/lib/runtime/src/protocols.rs index 33f8efd79e..6713308709 100644 --- a/lib/runtime/src/protocols.rs +++ b/lib/runtime/src/protocols.rs @@ -36,7 +36,7 @@ pub struct Component { /// /// Example format: `"namespace/component/endpoint"` /// -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)] pub struct EndpointId { pub namespace: String, pub component: String, From 6f194ee365ef13cf4d5ec869882761cf4eef23c5 Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 8 Oct 2025 06:26:47 +0000 Subject: [PATCH 33/34] try --- lib/llm/src/entrypoint/input/http.rs | 2 +- lib/llm/src/http/service/dynamic_endpoint.rs | 27 +++++++++++--------- lib/llm/src/http/service/dynamic_registry.rs | 9 +++---- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/llm/src/entrypoint/input/http.rs b/lib/llm/src/entrypoint/input/http.rs index 6e89c7217f..38057e8fca 100644 --- a/lib/llm/src/entrypoint/input/http.rs +++ b/lib/llm/src/entrypoint/input/http.rs @@ -85,7 +85,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul ) .await?; - // Start dynamic HTTP endpoint watcher (instances prefix) + // Start dynamic HTTP endpoint watcher run_endpoint_watcher(etcd_client.clone(), Arc::new(http_service.clone())) .await?; } diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 403f1c1933..684dd5b463 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -1,6 +1,10 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 +//! Dynamic endpoint handler that fans out requests to all instances that registered +//! the matching HTTP endpoint path, using the background registry. +//! Returns 404 if no instances have registered the endpoint. + use super::{RouteDoc, service_v2}; use crate::types::Annotated; use axum::{ @@ -28,10 +32,6 @@ pub fn dynamic_endpoint_router( (docs, router) } -/// Dynamic endpoint handler that fans out requests to all instances that registered -/// the matching HTTP endpoint path, using the background registry. -/// -/// Returns 404 if no instances have registered the endpoint. async fn inner_dynamic_endpoint_handler( state: Arc, path: String, @@ -48,22 +48,25 @@ async fn inner_dynamic_endpoint_handler( _ => return Err((StatusCode::NOT_FOUND, "Endpoint not found")), }; + // For now broadcast to all instances using direct routing let mut all_responses = Vec::new(); for client in target_clients { let router = PushRouter::>::from_client( - client, + client.clone(), Default::default(), ) .await .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to get router"))?; - let mut stream = router.round_robin(body.clone().into()).await.map_err(|e| { - tracing::error!("Failed to route: {:?}", e); - (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route") - })?; - - while let Some(resp) = stream.next().await { - all_responses.push(resp); + let ids = client.instance_ids_avail().clone(); + for id in ids.iter() { + let mut stream = router.direct(body.clone().into(), *id).await.map_err(|e| { + tracing::error!("Failed to route (direct): {:?}", e); + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to route") + })?; + while let Some(resp) = stream.next().await { + all_responses.push(resp); + } } } diff --git a/lib/llm/src/http/service/dynamic_registry.rs b/lib/llm/src/http/service/dynamic_registry.rs index 34c44adc42..0930788407 100644 --- a/lib/llm/src/http/service/dynamic_registry.rs +++ b/lib/llm/src/http/service/dynamic_registry.rs @@ -33,15 +33,13 @@ fn normalize_path(path: &str) -> String { #[derive(Default)] struct RegistryInner { - // path -> set of endpoint triples registered for that path + // Only 1 entry per EndpointId paths: HashMap>, - // endpoint -> constructed client (one per endpoint) endpoint_clients: HashMap, - // instance etcd key -> (path, endpoint) for delete bookkeeping + // Maps etcd key to its (path, endpoint) for easier deletes instance_index: HashMap, } -/// Watches instance records and exposes fast lookups from HTTP path -> Clients. #[derive(Clone)] pub struct DynamicEndpointWatcher { drt: Option, @@ -56,7 +54,6 @@ impl DynamicEndpointWatcher { } } - /// Watch loop: consume etcd WatchEvents and update local state. pub async fn watch(&self, mut rx: Receiver) { while let Some(evt) = rx.recv().await { match evt { @@ -148,7 +145,7 @@ impl DynamicEndpointWatcher { let set = guard.paths.entry(path.clone()).or_insert_with(HashSet::new); let inserted_new = set.insert(endpoint_id.clone()); let need_client = inserted_new && !guard.endpoint_clients.contains_key(&endpoint_id); - drop(guard); // release before await + drop(guard); if need_client { if let Err(e) = self.ensure_client(&endpoint_id).await { From c6c39cfc629d948cd7c91f70c249859ab264faee Mon Sep 17 00:00:00 2001 From: Ishan Dhanani Date: Wed, 8 Oct 2025 06:41:19 +0000 Subject: [PATCH 34/34] handling --- lib/llm/src/http/service/dynamic_endpoint.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/llm/src/http/service/dynamic_endpoint.rs b/lib/llm/src/http/service/dynamic_endpoint.rs index 684dd5b463..9adfa6e010 100644 --- a/lib/llm/src/http/service/dynamic_endpoint.rs +++ b/lib/llm/src/http/service/dynamic_endpoint.rs @@ -39,11 +39,16 @@ async fn inner_dynamic_endpoint_handler( ) -> Result { let fmt_path = format!("/{}", &path); let registry = state.dynamic_registry(); - let target_clients = match registry - .expect("Dynamic registry not found") - .get_clients(&fmt_path) - .await - { + let registry = match registry { + Some(r) => r, + None => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Dynamic registry not found", + )); + } + }; + let target_clients = match registry.get_clients(&fmt_path).await { Some(clients) if !clients.is_empty() => clients, _ => return Err((StatusCode::NOT_FOUND, "Endpoint not found")), };