Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,9 @@ async def _receive_loop(self) -> None:
await self._in_flight[cancelled_id].cancel()
else:
await self._received_notification(notification)
await self._incoming_message_stream_writer.send(
notification
)
# await self._incoming_message_stream_writer.send(
# notification
# )
Comment on lines +327 to +329
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems to be a debugging left over?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the lines that are present in shared/session that I believe cause the deadlock. If you uncomment the lines and run the end-to-end test, it will not complete (it will hang). If you comment the lines out, the test runs to completion.

except Exception as e:
# For other validation errors, log and continue
logging.warning(
Expand Down
33 changes: 33 additions & 0 deletions tests/example_mcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Small Demo server using FastMCP and illustrating debugging and notification streams
#

import logging
from mcp.server.fastmcp import FastMCP, Context
import time
import asyncio

mcp = FastMCP("MCP EXAMPLE SERVER", debug=True, log_level="DEBUG")

logger = logging.getLogger(__name__)

logger.debug(f"MCP STARTING EXAMPLE SERVER")

@mcp.resource("config://app")
def get_config() -> str:
"""Static configuration data"""
return "Test Server 2024-02-25"

@mcp.tool()
async def simple_tool(x:float, y:float, ctx:Context) -> str:
logger.debug("IN SIMPLE_TOOL")
await ctx.report_progress(1, 2)
return x*y

@mcp.tool()
async def simple_tool_with_logging(x:float, y:float, ctx:Context) -> str:
await ctx.info(f"Processing Simple Tool")
logger.debug("IN SIMPLE_TOOL")
await ctx.report_progress(1, 2)
return x*y

136 changes: 136 additions & 0 deletions tests/mcp_stdio_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from mcp import ClientSession, ListToolsResult, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.types import CallToolResult
from mcp import Tool as MCPTool

from contextlib import AsyncExitStack
from typing import Any
import asyncio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally don't use asyncio but anyio.



import logging
logger = logging.getLogger(__name__)



class NotificationLoggingClientSession(ClientSession):

def __init__(self, read_stream, write_stream):
print(f"NOTIFICATION LOGGING CLIENT SESSION")
super().__init__(read_stream, write_stream)

# override base session to log incoming notifications
async def _received_notification(self, notification):
print(f"NOTIFICATION:{notification}")
print(f"NOTIFICATION-END")

async def send_progress_notification(self, progress_token, progress, total):
print(f"PROGRESS:{progress_token}")
print(f"PROGRESS-END")


# adapted from mcp-python-sdk/examples/clients/simple-chatbot/mcp_simple_chatbot/main.py
class MCPClient:
"""Manages MCP server connections and tool execution."""

def __init__(self, name, server_params: StdioServerParameters, errlog=None):
self.name = name
self.server_params = server_params
self.errlog = errlog
self.stdio_context: Any | None = None
self.session: ClientSession | None = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
self.exit_stack: AsyncExitStack = AsyncExitStack()

async def initialize(self) -> None:
"""Initialize the server connection."""

try:
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(self.server_params)
)
read, write = stdio_transport
session = await self.exit_stack.enter_async_context(
# ClientSession(read, write)
NotificationLoggingClientSession(read, write)
)
await session.initialize()
self.session = session
except Exception as e:
logging.error(f"Error initializing server: {e}")
await self.cleanup()
raise

async def get_available_tools(self) -> list[MCPTool]:
"""List available tools from the server.

Returns:
A list of available tools.

Raises:
RuntimeError: If the server is not initialized.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")

tools_response = await self.session.list_tools()

# Let's just ignore pagination for now
return tools_response.tools

async def call_tool(
self,
tool_name: str,
arguments: dict[str, Any],
retries: int = 2,
delay: float = 1.0,
) -> Any:
"""Execute a tool with retry mechanism.

Args:
tool_name: Name of the tool to execute.
arguments: Tool arguments.
retries: Number of retry attempts.
delay: Delay between retries in seconds.

Returns:
Tool execution result.

Raises:
RuntimeError: If server is not initialized.
Exception: If tool execution fails after all retries.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")

attempt = 0
while attempt < retries:
try:
logging.info(f"Executing {tool_name}...")
result = await self.session.call_tool(tool_name, arguments)

return result

except Exception as e:
attempt += 1
logging.warning(
f"Error executing tool: {e}. Attempt {attempt} of {retries}."
)
if attempt < retries:
logging.info(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
logging.error("Max retries reached. Failing.")
raise

async def cleanup(self) -> None:
"""Clean up server resources."""
async with self._cleanup_lock:
try:
await self.exit_stack.aclose()
self.session = None
self.stdio_context = None
except Exception as e:
logging.error(f"Error during cleanup of server {self.name}: {e}")


83 changes: 83 additions & 0 deletions tests/test_mcp_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pytest

import os
import sys
from .mcp_stdio_client import MCPClient

from mcp import StdioServerParameters

# locate the exmaple MCP server co-located in this directory

mcp_server_dir = os.path.dirname(os.path.abspath(__file__))
mcp_server_file = os.path.join(mcp_server_dir, "example_mcp_server.py")

# mcpServers config in same syntax used by reference MCP

servers_config = {
"mcpServers": {

"testMcpServer": {
"command": "mcp", # be sure to . .venv/bin/activate so that mcp command is found
"args": [
"run",
mcp_server_file
]
}

}
}


# @pytest.mark.asyncio
@pytest.mark.anyio
async def test_mcp():

servers = servers_config.get("mcpServers")

server0 = "testMcpServer"
config0 = servers[server0]

client = MCPClient(
server0,
StdioServerParameters.model_validate(config0)
)
await client.initialize()
tools = await client.get_available_tools()

print(f"TOOLS:{tools}")
mcp_tool = tools[0]

res = await client.call_tool("simple_tool", {"x":5, "y":7})

print(f"RES:{res}")

# clients must be destroyed in reverse order
await client.cleanup()


# @pytest.mark.asyncio
@pytest.mark.anyio
async def test_mcp_with_logging():

servers = servers_config.get("mcpServers")

server0 = "testMcpServer"
config0 = servers[server0]

client = MCPClient(
server0,
StdioServerParameters.model_validate(config0)
)
await client.initialize()
tools = await client.get_available_tools()

print(f"TOOLS:{tools}")
mcp_tool = tools[0]

res = await client.call_tool("simple_tool_with_logging", {"x":5, "y":7})

print(f"RES:{res}")

# clients must be destroyed in reverse order
await client.cleanup()

Loading