Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 220 additions & 3 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use tracing;

// Stream recording imports
use serde::{Deserialize, Serialize};

use crate::model_card::{ModelDeploymentCard, ModelInfo};
use crate::preprocessor::prompt::OAIChatLikeRequest;
use crate::protocols::common::preprocessor::PreprocessedRequestBuilder;
Expand Down Expand Up @@ -94,12 +97,131 @@ impl LLMMetricAnnotation {
}
}

/// Stream recording entry for archiving postprocessor stream data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamRecordingEntry {
/// Request ID for correlation
pub request_id: String,
/// Sequence number in the stream
pub sequence: usize,
/// Elapsed time from stream start in milliseconds
pub elapsed_ms: u64,
/// The actual stream response data
pub response_data: Annotated<NvCreateChatCompletionStreamResponse>,
}

/// Combined stream content for structured JSON output
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CombinedStreamContent {
/// Request ID for correlation
pub request_id: String,
/// Combined normal content from all stream chunks
pub normal_content: String,
/// Combined reasoning content from all stream chunks
pub reasoning_content: String,
/// Tool calls extracted from the stream
pub tool_calls: Vec<serde_json::Value>,
/// Raw stream data chunks for detailed analysis
pub data: Vec<Annotated<NvCreateChatCompletionStreamResponse>>,
}

impl CombinedStreamContent {
/// Combine stream items into structured content
pub fn from_stream_items(
request_id: String,
items: &[Annotated<NvCreateChatCompletionStreamResponse>],
) -> Self {
let mut normal_content = String::new();
let mut reasoning_content = String::new();
let mut tool_calls = Vec::new();

for item in items {
if let Some(ref response) = item.data {
for choice in &response.choices {
// Combine normal content
if let Some(ref content) = choice.delta.content {
normal_content.push_str(content);
}

// Combine reasoning content
if let Some(ref reasoning) = choice.delta.reasoning_content {
reasoning_content.push_str(reasoning);
}

// Extract tool calls
if let Some(ref calls) = choice.delta.tool_calls {
for call in calls {
if let Ok(call_json) = serde_json::to_value(call) {
tool_calls.push(call_json);
}
}
}
}
}
}

Self {
request_id,
normal_content,
reasoning_content,
tool_calls,
data: items.to_vec(),
}
}

/// Create combined content with both raw chunks and processed chunks
pub fn from_raw_and_processed_items(
request_id: String,
raw_chunks: &[Annotated<NvCreateChatCompletionStreamResponse>],
processed_items: &[Annotated<NvCreateChatCompletionStreamResponse>],
) -> Self {
let mut normal_content = String::new();
let mut reasoning_content = String::new();
let mut tool_calls = Vec::new();

// Extract combined content from processed items (after jail processing)
for item in processed_items {
if let Some(ref response) = item.data {
for choice in &response.choices {
// Combine normal content
if let Some(ref content) = choice.delta.content {
normal_content.push_str(content);
}

// Combine reasoning content
if let Some(ref reasoning) = choice.delta.reasoning_content {
reasoning_content.push_str(reasoning);
}

// Extract tool calls
if let Some(ref calls) = choice.delta.tool_calls {
for call in calls {
if let Ok(call_json) = serde_json::to_value(call) {
tool_calls.push(call_json);
}
}
}
}
}
}

Self {
request_id,
normal_content,
reasoning_content,
tool_calls,
data: raw_chunks.to_vec(), // Use raw chunks for the data field
}
}
}

// Reasoning State for reasoning parsing transformation step
struct ReasoningState {
stream: Pin<Box<dyn Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>>,
reasoning_parser: Option<Box<dyn ReasoningParser>>,
}

