Skip to content

Commit a677cb1

Browse files
committed
makes possible to use StreamableHttpClientTransport in McpClient
1 parent 67b2881 commit a677cb1

File tree

6 files changed

+85
-47
lines changed

6 files changed

+85
-47
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public class McpAsyncClient {
159159
* @param features the MCP Client supported features.
160160
*/
161161
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
162-
McpClientFeatures.Async features) {
162+
McpClientFeatures.Async features, boolean connectOnInit) {
163163

164164
Assert.notNull(transport, "Transport must not be null");
165165
Assert.notNull(requestTimeout, "Request timeout must not be null");
@@ -234,7 +234,9 @@ public class McpAsyncClient {
234234
asyncLoggingNotificationHandler(loggingConsumersFinal));
235235

236236
this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
237-
237+
if (connectOnInit) {
238+
this.mcpSession.openSSE();
239+
}
238240
}
239241

240242
/**
@@ -301,6 +303,18 @@ public Mono<Void> closeGracefully() {
301303
return this.mcpSession.closeGracefully();
302304
}
303305

306+
// ---------------------------
307+
// open an SSE stream
308+
// ---------------------------
309+
/**
310+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
311+
* SSE stream, allowing the server to communicate to the client, without the client
312+
* first sending data via HTTP POST.
313+
*/
314+
public void openSSE() {
315+
this.mcpSession.openSSE();
316+
}
317+
304318
// --------------------------
305319
// Initialization
306320
// --------------------------

mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

