Skip to content

Commit 2c231df

Browse files
committed
Merge branch 'refs/heads/main' into fix/non-blocking-context
# Conflicts: # mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java # mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseIntegrationTests.java # mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java # mcp/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java # mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java # mcp/src/test/java/io/modelcontextprotocol/client/HttpSseMcpAsyncClientTests.java # mcp/src/test/java/io/modelcontextprotocol/client/HttpSseMcpSyncClientTests.java # mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpAsyncClientTests.java # mcp/src/test/java/io/modelcontextprotocol/client/StdioMcpSyncClientTests.java # mcp/src/test/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransportTests.java # mcp/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java
2 parents 5efe8e6 + fab434c commit 2c231df

26 files changed

+988
-179
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java

+61-20
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,16 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
8282
*/
8383
public static final String DEFAULT_SSE_ENDPOINT = "/sse";
8484

85+
public static final String DEFAULT_BASE_URL = "";
86+
8587
private final ObjectMapper objectMapper;
8688

89+
/**
90+
* Base URL for the message endpoint. This is used to construct the full URL for
91+
* clients to send their JSON-RPC messages.
92+
*/
93+
private final String baseUrl;
94+
8795
private final String messageEndpoint;
8896

8997
private final String sseEndpoint;
@@ -102,6 +110,20 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
102110
*/
103111
private volatile boolean isClosing = false;
104112

113+
/**
114+
* Constructs a new WebFlux SSE server transport provider instance with the default
115+
* SSE endpoint.
116+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
117+
* of MCP messages. Must not be null.
118+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
119+
* messages. This endpoint will be communicated to clients during SSE connection
120+
* setup. Must not be null.
121+
* @throws IllegalArgumentException if either parameter is null
122+
*/
123+
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
124+
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
125+
}
126+
105127
/**
106128
* Constructs a new WebFlux SSE server transport provider instance.
107129
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
@@ -112,11 +134,28 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
112134
* @throws IllegalArgumentException if either parameter is null
113135
*/
114136
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
137+
this(objectMapper, DEFAULT_BASE_URL, messageEndpoint, sseEndpoint);
138+
}
139+
140+
/**
141+
* Constructs a new WebFlux SSE server transport provider instance.
142+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
143+
* of MCP messages. Must not be null.
144+
* @param baseUrl webflux messag base path
145+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
146+
* messages. This endpoint will be communicated to clients during SSE connection
147+
* setup. Must not be null.
148+
* @throws IllegalArgumentException if either parameter is null
149+
*/
150+
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
151+
String sseEndpoint) {
115152
Assert.notNull(objectMapper, "ObjectMapper must not be null");
153+
Assert.notNull(baseUrl, "Message base path must not be null");
116154
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
117155
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
118156

119157
this.objectMapper = objectMapper;
158+
this.baseUrl = baseUrl;
120159
this.messageEndpoint = messageEndpoint;
121160
this.sseEndpoint = sseEndpoint;
122161
this.routerFunction = RouterFunctions.route()
@@ -125,20 +164,6 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
125164
.build();
126165
}
127166

128-
/**
129-
* Constructs a new WebFlux SSE server transport provider instance with the default
130-
* SSE endpoint.
131-
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
132-
* of MCP messages. Must not be null.
133-
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
134-
* messages. This endpoint will be communicated to clients during SSE connection
135-
* setup. Must not be null.
136-
* @throws IllegalArgumentException if either parameter is null
137-
*/
138-
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
139-
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
140-
}
141-
142167
@Override
143168
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
144169
this.sessionFactory = sessionFactory;
@@ -171,15 +196,16 @@ public Mono<Void> notifyClients(String method, Map<String, Object> params) {
171196

172197
logger.debug("Attempting to broadcast message to {} active sessions", sessions.size());
173198

174-
return Flux.fromStream(sessions.values().stream())
199+
return Flux.fromIterable(sessions.values())
175200
.flatMap(session -> session.sendNotification(method, params)
176-
.doOnError(e -> logger.error("Failed to " + "send message to session " + "{}: {}", session.getId(),
177-
e.getMessage()))
201+
.doOnError(
202+
e -> logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage()))
178203
.onErrorComplete())
179204
.then();
180205
}
181206