#[derive(Clone)]
pub struct OpenAIPreprocessor {
mdcsum: String,
formatter: Arc<dyn OAIPromptFormatter>,
Expand All @@ -108,6 +230,9 @@ pub struct OpenAIPreprocessor {
/// Per-model runtime configuration propagated to response generator (e.g., reasoning/tool parser)
runtime_config: crate::local_model::runtime_config::ModelRuntimeConfig,
tool_call_parser: Option<String>,
/// Stream recording configuration
stream_recording_enabled: bool,
stream_recording_path: String,
}

impl OpenAIPreprocessor {
Expand Down Expand Up @@ -137,13 +262,23 @@ impl OpenAIPreprocessor {
// // Initialize runtime config from the ModelDeploymentCard
let runtime_config = mdc.runtime_config.clone();

// Stream recording configuration - enable by default
let stream_recording_enabled = std::env::var("DYNAMO_STREAM_RECORDING_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.parse()
.unwrap_or(true);
let stream_recording_path = std::env::var("DYNAMO_STREAM_RECORDING_PATH")
.unwrap_or_else(|_| "./stream_recordings".to_string());

Ok(Arc::new(Self {
formatter,
tokenizer,
model_info,
mdcsum,
runtime_config,
tool_call_parser,
stream_recording_enabled,
stream_recording_path,
}))
}
/// Encode a string to it's tokens
Expand Down Expand Up @@ -800,6 +935,19 @@ impl
context.clone(),
);

// Collect raw chunks after transform_postprocessor_stream for recording
let (stream, raw_chunks): (
Pin<Box<dyn Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>>,
Option<Vec<Annotated<NvCreateChatCompletionStreamResponse>>>,
) = if self.stream_recording_enabled {
use futures::stream::StreamExt;
let items: Vec<_> = stream.collect().await;
let raw_chunks = items.clone();
(Box::pin(futures::stream::iter(items)), Some(raw_chunks))
} else {
(Box::pin(stream), None)
};

// Try to parse reasoning content only if parser is configured
let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some();

Expand All @@ -810,13 +958,13 @@ impl
// Future Solution:
// To address the limitation if needed in future: move this step before transform_postprocessor_stream and add new field of reasoning_content to the backend output
// Use backend_output.reasoning_content field to fill out the deltas.
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_parse_reasoning {
let stream = if should_parse_reasoning {
Box::pin(Self::parse_reasoning_content_from_stream(
stream,
self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg
))
)) as Pin<Box<dyn Stream<Item = _> + Send>>
} else {
Box::pin(stream)
Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
};

// Check if tools are present and if we should apply jail
Expand All @@ -841,6 +989,75 @@ impl
Box::pin(stream)
};

// Stream Recording: Set up recording if enabled (after jail processing where tool calls are parsed)
let transformed_stream: Pin<Box<dyn Stream<Item = _> + Send>> = if self
.stream_recording_enabled
{
let request_id_clone = request_id.clone();
let base_path = self.stream_recording_path.clone();

// Collect all items from the processed stream and write to file
use futures::stream::StreamExt;
let processed_items: Vec<_> = transformed_stream.collect().await;

// Create combined content structure using both raw chunks and processed items
let combined_content = if let Some(raw_chunks) = &raw_chunks {
CombinedStreamContent::from_raw_and_processed_items(
request_id_clone.clone(),
raw_chunks,
&processed_items,
)
} else {
// Fallback if raw chunks not available
CombinedStreamContent::from_stream_items(request_id_clone.clone(), &processed_items)
};

// Write combined content to JSON file
if !processed_items.is_empty()
|| raw_chunks.as_ref().is_some_and(|chunks| !chunks.is_empty())
{
let file_path = format!(
"{}/chat_completion_stream_{}.json",
base_path, request_id_clone
);

match tokio::fs::create_dir_all(&base_path).await {
Ok(_) => match serde_json::to_string_pretty(&combined_content) {
Ok(json_content) => {
if let Err(e) = tokio::fs::write(&file_path, json_content).await {
tracing::error!(
"Failed to write stream recording to {}: {}",
file_path,
e
);
} else {
tracing::info!("Wrote combined stream content to {}", file_path);
}
}
Err(e) => {
tracing::error!(
"Failed to serialize combined stream content for request {}: {}",
request_id_clone,
e
);
}
},
Err(e) => {
tracing::error!(
"Failed to create recording directory {}: {}",
base_path,
e
);
}
}
}

// Return the collected items as a new stream
Box::pin(futures::stream::iter(processed_items))
} else {
transformed_stream
};

// Step 4: Apply audit aggregation strategy
let final_stream = if let Some(mut audit) = audit_handle {
let (stream, agg_fut) = if audit.streaming() {
Expand Down
Loading