From e7cba4b25c88bf61132b096760c2740b8cedb2e0 Mon Sep 17 00:00:00 2001 From: ayushag Date: Fri, 3 Oct 2025 07:20:18 +0000 Subject: [PATCH 1/2] chore: stream to sink for tool calling Signed-off-by: ayushag --- lib/llm/src/preprocessor.rs | 85 ++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index e786211ae3..7d5c6ac4c8 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -23,6 +23,9 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{collections::HashMap, pin::Pin, sync::Arc}; use tracing; +// Stream recording imports +use serde::{Deserialize, Serialize}; + use crate::model_card::{ModelDeploymentCard, ModelInfo}; use crate::preprocessor::prompt::OAIChatLikeRequest; use crate::protocols::common::preprocessor::PreprocessedRequestBuilder; @@ -94,12 +97,26 @@ impl LLMMetricAnnotation { } } +/// Stream recording entry for archiving postprocessor stream data +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamRecordingEntry { + /// Request ID for correlation + pub request_id: String, + /// Sequence number in the stream + pub sequence: usize, + /// Elapsed time from stream start in milliseconds + pub elapsed_ms: u64, + /// The actual stream response data + pub response_data: Annotated, +} + // Reasoning State for reasoning parsing transformation step struct ReasoningState { stream: Pin> + Send>>, reasoning_parser: Option>, } +#[derive(Clone)] pub struct OpenAIPreprocessor { mdcsum: String, formatter: Arc, @@ -108,6 +125,9 @@ pub struct OpenAIPreprocessor { /// Per-model runtime configuration propagated to response generator (e.g., reasoning/tool parser) runtime_config: crate::local_model::runtime_config::ModelRuntimeConfig, tool_call_parser: Option, + /// Stream recording configuration + stream_recording_enabled: bool, + stream_recording_path: String, } impl OpenAIPreprocessor { @@ -137,6 +157,14 @@ impl OpenAIPreprocessor { // // Initialize runtime config from the ModelDeploymentCard let runtime_config = mdc.runtime_config.clone(); + // Stream recording configuration - enable by default + let stream_recording_enabled = std::env::var("DYNAMO_STREAM_RECORDING_ENABLED") + .unwrap_or_else(|_| "true".to_string()) + .parse() + .unwrap_or(true); + let stream_recording_path = std::env::var("DYNAMO_STREAM_RECORDING_PATH") + .unwrap_or_else(|_| "./stream_recordings".to_string()); + Ok(Arc::new(Self { formatter, tokenizer, @@ -144,6 +172,8 @@ impl OpenAIPreprocessor { mdcsum, runtime_config, tool_call_parser, + stream_recording_enabled, + stream_recording_path, })) } /// Encode a string to it's tokens @@ -800,6 +830,59 @@ impl context.clone(), ); + // Stream Recording: Set up recording if enabled + let stream: Pin + Send>> = if self.stream_recording_enabled { + let request_id_clone = request_id.clone(); + let base_path = self.stream_recording_path.clone(); + let start_time = std::time::Instant::now(); + + // Collect all items from the stream and write to file + use futures::stream::StreamExt; + let items: Vec<_> = stream.collect().await; + + // Write to file + let mut entries = Vec::new(); + for (sequence, item) in items.iter().enumerate() { + let entry = StreamRecordingEntry { + request_id: request_id_clone.clone(), + sequence, + elapsed_ms: start_time.elapsed().as_millis() as u64, + response_data: item.clone(), + }; + entries.push(entry); + } + + // Write to JSON file + if !entries.is_empty() { + let file_path = format!("{}/chat_completion_stream_{}.json", base_path, request_id_clone); + + match tokio::fs::create_dir_all(&base_path).await { + Ok(_) => { + match serde_json::to_string_pretty(&entries) { + Ok(json_content) => { + if let Err(e) = tokio::fs::write(&file_path, json_content).await { + tracing::error!("Failed to write stream recording to {}: {}", file_path, e); + } else { + tracing::info!("Wrote {} stream items to {}", entries.len(), file_path); + } + } + Err(e) => { + tracing::error!("Failed to serialize stream recording for request {}: {}", request_id_clone, e); + } + } + } + Err(e) => { + tracing::error!("Failed to create recording directory {}: {}", base_path, e); + } + } + } + + // Return the collected items as a new stream + Box::pin(futures::stream::iter(items)) + } else { + Box::pin(stream) + }; + // Try to parse reasoning content only if parser is configured let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some(); @@ -816,7 +899,7 @@ impl self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg )) } else { - Box::pin(stream) + stream }; // Check if tools are present and if we should apply jail From 3224daeebdcef68fb1d9a8304b0ff1acf2f21ba7 Mon Sep 17 00:00:00 2001 From: ayushag Date: Fri, 3 Oct 2025 20:43:13 +0000 Subject: [PATCH 2/2] chore: added expected content in recorder Signed-off-by: ayushag --- lib/llm/src/preprocessor.rs | 236 ++++++++++++++++++++++++++++-------- 1 file changed, 185 insertions(+), 51 deletions(-) diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index 7d5c6ac4c8..d375304f95 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -110,6 +110,111 @@ pub struct StreamRecordingEntry { pub response_data: Annotated, } +/// Combined stream content for structured JSON output +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CombinedStreamContent { + /// Request ID for correlation + pub request_id: String, + /// Combined normal content from all stream chunks + pub normal_content: String, + /// Combined reasoning content from all stream chunks + pub reasoning_content: String, + /// Tool calls extracted from the stream + pub tool_calls: Vec, + /// Raw stream data chunks for detailed analysis + pub data: Vec>, +} + +impl CombinedStreamContent { + /// Combine stream items into structured content + pub fn from_stream_items( + request_id: String, + items: &[Annotated], + ) -> Self { + let mut normal_content = String::new(); + let mut reasoning_content = String::new(); + let mut tool_calls = Vec::new(); + + for item in items { + if let Some(ref response) = item.data { + for choice in &response.choices { + // Combine normal content + if let Some(ref content) = choice.delta.content { + normal_content.push_str(content); + } + + // Combine reasoning content + if let Some(ref reasoning) = choice.delta.reasoning_content { + reasoning_content.push_str(reasoning); + } + + // Extract tool calls + if let Some(ref calls) = choice.delta.tool_calls { + for call in calls { + if let Ok(call_json) = serde_json::to_value(call) { + tool_calls.push(call_json); + } + } + } + } + } + } + + Self { + request_id, + normal_content, + reasoning_content, + tool_calls, + data: items.to_vec(), + } + } + + /// Create combined content with both raw chunks and processed chunks + pub fn from_raw_and_processed_items( + request_id: String, + raw_chunks: &[Annotated], + processed_items: &[Annotated], + ) -> Self { + let mut normal_content = String::new(); + let mut reasoning_content = String::new(); + let mut tool_calls = Vec::new(); + + // Extract combined content from processed items (after jail processing) + for item in processed_items { + if let Some(ref response) = item.data { + for choice in &response.choices { + // Combine normal content + if let Some(ref content) = choice.delta.content { + normal_content.push_str(content); + } + + // Combine reasoning content + if let Some(ref reasoning) = choice.delta.reasoning_content { + reasoning_content.push_str(reasoning); + } + + // Extract tool calls + if let Some(ref calls) = choice.delta.tool_calls { + for call in calls { + if let Ok(call_json) = serde_json::to_value(call) { + tool_calls.push(call_json); + } + } + } + } + } + } + + Self { + request_id, + normal_content, + reasoning_content, + tool_calls, + data: raw_chunks.to_vec(), // Use raw chunks for the data field + } + } +} + // Reasoning State for reasoning parsing transformation step struct ReasoningState { stream: Pin> + Send>>, @@ -830,57 +935,17 @@ impl context.clone(), ); - // Stream Recording: Set up recording if enabled - let stream: Pin + Send>> = if self.stream_recording_enabled { - let request_id_clone = request_id.clone(); - let base_path = self.stream_recording_path.clone(); - let start_time = std::time::Instant::now(); - - // Collect all items from the stream and write to file + // Collect raw chunks after transform_postprocessor_stream for recording + let (stream, raw_chunks): ( + Pin> + Send>>, + Option>>, + ) = if self.stream_recording_enabled { use futures::stream::StreamExt; let items: Vec<_> = stream.collect().await; - - // Write to file - let mut entries = Vec::new(); - for (sequence, item) in items.iter().enumerate() { - let entry = StreamRecordingEntry { - request_id: request_id_clone.clone(), - sequence, - elapsed_ms: start_time.elapsed().as_millis() as u64, - response_data: item.clone(), - }; - entries.push(entry); - } - - // Write to JSON file - if !entries.is_empty() { - let file_path = format!("{}/chat_completion_stream_{}.json", base_path, request_id_clone); - - match tokio::fs::create_dir_all(&base_path).await { - Ok(_) => { - match serde_json::to_string_pretty(&entries) { - Ok(json_content) => { - if let Err(e) = tokio::fs::write(&file_path, json_content).await { - tracing::error!("Failed to write stream recording to {}: {}", file_path, e); - } else { - tracing::info!("Wrote {} stream items to {}", entries.len(), file_path); - } - } - Err(e) => { - tracing::error!("Failed to serialize stream recording for request {}: {}", request_id_clone, e); - } - } - } - Err(e) => { - tracing::error!("Failed to create recording directory {}: {}", base_path, e); - } - } - } - - // Return the collected items as a new stream - Box::pin(futures::stream::iter(items)) + let raw_chunks = items.clone(); + (Box::pin(futures::stream::iter(items)), Some(raw_chunks)) } else { - Box::pin(stream) + (Box::pin(stream), None) }; // Try to parse reasoning content only if parser is configured @@ -893,13 +958,13 @@ impl // Future Solution: // To address the limitation if needed in future: move this step before transform_postprocessor_stream and add new field of reasoning_content to the backend output // Use backend_output.reasoning_content field to fill out the deltas. - let stream: Pin + Send>> = if should_parse_reasoning { + let stream = if should_parse_reasoning { Box::pin(Self::parse_reasoning_content_from_stream( stream, self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg - )) + )) as Pin + Send>> } else { - stream + Box::pin(stream) as Pin + Send>> }; // Check if tools are present and if we should apply jail @@ -924,6 +989,75 @@ impl Box::pin(stream) }; + // Stream Recording: Set up recording if enabled (after jail processing where tool calls are parsed) + let transformed_stream: Pin + Send>> = if self + .stream_recording_enabled + { + let request_id_clone = request_id.clone(); + let base_path = self.stream_recording_path.clone(); + + // Collect all items from the processed stream and write to file + use futures::stream::StreamExt; + let processed_items: Vec<_> = transformed_stream.collect().await; + + // Create combined content structure using both raw chunks and processed items + let combined_content = if let Some(raw_chunks) = &raw_chunks { + CombinedStreamContent::from_raw_and_processed_items( + request_id_clone.clone(), + raw_chunks, + &processed_items, + ) + } else { + // Fallback if raw chunks not available + CombinedStreamContent::from_stream_items(request_id_clone.clone(), &processed_items) + }; + + // Write combined content to JSON file + if !processed_items.is_empty() + || raw_chunks.as_ref().is_some_and(|chunks| !chunks.is_empty()) + { + let file_path = format!( + "{}/chat_completion_stream_{}.json", + base_path, request_id_clone + ); + + match tokio::fs::create_dir_all(&base_path).await { + Ok(_) => match serde_json::to_string_pretty(&combined_content) { + Ok(json_content) => { + if let Err(e) = tokio::fs::write(&file_path, json_content).await { + tracing::error!( + "Failed to write stream recording to {}: {}", + file_path, + e + ); + } else { + tracing::info!("Wrote combined stream content to {}", file_path); + } + } + Err(e) => { + tracing::error!( + "Failed to serialize combined stream content for request {}: {}", + request_id_clone, + e + ); + } + }, + Err(e) => { + tracing::error!( + "Failed to create recording directory {}: {}", + base_path, + e + ); + } + } + } + + // Return the collected items as a new stream + Box::pin(futures::stream::iter(processed_items)) + } else { + transformed_stream + }; + // Step 4: Apply audit aggregation strategy let final_stream = if let Some(mut audit) = audit_handle { let (stream, agg_fut) = if audit.streaming() {