182-
// FIXME: This javadoc makes claims about using isClosing flag but it's not actually
207+
// FIXME: This javadoc makes claims about using isClosing flag but it's not
208+
// actually
183209
// doing that.
184210
/**
185211
* Initiates a graceful shutdown of all the sessions. This method ensures all active
@@ -245,7 +271,7 @@ private Mono<ServerResponse> handleSseConnection(ServerRequest request) {
245271
logger.debug("Sending initial endpoint event to session: {}", sessionId);
246272
sink.next(ServerSentEvent.builder()
247273
.event(ENDPOINT_EVENT_TYPE)
248-
.data(messageEndpoint + "?sessionId=" + sessionId)
274+
.data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId)
249275
.build());
250276
sink.onCancel(() -> {
251277
logger.debug("Session {} cancelled", sessionId);
@@ -360,6 +386,8 @@ public static class Builder {
360386

361387
private ObjectMapper objectMapper;
362388

389+
private String baseUrl = DEFAULT_BASE_URL;
390+
363391
private String messageEndpoint;
364392

365393
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
@@ -377,6 +405,19 @@ public Builder objectMapper(ObjectMapper objectMapper) {
377405
return this;
378406
}
379407

408+
/**
409+
* Sets the project basePath as endpoint prefix where clients should send their
410+
* JSON-RPC messages
411+
* @param baseUrl the message basePath . Must not be null.
412+
* @return this builder instance
413+
* @throws IllegalArgumentException if basePath is null
414+
*/
415+
public Builder basePath(String baseUrl) {
416+
Assert.notNull(baseUrl, "basePath must not be null");
417+
this.baseUrl = baseUrl;
418+
return this;
419+
}
420+
380421
/**
381422
* Sets the endpoint URI where clients should send their JSON-RPC messages.
382423
* @param messageEndpoint The message endpoint URI. Must not be null.
@@ -411,7 +452,7 @@ public WebFluxSseServerTransportProvider build() {
411452
Assert.notNull(objectMapper, "ObjectMapper must be set");
412453
Assert.notNull(messageEndpoint, "Message endpoint must be set");
413454

414-
return new WebFluxSseServerTransportProvider(objectMapper, messageEndpoint, sseEndpoint);
455+
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint);
415456
}
416457

417458
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java

+14-16
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ public class WebFluxSseIntegrationTests {
5252

5353
private static final int PORT = 8182;
5454

55-
// private static final String MESSAGE_ENDPOINT = "/mcp/message";
56-
5755
private static final String CUSTOM_SSE_ENDPOINT = "/somePath/sse";
5856

5957
private static final String CUSTOM_MESSAGE_ENDPOINT = "/otherPath/mcp/message";
@@ -62,7 +60,7 @@ public class WebFluxSseIntegrationTests {
6260

6361
private WebFluxSseServerTransportProvider mcpServerTransportProvider;
6462

65-
ConcurrentHashMap<String, McpClient.SyncSpec> clientBulders = new ConcurrentHashMap<>();
63+
ConcurrentHashMap<String, McpClient.SyncSpec> clientBuilders = new ConcurrentHashMap<>();
6664

6765
@BeforeEach
6866
public void before() {
@@ -103,7 +101,7 @@ public void after() {
103101
@ValueSource(strings = { "httpclient", "webflux" })
104102
void testCreateMessageWithoutSamplingCapabilities(String clientType) {
105103

106-
var clientBuilder = clientBulders.get(clientType);
104+
var clientBuilder = clientBuilders.get(clientType);
107105

108106
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
109107
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
@@ -134,7 +132,7 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) {
134132
void testCreateMessageSuccess(String clientType) throws InterruptedException {
135133

136134
// Client
137-
var clientBuilder = clientBulders.get(clientType);
135+
var clientBuilder = clientBuilders.get(clientType);
138136

139137
Function<CreateMessageRequest, CreateMessageResult> samplingHandler = request -> {
140138
assertThat(request.messages()).hasSize(1);
@@ -203,7 +201,7 @@ void testCreateMessageSuccess(String clientType) throws InterruptedException {
203201
@ParameterizedTest(name = "{0} : {displayName} ")
204202
@ValueSource(strings = { "httpclient", "webflux" })
205203
void testRootsSuccess(String clientType) {
206-
var clientBuilder = clientBulders.get(clientType);
204+
var clientBuilder = clientBuilders.get(clientType);
207205

208206
List<Root> roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2"));
209207

@@ -250,7 +248,7 @@ void testRootsSuccess(String clientType) {
250248
@ValueSource(strings = { "httpclient", "webflux" })
251249
void testRootsWithoutCapability(String clientType) {
252250

253-
var clientBuilder = clientBulders.get(clientType);
251+
var clientBuilder = clientBuilders.get(clientType);
254252

255253
McpServerFeatures.SyncToolSpecification tool = new McpServerFeatures.SyncToolSpecification(
256254
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
@@ -284,7 +282,7 @@ void testRootsWithoutCapability(String clientType) {
284282
@ParameterizedTest(name = "{0} : {displayName} ")
285283
@ValueSource(strings = { "httpclient", "webflux" })
286284
void testRootsNotifciationWithEmptyRootsList(String clientType) {
287-
var clientBuilder = clientBulders.get(clientType);
285+
var clientBuilder = clientBuilders.get(clientType);
288286

289287
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
290288
var mcpServer = McpServer.sync(mcpServerTransportProvider)
@@ -311,7 +309,7 @@ void testRootsNotifciationWithEmptyRootsList(String clientType) {
311309
@ParameterizedTest(name = "{0} : {displayName} ")
312310
@ValueSource(strings = { "httpclient", "webflux" })
313311
void testRootsWithMultipleHandlers(String clientType) {
314-
var clientBuilder = clientBulders.get(clientType);
312+
var clientBuilder = clientBuilders.get(clientType);
315313

316314
List<Root> roots = List.of(new Root("uri1://", "root1"));
317315

@@ -345,7 +343,7 @@ void testRootsWithMultipleHandlers(String clientType) {
345343
@ValueSource(strings = { "httpclient", "webflux" })
346344
void testRootsServerCloseWithActiveSubscription(String clientType) {
347345

348-
var clientBuilder = clientBulders.get(clientType);
346+
var clientBuilder = clientBuilders.get(clientType);
349347

350348
List<Root> roots = List.of(new Root("uri1://", "root1"));
351349

@@ -390,15 +388,15 @@ void testRootsServerCloseWithActiveSubscription(String clientType) {
390388
@ValueSource(strings = { "httpclient", "webflux" })
391389
void testToolCallSuccess(String clientType) {
392390

393-
var clientBuilder = clientBulders.get(clientType);
391+
var clientBuilder = clientBuilders.get(clientType);
394392

395393
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
396394
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
397395
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
398396
// perform a blocking call to a remote service
399397
String response = RestClient.create()
400398
.get()
401-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
399+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")
402400
.retrieve()
403401
.body(String.class);
404402
assertThat(response).isNotBlank();
@@ -430,15 +428,15 @@ void testToolCallSuccess(String clientType) {
430428
@ValueSource(strings = { "httpclient", "webflux" })
431429
void testToolListChangeHandlingSuccess(String clientType) {
432430

433-
var clientBuilder = clientBulders.get(clientType);
431+
var clientBuilder = clientBuilders.get(clientType);
434432

435433
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
436434
McpServerFeatures.SyncToolSpecification tool1 = new McpServerFeatures.SyncToolSpecification(
437435
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
438436
// perform a blocking call to a remote service
439437
String response = RestClient.create()
440438
.get()
441-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
439+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")
442440
.retrieve()
443441
.body(String.class);
444442
assertThat(response).isNotBlank();
@@ -455,7 +453,7 @@ void testToolListChangeHandlingSuccess(String clientType) {
455453
// perform a blocking call to a remote service
456454
String response = RestClient.create()
457455
.get()
458-
.uri("https://github.com/modelcontextprotocol/specification/blob/main/README.md")
456+
.uri("https://raw.githubusercontent.com/modelcontextprotocol/java-sdk/refs/heads/main/README.md")
459457
.retrieve()
460458
.body(String.class);
461459
assertThat(response).isNotBlank();
@@ -500,7 +498,7 @@ void testToolListChangeHandlingSuccess(String clientType) {
500498
@ValueSource(strings = { "httpclient", "webflux" })
501499
void testInitialize(String clientType) {
502500

503-
var clientBuilder = clientBulders.get(clientType);
501+
var clientBuilder = clientBuilders.get(clientType);
504502

505503
var mcpServer = McpServer.sync(mcpServerTransportProvider).build();
506504

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java

+36-15
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
9191

9292
private final String sseEndpoint;
9393

94+
private final String baseUrl;
95+
9496
private final RouterFunction<ServerResponse> routerFunction;
9597

9698
private McpServerSession.Factory sessionFactory;
@@ -105,6 +107,20 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
105107
*/
106108
private volatile boolean isClosing = false;
107109

