Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ working_dir = "/home/agent"
[pool]
max_sessions = 10
session_ttl_hours = 24
# Hard ceiling (sec) per prompt; see #732. Default: 1800 (30 min).
# prompt_hard_timeout_secs = 1800
# Liveness-check cadence (sec) for the recv loop; see #732. Default: 30.
# liveness_check_secs = 30

[markdown]
tables = "code" # "code" (default) | "bullets" | "off"
Expand Down
336 changes: 241 additions & 95 deletions src/acp/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};

/// Pick the most permissive selectable permission option from ACP options.
fn pick_best_option(options: &[Value]) -> Option<String> {
Expand Down Expand Up @@ -149,6 +149,112 @@ fn build_agent_env(
(result, inherited)
}

/// Reader loop body: reads JSON-RPC messages from `reader`, auto-replies
/// `session/request_permission` via `writer`, resolves pending responses,
/// and forwards notifications + stale id-bearing messages to the active
/// subscriber. Extracted as a free generic function so unit tests can drive
/// it with `tokio::io::duplex()` halves instead of a real child process.
pub(crate) async fn run_reader_loop<R, W>(
reader: R,
writer: Arc<Mutex<W>>,
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>>,
notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>>,
) where
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {}
Err(e) => {
error!("reader error: {e}");
break;
}
}
let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) {
Ok(m) => m,
Err(_) => continue,
};
debug!(line = line.trim(), "acp_recv");

// Auto-reply session/request_permission
if msg.method.as_deref() == Some("session/request_permission") {
if let Some(id) = msg.id {
let title = msg
.params
.as_ref()
.and_then(|p| p.get("toolCall"))
.and_then(|t| t.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("?");

let outcome = build_permission_response(msg.params.as_ref());
info!(title, %outcome, "auto-respond permission");
let reply = JsonRpcResponse::new(id, outcome);
if let Ok(data) = serde_json::to_string(&reply) {
let mut w = writer.lock().await;
let _ = w.write_all(format!("{data}\n").as_bytes()).await;
let _ = w.flush().await;
}
}
continue;
}

// Response (has id) → resolve pending AND forward to subscriber
if let Some(id) = msg.id {
let mut map = pending.lock().await;
if let Some(tx) = map.remove(&id) {
// Forward to subscriber so they see the completion
let sub = notify_tx.lock().await;
if let Some(ntx) = sub.as_ref() {
// Clone the essential fields for the subscriber
let _ = ntx.send(JsonRpcMessage {
id: Some(id),
method: None,
result: msg.result.clone(),
error: msg.error.clone(),
params: None,
});
}
let _ = tx.send(msg);
continue;
}
// Stale id (#732): pending was already abandoned. Falls through
// to subscriber forwarding; the adapter recv loop filters by
// request_id so it can't leak into the next prompt.
trace!(request_id = id, "stale id-bearing message after abandon");
}

// Notification → forward to subscriber
let sub = notify_tx.lock().await;
if let Some(tx) = sub.as_ref() {
let _ = tx.send(msg);
}
}

// Connection closed — resolve all pending with error
let mut map = pending.lock().await;
for (_, tx) in map.drain() {
let _ = tx.send(JsonRpcMessage {
id: None,
method: None,
result: None,
error: Some(crate::acp::protocol::JsonRpcError {
code: -1,
message: "connection closed".into(),
}),
params: None,
});
}
// Close the notify channel so rx.recv() returns None
let mut sub = notify_tx.lock().await;
*sub = None;
}

impl AcpConnection {
pub async fn spawn(
command: &str,
Expand Down Expand Up @@ -254,99 +360,12 @@ impl AcpConnection {
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let reader_handle = {
let pending = pending.clone();
let notify_tx = notify_tx.clone();
let stdin_clone = stdin.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {}
Err(e) => {
error!("reader error: {e}");
break;
}
}
let msg: JsonRpcMessage = match serde_json::from_str(line.trim()) {
Ok(m) => m,
Err(_) => continue,
};
debug!(line = line.trim(), "acp_recv");

// Auto-reply session/request_permission
if msg.method.as_deref() == Some("session/request_permission") {
if let Some(id) = msg.id {
let title = msg
.params
.as_ref()
.and_then(|p| p.get("toolCall"))
.and_then(|t| t.get("title"))
.and_then(|t| t.as_str())
.unwrap_or("?");

let outcome = build_permission_response(msg.params.as_ref());
info!(title, %outcome, "auto-respond permission");
let reply = JsonRpcResponse::new(id, outcome);
if let Ok(data) = serde_json::to_string(&reply) {
let mut w = stdin_clone.lock().await;
let _ = w.write_all(format!("{data}\n").as_bytes()).await;
let _ = w.flush().await;
}
}
continue;
}

// Response (has id) → resolve pending AND forward to subscriber
if let Some(id) = msg.id {
let mut map = pending.lock().await;
if let Some(tx) = map.remove(&id) {
// Forward to subscriber so they see the completion
let sub = notify_tx.lock().await;
if let Some(ntx) = sub.as_ref() {
// Clone the essential fields for the subscriber
let _ = ntx.send(JsonRpcMessage {
id: Some(id),
method: None,
result: msg.result.clone(),
error: msg.error.clone(),
params: None,
});
}
let _ = tx.send(msg);
continue;
}
}

// Notification → forward to subscriber
let sub = notify_tx.lock().await;
if let Some(tx) = sub.as_ref() {
let _ = tx.send(msg);
}
}

// Connection closed — resolve all pending with error
let mut map = pending.lock().await;
for (_, tx) in map.drain() {
let _ = tx.send(JsonRpcMessage {
id: None,
method: None,
result: None,
error: Some(crate::acp::protocol::JsonRpcError {
code: -1,
message: "connection closed".into(),
}),
params: None,
});
}
// Close the notify channel so rx.recv() returns None
let mut sub = notify_tx.lock().await;
*sub = None;
})
};
let reader_handle = tokio::spawn(run_reader_loop(
stdout,
stdin.clone(),
pending.clone(),
notify_tx.clone(),
));

