diff --git a/codex-rs/core/src/chat_completions.rs b/codex-rs/core/src/chat_completions.rs index 816fc80f9b..e37b550aef 100644 --- a/codex-rs/core/src/chat_completions.rs +++ b/codex-rs/core/src/chat_completions.rs @@ -426,6 +426,12 @@ where // will never appear in a Chat Completions stream. continue; } + Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_)))) + | Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => { + // Deltas are ignored here since aggregation waits for the + // final OutputItemDone. + continue; + } } } } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 2fa182cf7f..6d54270a29 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -197,6 +197,10 @@ impl ModelClient { } } } + + pub fn streaming_enabled(&self) -> bool { + self.config.streaming_enabled + } } #[derive(Debug, Deserialize, Serialize)] @@ -205,6 +209,7 @@ struct SseEvent { kind: String, response: Option, item: Option, + delta: Option, } #[derive(Debug, Deserialize)] @@ -315,7 +320,7 @@ where // duplicated `output` array embedded in the `response.completed` // payload. That produced two concrete issues: // 1. No real‑time streaming – the user only saw output after the - // entire turn had finished, which broke the “typing” UX and + // entire turn had finished, which broke the "typing" UX and // made long‑running turns look stalled. // 2. Duplicate `function_call_output` items – both the // individual *and* the completed array were forwarded, which @@ -337,6 +342,22 @@ where return; } } + "response.output_text.delta" => { + if let Some(delta) = event.delta { + let event = ResponseEvent::OutputTextDelta(delta); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + } + } + "response.reasoning_summary_text.delta" => { + if let Some(delta) = event.delta { + let event = ResponseEvent::ReasoningSummaryDelta(delta); + if tx_event.send(Ok(event)).await.is_err() { + return; + } + } + } "response.created" => { if event.response.is_some() { let _ = tx_event.send(Ok(ResponseEvent::Created {})).await; @@ -360,10 +381,8 @@ where | "response.function_call_arguments.delta" | "response.in_progress" | "response.output_item.added" - | "response.output_text.delta" | "response.output_text.done" | "response.reasoning_summary_part.added" - | "response.reasoning_summary_text.delta" | "response.reasoning_summary_text.done" => { // Currently, we ignore these events, but we handle them // separately to skip the logging message in the `other` case. diff --git a/codex-rs/core/src/client_common.rs b/codex-rs/core/src/client_common.rs index f9a816a7a9..1014ac1270 100644 --- a/codex-rs/core/src/client_common.rs +++ b/codex-rs/core/src/client_common.rs @@ -53,6 +53,10 @@ impl Prompt { pub enum ResponseEvent { Created, OutputItemDone(ResponseItem), + /// Streaming text from an assistant message. + OutputTextDelta(String), + /// Streaming text from a reasoning summary. + ReasoningSummaryDelta(String), Completed { response_id: String, token_usage: Option, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 52c37c51ee..bb035e6790 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1121,19 +1121,15 @@ async fn try_run_turn( let mut stream = sess.client.clone().stream(&prompt).await?; - // Buffer all the incoming messages from the stream first, then execute them. - // If we execute a function call in the middle of handling the stream, it can time out. - let mut input = Vec::new(); - while let Some(event) = stream.next().await { - input.push(event?); - } - let mut output = Vec::new(); - for event in input { + // Patch: buffer for non-streaming mode + let mut assistant_message_buf = String::new(); + let streaming_enabled = sess.client.streaming_enabled(); + while let Some(event) = stream.next().await { + let event = event?; match event { ResponseEvent::Created => { let mut state = sess.state.lock().unwrap(); - // We successfully created a new response and ensured that all pending calls were included so we can clear the pending call ids. state.pending_call_ids.clear(); } ResponseEvent::OutputItemDone(item) => { @@ -1146,18 +1142,59 @@ async fn try_run_turn( _ => None, }; if let Some(call_id) = call_id { - // We just got a new call id so we need to make sure to respond to it in the next turn. let mut state = sess.state.lock().unwrap(); state.pending_call_ids.insert(call_id.clone()); } - let response = handle_response_item(sess, sub_id, item.clone()).await?; - + // Patch: buffer assistant message text if streaming is disabled + if !streaming_enabled { + if let ResponseItem::Message { role, content } = &item { + if role == "assistant" { + for c in content { + if let ContentItem::OutputText { text } = c { + assistant_message_buf.push_str(text); + } + } + } + } + } + let response = match &item { + ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } => None, + _ => handle_response_item(sess, sub_id, item.clone()).await?, + }; output.push(ProcessedResponseItem { item, response }); } + ResponseEvent::OutputTextDelta(text) => { + if streaming_enabled { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentMessageDelta(AgentMessageEvent { message: text }), + }; + sess.tx_event.send(event).await.ok(); + } else { + assistant_message_buf.push_str(&text); + } + } + ResponseEvent::ReasoningSummaryDelta(text) => { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }), + }; + sess.tx_event.send(event).await.ok(); + } ResponseEvent::Completed { response_id, token_usage, } => { + // Patch: emit full message if we buffered deltas + if !streaming_enabled && !assistant_message_buf.is_empty() { + let event = Event { + id: sub_id.to_string(), + msg: EventMsg::AgentMessage(AgentMessageEvent { + message: assistant_message_buf.clone(), + }), + }; + sess.tx_event.send(event).await.ok(); + } if let Some(token_usage) = token_usage { sess.tx_event .send(Event { diff --git a/codex-rs/core/src/config.rs b/codex-rs/core/src/config.rs index d67e692fc8..200d9d5eaa 100644 --- a/codex-rs/core/src/config.rs +++ b/codex-rs/core/src/config.rs @@ -131,6 +131,13 @@ pub struct Config { /// request using the Responses API. pub model_reasoning_summary: ReasoningSummary, + /// Whether to surface live streaming delta events in front-ends. When `true` + /// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta` + /// events and UIs may show a typing indicator. When `false` Codex UIs should + /// ignore delta events and rely solely on the final aggregated + /// `AgentMessage`/`AgentReasoning` items (legacy behaviour). + pub streaming_enabled: bool, + /// When set to `true`, overrides the default heuristic and forces /// `model_supports_reasoning_summaries()` to return `true`. pub model_supports_reasoning_summaries: bool, @@ -321,6 +328,13 @@ pub struct ConfigToml { /// Base URL for requests to ChatGPT (as opposed to the OpenAI API). pub chatgpt_base_url: Option, + + /// Whether to surface live streaming delta events in front-ends. When `true` + /// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta` + /// events and UIs may show a typing indicator. When `false` Codex UIs should + /// ignore delta events and rely solely on the final aggregated + /// `AgentMessage`/`AgentReasoning` items (legacy behaviour). + pub streaming: Option, } impl ConfigToml { @@ -486,6 +500,7 @@ impl Config { .or(cfg.model_reasoning_summary) .unwrap_or_default(), + streaming_enabled: cfg.streaming.unwrap_or(true), model_supports_reasoning_summaries: cfg .model_supports_reasoning_summaries .unwrap_or(false), @@ -798,6 +813,7 @@ disable_response_storage = true hide_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::High, model_reasoning_summary: ReasoningSummary::Detailed, + streaming_enabled: true, model_supports_reasoning_summaries: false, chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), }, @@ -844,6 +860,7 @@ disable_response_storage = true hide_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), + streaming_enabled: true, model_supports_reasoning_summaries: false, chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), }; @@ -905,6 +922,7 @@ disable_response_storage = true hide_agent_reasoning: false, model_reasoning_effort: ReasoningEffort::default(), model_reasoning_summary: ReasoningSummary::default(), + streaming_enabled: true, model_supports_reasoning_summaries: false, chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), }; diff --git a/codex-rs/core/src/config_types.rs b/codex-rs/core/src/config_types.rs index 83fe613c86..d78bd10eae 100644 --- a/codex-rs/core/src/config_types.rs +++ b/codex-rs/core/src/config_types.rs @@ -150,7 +150,7 @@ pub type EnvironmentVariablePattern = WildMatchPattern<'*', '?'>; /// Deriving the `env` based on this policy works as follows: /// 1. Create an initial map based on the `inherit` policy. /// 2. If `ignore_default_excludes` is false, filter the map using the default -/// exclude pattern(s), which are: `"*KEY*"` and `"*TOKEN*"`. +/// exclude pattern(s), which are: "*KEY*" and "*TOKEN*". /// 3. If `exclude` is not empty, filter the map using the provided patterns. /// 4. Insert any entries from `r#set` into the map. /// 5. If non-empty, filter the map using the `include_only` patterns. @@ -228,3 +228,10 @@ pub enum ReasoningSummary { /// Option to disable reasoning summaries. None, } + +// --------------------------------------------------------------------------- +// NOTE: The canonical ConfigToml definition lives in `crate::config`. +// Historically this file accidentally re-declared that struct, which caused +// drift and confusion. The duplicate has been removed; please use +// `codex_core::config::ConfigToml` instead. +// --------------------------------------------------------------------------- diff --git a/codex-rs/core/src/protocol.rs b/codex-rs/core/src/protocol.rs index fa25a2fe38..d3e5041233 100644 --- a/codex-rs/core/src/protocol.rs +++ b/codex-rs/core/src/protocol.rs @@ -282,9 +282,15 @@ pub enum EventMsg { /// Agent text output message AgentMessage(AgentMessageEvent), + /// Incremental assistant text delta + AgentMessageDelta(AgentMessageEvent), + /// Reasoning event from agent. AgentReasoning(AgentReasoningEvent), + /// Incremental reasoning text delta. + AgentReasoningDelta(AgentReasoningEvent), + /// Ack the client's configure message. SessionConfigured(SessionConfiguredEvent), diff --git a/codex-rs/core/tests/cli_stream.rs b/codex-rs/core/tests/cli_stream.rs index df3fedfd48..ec9320c1c5 100644 --- a/codex-rs/core/tests/cli_stream.rs +++ b/codex-rs/core/tests/cli_stream.rs @@ -58,6 +58,8 @@ async fn chat_mode_stream_cli() { .arg(&provider_override) .arg("-c") .arg("model_provider=\"mock\"") + .arg("-c") + .arg("streaming=false") .arg("-C") .arg(env!("CARGO_MANIFEST_DIR")) .arg("hello?"); @@ -71,8 +73,8 @@ async fn chat_mode_stream_cli() { println!("Stderr:\n{}", String::from_utf8_lossy(&output.stderr)); assert!(output.status.success()); let stdout = String::from_utf8_lossy(&output.stdout); - assert!(stdout.contains("hi")); - assert_eq!(stdout.matches("hi").count(), 1); + let hi_lines = stdout.lines().filter(|line| line.trim() == "hi").count(); + assert_eq!(hi_lines, 1, "Expected exactly one line with 'hi'"); server.verify().await; } @@ -104,6 +106,8 @@ async fn responses_api_stream_cli() { .arg("--") .arg("exec") .arg("--skip-git-repo-check") + .arg("-c") + .arg("streaming=false") .arg("-C") .arg(env!("CARGO_MANIFEST_DIR")) .arg("hello?"); @@ -117,3 +121,188 @@ async fn responses_api_stream_cli() { let stdout = String::from_utf8_lossy(&output.stdout); assert!(stdout.contains("fixture hello")); } + +/// Tests chat completions with streaming enabled (streaming=true) through the CLI using a mock server. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn chat_mode_streaming_enabled_cli() { + if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let server = MockServer::start().await; + // Simulate streaming deltas: 'h' and 'i' as separate chunks + let sse = concat!( + "data: {\"choices\":[{\"delta\":{\"content\":\"h\"}}]}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"i\"}}]}\n\n", + "data: {\"choices\":[{\"delta\":{}}]}\n\n", + "data: [DONE]\n\n" + ); + Mock::given(method("POST")) + .and(path("/v1/chat/completions")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_raw(sse, "text/event-stream"), + ) + .expect(1) + .mount(&server) + .await; + + let home = TempDir::new().unwrap(); + let provider_override = format!( + "model_providers.mock={{ name = \"mock\", base_url = \"{}/v1\", env_key = \"PATH\", wire_api = \"chat\" }}", + server.uri() + ); + let mut cmd = AssertCommand::new("cargo"); + cmd.arg("run") + .arg("-p") + .arg("codex-cli") + .arg("--quiet") + .arg("--") + .arg("exec") + .arg("--skip-git-repo-check") + .arg("-c") + .arg(&provider_override) + .arg("-c") + .arg("model_provider=\"mock\"") + .arg("-c") + .arg("streaming=true") + .arg("-C") + .arg(env!("CARGO_MANIFEST_DIR")) + .arg("hello?"); + cmd.env("CODEX_HOME", home.path()) + .env("OPENAI_API_KEY", "dummy") + .env("OPENAI_BASE_URL", format!("{}/v1", server.uri())); + + let output = cmd.output().unwrap(); + assert!(output.status.success()); + let stdout = String::from_utf8_lossy(&output.stdout); + // Assert that 'h' and 'i' are output as two separate chunks from stdout, not as a single chunk + // We split the output on 'h' and 'i' and check their order and separation + let mut chunks = Vec::new(); + let mut last = 0; + for (idx, c) in stdout.char_indices() { + if c == 'h' || c == 'i' { + if last != idx { + let chunk = &stdout[last..idx]; + if !chunk.trim().is_empty() { + chunks.push(chunk); + } + } + chunks.push(&stdout[idx..idx + c.len_utf8()]); + last = idx + c.len_utf8(); + } + } + if last < stdout.len() { + let chunk = &stdout[last..]; + if !chunk.trim().is_empty() { + chunks.push(chunk); + } + } + // Only keep the 'h' and 'i' chunks + let delta_chunks: Vec<&str> = chunks + .iter() + .cloned() + .filter(|s| *s == "h" || *s == "i") + .collect(); + assert_eq!( + delta_chunks, + vec!["h", "i"], + "Expected two separate delta chunks 'h' and 'i' from stdout" + ); + + server.verify().await; +} + +/// Tests responses API with streaming enabled (streaming=true) through the CLI using a local SSE fixture file. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_api_streaming_enabled_cli() { + if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + // Create a fixture with two deltas: 'fixture ' and 'hello' + use std::fs; + use std::io::Write; + let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .join("tests/cli_responses_fixture_streaming.sse"); + let mut fixture_file = fs::File::create(&fixture_path).unwrap(); + writeln!(fixture_file, "event: response.created").unwrap(); + writeln!( + fixture_file, + "data: {{\"type\":\"response.created\",\"response\":{{\"id\":\"resp1\"}}}}\n" + ) + .unwrap(); + writeln!(fixture_file, "event: response.output_text.delta").unwrap(); + writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"fixture \",\"item_id\":\"msg1\"}}\n").unwrap(); + writeln!(fixture_file, "event: response.output_text.delta").unwrap(); + writeln!(fixture_file, "data: {{\"type\":\"response.output_text.delta\",\"delta\":\"hello\",\"item_id\":\"msg1\"}}\n").unwrap(); + writeln!(fixture_file, "event: response.output_text.done").unwrap(); + writeln!(fixture_file, "data: {{\"type\":\"response.output_text.done\",\"text\":\"fixture hello\",\"item_id\":\"msg1\"}}\n").unwrap(); + writeln!(fixture_file, "event: response.output_item.done").unwrap(); + writeln!(fixture_file, "data: {{\"type\":\"response.output_item.done\",\"item\":{{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{{\"type\":\"output_text\",\"text\":\"fixture hello\"}}]}}}}\n").unwrap(); + writeln!(fixture_file, "event: response.completed").unwrap(); + writeln!(fixture_file, "data: {{\"type\":\"response.completed\",\"response\":{{\"id\":\"resp1\",\"output\":[]}}}}\n").unwrap(); + + let home = TempDir::new().unwrap(); + let mut cmd = AssertCommand::new("cargo"); + cmd.arg("run") + .arg("-p") + .arg("codex-cli") + .arg("--quiet") + .arg("--") + .arg("exec") + .arg("--skip-git-repo-check") + .arg("-c") + .arg("streaming=true") + .arg("-C") + .arg(env!("CARGO_MANIFEST_DIR")) + .arg("hello?"); + cmd.env("CODEX_HOME", home.path()) + .env("OPENAI_API_KEY", "dummy") + .env("CODEX_RS_SSE_FIXTURE", &fixture_path) + .env("OPENAI_BASE_URL", "http://unused.local"); + + let output = cmd.output().unwrap(); + assert!(output.status.success()); + let stdout = String::from_utf8_lossy(&output.stdout); + // Assert that 'fixture ' and 'hello' are output as two separate chunks from stdout, not as a single chunk + // We split the output on the known delta substrings and check their order and separation + let mut chunks = Vec::new(); + let mut last = 0; + for pat in ["fixture ", "hello"] { + if let Some(idx) = stdout[last..].find(pat) { + if last != last + idx { + let chunk = &stdout[last..last + idx]; + if !chunk.trim().is_empty() { + chunks.push(chunk); + } + } + chunks.push(&stdout[last + idx..last + idx + pat.len()]); + last = last + idx + pat.len(); + } + } + if last < stdout.len() { + let chunk = &stdout[last..]; + if !chunk.trim().is_empty() { + chunks.push(chunk); + } + } + // Only keep the delta chunks + let delta_chunks: Vec<&str> = chunks + .iter() + .cloned() + .filter(|s| *s == "fixture " || *s == "hello") + .collect(); + assert_eq!( + delta_chunks, + vec!["fixture ", "hello"], + "Expected two separate delta chunks 'fixture ' and 'hello' from stdout" + ); +} diff --git a/codex-rs/exec/src/event_processor.rs b/codex-rs/exec/src/event_processor.rs index 540e014298..690d94fa3f 100644 --- a/codex-rs/exec/src/event_processor.rs +++ b/codex-rs/exec/src/event_processor.rs @@ -21,6 +21,8 @@ use owo_colors::OwoColorize; use owo_colors::Style; use shlex::try_join; use std::collections::HashMap; +use std::io::Write; +use std::io::{self}; use std::time::Instant; /// This should be configurable. When used in CI, users may not want to impose @@ -50,10 +52,20 @@ pub(crate) struct EventProcessor { /// Whether to include `AgentReasoning` events in the output. show_agent_reasoning: bool, + /// Whether to surface streaming deltas (true = print deltas + suppress final message). + streaming_enabled: bool, + /// Internal: have we already printed the `codex` header for the current streaming turn? + printed_agent_header: bool, + /// Internal: have we already printed the `thinking` header for current streaming turn? + printed_reasoning_header: bool, } impl EventProcessor { - pub(crate) fn create_with_ansi(with_ansi: bool, show_agent_reasoning: bool) -> Self { + pub(crate) fn create_with_ansi( + with_ansi: bool, + show_agent_reasoning: bool, + streaming_enabled: bool, + ) -> Self { let call_id_to_command = HashMap::new(); let call_id_to_patch = HashMap::new(); let call_id_to_tool_call = HashMap::new(); @@ -71,6 +83,9 @@ impl EventProcessor { cyan: Style::new().cyan(), call_id_to_tool_call, show_agent_reasoning, + streaming_enabled, + printed_agent_header: false, + printed_reasoning_header: false, } } else { Self { @@ -85,6 +100,9 @@ impl EventProcessor { cyan: Style::new(), call_id_to_tool_call, show_agent_reasoning, + streaming_enabled, + printed_agent_header: false, + printed_reasoning_header: false, } } } @@ -179,17 +197,46 @@ impl EventProcessor { ts_println!(self, "{}", message.style(self.dimmed)); } EventMsg::TaskStarted | EventMsg::TaskComplete(_) => { + // Reset streaming headers at start/end boundaries. + if matches!(msg, EventMsg::TaskStarted) { + self.printed_agent_header = false; + self.printed_reasoning_header = false; + } // Ignore. } EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => { ts_println!(self, "tokens used: {total_tokens}"); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { - ts_println!( - self, - "{}\n{message}", - "codex".style(self.bold).style(self.magenta) - ); + if self.streaming_enabled { + // Suppress full message when streaming; final markdown not printed in CLI. + // If no deltas were seen, fall back to printing now. + if !self.printed_agent_header { + ts_println!( + self, + "{}\n{message}", + "codex".style(self.bold).style(self.magenta) + ); + } + } else { + ts_println!( + self, + "{}\n{message}", + "codex".style(self.bold).style(self.magenta) + ); + } + } + EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => { + if !self.streaming_enabled { + // streaming disabled, ignore + } else { + if !self.printed_agent_header { + ts_println!(self, "{}", "codex".style(self.bold).style(self.magenta)); + self.printed_agent_header = true; + } + print!("{message}"); + let _ = io::stdout().flush(); + } } EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id, @@ -343,7 +390,7 @@ impl EventProcessor { ); // Pretty-print the patch summary with colored diff markers so - // it’s easy to scan in the terminal output. + // it's easy to scan in the terminal output. for (path, change) in changes.iter() { match change { FileChange::Add { content } => { @@ -441,12 +488,37 @@ impl EventProcessor { } EventMsg::AgentReasoning(agent_reasoning_event) => { if self.show_agent_reasoning { - ts_println!( - self, - "{}\n{}", - "thinking".style(self.italic).style(self.magenta), - agent_reasoning_event.text - ); + if self.streaming_enabled { + if !self.printed_reasoning_header { + ts_println!( + self, + "{}\n{}", + "thinking".style(self.italic).style(self.magenta), + agent_reasoning_event.text + ); + } + } else { + ts_println!( + self, + "{}\n{}", + "thinking".style(self.italic).style(self.magenta), + agent_reasoning_event.text + ); + } + } + } + EventMsg::AgentReasoningDelta(agent_reasoning_event) => { + if self.show_agent_reasoning && self.streaming_enabled { + if !self.printed_reasoning_header { + ts_println!( + self, + "{}", + "thinking".style(self.italic).style(self.magenta) + ); + self.printed_reasoning_header = true; + } + print!("{}", agent_reasoning_event.text); + let _ = io::stdout().flush(); } } EventMsg::SessionConfigured(session_configured_event) => { diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 44dddd4d0f..49ae23437a 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -115,8 +115,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option) -> any }; let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?; - let mut event_processor = - EventProcessor::create_with_ansi(stdout_with_ansi, !config.hide_agent_reasoning); + println!("[DEBUG] streaming_enabled: {}", config.streaming_enabled); + let mut event_processor = EventProcessor::create_with_ansi( + stdout_with_ansi, + !config.hide_agent_reasoning, + config.streaming_enabled, + ); // Print the effective configuration and prompt so users can see what Codex // is using. event_processor.print_config_summary(&config, &prompt); diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 7c3b02fe5e..5a65e9e6c7 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -175,6 +175,8 @@ pub async fn run_codex_tool_session( | EventMsg::TaskStarted | EventMsg::TokenCount(_) | EventMsg::AgentReasoning(_) + | EventMsg::AgentMessageDelta(_) + | EventMsg::AgentReasoningDelta(_) | EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) | EventMsg::ExecCommandBegin(_) diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 865e339763..2bc13bc344 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -40,6 +40,15 @@ use crate::history_cell::PatchEventType; use crate::user_approval_widget::ApprovalRequest; use codex_file_search::FileMatch; +/// Bookkeeping for a live streaming cell. We track the `sub_id` to know when +/// a new turn has started (and thus when to start a new cell) and accumulate +/// the full text so we can re-render markdown cleanly when the turn ends. +#[derive(Default)] +struct StreamingBuf { + sub_id: Option, + text: String, +} + pub(crate) struct ChatWidget<'a> { app_event_tx: AppEventSender, codex_op_tx: UnboundedSender, @@ -49,6 +58,10 @@ pub(crate) struct ChatWidget<'a> { config: Config, initial_user_message: Option, token_usage: TokenUsage, + /// Accumulates assistant streaming text for the *current* turn. + streaming_agent: StreamingBuf, + /// Accumulates reasoning streaming text for the *current* turn. + streaming_reasoning: StreamingBuf, } #[derive(Clone, Copy, Eq, PartialEq)] @@ -135,6 +148,8 @@ impl ChatWidget<'_> { initial_images, ), token_usage: TokenUsage::default(), + streaming_agent: StreamingBuf::default(), + streaming_reasoning: StreamingBuf::default(), } } @@ -220,6 +235,8 @@ impl ChatWidget<'_> { pub(crate) fn handle_codex_event(&mut self, event: Event) { let Event { id, msg } = event; + // We need a copy of `id` for streaming bookkeeping because it is moved into some match arms. + let event_id = id.clone(); match msg { EventMsg::SessionConfigured(event) => { // Record session information at the top of the conversation. @@ -240,14 +257,111 @@ impl ChatWidget<'_> { self.request_redraw(); } EventMsg::AgentMessage(AgentMessageEvent { message }) => { + if self.config.streaming_enabled { + // Final full assistant message. If we have an in-flight streaming cell for this id, replace it. + let same_turn = self + .streaming_agent + .sub_id + .as_ref() + .map(|s| s == &event_id) + .unwrap_or(false); + if same_turn { + self.conversation_history + .replace_last_agent_message(&self.config, message.clone()); + self.streaming_agent.sub_id = None; + self.streaming_agent.text.clear(); + } else { + // Streaming enabled but we never saw deltas – just render normally. + self.finalize_streams_if_new_turn(&event_id); + self.conversation_history + .add_agent_message(&self.config, message.clone()); + } + } else { + // Streaming disabled -> always render final message, ignore any deltas. + self.conversation_history + .add_agent_message(&self.config, message.clone()); + } + self.request_redraw(); + } + EventMsg::AgentMessageDelta(AgentMessageEvent { message }) => { + // Streaming Assistant text. + if !self.config.streaming_enabled { + // Ignore when streaming disabled. + return; + } + // Start a new cell if this delta belongs to a new turn. + let is_new_stream = self + .streaming_agent + .sub_id + .as_ref() + .map(|s| s != &event_id) + .unwrap_or(true); + if is_new_stream { + // Finalise any in-flight stream from the prior turn. + self.finalize_streams_if_new_turn(&event_id); + // Start a header-only streaming cell so we don't parse partial markdown. + self.conversation_history + .add_agent_message(&self.config, String::new()); + self.streaming_agent.sub_id = Some(event_id.clone()); + self.streaming_agent.text.clear(); + } + // Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget. + self.streaming_agent.text.push_str(&message); self.conversation_history - .add_agent_message(&self.config, message); + .append_agent_message_delta(&self.config, message); self.request_redraw(); } EventMsg::AgentReasoning(AgentReasoningEvent { text }) => { if !self.config.hide_agent_reasoning { + if self.config.streaming_enabled { + // Final full reasoning summary. Replace streaming cell if same turn. + let same_turn = self + .streaming_reasoning + .sub_id + .as_ref() + .map(|s| s == &event_id) + .unwrap_or(false); + if same_turn { + self.conversation_history + .replace_last_agent_reasoning(&self.config, text.clone()); + self.streaming_reasoning.sub_id = None; + self.streaming_reasoning.text.clear(); + } else { + self.finalize_streams_if_new_turn(&event_id); + self.conversation_history + .add_agent_reasoning(&self.config, text.clone()); + } + } else { + self.conversation_history + .add_agent_reasoning(&self.config, text.clone()); + } + self.request_redraw(); + } + } + EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }) => { + if !self.config.hide_agent_reasoning { + if !self.config.streaming_enabled { + // Ignore when streaming disabled. + return; + } + let is_new_stream = self + .streaming_reasoning + .sub_id + .as_ref() + .map(|s| s != &event_id) + .unwrap_or(true); + if is_new_stream { + self.finalize_streams_if_new_turn(&event_id); + // Start header-only streaming cell. + self.conversation_history + .add_agent_reasoning(&self.config, String::new()); + self.streaming_reasoning.sub_id = Some(event_id.clone()); + self.streaming_reasoning.text.clear(); + } + // Accumulate full text; incremental markdown re-render happens in ConversationHistoryWidget. + self.streaming_reasoning.text.push_str(&text); self.conversation_history - .add_agent_reasoning(&self.config, text); + .append_agent_reasoning_delta(&self.config, text); self.request_redraw(); } } @@ -259,6 +373,8 @@ impl ChatWidget<'_> { EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message: _, }) => { + // Turn has ended – ensure no lingering streaming cells remain un-finalised. + self.finalize_streams(); self.bottom_pane.set_task_running(false); self.request_redraw(); } @@ -438,6 +554,42 @@ impl ChatWidget<'_> { tracing::error!("failed to submit op: {e}"); } } + + /// Finalise (render) streaming buffers when we detect a new turn id. + fn finalize_streams_if_new_turn(&mut self, new_id: &str) { + // If the incoming id differs from the current stream id(s) we must flush. + let agent_changed = self + .streaming_agent + .sub_id + .as_ref() + .map(|s| s != new_id) + .unwrap_or(false); + let reasoning_changed = self + .streaming_reasoning + .sub_id + .as_ref() + .map(|s| s != new_id) + .unwrap_or(false); + if agent_changed || reasoning_changed { + self.finalize_streams(); + } + } + + /// Re-render any in-flight streaming cells with full markdown and clear buffers. + fn finalize_streams(&mut self) { + let had_agent = self.streaming_agent.sub_id.take().is_some(); + if had_agent { + let text = std::mem::take(&mut self.streaming_agent.text); + self.conversation_history + .replace_last_agent_message(&self.config, text); + } + let had_reasoning = self.streaming_reasoning.sub_id.take().is_some(); + if had_reasoning { + let text = std::mem::take(&mut self.streaming_reasoning.text); + self.conversation_history + .replace_last_agent_reasoning(&self.config, text); + } + } } impl WidgetRef for &ChatWidget<'_> { diff --git a/codex-rs/tui/src/conversation_history_widget.rs b/codex-rs/tui/src/conversation_history_widget.rs index c0e5031d70..68c169b680 100644 --- a/codex-rs/tui/src/conversation_history_widget.rs +++ b/codex-rs/tui/src/conversation_history_widget.rs @@ -9,9 +9,9 @@ use crossterm::event::KeyCode; use crossterm::event::KeyEvent; use ratatui::prelude::*; use ratatui::style::Style; +use ratatui::text::Span; use ratatui::widgets::*; use serde_json::Value as JsonValue; -use std::cell::Cell as StdCell; use std::cell::Cell; use std::collections::HashMap; use std::path::PathBuf; @@ -26,24 +26,30 @@ pub struct ConversationHistoryWidget { entries: Vec, /// The width (in terminal cells/columns) that [`Entry::line_count`] was /// computed for. When the available width changes we recompute counts. - cached_width: StdCell, + cached_width: Cell, scroll_position: usize, /// Number of lines the last time render_ref() was called - num_rendered_lines: StdCell, + num_rendered_lines: Cell, /// The height of the viewport last time render_ref() was called - last_viewport_height: StdCell, + last_viewport_height: Cell, has_input_focus: bool, + /// Scratch buffer used while incrementally streaming an agent message so we can re-render markdown at newline boundaries. + streaming_agent_message_buf: String, + /// Scratch buffer used while incrementally streaming agent reasoning so we can re-render markdown at newline boundaries. + streaming_agent_reasoning_buf: String, } impl ConversationHistoryWidget { pub fn new() -> Self { Self { entries: Vec::new(), - cached_width: StdCell::new(0), + cached_width: Cell::new(0), scroll_position: usize::MAX, - num_rendered_lines: StdCell::new(0), - last_viewport_height: StdCell::new(0), + num_rendered_lines: Cell::new(0), + last_viewport_height: Cell::new(0), has_input_focus: false, + streaming_agent_message_buf: String::new(), + streaming_agent_reasoning_buf: String::new(), } } @@ -84,38 +90,26 @@ impl ConversationHistoryWidget { } fn scroll_up(&mut self, num_lines: u32) { - // If a user is scrolling up from the "stick to bottom" mode, we need to - // map this to a specific scroll position so we can calculate the delta. - // This requires us to care about how tall the screen is. + // Convert sticky-to-bottom sentinel into a concrete offset anchored at the bottom. if self.scroll_position == usize::MAX { - self.scroll_position = self - .num_rendered_lines - .get() - .saturating_sub(self.last_viewport_height.get()); + self.scroll_position = sticky_offset( + self.num_rendered_lines.get(), + self.last_viewport_height.get(), + ); } - self.scroll_position = self.scroll_position.saturating_sub(num_lines as usize); } fn scroll_down(&mut self, num_lines: u32) { - // If we're already pinned to the bottom there's nothing to do. + // Nothing to do if we're already pinned to the bottom. if self.scroll_position == usize::MAX { return; } - let viewport_height = self.last_viewport_height.get().max(1); - let num_rendered_lines = self.num_rendered_lines.get(); - - // Compute the maximum explicit scroll offset that still shows a full - // viewport. This mirrors the calculation in `scroll_page_down()` and - // in the render path. - let max_scroll = num_rendered_lines.saturating_sub(viewport_height); - + let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height); let new_pos = self.scroll_position.saturating_add(num_lines as usize); - if new_pos >= max_scroll { - // Reached (or passed) the bottom – switch to stick‑to‑bottom mode - // so that additional output keeps the view pinned automatically. + // Switch to sticky-bottom mode so subsequent output pins view. self.scroll_position = usize::MAX; } else { self.scroll_position = new_pos; @@ -125,44 +119,21 @@ impl ConversationHistoryWidget { /// Scroll up by one full viewport height (Page Up). fn scroll_page_up(&mut self) { let viewport_height = self.last_viewport_height.get().max(1); - - // If we are currently in the "stick to bottom" mode, first convert the - // implicit scroll position (`usize::MAX`) into an explicit offset that - // represents the very bottom of the scroll region. This mirrors the - // logic from `scroll_up()`. if self.scroll_position == usize::MAX { - self.scroll_position = self - .num_rendered_lines - .get() - .saturating_sub(viewport_height); + self.scroll_position = sticky_offset(self.num_rendered_lines.get(), viewport_height); } - - // Move up by a full page. self.scroll_position = self.scroll_position.saturating_sub(viewport_height); } /// Scroll down by one full viewport height (Page Down). fn scroll_page_down(&mut self) { - // Nothing to do if we're already stuck to the bottom. if self.scroll_position == usize::MAX { return; } - let viewport_height = self.last_viewport_height.get().max(1); - let num_lines = self.num_rendered_lines.get(); - - // Calculate the maximum explicit scroll offset that is still within - // range. This matches the logic in `scroll_down()` and the render - // method. - let max_scroll = num_lines.saturating_sub(viewport_height); - - // Attempt to move down by a full page. + let max_scroll = sticky_offset(self.num_rendered_lines.get(), viewport_height); let new_pos = self.scroll_position.saturating_add(viewport_height); - if new_pos >= max_scroll { - // We have reached (or passed) the bottom – switch back to - // automatic stick‑to‑bottom mode so that subsequent output keeps - // the viewport pinned. self.scroll_position = usize::MAX; } else { self.scroll_position = new_pos; @@ -195,13 +166,107 @@ impl ConversationHistoryWidget { } pub fn add_agent_message(&mut self, config: &Config, message: String) { + // Reset streaming scratch because we are starting a fresh agent message. + self.streaming_agent_message_buf.clear(); + self.streaming_agent_message_buf.push_str(&message); self.add_to_history(HistoryCell::new_agent_message(config, message)); } pub fn add_agent_reasoning(&mut self, config: &Config, text: String) { + self.streaming_agent_reasoning_buf.clear(); + self.streaming_agent_reasoning_buf.push_str(&text); self.add_to_history(HistoryCell::new_agent_reasoning(config, text)); } + /// Append incremental assistant text. + /// + /// Previous heuristic: fast‑append chunks until we saw a newline, then re‑render. + /// This caused visible "one‑word" lines (e.g., "The" -> "The user") when models + /// streamed small token fragments and also delayed Markdown styling (headings, code fences) + /// until the first newline arrived. To improve perceived quality we now *always* re‑render + /// the accumulated markdown buffer on every incoming delta chunk. We still apply the + /// soft‑break collapsing heuristic (outside fenced code blocks) so interim layout more closely + /// matches the final message and reduces layout thrash. + pub fn append_agent_message_delta(&mut self, _config: &Config, text: String) { + if text.is_empty() { + return; + } + // Accumulate full buffer. + self.streaming_agent_message_buf.push_str(&text); + + let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_message_buf); + if let Some(idx) = last_agent_message_idx(&self.entries) { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_message(_config, collapsed); + // Drop trailing blank so we can continue streaming additional tokens cleanly. + if let HistoryCell::AgentMessage { view } = &mut entry.cell { + drop_trailing_blank_line(&mut view.lines); + } + if width > 0 { + update_entry_height(entry, width); + } + } else { + // No existing cell? Start a new one. + self.add_agent_message(_config, self.streaming_agent_message_buf.clone()); + } + } + + /// Append incremental reasoning text (mirrors `append_agent_message_delta`). + pub fn append_agent_reasoning_delta(&mut self, _config: &Config, text: String) { + if text.is_empty() { + return; + } + self.streaming_agent_reasoning_buf.push_str(&text); + + let collapsed = collapse_single_newlines_for_streaming(&self.streaming_agent_reasoning_buf); + if let Some(idx) = last_agent_reasoning_idx(&self.entries) { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_reasoning(_config, collapsed); + if let HistoryCell::AgentReasoning { view } = &mut entry.cell { + drop_trailing_blank_line(&mut view.lines); + } + if width > 0 { + update_entry_height(entry, width); + } + } else { + self.add_agent_reasoning(_config, self.streaming_agent_reasoning_buf.clone()); + } + } + + /// Replace the most recent AgentMessage cell with the fully accumulated `text`. + /// This should be called once the turn is complete so we can render proper markdown. + pub fn replace_last_agent_message(&mut self, config: &Config, text: String) { + self.streaming_agent_message_buf.clear(); + if let Some(idx) = last_agent_message_idx(&self.entries) { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_message(config, text); + if width > 0 { + update_entry_height(entry, width); + } + } else { + // No existing AgentMessage (shouldn't happen) – append new. + self.add_agent_message(config, text); + } + } + + /// Replace the most recent AgentReasoning cell with the fully accumulated `text`. + pub fn replace_last_agent_reasoning(&mut self, config: &Config, text: String) { + self.streaming_agent_reasoning_buf.clear(); + if let Some(idx) = last_agent_reasoning_idx(&self.entries) { + let width = self.cached_width.get(); + let entry = &mut self.entries[idx]; + entry.cell = HistoryCell::new_agent_reasoning(config, text); + if width > 0 { + update_entry_height(entry, width); + } + } else { + self.add_agent_reasoning(config, text); + } + } + pub fn add_background_event(&mut self, message: String) { self.add_to_history(HistoryCell::new_background_event(message)); } @@ -279,7 +344,7 @@ impl ConversationHistoryWidget { // Update cached line count. if width > 0 { - entry.line_count.set(cell.height(width)); + update_entry_height(entry, width); } break; } @@ -313,7 +378,7 @@ impl ConversationHistoryWidget { entry.cell = completed; if width > 0 { - entry.line_count.set(entry.cell.height(width)); + update_entry_height(entry, width); } break; @@ -370,14 +435,12 @@ impl WidgetRef for ConversationHistoryWidget { self.entries.iter().map(|e| e.line_count.get()).sum() }; - // Determine the scroll position. Note the existing value of - // `self.scroll_position` could exceed the maximum scroll offset if the - // user made the window wider since the last render. - let max_scroll = num_lines.saturating_sub(viewport_height); + // Determine the scroll position (respect sticky-to-bottom sentinel and clamp). + let max_scroll = sticky_offset(num_lines, viewport_height); let scroll_pos = if self.scroll_position == usize::MAX { max_scroll } else { - self.scroll_position.min(max_scroll) + clamp_scroll_pos(self.scroll_position, max_scroll) }; // ------------------------------------------------------------------ @@ -454,7 +517,7 @@ impl WidgetRef for ConversationHistoryWidget { { // Choose a thumb color that stands out only when this pane has focus so that the - // user’s attention is naturally drawn to the active viewport. When unfocused we show + // user's attention is naturally drawn to the active viewport. When unfocused we show // a low-contrast thumb so the scrollbar fades into the background without becoming // invisible. let thumb_style = if self.has_input_focus { @@ -497,3 +560,118 @@ impl WidgetRef for ConversationHistoryWidget { pub(crate) const fn wrap_cfg() -> ratatui::widgets::Wrap { ratatui::widgets::Wrap { trim: false } } + +// --------------------------------------------------------------------------- +// Scrolling helpers (private) +// --------------------------------------------------------------------------- +#[inline] +fn sticky_offset(num_lines: usize, viewport_height: usize) -> usize { + num_lines.saturating_sub(viewport_height.max(1)) +} + +#[inline] +fn clamp_scroll_pos(pos: usize, max_scroll: usize) -> usize { + pos.min(max_scroll) +} + +// --------------------------------------------------------------------------- +// Streaming helpers (private) +// --------------------------------------------------------------------------- + +/// Locate the most recent `HistoryCell::AgentMessage` entry. +fn last_agent_message_idx(entries: &[Entry]) -> Option { + entries + .iter() + .rposition(|e| matches!(e.cell, HistoryCell::AgentMessage { .. })) +} + +/// Locate the most recent `HistoryCell::AgentReasoning` entry. +fn last_agent_reasoning_idx(entries: &[Entry]) -> Option { + entries + .iter() + .rposition(|e| matches!(e.cell, HistoryCell::AgentReasoning { .. })) +} + +/// True if the line is an empty spacer (single empty span). +fn is_blank_line(line: &Line<'_>) -> bool { + line.spans.len() == 1 && line.spans[0].content.is_empty() +} + +/// Ensure that the vector has *at least* one body line after the header. +/// A freshly-created AgentMessage/Reasoning cell always has a header + blank line, +/// but streaming cells may be created empty; this makes sure we have a target line. +#[allow(dead_code)] +fn ensure_body_line(lines: &mut Vec>) { + if lines.len() < 2 { + lines.push(Line::from("")); + } +} + +/// Trim a single trailing blank spacer line (but preserve intentional paragraph breaks). +fn drop_trailing_blank_line(lines: &mut Vec>) { + if let Some(last) = lines.last() { + if is_blank_line(last) { + lines.pop(); + } + } +} + +/// Append streaming text, honouring embedded newlines. +#[allow(dead_code)] +fn append_streaming_text_chunks(lines: &mut Vec>, text: &str) { + // NOTE: This helper is now a fallback path only (we eagerly re-render accumulated markdown). + // Still, keep behaviour sane: drop trailing spacer, ensure a writable body line, then append. + drop_trailing_blank_line(lines); + ensure_body_line(lines); + if let Some(last_line) = lines.last_mut() { + last_line.spans.push(Span::raw(text.to_string())); + } else { + lines.push(Line::from(text.to_string())); + } +} + +/// Re-measure a mutated entry at `width` columns and update its cached height. +fn update_entry_height(entry: &Entry, width: u16) { + entry.line_count.set(entry.cell.height(width)); +} + +/// Collapse *single* newlines in a streaming buffer into spaces so that interim streaming +/// renders more closely match final Markdown layout — *except* when we detect fenced code blocks. +/// If the accumulated text contains a Markdown code fence (``` or ~~~), we preserve **all** +/// newlines verbatim so multi-line code renders correctly while streaming. +fn collapse_single_newlines_for_streaming(src: &str) -> String { + // Quick fence detection. If we see a code fence marker anywhere in the accumulated text, + // skip collapsing entirely so we do not mangle code formatting. + if src.contains("```") || src.contains("~~~") { + return src.to_string(); + } + + let mut out = String::with_capacity(src.len()); + let mut pending_newlines = 0usize; + for ch in src.chars() { + if ch == '\n' { + pending_newlines += 1; + continue; + } + if pending_newlines == 1 { + // soft break -> space + out.push(' '); + } else if pending_newlines > 1 { + // preserve paragraph breaks exactly + for _ in 0..pending_newlines { + out.push('\n'); + } + } + pending_newlines = 0; + out.push(ch); + } + // flush tail + if pending_newlines == 1 { + out.push(' '); + } else if pending_newlines > 1 { + for _ in 0..pending_newlines { + out.push('\n'); + } + } + out +}