Skip to content

Commit d89ce0d

Browse files
qimcisayushag-nv
authored andcommitted
feat: align OpenAI response IDs with distributed trace IDs (#2496)
Signed-off-by: ayushag <[email protected]>
1 parent 000d91b commit d89ce0d

File tree

7 files changed

+60
-93
lines changed

7 files changed

+60
-93
lines changed

lib/engines/mistralrs/src/lib.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ impl MistralRsEngine {
212212

213213
// Perform warmup request
214214
let (tx, mut rx) = channel(1);
215-
let request_id = engine.mistralrs.next_request_id();
215+
let mistralrs_request_id = engine.mistralrs.next_request_id();
216216
let warmup_request = Request::Normal(Box::new(NormalRequest {
217-
id: request_id,
217+
id: mistralrs_request_id,
218218
model_id: Some(display_name.to_string()),
219219
messages: RequestMessage::Chat {
220220
messages: vec![IndexMap::from([
@@ -246,10 +246,10 @@ impl MistralRsEngine {
246246
{
247247
match response.as_result() {
248248
Ok(r) => {
249-
tracing::debug!(request_id, "Warmup response: {r:?}");
249+
tracing::debug!(mistralrs_request_id, "Warmup response: {r:?}");
250250
}
251251
Err(err) => {
252-
tracing::error!(request_id, %err, "Failed converting response to result.");
252+
tracing::error!(mistralrs_request_id, %err, "Failed converting response to result.");
253253
}
254254
}
255255
}
@@ -272,6 +272,7 @@ impl
272272
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
273273
let (request, context) = request.transfer(());
274274
let ctx = context.context();
275+
let request_id = ctx.id().to_string();
275276
let (tx, mut rx) = channel(10_000);
276277

277278
let mut messages = vec![];
@@ -338,9 +339,9 @@ impl
338339
n_choices: 1,
339340
dry_params: det.dry_params,
340341
};
341-
let request_id = self.mistralrs.next_request_id();
342+
let mistralrs_request_id = self.mistralrs.next_request_id();
342343
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
343-
id: request_id,
344+
id: mistralrs_request_id,
344345
model_id: Some(self.display_name.clone()),
345346
messages: RequestMessage::Chat {
346347
messages,
@@ -369,14 +370,14 @@ impl
369370
let response = match response.as_result() {
370371
Ok(r) => r,
371372
Err(err) => {
372-
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
373+
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
373374
break;
374375
}
375376
};
376377
match response {
377378
ResponseOk::Chunk(c) => {
378379
let Some(from_assistant) = c.choices[0].delta.content.clone() else {
379-
tracing::warn!(request_id, "No content from mistralrs. Abandoning request.");
380+
tracing::warn!(mistralrs_request_id, "No content from mistralrs. Abandoning request.");
380381
break;
381382
};
382383
let finish_reason = match &c.choices[0].finish_reason.as_deref() {
@@ -387,7 +388,7 @@ impl
387388
Some(FinishReason::Length)
388389
}
389390
Some(s) => {
390-
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
391+
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
391392
Some(FinishReason::Stop)
392393
}
393394
None => None,
@@ -396,7 +397,7 @@ impl
396397

397398
#[allow(deprecated)]
398399
let delta = NvCreateChatCompletionStreamResponse {
399-
id: c.id,
400+
id: format!("chatcmpl-{request_id}"),
400401
choices: vec![dynamo_async_openai::types::ChatChoiceStream{
401402
index: 0,
402403
delta: dynamo_async_openai::types::ChatCompletionStreamResponseDelta{
@@ -427,11 +428,11 @@ impl
427428
yield ann;
428429

429430
if finish_reason.is_some() {
430-
//tracing::trace!(request_id, "Finish reason: {finish_reason:?}");
431+
//tracing::trace!(mistralrs_request_id, "Finish reason: {finish_reason:?}");
431432
break;
432433
}
433434
},
434-
x => tracing::error!(request_id, "Unhandled. {x:?}"),
435+
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
435436
}
436437
}
437438
};
@@ -485,7 +486,7 @@ impl
485486
let (request, context) = request.transfer(());
486487
let ctx = context.context();
487488
let (tx, mut rx) = channel(10_000);
488-
let response_generator = request.response_generator();
489+
let response_generator = request.response_generator(ctx.id().to_string());
489490

490491
let messages = RequestMessage::Completion {
491492
text: prompt_to_string(&request.inner.prompt),
@@ -539,9 +540,9 @@ impl
539540
dry_params: det.dry_params,
540541
};
541542

542-
let request_id = self.mistralrs.next_request_id();
543+
let mistralrs_request_id = self.mistralrs.next_request_id();
543544
let mistralrs_request = Request::Normal(Box::new(NormalRequest {
544-
id: request_id,
545+
id: mistralrs_request_id,
545546
model_id: Some(self.display_name.clone()),
546547
messages,
547548
sampling_params,
@@ -567,7 +568,7 @@ impl
567568
let response = match response.as_result() {
568569
Ok(r) => r,
569570
Err(err) => {
570-
tracing::error!(request_id, %err, "Failed converting mistralrs channel response to result.");
571+
tracing::error!(mistralrs_request_id, %err, "Failed converting mistralrs channel response to result.");
571572
break;
572573
}
573574
};
@@ -583,7 +584,7 @@ impl
583584
Some(FinishReason::Length)
584585
}
585586
Some(s) => {
586-
tracing::warn!(request_id, stop_reason = s, "Unknow stop reason");
587+
tracing::warn!(mistralrs_request_id, stop_reason = s, "Unknow stop reason");
587588
Some(FinishReason::Stop)
588589
}
589590
None => None,
@@ -602,7 +603,7 @@ impl
602603
break;
603604
}
604605
},
605-
x => tracing::error!(request_id, "Unhandled. {x:?}"),
606+
x => tracing::error!(mistralrs_request_id, "Unhandled. {x:?}"),
606607
}
607608
}
608609
};

lib/llm/src/engines.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
1414
use dynamo_runtime::protocols::annotated::Annotated;
1515

1616
use crate::backend::ExecutionContext;
17-
use crate::local_model::runtime_config;
1817
use crate::preprocessor::PreprocessedRequest;
1918
use crate::protocols::common::llm_backend::LLMEngineOutput;
2019
use crate::protocols::openai::{
@@ -184,8 +183,8 @@ impl
184183
incoming_request: SingleIn<NvCreateChatCompletionRequest>,
185184
) -> Result<ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>, Error> {
186185
let (request, context) = incoming_request.transfer(());
187-
let mut deltas = request.response_generator(runtime_config::ModelRuntimeConfig::default());
188186
let ctx = context.context();
187+
let mut deltas = request.response_generator(ctx.id().to_string());
189188
let req = request.inner.messages.into_iter().next_back().unwrap();
190189

191190
let prompt = match req {
@@ -231,8 +230,8 @@ impl
231230
incoming_request: SingleIn<NvCreateCompletionRequest>,
232231
) -> Result<ManyOut<Annotated<NvCreateCompletionResponse>>, Error> {
233232
let (request, context) = incoming_request.transfer(());
234-
let deltas = request.response_generator();
235233
let ctx = context.context();
234+
let deltas = request.response_generator(ctx.id().to_string());
236235
let chars_string = prompt_to_string(&request.inner.prompt);
237236
let output = stream! {
238237
let mut id = 1;

lib/llm/src/http/service/openai.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,7 @@ async fn completions(
253253
// return a 503 if the service is not ready
254254
check_ready(&state)?;
255255

256-
// todo - extract distributed tracing id and context id from headers
257-
let request_id = uuid::Uuid::new_v4().to_string();
256+
let request_id = request.id().to_string();
258257

259258
// todo - decide on default
260259
let streaming = request.inner.stream.unwrap_or(false);
@@ -354,13 +353,15 @@ async fn completions(
354353
#[tracing::instrument(skip_all)]
355354
async fn embeddings(
356355
State(state): State<Arc<service_v2::State>>,
356+
headers: HeaderMap,
357357
Json(request): Json<NvCreateEmbeddingRequest>,
358358
) -> Result<Response, ErrorResponse> {
359359
// return a 503 if the service is not ready
360360
check_ready(&state)?;
361361

362-
// todo - extract distributed tracing id and context id from headers
363-
let request_id = uuid::Uuid::new_v4().to_string();
362+
let request_id = get_or_create_request_id(request.inner.user.as_deref(), &headers);
363+
let request = Context::with_id(request, request_id);
364+
let request_id = request.id().to_string();
364365

365366
// Embeddings are typically not streamed, so we default to non-streaming
366367
let streaming = false;
@@ -381,10 +382,6 @@ async fn embeddings(
381382
.metrics_clone()
382383
.create_inflight_guard(model, Endpoint::Embeddings, streaming);
383384

384-
// setup context
385-
// todo - inherit request_id from distributed trace details
386-
let request = Context::with_id(request, request_id.clone());
387-
388385
// issue the generate call on the engine
389386
let stream = engine
390387
.generate(request)

lib/llm/src/preprocessor.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
2222
use std::{collections::HashMap, sync::Arc};
2323
use tracing;
2424

25-
use crate::local_model::runtime_config::ModelRuntimeConfig;
2625
use crate::model_card::{ModelDeploymentCard, ModelInfo, TokenizerKind};
2726
use crate::preprocessor::prompt::OAIChatLikeRequest;
2827
use crate::tokenizers::Encoding;
@@ -95,7 +94,6 @@ pub struct OpenAIPreprocessor {
9594
formatter: Arc<dyn OAIPromptFormatter>,
9695
tokenizer: Arc<dyn Tokenizer>,
9796
model_info: Arc<dyn ModelInfo>,
98-
runtime_config: ModelRuntimeConfig,
9997
}
10098

10199
impl OpenAIPreprocessor {
@@ -123,14 +121,11 @@ impl OpenAIPreprocessor {
123121
};
124122
let model_info = model_info.get_model_info().await?;
125123

126-
let runtime_config = mdc.runtime_config.clone();
127-
128124
Ok(Arc::new(Self {
129125
formatter,
130126
tokenizer,
131127
model_info,
132128
mdcsum,
133-
runtime_config,
134129
}))
135130
}
136131

@@ -499,7 +494,7 @@ impl
499494
let (request, context) = request.into_parts();
500495

501496
// create a response generator
502-
let response_generator = request.response_generator(self.runtime_config.clone());
497+
let response_generator = request.response_generator(context.id().to_string());
503498
let mut response_generator = Box::new(response_generator);
504499

505500
// convert the chat completion request to a common completion request
@@ -553,7 +548,7 @@ impl
553548
let (request, context) = request.into_parts();
554549

555550
// create a response generator
556-
let response_generator = request.response_generator();
551+
let response_generator = request.response_generator(context.id().to_string());
557552
let mut response_generator = Box::new(response_generator);
558553
// convert the chat completion request to a common completion request
559554
let (common_request, annotations) = self.preprocess_request(&request)?;

lib/llm/src/protocols/openai/chat_completions/delta.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,32 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
5-
64
use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse};
75
use crate::{
8-
local_model::runtime_config,
6+
local_model::runtime_config::ModelRuntimeConfig,
97
protocols::common::{self},
108
types::TokenIdType,
119
};
10+
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
1211

1312
/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
1413
impl NvCreateChatCompletionRequest {
1514
/// Creates a [`DeltaGenerator`] instance based on the chat completion request.
1615
///
16+
/// # Arguments
17+
/// * `request_id` - The request ID to use for the chat completion response ID.
18+
///
1719
/// # Returns
1820
/// * [`DeltaGenerator`] configured with model name and response options.
19-
pub fn response_generator(
20-
&self,
21-
runtime_config: runtime_config::ModelRuntimeConfig,
22-
) -> DeltaGenerator {
21+
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
2322
let options = DeltaGeneratorOptions {
2423
enable_usage: true,
2524
enable_logprobs: self.inner.logprobs.unwrap_or(false)
2625
|| self.inner.top_logprobs.unwrap_or(0) > 0,
27-
runtime_config,
26+
runtime_config: ModelRuntimeConfig::default(),
2827
};
2928

30-
DeltaGenerator::new(self.inner.model.clone(), options)
29+
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
3130
}
3231
}
3332

@@ -39,7 +38,7 @@ pub struct DeltaGeneratorOptions {
3938
/// Determines whether log probabilities should be included in the response.
4039
pub enable_logprobs: bool,
4140

42-
pub runtime_config: runtime_config::ModelRuntimeConfig,
41+
pub runtime_config: ModelRuntimeConfig,
4342
}
4443

4544
/// Generates incremental chat completion responses in a streaming fashion.
@@ -74,10 +73,11 @@ impl DeltaGenerator {
7473
/// # Arguments
7574
/// * `model` - The model name used for response generation.
7675
/// * `options` - Configuration options for enabling usage and log probabilities.
76+
/// * `request_id` - The request ID to use for the chat completion response.
7777
///
7878
/// # Returns
7979
/// * A new instance of [`DeltaGenerator`].
80-
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
80+
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
8181
let now = std::time::SystemTime::now()
8282
.duration_since(std::time::UNIX_EPOCH)
8383
.unwrap()
@@ -108,8 +108,10 @@ impl DeltaGenerator {
108108
.unwrap_or("basic"),
109109
);
110110

111+
let chatcmpl_id = format!("chatcmpl-{request_id}");
112+
111113
Self {
112-
id: format!("chatcmpl-{}", uuid::Uuid::new_v4()),
114+
id: chatcmpl_id,
113115
object: "chat.completion.chunk".to_string(),
114116
created: now,
115117
model,

lib/llm/src/protocols/openai/completions/delta.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,19 @@
11
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
3-
//
4-
// Licensed under the Apache License, Version 2.0 (the "License");
5-
// you may not use this file except in compliance with the License.
6-
// You may obtain a copy of the License at
7-
//
8-
// http://www.apache.org/licenses/LICENSE-2.0
9-
//
10-
// Unless required by applicable law or agreed to in writing, software
11-
// distributed under the License is distributed on an "AS IS" BASIS,
12-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
// See the License for the specific language governing permissions and
14-
// limitations under the License.
153

164
use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
175
use crate::{protocols::common, types::TokenIdType};
186

197
impl NvCreateCompletionRequest {
208
// put this method on the request
219
// inspect the request to extract options
22-
pub fn response_generator(&self) -> DeltaGenerator {
10+
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
2311
let options = DeltaGeneratorOptions {
2412
enable_usage: true,
2513
enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
2614
};
2715

28-
DeltaGenerator::new(self.inner.model.clone(), options)
16+
DeltaGenerator::new(self.inner.model.clone(), options, request_id)
2917
}
3018
}
3119

@@ -47,7 +35,7 @@ pub struct DeltaGenerator {
4735
}
4836

4937
impl DeltaGenerator {
50-
pub fn new(model: String, options: DeltaGeneratorOptions) -> Self {
38+
pub fn new(model: String, options: DeltaGeneratorOptions, request_id: String) -> Self {
5139
let now = std::time::SystemTime::now()
5240
.duration_since(std::time::UNIX_EPOCH)
5341
.unwrap()
@@ -67,8 +55,10 @@ impl DeltaGenerator {
6755
prompt_tokens_details: None,
6856
};
6957

58+
let completion_id = format!("cmpl-{request_id}");
59+
7060
Self {
71-
id: format!("cmpl-{}", uuid::Uuid::new_v4()),
61+
id: completion_id,
7262
object: "text_completion".to_string(),
7363
created: now,
7464
model,

0 commit comments

Comments
 (0)