From e5167cbe2004e12cedbcaa3d2ba444f13ef80273 Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:20:28 -0400 Subject: [PATCH 1/7] Update streamableHttp custom fetch test --- src/client/sse.test.ts | 30 ++++++++++++++++++++++++++++++ src/client/sse.ts | 13 +++++++++++-- src/client/streamableHttp.test.ts | 29 +++++++++++++++++++++++++++++ src/client/streamableHttp.ts | 15 ++++++++++++--- 4 files changed, 82 insertions(+), 5 deletions(-) diff --git a/src/client/sse.test.ts b/src/client/sse.test.ts index 3cb4e8a3..c446b9e9 100644 --- a/src/client/sse.test.ts +++ b/src/client/sse.test.ts @@ -262,6 +262,36 @@ describe("SSEClientTransport", () => { expect(lastServerRequest.headers.authorization).toBe(authToken); }); + it("uses custom fetch implementation from options", async () => { + const authToken = "Bearer custom-token"; + + const fetchWithAuth = jest.fn((url: string | URL, init?: RequestInit) => { + const headers = new Headers(init?.headers); + headers.set("Authorization", authToken); + return fetch(url.toString(), { ...init, headers }); + }); + + transport = new SSEClientTransport(resourceBaseUrl, { + fetch: fetchWithAuth, + }); + + await transport.start(); + + expect(lastServerRequest.headers.authorization).toBe(authToken); + + // Send a message to verify fetchWithAuth used for POST as well + const message: JSONRPCMessage = { + jsonrpc: "2.0", + id: "1", + method: "test", + params: {}, + }; + + await transport.send(message); + + expect(fetchWithAuth).toHaveBeenCalled(); + }); + it("passes custom headers to fetch requests", async () => { const customHeaders = { Authorization: "Bearer test-token", diff --git a/src/client/sse.ts b/src/client/sse.ts index 2546d508..5ca724b1 100644 --- a/src/client/sse.ts +++ b/src/client/sse.ts @@ -3,6 +3,8 @@ import { Transport } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, extractResourceMetadataUrl, OAuthClientProvider, UnauthorizedError } from "./auth.js"; +export type FetchLike = (url: string | URL, init?: RequestInit) => Promise; + export class SseError extends Error { constructor( public readonly code: number | undefined, @@ -47,6 +49,11 @@ export type SSEClientTransportOptions = { * Customizes recurring POST requests to the server. */ requestInit?: RequestInit; + + /** + * Custom fetch implementation used for all network requests. + */ + fetch?: FetchLike; }; /** @@ -62,6 +69,7 @@ export class SSEClientTransport implements Transport { private _eventSourceInit?: EventSourceInit; private _requestInit?: RequestInit; private _authProvider?: OAuthClientProvider; + private _fetch?: FetchLike; private _protocolVersion?: string; onclose?: () => void; @@ -77,6 +85,7 @@ export class SSEClientTransport implements Transport { this._eventSourceInit = opts?.eventSourceInit; this._requestInit = opts?.requestInit; this._authProvider = opts?.authProvider; + this._fetch = opts?.fetch; } private async _authThenStart(): Promise { @@ -117,7 +126,7 @@ export class SSEClientTransport implements Transport { } private _startOrAuth(): Promise { - const fetchImpl = (this?._eventSourceInit?.fetch || fetch) as typeof fetch + const fetchImpl = (this?._eventSourceInit?.fetch || this._fetch || fetch) as typeof fetch return new Promise((resolve, reject) => { this._eventSource = new EventSource( this._url.href, @@ -242,7 +251,7 @@ export class SSEClientTransport implements Transport { signal: this._abortController?.signal, }; - const response = await fetch(this._endpoint, init); + const response = await (this._fetch ?? fetch)(this._endpoint, init); if (!response.ok) { if (response.status === 401 && this._authProvider) { diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 11dfe7d4..be70b8b4 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -443,6 +443,35 @@ describe("StreamableHTTPClientTransport", () => { expect(errorSpy).toHaveBeenCalled(); }); + it("uses custom fetch implementation", async () => { + const authToken = "Bearer custom-token"; + + const fetchWithAuth = jest.fn((url: string | URL, init?: RequestInit) => { + const headers = new Headers(init?.headers); + headers.set("Authorization", authToken); + return (global.fetch as jest.Mock)(url, { ...init, headers }); + }); + + (global.fetch as jest.Mock) + .mockResolvedValueOnce( + new Response(null, { status: 200, headers: { "content-type": "text/event-stream" } }) + ) + .mockResolvedValueOnce(new Response(null, { status: 202 })); + + transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), { fetch: fetchWithAuth }); + + await transport.start(); + await (transport as unknown as { _startOrAuthSse: (opts: any) => Promise })._startOrAuthSse({}); + + await transport.send({ jsonrpc: "2.0", method: "test", params: {}, id: "1" } as JSONRPCMessage); + + expect(fetchWithAuth).toHaveBeenCalled(); + for (const call of (global.fetch as jest.Mock).mock.calls) { + const headers = call[1].headers as Headers; + expect(headers.get("Authorization")).toBe(authToken); + } + }); + it("should always send specified custom headers", async () => { const requestInit = { diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 73078442..b73f67b9 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -3,6 +3,8 @@ import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPC import { auth, AuthResult, extractResourceMetadataUrl, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; +export type FetchLike = (url: string | URL, init?: RequestInit) => Promise; + // Default reconnection options for StreamableHTTP connections const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOptions = { initialReconnectionDelay: 1000, @@ -99,6 +101,11 @@ export type StreamableHTTPClientTransportOptions = { */ requestInit?: RequestInit; + /** + * Custom fetch implementation used for all network requests. + */ + fetch?: FetchLike; + /** * Options to configure the reconnection behavior. */ @@ -122,6 +129,7 @@ export class StreamableHTTPClientTransport implements Transport { private _resourceMetadataUrl?: URL; private _requestInit?: RequestInit; private _authProvider?: OAuthClientProvider; + private _fetch?: FetchLike; private _sessionId?: string; private _reconnectionOptions: StreamableHTTPReconnectionOptions; private _protocolVersion?: string; @@ -138,6 +146,7 @@ export class StreamableHTTPClientTransport implements Transport { this._resourceMetadataUrl = undefined; this._requestInit = opts?.requestInit; this._authProvider = opts?.authProvider; + this._fetch = opts?.fetch; this._sessionId = opts?.sessionId; this._reconnectionOptions = opts?.reconnectionOptions ?? DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS; } @@ -200,7 +209,7 @@ export class StreamableHTTPClientTransport implements Transport { headers.set("last-event-id", resumptionToken); } - const response = await fetch(this._url, { + const response = await (this._fetch ?? fetch)(this._url, { method: "GET", headers, signal: this._abortController?.signal, @@ -414,7 +423,7 @@ export class StreamableHTTPClientTransport implements Transport { signal: this._abortController?.signal, }; - const response = await fetch(this._url, init); + const response = await (this._fetch ?? fetch)(this._url, init); // Handle session ID received during initialization const sessionId = response.headers.get("mcp-session-id"); @@ -520,7 +529,7 @@ export class StreamableHTTPClientTransport implements Transport { signal: this._abortController?.signal, }; - const response = await fetch(this._url, init); + const response = await (this._fetch ?? fetch)(this._url, init); // We specifically handle 405 as a valid response according to the spec, // meaning the server does not support explicit session termination From 3300a43742baebc60f4b715628f49b298e33af69 Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:33:31 -0400 Subject: [PATCH 2/7] Update src/client/sse.ts nullish coalescing operator ?? instead of the logical OR operator || to handle the case where this?._eventSourceInit?.fetch or this._fetch might be null or undefined but not falsy Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/client/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/sse.ts b/src/client/sse.ts index 5ca724b1..808aa24b 100644 --- a/src/client/sse.ts +++ b/src/client/sse.ts @@ -126,7 +126,7 @@ export class SSEClientTransport implements Transport { } private _startOrAuth(): Promise { - const fetchImpl = (this?._eventSourceInit?.fetch || this._fetch || fetch) as typeof fetch +const fetchImpl = (this?._eventSourceInit?.fetch ?? this._fetch ?? fetch) as typeof fetch return new Promise((resolve, reject) => { this._eventSource = new EventSource( this._url.href, From a3c4e28138753696498647e9a8ed51b7b9e0045f Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:33:39 -0400 Subject: [PATCH 3/7] Update src/client/streamableHttp.ts nullish coalescing operator ?? instead of the logical OR operator || to handle the case where this?._eventSourceInit?.fetch or this._fetch might be null or undefined but not falsy Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/client/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index b73f67b9..af884b26 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -209,7 +209,7 @@ export class StreamableHTTPClientTransport implements Transport { headers.set("last-event-id", resumptionToken); } - const response = await (this._fetch ?? fetch)(this._url, { +const response = await (this._fetch ?? fetch)(this._url, { method: "GET", headers, signal: this._abortController?.signal, From 8e02df150e88098cf5bd63b82024933ac8ccd98f Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:33:48 -0400 Subject: [PATCH 4/7] Update src/client/sse.ts nullish coalescing operator ?? instead of the logical OR operator || to handle the case where this?._eventSourceInit?.fetch or this._fetch might be null or undefined but not falsy Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/client/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/sse.ts b/src/client/sse.ts index 808aa24b..af04f274 100644 --- a/src/client/sse.ts +++ b/src/client/sse.ts @@ -251,7 +251,7 @@ const fetchImpl = (this?._eventSourceInit?.fetch ?? this._fetch ?? fetch) as typ signal: this._abortController?.signal, }; - const response = await (this._fetch ?? fetch)(this._endpoint, init); +const response = await (this._fetch ?? fetch)(this._endpoint, init); if (!response.ok) { if (response.status === 401 && this._authProvider) { From 7d8c0610283f5f1c6a7132a8ed50fc6c4eafd3e6 Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:33:56 -0400 Subject: [PATCH 5/7] Update src/client/streamableHttp.ts nullish coalescing operator ?? instead of the logical OR operator || to handle the case where this?._eventSourceInit?.fetch or this._fetch might be null or undefined but not falsy Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/client/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index af884b26..d28eae86 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -423,7 +423,7 @@ const response = await (this._fetch ?? fetch)(this._url, { signal: this._abortController?.signal, }; - const response = await (this._fetch ?? fetch)(this._url, init); +const response = await (this._fetch ?? fetch)(this._url, init); // Handle session ID received during initialization const sessionId = response.headers.get("mcp-session-id"); From 242c8248f73078c62883e49b1dcdbc0727e95319 Mon Sep 17 00:00:00 2001 From: Cliff Hall Date: Tue, 1 Jul 2025 13:34:04 -0400 Subject: [PATCH 6/7] Update src/client/streamableHttp.ts nullish coalescing operator ?? instead of the logical OR operator || to handle the case where this?._eventSourceInit?.fetch or this._fetch might be null or undefined but not falsy Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- src/client/streamableHttp.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index d28eae86..ceaecd83 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -529,7 +529,7 @@ const response = await (this._fetch ?? fetch)(this._url, init); signal: this._abortController?.signal, }; - const response = await (this._fetch ?? fetch)(this._url, init); +const response = await (this._fetch ?? fetch)(this._url, init); // We specifically handle 405 as a valid response according to the spec, // meaning the server does not support explicit session termination From f383a9cb8cbcebd9057e0eb2ec1dc22b250c8970 Mon Sep 17 00:00:00 2001 From: cliffhall Date: Tue, 1 Jul 2025 14:00:56 -0400 Subject: [PATCH 7/7] * In sse.test.ts - More specific expectations in test "uses custom fetch implementation from options" * In sse.ts - Import FetchLike from transport.ts * In steramableHttp.ts - Import FetchLike from transport.ts - Export StartSSEOptions interface for testing * In streamableHttp.test.ts - import StartSSEOptions from streamableHttp.ts - use StartSSEOptions instead of any in test "uses custom fetch implementation" * In transport.ts - Add FetchLike function type --- src/client/sse.test.ts | 4 +++- src/client/sse.ts | 4 +--- src/client/streamableHttp.test.ts | 6 +++--- src/client/streamableHttp.ts | 12 +++++------- src/shared/transport.ts | 10 ++++++---- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/client/sse.test.ts b/src/client/sse.test.ts index c446b9e9..2d116344 100644 --- a/src/client/sse.test.ts +++ b/src/client/sse.test.ts @@ -289,7 +289,9 @@ describe("SSEClientTransport", () => { await transport.send(message); - expect(fetchWithAuth).toHaveBeenCalled(); + expect(fetchWithAuth).toHaveBeenCalledTimes(2); + expect(lastServerRequest.method).toBe("POST"); + expect(lastServerRequest.headers.authorization).toBe(authToken); }); it("passes custom headers to fetch requests", async () => { diff --git a/src/client/sse.ts b/src/client/sse.ts index af04f274..faffecc4 100644 --- a/src/client/sse.ts +++ b/src/client/sse.ts @@ -1,10 +1,8 @@ import { EventSource, type ErrorEvent, type EventSourceInit } from "eventsource"; -import { Transport } from "../shared/transport.js"; +import { Transport, FetchLike } from "../shared/transport.js"; import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, extractResourceMetadataUrl, OAuthClientProvider, UnauthorizedError } from "./auth.js"; -export type FetchLike = (url: string | URL, init?: RequestInit) => Promise; - export class SseError extends Error { constructor( public readonly code: number | undefined, diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index be70b8b4..dcd76528 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -1,4 +1,4 @@ -import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions } from "./streamableHttp.js"; +import { StreamableHTTPClientTransport, StreamableHTTPReconnectionOptions, StartSSEOptions } from "./streamableHttp.js"; import { OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { JSONRPCMessage } from "../types.js"; @@ -461,7 +461,7 @@ describe("StreamableHTTPClientTransport", () => { transport = new StreamableHTTPClientTransport(new URL("http://localhost:1234/mcp"), { fetch: fetchWithAuth }); await transport.start(); - await (transport as unknown as { _startOrAuthSse: (opts: any) => Promise })._startOrAuthSse({}); + await (transport as unknown as { _startOrAuthSse: (opts: StartSSEOptions) => Promise })._startOrAuthSse({}); await transport.send({ jsonrpc: "2.0", method: "test", params: {}, id: "1" } as JSONRPCMessage); @@ -559,7 +559,7 @@ describe("StreamableHTTPClientTransport", () => { // Second retry - should double (2^1 * 100 = 200) expect(getDelay(1)).toBe(200); - // Third retry - should double again (2^2 * 100 = 400) + // Third retry - should double again (2^2 * 100 = 400) expect(getDelay(2)).toBe(400); // Fourth retry - should double again (2^3 * 100 = 800) diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index ceaecd83..b81f1a5d 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -1,10 +1,8 @@ -import { Transport } from "../shared/transport.js"; +import { Transport, FetchLike } from "../shared/transport.js"; import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js"; import { auth, AuthResult, extractResourceMetadataUrl, OAuthClientProvider, UnauthorizedError } from "./auth.js"; import { EventSourceParserStream } from "eventsource-parser/stream"; -export type FetchLike = (url: string | URL, init?: RequestInit) => Promise; - // Default reconnection options for StreamableHTTP connections const DEFAULT_STREAMABLE_HTTP_RECONNECTION_OPTIONS: StreamableHTTPReconnectionOptions = { initialReconnectionDelay: 1000, @@ -25,7 +23,7 @@ export class StreamableHTTPError extends Error { /** * Options for starting or authenticating an SSE connection */ -interface StartSSEOptions { +export interface StartSSEOptions { /** * The resumption token used to continue long-running requests that were interrupted. * @@ -260,15 +258,15 @@ const response = await (this._fetch ?? fetch)(this._url, { private _normalizeHeaders(headers: HeadersInit | undefined): Record { if (!headers) return {}; - + if (headers instanceof Headers) { return Object.fromEntries(headers.entries()); } - + if (Array.isArray(headers)) { return Object.fromEntries(headers); } - + return { ...headers as Record }; } diff --git a/src/shared/transport.ts b/src/shared/transport.ts index 96b291fa..386b6bae 100644 --- a/src/shared/transport.ts +++ b/src/shared/transport.ts @@ -1,10 +1,12 @@ import { JSONRPCMessage, MessageExtraInfo, RequestId } from "../types.js"; +export type FetchLike = (url: string | URL, init?: RequestInit) => Promise; + /** * Options for sending a JSON-RPC message. */ export type TransportSendOptions = { - /** + /** * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. */ relatedRequestId?: RequestId; @@ -38,7 +40,7 @@ export interface Transport { /** * Sends a JSON-RPC message (request or response). - * + * * If present, `relatedRequestId` is used to indicate to the transport which incoming request to associate this outgoing message with. */ send(message: JSONRPCMessage, options?: TransportSendOptions): Promise; @@ -64,9 +66,9 @@ export interface Transport { /** * Callback for when a message (request or response) is received over the connection. - * + * * Includes the requestInfo and authInfo if the transport is authenticated. - * + * * The requestInfo can be used to get the original request information (headers, etc.) */ onmessage?: (message: JSONRPCMessage, extra?: MessageExtraInfo) => void;