Ok(Self {
_proc: proc,
Expand Down Expand Up @@ -557,6 +576,31 @@ impl AcpConnection {
self.last_active = Instant::now();
}

/// Drop the pending entry for `request_id` and best-effort send
/// `session/cancel`. Errors are swallowed: the agent process may already
/// be dead, in which case the stdin write fails harmlessly. See #732.
///
/// `session/cancel` carries a fresh JSON-RPC id but is not registered in
/// `pending`, so the agent's reply lands in the stale-id branch of
/// `run_reader_loop` and only emits a `trace!`. That is intentional: we
/// never wait on the cancel response, and the adapter recv loop's
/// request_id filter prevents leakage into the next prompt.
pub async fn abandon_request(&self, request_id: u64) {
self.pending.lock().await.remove(&request_id);
let Some(session_id) = self.acp_session_id.as_deref() else {
return;
};
let req = json!({
"jsonrpc": "2.0",
"id": self.next_id(),
"method": "session/cancel",
"params": {"sessionId": session_id},
});
if let Ok(data) = serde_json::to_string(&req) {
let _ = self.send_raw(&data).await;
}
}

/// Return a clone of the stdin handle for lock-free cancel.
pub fn cancel_handle(&self) -> Arc<Mutex<ChildStdin>> {
Arc::clone(&self.stdin)
Expand Down Expand Up @@ -758,3 +802,105 @@ mod tests {
assert!(inherited.is_empty());
}
}

#[cfg(test)]
mod reader_loop_tests {
use super::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::{duplex, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot, Mutex};

/// #732 stale-id path: when a response arrives for an id the broker has
/// already abandoned, the reader must (a) not crash, (b) leave `pending`
/// untouched, and (c) still forward the message to whoever is currently
/// subscribed — the adapter recv loop is responsible for filtering by
/// request_id so the stray response never leaks into the next prompt.
#[tokio::test]
async fn stale_id_response_is_forwarded_without_pending_entry() {
let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024);
let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024);

let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(HashMap::new()));
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();
*notify_tx.lock().await = Some(sub_tx);

let writer = Arc::new(Mutex::new(agent_stdin_writer));
let handle = tokio::spawn(run_reader_loop(
agent_stdout_reader,
writer,
pending.clone(),
notify_tx.clone(),
));

let stale = b"{\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"stopReason\":\"ok\"}}\n";
agent_stdout_writer.write_all(stale).await.unwrap();
agent_stdout_writer.flush().await.unwrap();

let forwarded = tokio::time::timeout(
std::time::Duration::from_secs(2),
sub_rx.recv(),
)
.await
.expect("subscriber should receive stale message before timeout")
.expect("subscriber channel should not be closed");
assert_eq!(forwarded.id, Some(42));
assert!(pending.lock().await.is_empty());

drop(agent_stdout_writer);
handle.await.unwrap();
}

/// Matched-id path: when a response's id is in `pending`, the loop must
/// resolve the oneshot AND forward a copy to the subscriber so the
/// adapter's recv loop sees the completion. Guards against regressions
/// that would suppress the forward branch while keeping resolve.
#[tokio::test]
async fn matched_id_response_resolves_pending_and_forwards() {
let (mut agent_stdout_writer, agent_stdout_reader) = duplex(8 * 1024);
let (agent_stdin_writer, _agent_stdin_reader) = duplex(8 * 1024);

let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(HashMap::new()));
let notify_tx: Arc<Mutex<Option<mpsc::UnboundedSender<JsonRpcMessage>>>> =
Arc::new(Mutex::new(None));

let (resp_tx, resp_rx) = oneshot::channel();
pending.lock().await.insert(7, resp_tx);

let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();
*notify_tx.lock().await = Some(sub_tx);

let writer = Arc::new(Mutex::new(agent_stdin_writer));
let handle = tokio::spawn(run_reader_loop(
agent_stdout_reader,
writer,
pending.clone(),
notify_tx.clone(),
));

let payload = b"{\"jsonrpc\":\"2.0\",\"id\":7,\"result\":{\"stopReason\":\"end_turn\"}}\n";
agent_stdout_writer.write_all(payload).await.unwrap();
agent_stdout_writer.flush().await.unwrap();

let resolved = tokio::time::timeout(std::time::Duration::from_secs(2), resp_rx)
.await
.expect("oneshot should resolve")
.expect("oneshot should not be cancelled");
assert_eq!(resolved.id, Some(7));

let forwarded = tokio::time::timeout(std::time::Duration::from_secs(2), sub_rx.recv())
.await
.expect("subscriber should receive forwarded copy")
.expect("subscriber channel should not be closed");
assert_eq!(forwarded.id, Some(7));
assert!(pending.lock().await.is_empty());

drop(agent_stdout_writer);
handle.await.unwrap();
}
}
Loading
Loading