Skip to content

Commit 7d3674a

Browse files
Add retry_interval to FastMCP and test_reconnection to everything-server (SEP-1699)
- Add retry_interval parameter to FastMCP for SSE polling control - Add InMemoryEventStore and test_reconnection tool to everything-server - Enables SSE polling conformance test to pass (server-sse-polling scenario)
1 parent fdcd8f5 commit 7d3674a

File tree

6 files changed

+72
-11
lines changed

6 files changed

+72
-11
lines changed

examples/clients/sse-polling-client/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ uv run mcp-sse-polling-client --url http://localhost:3000/mcp --items 20 --check
2424

2525
## Options
2626

27-
- `--url`: Server URL (default: http://localhost:3000/mcp)
27+
- `--url`: Server URL (default: <http://localhost:3000/mcp>)
2828
- `--items`: Number of items to process (default: 10)
2929
- `--checkpoint-every`: Checkpoint interval (default: 3)
3030
- `--log-level`: Logging level (default: DEBUG)

examples/servers/everything-server/mcp_everything_server/server.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
from mcp.server.fastmcp import Context, FastMCP
1515
from mcp.server.fastmcp.prompts.base import UserMessage
1616
from mcp.server.session import ServerSession
17+
from mcp.server.streamable_http import EventCallback, EventMessage, EventStore
1718
from mcp.types import (
1819
AudioContent,
1920
Completion,
2021
CompletionArgument,
2122
CompletionContext,
2223
EmbeddedResource,
2324
ImageContent,
25+
JSONRPCMessage,
2426
PromptReference,
2527
ResourceTemplateReference,
2628
SamplingMessage,
@@ -31,6 +33,43 @@
3133

3234
logger = logging.getLogger(__name__)
3335

36+
# Type aliases for event store
37+
StreamId = str
38+
EventId = str
39+
40+
41+
class InMemoryEventStore(EventStore):
42+
"""Simple in-memory event store for SSE resumability testing."""
43+
44+
def __init__(self) -> None:
45+
self._events: list[tuple[StreamId, EventId, JSONRPCMessage | None]] = []
46+
self._event_id_counter = 0
47+
48+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
49+
"""Store an event and return its ID."""
50+
self._event_id_counter += 1
51+
event_id = str(self._event_id_counter)
52+
self._events.append((stream_id, event_id, message))
53+
return event_id
54+
55+
async def replay_events_after(self, last_event_id: EventId, send_callback: EventCallback) -> StreamId | None:
56+
"""Replay events after the specified ID."""
57+
target_stream_id = None
58+
for stream_id, event_id, _ in self._events:
59+
if event_id == last_event_id:
60+
target_stream_id = stream_id
61+
break
62+
if target_stream_id is None:
63+
return None
64+
last_event_id_int = int(last_event_id)
65+
for stream_id, event_id, message in self._events:
66+
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
67+
# Skip priming events (None message)
68+
if message is not None:
69+
await send_callback(EventMessage(message, event_id))
70+
return target_stream_id
71+
72+
3473
# Test data
3574
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
3675
TEST_AUDIO_BASE64 = "UklGRiYAAABXQVZFZm10IBAAAAABAAEAQB8AAAB9AAACABAAZGF0YQIAAAA="
@@ -39,8 +78,13 @@
3978
resource_subscriptions: set[str] = set()
4079
watched_resource_content = "Watched resource content"
4180

81+
# Create event store for SSE resumability (SEP-1699)
82+
event_store = InMemoryEventStore()
83+
4284
mcp = FastMCP(
4385
name="mcp-conformance-test-server",
86+
event_store=event_store,
87+
retry_interval=100, # 100ms retry interval for SSE polling
4488
)
4589

4690

@@ -263,6 +307,19 @@ def test_error_handling() -> str:
263307
raise RuntimeError("This tool intentionally returns an error for testing")
264308

265309

310+
@mcp.tool()
311+
async def test_reconnection(ctx: Context[ServerSession, None]) -> str:
312+
"""Tests SSE polling by closing stream mid-call (SEP-1699)"""
313+
await ctx.info("Before disconnect")
314+
315+
await ctx.close_sse_stream()
316+
317+
await asyncio.sleep(0.2) # Wait for client to reconnect
318+
319+
await ctx.info("After reconnect")
320+
return "Reconnection test completed"
321+
322+
266323
# Resources
267324
@mcp.resource("test://static-text")
268325
def static_text_resource() -> str:

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/event_store.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class EventEntry:
2424

2525
event_id: EventId
2626
stream_id: StreamId
27-
message: JSONRPCMessage
27+
message: JSONRPCMessage | None
2828

2929

3030
class InMemoryEventStore(EventStore):
@@ -48,7 +48,7 @@ def __init__(self, max_events_per_stream: int = 100):
4848
# event_id -> EventEntry for quick lookup
4949
self.event_index: dict[EventId, EventEntry] = {}
5050

51-
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage) -> EventId:
51+
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
5252
"""Stores an event with a generated event ID."""
5353
event_id = str(uuid4())
5454
event_entry = EventEntry(event_id=event_id, stream_id=stream_id, message=message)
@@ -88,7 +88,9 @@ async def replay_events_after(
8888
found_last = False
8989
for event in stream_events:
9090
if found_last:
91-
await send_callback(EventMessage(event.message, event.event_id))
91+
# Skip priming events (None message)
92+
if event.message is not None:
93+
await send_callback(EventMessage(event.message, event.event_id))
9294
elif event.event_id == last_event_id:
9395
found_last = True
9496

src/mcp/client/streamable_http.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ async def _handle_reconnection(
403403
logger.info("Reconnected to SSE stream")
404404

405405
# Track for potential further reconnection
406-
reconnect_last_event_id: str | None = last_event_id
406+
reconnect_last_event_id: str = last_event_id
407407
reconnect_retry_ms = retry_interval_ms
408408

409409
async for sse in event_source.aiter_sse():
@@ -423,11 +423,8 @@ async def _handle_reconnection(
423423
return
424424

425425
# Stream ended again without response - reconnect again (reset attempt counter)
426-
if reconnect_last_event_id is not None:
427-
logger.info("SSE stream disconnected, reconnecting...")
428-
await self._handle_reconnection(
429-
ctx, reconnect_last_event_id, reconnect_retry_ms, 0
430-
)
426+
logger.info("SSE stream disconnected, reconnecting...")
427+
await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0)
431428
except Exception as e:
432429
logger.debug(f"Reconnection failed: {e}")
433430
# Try to reconnect again if we still have an event ID

src/mcp/server/fastmcp/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def __init__( # noqa: PLR0913
153153
auth_server_provider: (OAuthAuthorizationServerProvider[Any, Any, Any] | None) = None,
154154
token_verifier: TokenVerifier | None = None,
155155
event_store: EventStore | None = None,
156+
retry_interval: int | None = None,
156157
*,
157158
tools: list[Tool] | None = None,
158159
debug: bool = False,
@@ -221,6 +222,7 @@ def __init__( # noqa: PLR0913
221222
if auth_server_provider and not token_verifier: # pragma: no cover
222223
self._token_verifier = ProviderTokenVerifier(auth_server_provider)
223224
self._event_store = event_store
225+
self._retry_interval = retry_interval
224226
self._custom_starlette_routes: list[Route] = []
225227
self.dependencies = self.settings.dependencies
226228
self._session_manager: StreamableHTTPSessionManager | None = None
@@ -940,6 +942,7 @@ def streamable_http_app(self) -> Starlette:
940942
self._session_manager = StreamableHTTPSessionManager(
941943
app=self._mcp_server,
942944
event_store=self._event_store,
945+
retry_interval=self._retry_interval,
943946
json_response=self.settings.json_response,
944947
stateless=self.settings.stateless_http, # Use the stateless setting
945948
security_settings=self.settings.transport_security,

tests/shared/test_streamable_http.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ async def replay_events_after( # pragma: no cover
110110
# Replay only events from the same stream with ID > last_event_id
111111
for stream_id, event_id, message in self._events:
112112
if stream_id == target_stream_id and int(event_id) > last_event_id_int:
113-
await send_callback(EventMessage(message, event_id))
113+
# Skip priming events (None message)
114+
if message is not None:
115+
await send_callback(EventMessage(message, event_id))
114116

115117
return target_stream_id
116118

0 commit comments

Comments
 (0)