diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index f577b493..ff59d8bc 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -242,6 +242,11 @@ public Mono sendRequest(String method, Object requestParams, TypeReferenc this.pendingResponses.remove(requestId); sink.error(error); }); + sink.onDispose(() -> { + if (this.pendingResponses.remove(requestId) != null) { + logger.debug("Request {} disposed (e.g., timeout, cancellation), removed from client pendingResponses.", requestId); + } + }); })).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> { if (jsonRpcResponse.error() != null) { logger.error("Error handling request: {}", jsonRpcResponse.error()); diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 86906d85..f7f7fab7 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -121,6 +121,11 @@ public Mono sendRequest(String method, Object requestParams, TypeReferenc this.pendingResponses.remove(requestId); sink.error(error); }); + sink.onDispose(() -> { + if (this.pendingResponses.remove(requestId) != null) { + logger.debug("Request {} disposed (e.g., timeout, cancellation), removed from client pendingResponses.", requestId); + } + }); }).timeout(requestTimeout).handle((jsonRpcResponse, sink) -> { if (jsonRpcResponse.error() != null) { sink.error(new McpError(jsonRpcResponse.error()));