Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion packages/sdk/src/__tests__/ids.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { describe, expect, it } from "vitest";
import { generateEventId, generateSessionId, generateTraceId } from "../core/ids.js";
import {
deriveSessionId,
generateEventId,
generateSessionId,
generateTraceId,
} from "../core/ids.js";

describe("ID generation", () => {
describe("generateSessionId", () => {
Expand All @@ -17,6 +22,35 @@ describe("ID generation", () => {
});
});

describe("deriveSessionId", () => {
it("starts with ses_ prefix", () => {
expect(deriveSessionId("mcp-session-123")).toMatch(/^ses_/);
});

it("has correct length (ses_ + 21 chars)", () => {
expect(deriveSessionId("mcp-session-123")).toHaveLength(4 + 21);
});

it("is deterministic — same input produces same output", () => {
const id1 = deriveSessionId("mcp-session-abc");
const id2 = deriveSessionId("mcp-session-abc");
expect(id1).toBe(id2);
});

it("produces different IDs for different inputs", () => {
const id1 = deriveSessionId("mcp-session-alpha");
const id2 = deriveSessionId("mcp-session-beta");
expect(id1).not.toBe(id2);
});

it("uses only URL-safe characters", () => {
// base64url alphabet: A-Z, a-z, 0-9, -, _
const id = deriveSessionId("test-input");
const suffix = id.slice(4); // strip ses_ prefix
expect(suffix).toMatch(/^[A-Za-z0-9_-]+$/);
});
});

describe("generateTraceId", () => {
it("starts with tr_ prefix", () => {
expect(generateTraceId()).toMatch(/^tr_/);
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/__tests__/integration/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
import type { YavioConfig } from "../../core/types.js";
import { createYavioContext } from "../../server/context.js";
import { _resetSessionMap, createProxy } from "../../server/proxy.js";
import { _resetGlobalState, createProxy } from "../../server/proxy.js";
import { HttpTransport } from "../../transport/http.js";

interface ReceivedBatch {
Expand Down Expand Up @@ -68,7 +68,7 @@ describe("End-to-end: proxy → tool call → HTTP transport → mock ingest", (
});

beforeEach(() => {
_resetSessionMap();
_resetGlobalState();
});

it("sends tool_call events to the ingest API on tool invocation", async () => {
Expand Down
57 changes: 28 additions & 29 deletions packages/sdk/src/__tests__/proxy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { BaseEvent } from "@yavio/shared/events";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { z } from "zod";
import type { CaptureConfig, YavioConfig } from "../core/types.js";
import { _resetSessionMap, createProxy } from "../server/proxy.js";
import { _resetGlobalState, createProxy } from "../server/proxy.js";
import { mintWidgetToken } from "../server/token.js";
import type { Transport } from "../transport/types.js";

Expand Down Expand Up @@ -311,7 +311,7 @@ describe("createProxy — tool_discovery emission", () => {
beforeEach(() => {
mockedMint.mockReset();
mockedMint.mockResolvedValue(null);
_resetSessionMap();
_resetGlobalState();
});

it("emits tool_discovery event when tool() is called", () => {
Expand Down Expand Up @@ -435,7 +435,7 @@ describe("createProxy — session reuse", () => {
beforeEach(() => {
mockedMint.mockReset();
mockedMint.mockResolvedValue(null);
_resetSessionMap();
_resetGlobalState();
});

function makeExtra(overrides?: Record<string, unknown>) {
Expand Down Expand Up @@ -682,33 +682,32 @@ describe("createProxy — session reuse", () => {
expect(firstSessionId).not.toBe(secondSessionId);
});

it("caps session map size at MAX_SESSION_MAP_SIZE", async () => {
it("produces stable session IDs across independent proxy instances", async () => {
const yavioTransport = createMockTransport();

// Create 1001 connections with unique MCP session IDs via extra.sessionId
for (let i = 0; i < 1001; i++) {
const { server, proxy } = createServerAndProxy(yavioTransport);
const mcpTransport = { start: vi.fn(), close: vi.fn(), send: vi.fn() };
await proxy.connect(mcpTransport as never);
const tool = getRegisteredTool(server, "tool_a");
await tool?.handler(makeExtra({ sessionId: `mcp-session-${i}`, requestId: `req-cap-${i}` }));
}

// Capture the original Yavio session ID for mcp-session-0
// Events: [connection_0, tool_call_0, connection_1, tool_call_1, ...]
// tool_call_0 is at index 1
const originalSessionId = (yavioTransport.sent[1] as unknown as BaseEvent[])[0].session_id;

// Reconnect with the very first MCP session ID — it should have been evicted
const { server, proxy } = createServerAndProxy(yavioTransport);
const mcpTransport = { start: vi.fn(), close: vi.fn(), send: vi.fn() };
await proxy.connect(mcpTransport as never);
const tool = getRegisteredTool(server, "tool_a");
await tool?.handler(makeExtra({ sessionId: "mcp-session-0", requestId: "req-cap-final" }));

const finalSessionId = (yavioTransport.sent.at(-1) as unknown as BaseEvent[])[0].session_id;

// mcp-session-0 was evicted, so a new Yavio session was created
expect(finalSessionId).not.toBe(originalSessionId);
// Two completely independent proxy instances (simulating different server instances)
// with the same MCP session key should produce the same Yavio session ID.
const { server: server1, proxy: proxy1 } = createServerAndProxy(yavioTransport);
const mcpTransport1 = { start: vi.fn(), close: vi.fn(), send: vi.fn() };
await proxy1.connect(mcpTransport1 as never);
const tool1 = getRegisteredTool(server1, "tool_a");
await tool1?.handler(makeExtra({ sessionId: "mcp-session-stable" }));

const firstSessionId = (yavioTransport.sent.at(-1) as unknown as BaseEvent[])[0].session_id;

// Reset global state to simulate a completely separate process
_resetGlobalState();

const { server: server2, proxy: proxy2 } = createServerAndProxy(yavioTransport);
const mcpTransport2 = { start: vi.fn(), close: vi.fn(), send: vi.fn() };
await proxy2.connect(mcpTransport2 as never);
const tool2 = getRegisteredTool(server2, "tool_a");
await tool2?.handler(makeExtra({ sessionId: "mcp-session-stable", requestId: "req-2" }));

const secondSessionId = (yavioTransport.sent.at(-1) as unknown as BaseEvent[])[0].session_id;

// Deterministic derivation — no shared state needed
expect(firstSessionId).toBe(secondSessionId);
expect(firstSessionId).toMatch(/^ses_/);
});
});
15 changes: 15 additions & 0 deletions packages/sdk/src/core/ids.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import { createHash } from "node:crypto";
import { nanoid } from "nanoid";

export function generateSessionId(): string {
return `ses_${nanoid(21)}`;
}

/**
* Derive a deterministic Yavio session ID from an external session key
* (MCP session ID, OpenAI session, or transport session ID).
*
* Uses SHA-256 hash truncated to 21 base64url chars, matching the format
* of randomly generated session IDs. This ensures the same external key
* produces the same Yavio session ID across all server instances — no
* shared state needed for cross-instance session correlation.
*/
export function deriveSessionId(externalId: string): string {
const hash = createHash("sha256").update(externalId).digest("base64url");
return `ses_${hash.substring(0, 21)}`;
}

export function generateTraceId(): string {
return `tr_${nanoid(21)}`;
}
Expand Down
67 changes: 22 additions & 45 deletions packages/sdk/src/server/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
buildToolCallEvent,
buildToolDiscoveryEvent,
} from "../core/events.js";
import { generateSessionId, generateTraceId } from "../core/ids.js";
import { deriveSessionId, generateSessionId, generateTraceId } from "../core/ids.js";
import { detectPlatform } from "../core/platform.js";
import type { SessionState, YavioConfig } from "../core/types.js";
import type { Transport } from "../transport/types.js";
Expand Down Expand Up @@ -56,7 +56,7 @@ async function getWidgetToken(
function wrapToolCallback(
originalCb: (...cbArgs: unknown[]) => unknown,
toolName: string,
getSession: (mcpSessionId?: string) => SessionState,
resolveSession: (sessionKey?: string) => SessionState,
server: McpServer,
config: YavioConfig,
transport: Transport,
Expand Down Expand Up @@ -85,7 +85,7 @@ function wrapToolCallback(
: typeof openaiSessionId === "string"
? openaiSessionId
: undefined;
const session = getSession(sessionKey);
const session = resolveSession(sessionKey);

// Emit deferred connection event on the first tool call for this session.
// Deferred from connect() so that OpenAI's per-tool-call reconnects don't
Expand Down Expand Up @@ -208,31 +208,14 @@ function wrapToolCallback(
};
}

/** Maximum number of MCP sessions to cache for session reuse. */
const MAX_SESSION_MAP_SIZE = 1000;

/**
* Module-level session map: MCP session ID → Yavio SessionState.
*
* Shared across all proxy instances so that the common pattern of creating a
* new McpServer + withYavio() per HTTP connection still correlates tool calls
* belonging to the same MCP session.
*
* Correlation uses extra.sessionId (from RequestHandlerExtra, set via the
* Mcp-Session-Id header) as the primary key, with transport.sessionId as
* fallback. Both are checked lazily at tool-call time via getSession().
*/
const globalSessionMap = new Map<string, SessionState>();

/** Tracks session IDs that have already emitted a connection event. */
const emittedConnections = new Set<string>();

/** Tracks tool names that have already emitted a tool_discovery event (global dedup). */
const emittedToolDiscoveries = new Set<string>();

/** @internal Reset the global session map — exposed for testing only. */
export function _resetSessionMap(): void {
globalSessionMap.clear();
/** @internal Reset global dedup state — exposed for testing only. */
export function _resetGlobalState(): void {
emittedConnections.clear();
emittedToolDiscoveries.clear();
}
Expand Down Expand Up @@ -262,30 +245,22 @@ export function createProxy<T extends McpServer>(
// Reference to the current MCP transport for lazy sessionId lookup
let currentMcpTransport: Record<string, unknown> | null = null;

const getSession = (extraSessionId?: string) => {
// Prefer extra.sessionId (from MCP request context), fall back to transport.sessionId
const mcpSessionId =
extraSessionId ??
/**
* Resolve the session for the current tool call.
*
* If a session key is available (MCP session ID, OpenAI session, or transport
* session ID), derives a deterministic Yavio session ID from it so that any
* server instance processing the same key produces the same ID.
*/
const resolveSession = (sessionKey?: string) => {
const key =
sessionKey ??
(currentMcpTransport && typeof currentMcpTransport.sessionId === "string"
? (currentMcpTransport.sessionId as string)
: undefined);

if (mcpSessionId) {
const existing = globalSessionMap.get(mcpSessionId);
if (existing && existing !== session) {
// Reuse the existing session — preserves userId, userTraits, stepSequence
session = existing;
} else if (!existing) {
// First time seeing this MCP session — register current session
if (globalSessionMap.size >= MAX_SESSION_MAP_SIZE) {
// Evict oldest entry (first key in insertion order)
const firstKey = globalSessionMap.keys().next().value as string;
const evicted = globalSessionMap.get(firstKey);
if (evicted) emittedConnections.delete(evicted.sessionId);
globalSessionMap.delete(firstKey);
}
globalSessionMap.set(mcpSessionId, session);
}
if (key) {
session.sessionId = deriveSessionId(key);
}
return session;
};
Expand Down Expand Up @@ -327,7 +302,7 @@ export function createProxy<T extends McpServer>(
args[cbIndex] = wrapToolCallback(
originalCb,
toolName,
getSession,
resolveSession,
server,
config,
transport,
Expand Down Expand Up @@ -376,7 +351,7 @@ export function createProxy<T extends McpServer>(
args[2] = wrapToolCallback(
originalCb,
toolName,
getSession,
resolveSession,
server,
config,
transport,
Expand Down Expand Up @@ -419,7 +394,9 @@ export function createProxy<T extends McpServer>(
: undefined;

session = {
sessionId: sessionIdFromTransport ?? generateSessionId(),
sessionId: sessionIdFromTransport
? deriveSessionId(sessionIdFromTransport)
: generateSessionId(),
userId: null,
userTraits: {},
platform: detectPlatform({
Expand Down
Loading