Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b317939
no errors for now
Oct 2, 2025
6d25ddc
bump
ishandhanani Oct 3, 2025
f90eb88
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 3, 2025
3b492f4
refactor
ishandhanani Oct 3, 2025
2124d91
found
ishandhanani Oct 3, 2025
ac9357e
nice
ishandhanani Oct 4, 2025
999c716
err
ishandhanani Oct 4, 2025
889f0c5
cleanup
ishandhanani Oct 5, 2025
9562483
bump
ishandhanani Oct 5, 2025
48bc524
go
ishandhanani Oct 5, 2025
ad934a4
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 5, 2025
c2d10ee
working
ishandhanani Oct 5, 2025
f3d74b8
go
ishandhanani Oct 5, 2025
3820e80
profile
ishandhanani Oct 5, 2025
a791ca7
bump:
ishandhanani Oct 5, 2025
5886ae4
redundant space
ishandhanani Oct 5, 2025
5166638
bump
ishandhanani Oct 5, 2025
35aa49e
slightly logic change since only agg endpoints supported now
ishandhanani Oct 6, 2025
b0ae2d4
reshuffle
ishandhanani Oct 6, 2025
c5307e3
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 6, 2025
f50c54b
selective pieces return a 404
ishandhanani Oct 6, 2025
3e6bf2c
singleton drt
ishandhanani Oct 6, 2025
f6f7c91
bump
ishandhanani Oct 6, 2025
e774001
bump
ishandhanani Oct 6, 2025
4502810
bump
ishandhanani Oct 6, 2025
18f590d
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
0cb978a
bump
ishandhanani Oct 7, 2025
145fb06
Merge branch 'main' into ishan/late-registration-ye
ishandhanani Oct 7, 2025
95e3a4f
swap etcd with drt in http builder
ishandhanani Oct 7, 2025
cdc4da0
gp
ishandhanani Oct 7, 2025
3d79298
lel
ishandhanani Oct 7, 2025
7ffe3e2
test
ishandhanani Oct 7, 2025
28e268d
no panicgit add .!
ishandhanani Oct 7, 2025
7852920
bruh
ishandhanani Oct 7, 2025
a3e2839
bump
ishandhanani Oct 7, 2025
11e8439
simple
ishandhanani Oct 7, 2025
2b7d61a
full refactor
ishandhanani Oct 8, 2025
6f194ee
try
ishandhanani Oct 8, 2025
c6c39cf
handling
ishandhanani Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions components/backends/sglang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ uv pip install maturin
cd $DYNAMO_HOME/lib/bindings/python
maturin develop --uv
cd $DYNAMO_HOME
# installs sglang supported version along with dynamo
# include the prerelease flag to install flashinfer rc versions
uv pip install --prerelease=allow -e .[sglang]
uv pip install -e .[sglang]
```

</details>
Expand Down
19 changes: 13 additions & 6 deletions components/src/dynamo/sglang/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
MultimodalPrefillWorkerHandler,
MultimodalProcessorHandler,
MultimodalWorkerHandler,
NativeApiHandler,
PrefillWorkerHandler,
)

Expand Down Expand Up @@ -74,7 +75,13 @@ async def init(runtime: DistributedRuntime, config: Config):

generate_endpoint = component.endpoint(dynamo_args.endpoint)

# publisher instantiates the metrics and kv event publishers
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, component, generate_endpoint
)

prefill_client = None
native_api_tasks = []
if config.serving_mode == DisaggregationMode.DECODE:
logging.info("Initializing prefill client")
prefill_client = (
Expand All @@ -83,11 +90,11 @@ async def init(runtime: DistributedRuntime, config: Config):
.endpoint("generate")
.client()
)

# publisher instantiates the metrics and kv event publishers
publisher, metrics_task, metrics_labels = await setup_sgl_metrics(
engine, config, component, generate_endpoint
)
# TODO: implement other native APIs and come up with clean layer to apply to agg/disagg/etc
if config.serving_mode == DisaggregationMode.AGGREGATED:
native_api_tasks = await NativeApiHandler(
component, engine, metrics_labels
).init_native_apis()

# Readiness gate: requests wait until model is registered
ready_event = asyncio.Event()
Expand All @@ -97,7 +104,6 @@ async def init(runtime: DistributedRuntime, config: Config):
health_check_payload = SglangHealthCheckPayload(engine).to_dict()

try:
# Start endpoint immediately and register model concurrently
# Requests queue until ready_event is set
await asyncio.gather(
generate_endpoint.serve_endpoint(
Expand All @@ -113,6 +119,7 @@ async def init(runtime: DistributedRuntime, config: Config):
dynamo_args,
readiness_gate=ready_event,
),
*native_api_tasks,
)
except Exception as e:
logging.error(f"Failed to serve endpoints: {e}")
Expand Down
2 changes: 2 additions & 0 deletions components/src/dynamo/sglang/request_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
MultimodalProcessorHandler,
MultimodalWorkerHandler,
)
from .native_api_handler import NativeApiHandler

__all__ = [
"BaseWorkerHandler",
Expand All @@ -28,6 +29,7 @@
# Multimodal handlers
"MultimodalEncodeWorkerHandler",
"MultimodalPrefillWorkerHandler",
"NativeApiHandler",
"MultimodalProcessorHandler",
"MultimodalWorkerHandler",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# SGLang Native APIs: https://docs.sglang.ai/basic_usage/native_api.html
# Code: https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/entrypoints/http_server.py

import asyncio
import logging
from typing import List, Optional, Tuple

import sglang as sgl

from dynamo._core import Component


class NativeApiHandler:
"""Handler to add sglang native API endpoints to workers"""

def __init__(
self,
component: Component,
engine: sgl.Engine,
metrics_labels: Optional[List[Tuple[str, str]]] = None,
):
self.component = component
self.engine = engine
self.metrics_labels = metrics_labels
self.native_api_tasks = []

async def init_native_apis(
self,
) -> List[asyncio.Task]:
"""
Initialize and register native API endpoints.
Returns list of tasks to be gathered.
"""
logging.info("Initializing native SGLang API endpoints")

self.tm = self.engine.tokenizer_manager

tasks = []

model_info_ep = self.component.endpoint("get_model_info")
tasks.extend(
[
model_info_ep.serve_endpoint(
self.get_model_info,
graceful_shutdown=True,
metrics_labels=self.metrics_labels,
http_endpoint_path="/get_model_info",
),
]
)

self.native_api_tasks = tasks
logging.info(f"Registered {len(tasks)} native API endpoints")
return tasks

async def get_model_info(self, request: dict):
_ = request
result = {
"model_path": self.tm.server_args.model_path,
"tokenizer_path": self.tm.server_args.tokenizer_path,
"preferred_sampling_params": self.tm.server_args.preferred_sampling_params,
"weight_version": self.tm.server_args.weight_version,
}

yield {"data": [result]}
7 changes: 6 additions & 1 deletion lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,15 @@ impl Component {

#[pymethods]
impl Endpoint {
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None))]
#[pyo3(signature = (generator, graceful_shutdown = true, metrics_labels = None, health_check_payload = None, http_endpoint_path = None))]
fn serve_endpoint<'p>(
&self,
py: Python<'p>,
generator: PyObject,
graceful_shutdown: Option<bool>,
metrics_labels: Option<Vec<(String, String)>>,
health_check_payload: Option<&Bound<'p, PyDict>>,
http_endpoint_path: Option<&str>,
) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new(
generator,
Expand Down Expand Up @@ -688,6 +689,10 @@ impl Endpoint {
builder = builder.health_check_payload(payload);
}

if let Some(http_endpoint_path) = http_endpoint_path {
builder = builder.http_endpoint_path(http_endpoint_path);
}

let graceful_shutdown = graceful_shutdown.unwrap_or(true);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
builder
Expand Down
2 changes: 1 addition & 1 deletion lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Endpoint:

...

async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None) -> None:
async def serve_endpoint(self, handler: RequestHandler, graceful_shutdown: bool = True, metrics_labels: Optional[List[Tuple[str, str]]] = None, health_check_payload: Optional[Dict[str, Any]] = None, http_endpoint_path: Optional[str] = None) -> None:
"""
Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
Expand Down
32 changes: 27 additions & 5 deletions lib/llm/src/entrypoint/input/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
},
};
use dynamo_runtime::component::INSTANCE_ROOT_PATH;
use dynamo_runtime::transports::etcd;
use dynamo_runtime::{DistributedRuntime, Runtime};
use dynamo_runtime::{distributed::DistributedConfig, pipeline::RouterMode};
Expand Down Expand Up @@ -55,11 +56,10 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
let http_service = match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client();
// This allows the /health endpoint to query etcd for active instances
http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone());
http_service_builder = http_service_builder.with_drt(Some(distributed_runtime.clone()));
let http_service = http_service_builder.build()?;
match etcd_client {
match distributed_runtime.etcd_client() {
Some(ref etcd_client) => {
let router_config = engine_config.local_model().router_config();
// Listen for models registering themselves in etcd, add them to HTTP service
Expand All @@ -71,7 +71,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
} else {
Some(namespace.to_string())
};
run_watcher(
run_model_watcher(
distributed_runtime,
http_service.state().manager_clone(),
etcd_client.clone(),
Expand All @@ -84,6 +84,10 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
http_service.state().metrics_clone(),
)
.await?;

// Start dynamic HTTP endpoint watcher
run_endpoint_watcher(etcd_client.clone(), Arc::new(http_service.clone()))
.await?;
}
None => {
// Static endpoints don't need discovery
Expand Down Expand Up @@ -221,7 +225,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
/// Spawns a task that watches for new models in etcd at network_prefix,
/// and registers them with the ModelManager so that the HTTP service can use them.
#[allow(clippy::too_many_arguments)]
async fn run_watcher(
async fn run_model_watcher(
runtime: DistributedRuntime,
model_manager: Arc<ModelManager>,
etcd_client: etcd::Client,
Expand Down Expand Up @@ -265,6 +269,24 @@ async fn run_watcher(
Ok(())
}

/// Spawns a task that watches instance records for dynamic HTTP endpoints and updates the
/// DynamicEndpointWatcher held in the HTTP service state.
async fn run_endpoint_watcher(
etcd_client: etcd::Client,
http_service: Arc<HttpService>,
) -> anyhow::Result<()> {
if let Some(dep_watcher) = http_service.state().dynamic_registry() {
let instances_watcher = etcd_client
.kv_get_and_watch_prefix(INSTANCE_ROOT_PATH)
.await?;
let (_prefix2, _watcher2, instances_rx) = instances_watcher.dissolve();
tokio::spawn(async move {
dep_watcher.watch(instances_rx).await;
});
}
Ok(())
}

/// Updates HTTP service endpoints based on available model types
fn update_http_endpoints(service: Arc<HttpService>, model_type: ModelUpdate) {
tracing::debug!(
Expand Down
2 changes: 2 additions & 0 deletions lib/llm/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
mod openai;

pub mod disconnect;
pub mod dynamic_endpoint;
pub mod dynamic_registry;
pub mod error;
pub mod health;
pub mod metrics;
Expand Down
Loading
Loading