+32-5
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,13 @@ class SyncSpec {
157157

158158
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
159159

160+
private boolean connectOnInit = true; // Default true, for backward compatibility
161+
160162
private Duration initializationTimeout = Duration.ofSeconds(20);
161163

162164
private ClientCapabilities capabilities;
163165

164-
private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
166+
private Implementation clientInfo = new Implementation("Java SDK MCP Sync Client", "0.10.0");
165167

166168
private final Map<String, Root> roots = new HashMap<>();
167169

@@ -195,6 +197,17 @@ public SyncSpec requestTimeout(Duration requestTimeout) {
195197
return this;
196198
}
197199

200+
/**
201+
* Sets whether to connect to the server during the initialization phase (open an
202+
* SSE stream).
203+
* @param connectOnInit true to open an SSE stream during the initialization
204+
* @return This builder instance for method chaining
205+
*/
206+
public SyncSpec withConnectOnInit(final boolean connectOnInit) {
207+
this.connectOnInit = connectOnInit;
208+
return this;
209+
}
210+
198211
/**
199212
* @param initializationTimeout The duration to wait for the initialization
200213
* lifecycle step to complete.
@@ -368,8 +381,8 @@ public McpSyncClient build() {
368381

369382
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
370383

371-
return new McpSyncClient(
372-
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
384+
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
385+
asyncFeatures, this.connectOnInit));
373386
}
374387

375388
}
@@ -396,11 +409,13 @@ class AsyncSpec {
396409

397410
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
398411

412+
private boolean connectOnInit = true; // Default true, for backward compatibility
413+
399414
private Duration initializationTimeout = Duration.ofSeconds(20);
400415

401416
private ClientCapabilities capabilities;
402417

403-
private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
418+
private Implementation clientInfo = new Implementation("Java SDK MCP Async Client", "0.10.0");
404419

405420
private final Map<String, Root> roots = new HashMap<>();
406421

@@ -434,6 +449,17 @@ public AsyncSpec requestTimeout(Duration requestTimeout) {
434449
return this;
435450
}
436451

452+
/**
453+
* Sets whether to connect to the server during the initialization phase (open an
454+
* SSE stream).
455+
* @param connectOnInit true to open an SSE stream during the initialization
456+
* @return This builder instance for method chaining
457+
*/
458+
public AsyncSpec withConnectOnInit(final boolean connectOnInit) {
459+
this.connectOnInit = connectOnInit;
460+
return this;
461+
}
462+
437463
/**
438464
* @param initializationTimeout The duration to wait for the initialization
439465
* lifecycle step to complete.
@@ -606,7 +632,8 @@ public McpAsyncClient build() {
606632
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
607633
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
608634
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
609-
this.loggingConsumers, this.samplingHandler));
635+
this.loggingConsumers, this.samplingHandler),
636+
this.connectOnInit);
610637
}
611638

612639
}

mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ public boolean closeGracefully() {
128128
return true;
129129
}
130130

131+
// ---------------------------
132+
// open an SSE stream
133+
// ---------------------------
134+
/**
135+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
136+
* SSE stream, allowing the server to communicate to the client, without the client
137+
* first sending data via HTTP POST.
138+
*/
139+
public void openSSE() {
140+
this.delegate.openSSE();
141+
}
142+
131143
/**
132144
* The initialization phase MUST be the first interaction between client and server.
133145
* During this phase, the client and server:
@@ -147,9 +159,7 @@ public boolean closeGracefully() {
147159
* The server MUST respond with its own capabilities and information:
148160
* {@link McpSchema.ServerCapabilities}. <br/>
149161
* After successful initialization, the client MUST send an initialized notification
150-
* to indicate it is ready to begin normal operations.
151-
*
152-
* <br/>
162+
* to indicate it is ready to begin normal operations. <br/>
153163
*
154164
* <a href=
155165
* "https://github.com/modelcontextprotocol/specification/blob/main/docs/specification/basic/lifecycle.md#initialization">Initialization
@@ -271,9 +281,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest r
271281

272282
/**
273283
* Resource templates allow servers to expose parameterized resources using URI
274-
* templates. Arguments may be auto-completed through the completion API.
275-
*
276-
* Request a list of resource templates the server has.
284+
* templates. Arguments may be auto-completed through the completion API. Request a
285+
* list of resource templates the server has.
277286
* @param cursor the cursor
278287
* @return the list of resource templates result.
279288
*/
@@ -292,9 +301,7 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
292301
/**
293302
* Subscriptions. The protocol supports optional subscriptions to resource changes.
294303
* Clients can subscribe to specific resources and receive notifications when they
295-
* change.
296-
*
297-
* Send a resources/subscribe request.
304+
* change. Send a resources/subscribe request.
298305
* @param subscribeRequest the subscribe request contains the uri of the resource to
299306
* subscribe to.
300307
*/

mcp/src/main/java/io/modelcontextprotocol/client/transport/StreamableHttpClientTransport.java

+3-28
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ public class StreamableHttpClientTransport implements McpClientTransport {
7272

7373
private final URI uri;
7474

75-
private final AtomicReference<TransportState> state = new AtomicReference<>(TransportState.DISCONNECTED);
76-
7775
private final AtomicReference<String> lastEventId = new AtomicReference<>();
7876

7977
private final AtomicReference<String> mcpSessionId = new AtomicReference<>();
@@ -99,15 +97,6 @@ public static Builder builder(final String uri) {
9997
return new Builder().withBaseUri(uri);
10098
}
10199

102-
/**
103-
* The state of the Transport connection.
104-
*/
105-
public enum TransportState {
106-
107-
DISCONNECTED, CONNECTING, CONNECTED, CLOSED
108-
109-
}
110-
111100
/**
112101
* A builder for creating instances of WebSocketClientTransport.
113102
*/
@@ -188,10 +177,6 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
188177
return sseClientTransport.connect(handler);
189178
}
190179

191-
if (!state.compareAndSet(TransportState.DISCONNECTED, TransportState.CONNECTING)) {
192-
return Mono.error(new IllegalStateException("Already connected or connecting"));
193-
}
194-
195180
return Mono.defer(() -> Mono.fromFuture(() -> {
196181
final HttpRequest.Builder request = requestBuilder.copy().GET().header(ACCEPT, TEXT_EVENT_STREAM).uri(uri);
197182
final String lastId = lastEventId.get();
@@ -219,13 +204,10 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
219204
return handleStreamingResponse(response, handler);
220205
})
221206
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(err -> err instanceof IllegalStateException))
222-
.doOnSuccess(v -> state.set(TransportState.CONNECTED))
223-
.doOnTerminate(() -> state.set(TransportState.CLOSED))
224207
.onErrorResume(e -> {
225208
LOGGER.error("Streamable transport connection error", e);
226-
state.set(TransportState.DISCONNECTED);
227209
return Mono.error(e);
228-
}));
210+
})).doOnTerminate(this::closeGracefully);
229211
}
230212

