Skip to content

Commit 1f5e488

Browse files
brettchienclaude
authored andcommitted
fix(acp): clean up pending + cancel agent on abandoned prompts (#732)
The flat 600s recv_timeout in adapter.rs:386 fires "Agent stopped responding" without removing pending[id] or sending session/cancel. The agent keeps running the abandoned prompt and eventually emits its final response with the original id. The reader at connection.rs:284 looks up pending[id], sees the now-stale entry, and forwards the message to the *current* notify_tx subscriber — which belongs to the next prompt. The next prompt's loop sees notification.id.is_some() and breaks immediately with empty text_buf, returning "(no response)". Each new prompt sent before the agent drains its backlog inherits the previous prompt's stale id and the cascade persists. Fix follows the issue's recommended A+B+C: (A) Replace flat 600s timeout with a tokio::select! loop in stream_prompt_blocks. Recv arm + 30s liveness arm. Liveness arm checks conn.alive() (cheap, just !reader_handle.is_finished()) and a configurable hard ceiling. Default ceiling is 30 min via pool.prompt_hard_timeout_secs. Long-running tools no longer trip the timeout — only a dead reader task or the hard ceiling abandon the prompt. (B) Add AcpConnection::abandon_request(request_id) called on every abandon path: drops pending[request_id] so a late response cannot route to a future subscriber, and best-effort writes session/cancel so the agent stops working on a request the broker has given up on. (C) Capture request_id from session_prompt() (was discarded as `_`) and skip notifications whose id doesn't match. Defense-in-depth at the routing layer; complements (B)'s cleanup if any future abandon path forgets to call abandon_request. No unit test for abandon_request — the connection has no test seam without spawning a real subprocess. Behavior is exercised end-to-end via the adapter loop on real ACP backends. Refs: - #76 (Assumption 2: prompts always complete) - #307 (sibling: same family, different visible symptom) - #470 (added the 600s recv timeout this issue exposes) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 7084636 commit 1f5e488

6 files changed

Lines changed: 70 additions & 11 deletions

File tree

config.toml.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ working_dir = "/home/agent"
104104
[pool]
105105
max_sessions = 10
106106
session_ttl_hours = 24
107+
# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min).
108+
# prompt_hard_timeout_secs = 1800
107109

108110
[markdown]
109111
tables = "code" # "code" (default) | "bullets" | "off"

src/acp/connection.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,24 @@ impl AcpConnection {
557557
self.last_active = Instant::now();
558558
}
559559

560+
/// Drop the pending entry for `request_id` and best-effort send
561+
/// `session/cancel`. Errors are swallowed: the agent process may already
562+
/// be dead, in which case the stdin write fails harmlessly. See #732.
563+
pub async fn abandon_request(&self, request_id: u64) {
564+
self.pending.lock().await.remove(&request_id);
565+
let Some(session_id) = self.acp_session_id.as_deref() else {
566+
return;
567+
};
568+
let req = json!({
569+
"jsonrpc": "2.0",
570+
"method": "session/cancel",
571+
"params": {"sessionId": session_id},
572+
});
573+
if let Ok(data) = serde_json::to_string(&req) {
574+
let _ = self.send_raw(&data).await;
575+
}
576+
}
577+
560578
/// Return a clone of the stdin handle for lock-free cancel.
561579
pub fn cancel_handle(&self) -> Arc<Mutex<ChildStdin>> {
562580
Arc::clone(&self.stdin)

src/adapter.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,24 +153,30 @@ pub trait ChatAdapter: Send + Sync + 'static {
153153

154154
// --- AdapterRouter ---
155155

156+
/// Polling cadence for the recv-loop liveness check (#732).
157+
const LIVENESS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
158+
156159
/// Shared logic for routing messages to ACP agents, managing sessions,
157160
/// streaming edits, and controlling reactions. Platform-independent.
158161
pub struct AdapterRouter {
159162
pool: Arc<SessionPool>,
160163
reactions_config: ReactionsConfig,
161164
table_mode: TableMode,
165+
prompt_hard_timeout: std::time::Duration,
162166
}
163167

164168
impl AdapterRouter {
165169
pub fn new(
166170
pool: Arc<SessionPool>,
167171
reactions_config: ReactionsConfig,
168172
table_mode: TableMode,
173+
prompt_hard_timeout_secs: u64,
169174
) -> Self {
170175
Self {
171176
pool,
172177
reactions_config,
173178
table_mode,
179+
prompt_hard_timeout: std::time::Duration::from_secs(prompt_hard_timeout_secs),
174180
}
175181
}
176182

@@ -335,6 +341,7 @@ impl AdapterRouter {
335341
let streaming = adapter.use_streaming(other_bot_present);
336342
let table_mode = self.table_mode;
337343
let tool_display = self.reactions_config.tool_display;
344+
let prompt_hard_timeout = self.prompt_hard_timeout;
338345

339346
self.pool
340347
.with_connection(thread_key, |conn| {
@@ -343,7 +350,7 @@ impl AdapterRouter {
343350
let reset = conn.session_reset;
344351
conn.session_reset = false;
345352

346-
let (mut rx, _) = conn.session_prompt(content_blocks).await?;
353+
let (mut rx, request_id) = conn.session_prompt(content_blocks).await?;
347354
reactions.set_thinking().await;
348355

349356
let mut text_buf = String::new();
@@ -396,20 +403,40 @@ impl AdapterRouter {
396403
(None, None)
397404
};
398405

399-
// Process ACP notifications
406+
// (#732) Liveness-aware recv loop. Filters stale id-bearing
407+
// messages and abandons cleanly on dead agent / hard ceiling
408+
// so late responses cannot leak into the next prompt.
400409
let mut response_error: Option<String> = None;
401-
let recv_timeout = std::time::Duration::from_secs(600);
410+
let prompt_start = std::time::Instant::now();
402411
loop {
403-
let notification = match tokio::time::timeout(recv_timeout, rx.recv()).await
404-
{
405-
Ok(Some(n)) => n,
406-
Ok(None) => break, // channel closed
407-
Err(_) => {
408-
response_error = Some("Agent stopped responding".into());
409-
break;
412+
let notification = tokio::select! {
413+
msg = rx.recv() => match msg {
414+
Some(n) => n,
415+
// Reader saw EOF and already drained pending; nothing to abandon.
416+
None => break,
417+
},
418+
_ = tokio::time::sleep(LIVENESS_CHECK_INTERVAL) => {
419+
if !conn.alive() {
420+
response_error = Some("Agent process died".into());
421+
conn.abandon_request(request_id).await;
422+
break;
423+
}
424+
if prompt_start.elapsed() > prompt_hard_timeout {
425+
response_error = Some(format!(
426+
"Agent exceeded hard timeout ({}m)",
427+
prompt_hard_timeout.as_secs() / 60,
428+
));
429+
conn.abandon_request(request_id).await;
430+
break;
431+
}
432+
continue;
410433
}
411434
};
412-
if notification.id.is_some() {
435+
if let Some(notification_id) = notification.id {
436+
if notification_id != request_id {
437+
// Stale response from a previously-abandoned prompt.
438+
continue;
439+
}
413440
if let Some(ref err) = notification.error {
414441
response_error = Some(format_coded_error(err.code, &err.message));
415442
}

src/config.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,12 @@ pub struct PoolConfig {
313313
pub max_sessions: usize,
314314
#[serde(default = "default_ttl_hours")]
315315
pub session_ttl_hours: u64,
316+
/// Hard ceiling for a single prompt (#732). Once exceeded, the broker
317+
/// abandons the in-flight request, sends `session/cancel` to the agent,
318+
/// and clears the pending entry so late responses cannot leak into the
319+
/// next prompt's subscriber.
320+
#[serde(default = "default_prompt_hard_timeout_secs")]
321+
pub prompt_hard_timeout_secs: u64,
316322
}
317323

318324
#[derive(Debug, Clone, Deserialize)]
@@ -434,6 +440,9 @@ fn default_max_sessions() -> usize {
434440
fn default_ttl_hours() -> u64 {
435441
4
436442
}
443+
pub(crate) fn default_prompt_hard_timeout_secs() -> u64 {
444+
30 * 60
445+
}
437446
fn default_true() -> bool {
438447
true
439448
}
@@ -481,6 +490,7 @@ impl Default for PoolConfig {
481490
Self {
482491
max_sessions: default_max_sessions(),
483492
session_ttl_hours: default_ttl_hours(),
493+
prompt_hard_timeout_secs: default_prompt_hard_timeout_secs(),
484494
}
485495
}
486496
}

src/dispatch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,6 +1072,7 @@ mod tests {
10721072
pool,
10731073
crate::config::ReactionsConfig::default(),
10741074
crate::markdown::TableMode::Off,
1075+
crate::config::default_prompt_hard_timeout_secs(),
10751076
));
10761077
Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT)
10771078
}

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ async fn main() -> anyhow::Result<()> {
147147
pool.clone(),
148148
cfg.reactions,
149149
cfg.markdown.tables,
150+
cfg.pool.prompt_hard_timeout_secs,
150151
));
151152

152153
// Shutdown signal for Slack adapter

0 commit comments

Comments
 (0)