Skip to content

feat(client): adds StreamableHttpClientTransport #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public class McpAsyncClient {
* @param features the MCP Client supported features.
*/
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
McpClientFeatures.Async features) {
McpClientFeatures.Async features, boolean connectOnInit) {

Assert.notNull(transport, "Transport must not be null");
Assert.notNull(requestTimeout, "Request timeout must not be null");
Expand Down Expand Up @@ -235,7 +235,9 @@ public class McpAsyncClient {
asyncLoggingNotificationHandler(loggingConsumersFinal));

this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);

if (connectOnInit) {
this.mcpSession.openSSE();
}
}

/**
Expand Down Expand Up @@ -302,6 +304,18 @@ public Mono<Void> closeGracefully() {
return this.mcpSession.closeGracefully();
}

// ---------------------------
// open an SSE stream
// ---------------------------
/**
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
* SSE stream, allowing the server to communicate to the client, without the client
* first sending data via HTTP POST.
*/
public void openSSE() {
this.mcpSession.openSSE();
}

// --------------------------
// Initialization
// --------------------------
Expand Down
37 changes: 32 additions & 5 deletions mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ class SyncSpec {

private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout

private boolean connectOnInit = true; // Default true, for backward compatibility

private Duration initializationTimeout = Duration.ofSeconds(20);

private ClientCapabilities capabilities;

private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
private Implementation clientInfo = new Implementation("Java SDK MCP Sync Client", "0.10.0");

private final Map<String, Root> roots = new HashMap<>();

Expand Down Expand Up @@ -195,6 +197,17 @@ public SyncSpec requestTimeout(Duration requestTimeout) {
return this;
}

/**
* Sets whether to connect to the server during the initialization phase (open an
* SSE stream).
* @param connectOnInit true to open an SSE stream during the initialization
* @return This builder instance for method chaining
*/
public SyncSpec withConnectOnInit(final boolean connectOnInit) {
this.connectOnInit = connectOnInit;
return this;
}

/**
* @param initializationTimeout The duration to wait for the initialization
* lifecycle step to complete.
Expand Down Expand Up @@ -368,8 +381,8 @@ public McpSyncClient build() {

McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);

return new McpSyncClient(
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
asyncFeatures, this.connectOnInit));
}

}
Expand All @@ -396,11 +409,13 @@ class AsyncSpec {

private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout

private boolean connectOnInit = true; // Default true, for backward compatibility

private Duration initializationTimeout = Duration.ofSeconds(20);

private ClientCapabilities capabilities;

private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
private Implementation clientInfo = new Implementation("Java SDK MCP Async Client", "0.10.0");

private final Map<String, Root> roots = new HashMap<>();

Expand Down Expand Up @@ -434,6 +449,17 @@ public AsyncSpec requestTimeout(Duration requestTimeout) {
return this;
}

/**
* Sets whether to connect to the server during the initialization phase (open an
* SSE stream).
* @param connectOnInit true to open an SSE stream during the initialization
* @return This builder instance for method chaining
*/
public AsyncSpec withConnectOnInit(final boolean connectOnInit) {
this.connectOnInit = connectOnInit;
return this;
}

/**
* @param initializationTimeout The duration to wait for the initialization
* lifecycle step to complete.
Expand Down Expand Up @@ -606,7 +632,8 @@ public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler));
this.loggingConsumers, this.samplingHandler),
this.connectOnInit);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ public boolean closeGracefully() {
return true;
}

// ---------------------------
// open an SSE stream
// ---------------------------
/**
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
* SSE stream, allowing the server to communicate to the client, without the client
* first sending data via HTTP POST.
*/
public void openSSE() {
this.delegate.openSSE();
}

/**
* The initialization phase MUST be the first interaction between client and server.
* During this phase, the client and server:
Expand All @@ -148,9 +160,7 @@ public boolean closeGracefully() {
* The server MUST respond with its own capabilities and information:
* {@link McpSchema.ServerCapabilities}. <br/>
* After successful initialization, the client MUST send an initialized notification
* to indicate it is ready to begin normal operations.
*
* <br/>
* to indicate it is ready to begin normal operations. <br/>
*
* <a href=
* "https://github.com/modelcontextprotocol/specification/blob/main/docs/specification/basic/lifecycle.md#initialization">Initialization
Expand Down Expand Up @@ -272,9 +282,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest r

/**
* Resource templates allow servers to expose parameterized resources using URI
* templates. Arguments may be auto-completed through the completion API.
*
* Request a list of resource templates the server has.
* templates. Arguments may be auto-completed through the completion API. Request a
* list of resource templates the server has.
* @param cursor the cursor
* @return the list of resource templates result.
*/
Expand All @@ -293,9 +302,7 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
/**
* Subscriptions. The protocol supports optional subscriptions to resource changes.
* Clients can subscribe to specific resources and receive notifications when they
* change.
*
* Send a resources/subscribe request.
* change. Send a resources/subscribe request.
* @param subscribeRequest the subscribe request contains the uri of the resource to
* subscribe to.
*/
Expand Down
Loading