Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MCP timeouts to be recoverable #1100

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion crates/goose-cli/src/commands/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use goose_mcp::{
use mcp_server::router::RouterService;
use mcp_server::{BoundedService, ByteTransport, Server};
use tokio::io::{stdin, stdout};
use tokio::time::{timeout, Duration};

pub async fn run_server(name: &str) -> Result<()> {
// Initialize logging
Expand All @@ -29,5 +30,15 @@ pub async fn run_server(name: &str) -> Result<()> {
let transport = ByteTransport::new(stdin(), stdout());

tracing::info!("Server initialized and ready to handle requests");
Ok(server.run(transport).await?)

// Add timeout and cancellation handling
let server_future = server.run(transport);
match timeout(Duration::from_secs(30), server_future).await {
Ok(result) => result,
Err(_) => {
tracing::warn!("Timeout occurred while running the server");
// Handle cancellation logic here if needed
Ok(())
}
}
}
56 changes: 56 additions & 0 deletions crates/goose-cli/src/commands/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use goose::providers::create;
use std::path::Path;

use mcp_client::transport::Error as McpClientError;
use tokio::time::{timeout, Duration};

pub async fn build_session(
name: Option<String>,
Expand Down Expand Up @@ -176,3 +177,58 @@ fn display_session_info(resume: bool, provider: &str, model: &str, session_file:
style(session_file.display()).dim().cyan(),
);
}

// Add timeout mechanism using `tokio::time::timeout` in `agent_process_messages` function
impl<'a> Session<'a> {
async fn agent_process_messages(&mut self) {
let mut stream = match self.agent.reply(&self.messages).await {
Ok(stream) => stream,
Err(e) => {
eprintln!("Error starting reply stream: {}", e);
return;
}
};

loop {
tokio::select! {
response = timeout(Duration::from_secs(30), stream.next()) => {
match response {
Ok(Some(Ok(message))) => {
self.messages.push(message.clone());
persist_messages(&self.session_file, &self.messages).unwrap_or_else(|e| eprintln!("Failed to persist messages: {}", e));
self.prompt.hide_busy();
self.prompt.render(Box::new(message.clone()));
self.prompt.show_busy();
}
Ok(Some(Err(e))) => {
eprintln!("Error: {}", e);
drop(stream);
self.rewind_messages();
self.prompt.render(raw_message(r#"
The error above was an exception we were not able to handle.\n\n
These errors are often related to connection or authentication\n
We've removed the conversation up to the most recent user message
- depending on the error you may be able to continue"#));
break;
}
Ok(None) => break,
Err(_) => {
eprintln!("Timeout occurred while waiting for response");
drop(stream);
self.handle_interrupted_messages();
break;
}
}
}
_ = tokio::signal::ctrl_c() => {
// Kill any running processes when the client disconnects
// TODO is this used? I suspect post MCP this is on the server instead
// goose::process_store::kill_processes();
drop(stream);
self.handle_interrupted_messages();
break;
}
}
}
}
}
13 changes: 12 additions & 1 deletion crates/goose-server/src/commands/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use goose_mcp::{
use mcp_server::router::RouterService;
use mcp_server::{BoundedService, ByteTransport, Server};
use tokio::io::{stdin, stdout};
use tokio::time::{timeout, Duration};

pub async fn run(name: &str) -> Result<()> {
// Initialize logging
Expand All @@ -28,5 +29,15 @@ pub async fn run(name: &str) -> Result<()> {
let transport = ByteTransport::new(stdin(), stdout());

tracing::info!("Server initialized and ready to handle requests");
Ok(server.run(transport).await?)

// Add timeout and cancellation handling
let server_future = server.run(transport);
match timeout(Duration::from_secs(30), server_future).await {
Ok(result) => result,
Err(_) => {
tracing::warn!("Timeout occurred while running the server");
// Handle cancellation logic here if needed
Ok(())
}
}
}
10 changes: 10 additions & 0 deletions crates/mcp-client/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ impl<T: TransportHandle> McpService<T> {
inner: Arc::new(transport),
}
}

pub async fn cancel(&self, request_id: &str) -> Result<(), Error> {
let cancel_message = JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "cancel".to_string(),
params: Some(json!({ "request_id": request_id })),
});

self.inner.send(cancel_message).await.map(|_| ())
}
}

impl<T> Service<JsonRpcMessage> for McpService<T>
Expand Down