Skip to content

Enable streaming response deltas #1574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
6 changes: 6 additions & 0 deletions codex-rs/core/src/chat_completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;
}
}
}
}
Expand Down
25 changes: 22 additions & 3 deletions codex-rs/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ impl ModelClient {
}
}
}

pub fn streaming_enabled(&self) -> bool {
self.config.streaming_enabled
}
}

#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -205,6 +209,7 @@ struct SseEvent {
kind: String,
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -315,7 +320,7 @@ where
// duplicated `output` array embedded in the `response.completed`
// payload. That produced two concrete issues:
// 1. No real‑time streaming – the user only saw output after the
// entire turn had finished, which broke the typing UX and
// entire turn had finished, which broke the "typing" UX and
// made long‑running turns look stalled.
// 2. Duplicate `function_call_output` items – both the
// individual *and* the completed array were forwarded, which
Expand All @@ -337,6 +342,22 @@ where
return;
}
}
"response.output_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.created" => {
if event.response.is_some() {
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
Expand All @@ -360,10 +381,8 @@ where
| "response.function_call_arguments.delta"
| "response.in_progress"
| "response.output_item.added"
| "response.output_text.delta"
| "response.output_text.done"
| "response.reasoning_summary_part.added"
| "response.reasoning_summary_text.delta"
| "response.reasoning_summary_text.done" => {
// Currently, we ignore these events, but we handle them
// separately to skip the logging message in the `other` case.
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/core/src/client_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ impl Prompt {
pub enum ResponseEvent {
Created,
OutputItemDone(ResponseItem),
/// Streaming text from an assistant message.
OutputTextDelta(String),
/// Streaming text from a reasoning summary.
ReasoningSummaryDelta(String),
Completed {
response_id: String,
token_usage: Option<TokenUsage>,
Expand Down
61 changes: 49 additions & 12 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,19 +1121,15 @@ async fn try_run_turn(

let mut stream = sess.client.clone().stream(&prompt).await?;

// Buffer all the incoming messages from the stream first, then execute them.
// If we execute a function call in the middle of handling the stream, it can time out.
let mut input = Vec::new();
while let Some(event) = stream.next().await {
input.push(event?);
}

let mut output = Vec::new();
for event in input {
// Patch: buffer for non-streaming mode
let mut assistant_message_buf = String::new();
let streaming_enabled = sess.client.streaming_enabled();
while let Some(event) = stream.next().await {
let event = event?;
match event {
ResponseEvent::Created => {
let mut state = sess.state.lock().unwrap();
// We successfully created a new response and ensured that all pending calls were included so we can clear the pending call ids.
state.pending_call_ids.clear();
}
ResponseEvent::OutputItemDone(item) => {
Expand All @@ -1146,18 +1142,59 @@ async fn try_run_turn(
_ => None,
};
if let Some(call_id) = call_id {
// We just got a new call id so we need to make sure to respond to it in the next turn.
let mut state = sess.state.lock().unwrap();
state.pending_call_ids.insert(call_id.clone());
}
let response = handle_response_item(sess, sub_id, item.clone()).await?;

// Patch: buffer assistant message text if streaming is disabled
if !streaming_enabled {
if let ResponseItem::Message { role, content } = &item {
if role == "assistant" {
for c in content {
if let ContentItem::OutputText { text } = c {
assistant_message_buf.push_str(text);
}
}
}
}
}
let response = match &item {
ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } => None,
_ => handle_response_item(sess, sub_id, item.clone()).await?,
};
output.push(ProcessedResponseItem { item, response });
}
ResponseEvent::OutputTextDelta(text) => {
if streaming_enabled {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
} else {
assistant_message_buf.push_str(&text);
}
}
ResponseEvent::ReasoningSummaryDelta(text) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::Completed {
response_id,
token_usage,
} => {
// Patch: emit full message if we buffered deltas
if !streaming_enabled && !assistant_message_buf.is_empty() {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: assistant_message_buf.clone(),
}),
};
sess.tx_event.send(event).await.ok();
}
if let Some(token_usage) = token_usage {
sess.tx_event
.send(Event {
Expand Down
18 changes: 18 additions & 0 deletions codex-rs/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ pub struct Config {
/// request using the Responses API.
pub model_reasoning_summary: ReasoningSummary,

/// Whether to surface live streaming delta events in front-ends. When `true`
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
/// events and UIs may show a typing indicator. When `false` Codex UIs should
/// ignore delta events and rely solely on the final aggregated
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
pub streaming_enabled: bool,

/// When set to `true`, overrides the default heuristic and forces
/// `model_supports_reasoning_summaries()` to return `true`.
pub model_supports_reasoning_summaries: bool,
Expand Down Expand Up @@ -321,6 +328,13 @@ pub struct ConfigToml {

/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
pub chatgpt_base_url: Option<String>,

/// Whether to surface live streaming delta events in front-ends. When `true`
/// (default) Codex will forward `AgentMessageDelta` / `AgentReasoningDelta`
/// events and UIs may show a typing indicator. When `false` Codex UIs should
/// ignore delta events and rely solely on the final aggregated
/// `AgentMessage`/`AgentReasoning` items (legacy behaviour).
pub streaming: Option<bool>,
}

impl ConfigToml {
Expand Down Expand Up @@ -486,6 +500,7 @@ impl Config {
.or(cfg.model_reasoning_summary)
.unwrap_or_default(),

streaming_enabled: cfg.streaming.unwrap_or(true),
model_supports_reasoning_summaries: cfg
.model_supports_reasoning_summaries
.unwrap_or(false),
Expand Down Expand Up @@ -798,6 +813,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::High,
model_reasoning_summary: ReasoningSummary::Detailed,
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
},
Expand Down Expand Up @@ -844,6 +860,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::default(),
model_reasoning_summary: ReasoningSummary::default(),
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
};
Expand Down Expand Up @@ -905,6 +922,7 @@ disable_response_storage = true
hide_agent_reasoning: false,
model_reasoning_effort: ReasoningEffort::default(),
model_reasoning_summary: ReasoningSummary::default(),
streaming_enabled: true,
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
};
Expand Down
9 changes: 8 additions & 1 deletion codex-rs/core/src/config_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@
/// Deriving the `env` based on this policy works as follows:
/// 1. Create an initial map based on the `inherit` policy.
/// 2. If `ignore_default_excludes` is false, filter the map using the default
/// exclude pattern(s), which are: `"*KEY*"` and `"*TOKEN*"`.
/// exclude pattern(s), which are: "*KEY*" and "*TOKEN*".
/// 3. If `exclude` is not empty, filter the map using the provided patterns.
/// 4. Insert any entries from `r#set` into the map.
/// 5. If non-empty, filter the map using the `include_only` patterns.
Expand Down Expand Up @@ -228,3 +228,10 @@
/// Option to disable reasoning summaries.
None,
}

// ---------------------------------------------------------------------------
// NOTE: The canonical ConfigToml definition lives in `crate::config`.
// Historically this file accidentally re-declared that struct, which caused

Check failure on line 234 in codex-rs/core/src/config_types.rs

View workflow job for this annotation

GitHub Actions / Check for spelling errors

re-declared ==> redeclared
// drift and confusion. The duplicate has been removed; please use
// `codex_core::config::ConfigToml` instead.
// ---------------------------------------------------------------------------
6 changes: 6 additions & 0 deletions codex-rs/core/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,15 @@ pub enum EventMsg {
/// Agent text output message
AgentMessage(AgentMessageEvent),

/// Incremental assistant text delta
AgentMessageDelta(AgentMessageEvent),

/// Reasoning event from agent.
AgentReasoning(AgentReasoningEvent),

/// Incremental reasoning text delta.
AgentReasoningDelta(AgentReasoningEvent),

/// Ack the client's configure message.
SessionConfigured(SessionConfiguredEvent),

Expand Down
Loading
Loading