110+
/**
111+
* Constructs a new WebMvcSseServerTransportProvider instance with the default SSE
112+
* endpoint.
113+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
114+
* of messages.
115+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
116+
* messages via HTTP POST. This endpoint will be communicated to clients through the
117+
* SSE connection's initial endpoint event.
118+
* @throws IllegalArgumentException if either objectMapper or messageEndpoint is null
119+
*/
120+
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
121+
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
122+
}
123+
108124
/**
109125
* Constructs a new WebMvcSseServerTransportProvider instance.
110126
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
@@ -116,11 +132,30 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
116132
* @throws IllegalArgumentException if any parameter is null
117133
*/
118134
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
135+
this(objectMapper, "", messageEndpoint, sseEndpoint);
136+
}
137+
138+
/**
139+
* Constructs a new WebMvcSseServerTransportProvider instance.
140+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
141+
* of messages.
142+
* @param baseUrl The base URL for the message endpoint, used to construct the full
143+
* endpoint URL for clients.
144+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
145+
* messages via HTTP POST. This endpoint will be communicated to clients through the
146+
* SSE connection's initial endpoint event.
147+
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
148+
* @throws IllegalArgumentException if any parameter is null
149+
*/
150+
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
151+
String sseEndpoint) {
119152
Assert.notNull(objectMapper, "ObjectMapper must not be null");
153+
Assert.notNull(baseUrl, "Message base URL must not be null");
120154
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
121155
Assert.notNull(sseEndpoint, "SSE endpoint must not be null");
122156

123157
this.objectMapper = objectMapper;
158+
this.baseUrl = baseUrl;
124159
this.messageEndpoint = messageEndpoint;
125160
this.sseEndpoint = sseEndpoint;
126161
this.routerFunction = RouterFunctions.route()
@@ -129,20 +164,6 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messag
129164
.build();
130165
}
131166

132-
/**
133-
* Constructs a new WebMvcSseServerTransportProvider instance with the default SSE
134-
* endpoint.
135-
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
136-
* of messages.
137-
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
138-
* messages via HTTP POST. This endpoint will be communicated to clients through the
139-
* SSE connection's initial endpoint event.
140-
* @throws IllegalArgumentException if either objectMapper or messageEndpoint is null
141-
*/
142-
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
143-
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
144-
}
145-
146167
@Override
147168
public void setSessionFactory(McpServerSession.Factory sessionFactory) {
148169
this.sessionFactory = sessionFactory;
@@ -248,7 +269,7 @@ private ServerResponse handleSseConnection(ServerRequest request) {
248269
try {
249270
sseBuilder.id(sessionId)
250271
.event(ENDPOINT_EVENT_TYPE)
251-
.data(messageEndpoint + "?sessionId=" + sessionId);
272+
.data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId);
252273
}
253274
catch (Exception e) {
254275
logger.error("Failed to send initial endpoint event: {}", e.getMessage());

0 commit comments

Comments
 (0)