231213
@Override
@@ -239,10 +221,6 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
239221
return fallbackToSse(message);
240222
}
241223

242-
if (state.get() == TransportState.CLOSED) {
243-
return Mono.empty();
244-
}
245-
246224
return serializeJson(message).flatMap(json -> {
247225
final HttpRequest.Builder request = requestBuilder.copy()
248226
.POST(HttpRequest.BodyPublishers.ofString(json))
@@ -427,7 +405,8 @@ else if (node.isObject()) {
427405

428406
@Override
429407
public Mono<Void> closeGracefully() {
430-
state.set(TransportState.CLOSED);
408+
mcpSessionId.set(null);
409+
lastEventId.set(null);
431410
if (fallbackToSse.get()) {
432411
return sseClientTransport.closeGracefully();
433412
}
@@ -439,8 +418,4 @@ public <T> T unmarshalFrom(final Object data, final TypeReference<T> typeRef) {
439418
return objectMapper.convertValue(data, typeRef);
440419
}
441420

442-
public TransportState getState() {
443-
return state.get();
444-
}
445-
446421
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class McpClientSession implements McpSession {
6161
/** Atomic counter for generating unique request IDs */
6262
private final AtomicLong requestCounter = new AtomicLong(0);
6363

64-
private final Disposable connection;
64+
private Disposable connection;
6565

6666
/**
6767
* Functional interface for handling incoming JSON-RPC requests. Implementations
@@ -116,6 +116,17 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
116116
this.transport = transport;
117117
this.requestHandlers.putAll(requestHandlers);
118118
this.notificationHandlers.putAll(notificationHandlers);
119+
}
120+
121+
/**
122+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
123+
* SSE stream, allowing the server to communicate to the client, without the client
124+
* first sending data via HTTP POST.
125+
*/
126+
public void openSSE() {
127+
if (this.connection != null && !this.connection.isDisposed()) {
128+
return; // already connected and still active
129+
}
119130

120131
// TODO: consider mono.transformDeferredContextual where the Context contains
121132
// the
@@ -281,7 +292,9 @@ public Mono<Void> closeGracefully() {
281292
*/
282293
@Override
283294
public void close() {
284-
this.connection.dispose();
295+
if (this.connection != null) {
296+
this.connection.dispose();
297+
}
285298
transport.close();
286299
}
287300

mcp/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ void setUp() {
4848
transport = new MockMcpClientTransport();
4949
session = new McpClientSession(TIMEOUT, transport, Map.of(),
5050
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: " + params))));
51+
session.openSSE();
5152
}
5253

5354
@AfterEach
@@ -141,6 +142,7 @@ void testRequestHandling() {
141142
params -> Mono.just(params));
142143
transport = new MockMcpClientTransport();
143144
session = new McpClientSession(TIMEOUT, transport, requestHandlers, Map.of());
145+
session.openSSE();
144146

145147
// Simulate incoming request
146148
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, ECHO_METHOD,
@@ -162,7 +164,7 @@ void testNotificationHandling() {
162164
transport = new MockMcpClientTransport();
163165
session = new McpClientSession(TIMEOUT, transport, Map.of(),
164166
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> receivedParams.tryEmitValue(params))));
165-
167+
session.openSSE();
166168
// Simulate incoming notification from the server
167169
Map<String, Object> notificationParams = Map.of("status", "ready");
168170

0 commit comments

Comments
 (0)