Skip to content

Commit c3e0095

Browse files
brettchienclaude
andcommitted
fix(acp): clean up pending + cancel agent on abandoned prompts (#732)
The flat 600s recv_timeout in adapter.rs:386 fires "Agent stopped responding" without removing pending[id] or sending session/cancel. The agent keeps running the abandoned prompt and eventually emits its final response with the original id. The reader at connection.rs:284 looks up pending[id], sees the now-stale entry, and forwards the message to the *current* notify_tx subscriber — which belongs to the next prompt. The next prompt's loop sees notification.id.is_some() and breaks immediately with empty text_buf, returning "(no response)". Each new prompt sent before the agent drains its backlog inherits the previous prompt's stale id and the cascade persists. Fix follows the issue's recommended A+B+C: (A) Replace flat 600s timeout with a tokio::select! loop in stream_prompt_blocks. Recv arm + 30s liveness arm. Liveness arm checks conn.alive() (cheap, just !reader_handle.is_finished()) and a configurable hard ceiling. Default ceiling is 30 min via pool.prompt_hard_timeout_secs. Long-running tools no longer trip the timeout — only a dead reader task or the hard ceiling abandon the prompt. (B) Add AcpConnection::abandon_request(request_id) called on every abandon path: drops pending[request_id] so a late response cannot route to a future subscriber, and best-effort writes session/cancel so the agent stops working on a request the broker has given up on. (C) Capture request_id from session_prompt() (was discarded as `_`) and skip notifications whose id doesn't match. Defense-in-depth at the routing layer; complements (B)'s cleanup if any future abandon path forgets to call abandon_request. No unit test for abandon_request — the connection has no test seam without spawning a real subprocess. Behavior is exercised end-to-end via the adapter loop on real ACP backends. Refs: - #76 (Assumption 2: prompts always complete) - #307 (sibling: same family, different visible symptom) - #470 (added the 600s recv timeout this issue exposes) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 4446321 commit c3e0095

6 files changed

Lines changed: 77 additions & 13 deletions

File tree

config.toml.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ working_dir = "/home/agent"
104104
[pool]
105105
max_sessions = 10
106106
session_ttl_hours = 24
107+
# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min).
108+
# prompt_hard_timeout_secs = 1800
107109

108110
[markdown]
109111
tables = "code" # "code" (default) | "bullets" | "off"

src/acp/connection.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,24 @@ impl AcpConnection {
531531
self.last_active = Instant::now();
532532
}
533533

534+
/// Drop the pending entry for `request_id` and best-effort send
535+
/// `session/cancel`. Errors are swallowed: the agent process may already
536+
/// be dead, in which case the stdin write fails harmlessly. See #732.
537+
pub async fn abandon_request(&self, request_id: u64) {
538+
self.pending.lock().await.remove(&request_id);
539+
let Some(session_id) = self.acp_session_id.as_deref() else {
540+
return;
541+
};
542+
let req = json!({
543+
"jsonrpc": "2.0",
544+
"method": "session/cancel",
545+
"params": {"sessionId": session_id},
546+
});
547+
if let Ok(data) = serde_json::to_string(&req) {
548+
let _ = self.send_raw(&data).await;
549+
}
550+
}
551+
534552
/// Return a clone of the stdin handle for lock-free cancel.
535553
pub fn cancel_handle(&self) -> Arc<Mutex<ChildStdin>> {
536554
Arc::clone(&self.stdin)

src/adapter.rs

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,24 +153,30 @@ pub trait ChatAdapter: Send + Sync + 'static {
153153

154154
// --- AdapterRouter ---
155155

156+
/// Polling cadence for the recv-loop liveness check (#732).
157+
const LIVENESS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
158+
156159
/// Shared logic for routing messages to ACP agents, managing sessions,
157160
/// streaming edits, and controlling reactions. Platform-independent.
158161
pub struct AdapterRouter {
159162
pool: Arc<SessionPool>,
160163
reactions_config: ReactionsConfig,
161164
table_mode: TableMode,
165+
prompt_hard_timeout: std::time::Duration,
162166
}
163167

164168
impl AdapterRouter {
165169
pub fn new(
166170
pool: Arc<SessionPool>,
167171
reactions_config: ReactionsConfig,
168172
table_mode: TableMode,
173+
prompt_hard_timeout_secs: u64,
169174
) -> Self {
170175
Self {
171176
pool,
172177
reactions_config,
173178
table_mode,
179+
prompt_hard_timeout: std::time::Duration::from_secs(prompt_hard_timeout_secs),
174180
}
175181
}
176182

@@ -327,6 +333,7 @@ impl AdapterRouter {
327333
let streaming = adapter.use_streaming(other_bot_present);
328334
let table_mode = self.table_mode;
329335
let tool_display = self.reactions_config.tool_display;
336+
let prompt_hard_timeout = self.prompt_hard_timeout;
330337

331338
self.pool
332339
.with_connection(thread_key, |conn| {
@@ -335,7 +342,7 @@ impl AdapterRouter {
335342
let reset = conn.session_reset;
336343
conn.session_reset = false;
337344

338-
let (mut rx, _) = conn.session_prompt(content_blocks).await?;
345+
let (mut rx, request_id) = conn.session_prompt(content_blocks).await?;
339346
reactions.set_thinking().await;
340347

341348
let mut text_buf = String::new();
@@ -388,20 +395,40 @@ impl AdapterRouter {
388395
(None, None)
389396
};
390397

391-
// Process ACP notifications
398+
// (#732) Liveness-aware recv loop. Filters stale id-bearing
399+
// messages and abandons cleanly on dead agent / hard ceiling
400+
// so late responses cannot leak into the next prompt.
392401
let mut response_error: Option<String> = None;
393-
let recv_timeout = std::time::Duration::from_secs(600);
402+
let prompt_start = std::time::Instant::now();
394403
loop {
395-
let notification = match tokio::time::timeout(recv_timeout, rx.recv()).await
396-
{
397-
Ok(Some(n)) => n,
398-
Ok(None) => break, // channel closed
399-
Err(_) => {
400-
response_error = Some("Agent stopped responding".into());
401-
break;
404+
let notification = tokio::select! {
405+
msg = rx.recv() => match msg {
406+
Some(n) => n,
407+
// Reader saw EOF and already drained pending; nothing to abandon.
408+
None => break,
409+
},
410+
_ = tokio::time::sleep(LIVENESS_CHECK_INTERVAL) => {
411+
if !conn.alive() {
412+
response_error = Some("Agent process died".into());
413+
conn.abandon_request(request_id).await;
414+
break;
415+
}
416+
if prompt_start.elapsed() > prompt_hard_timeout {
417+
response_error = Some(format!(
418+
"Agent exceeded hard timeout ({}m)",
419+
prompt_hard_timeout.as_secs() / 60,
420+
));
421+
conn.abandon_request(request_id).await;
422+
break;
423+
}
424+
continue;
402425
}
403426
};
404-
if notification.id.is_some() {
427+
if let Some(notification_id) = notification.id {
428+
if notification_id != request_id {
429+
// Stale response from a previously-abandoned prompt.
430+
continue;
431+
}
405432
if let Some(ref err) = notification.error {
406433
response_error = Some(format_coded_error(err.code, &err.message));
407434
}

src/config.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,12 @@ pub struct PoolConfig {
289289
pub max_sessions: usize,
290290
#[serde(default = "default_ttl_hours")]
291291
pub session_ttl_hours: u64,
292+
/// Hard ceiling for a single prompt (#732). Once exceeded, the broker
293+
/// abandons the in-flight request, sends `session/cancel` to the agent,
294+
/// and clears the pending entry so late responses cannot leak into the
295+
/// next prompt's subscriber.
296+
#[serde(default = "default_prompt_hard_timeout_secs")]
297+
pub prompt_hard_timeout_secs: u64,
292298
}
293299

294300
#[derive(Debug, Clone, Deserialize)]
@@ -395,6 +401,7 @@ pub struct ReactionTiming {
395401
fn default_working_dir() -> String { "/tmp".into() }
396402
fn default_max_sessions() -> usize { 10 }
397403
fn default_ttl_hours() -> u64 { 4 }
404+
pub(crate) fn default_prompt_hard_timeout_secs() -> u64 { 30 * 60 }
398405
fn default_true() -> bool { true }
399406

400407
fn emoji_queued() -> String { "👀".into() }
@@ -413,7 +420,11 @@ fn default_error_hold_ms() -> u64 { 2_500 }
413420

414421
impl Default for PoolConfig {
415422
fn default() -> Self {
416-
Self { max_sessions: default_max_sessions(), session_ttl_hours: default_ttl_hours() }
423+
Self {
424+
max_sessions: default_max_sessions(),
425+
session_ttl_hours: default_ttl_hours(),
426+
prompt_hard_timeout_secs: default_prompt_hard_timeout_secs(),
427+
}
417428
}
418429
}
419430

src/dispatch.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,6 +1045,7 @@ mod tests {
10451045
pool,
10461046
crate::config::ReactionsConfig::default(),
10471047
crate::markdown::TableMode::Off,
1048+
crate::config::default_prompt_hard_timeout_secs(),
10481049
));
10491050
Dispatcher::with_idle_timeout(router, 10, 24_000, grouping, DEFAULT_CONSUMER_IDLE_TIMEOUT)
10501051
}

src/main.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,12 @@ async fn main() -> anyhow::Result<()> {
139139
info!(model = %cfg.stt.model, base_url = %cfg.stt.base_url, "STT enabled");
140140
}
141141

142-
let router = Arc::new(AdapterRouter::new(pool.clone(), cfg.reactions, cfg.markdown.tables));
142+
let router = Arc::new(AdapterRouter::new(
143+
pool.clone(),
144+
cfg.reactions,
145+
cfg.markdown.tables,
146+
cfg.pool.prompt_hard_timeout_secs,
147+
));
143148

144149
// Shutdown signal for Slack adapter
145150
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

0 commit comments

Comments
 (0)