Skip to content

Commit 38c48ef

Browse files
authored
fix: handle concurrent sessions without dropping session connections (#36)
* fix: handle concurrent sessions without dropping session connections Signed-off-by: JM Huibonhoa <[email protected]> * style: use existing variables Signed-off-by: JM Huibonhoa <[email protected]> --------- Signed-off-by: JM Huibonhoa <[email protected]>
1 parent b6bd74c commit 38c48ef

File tree

1 file changed

+79
-47
lines changed

1 file changed

+79
-47
lines changed

mcp/src/index.ts

Lines changed: 79 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// src/index.ts
33
import 'dotenv/config'; // Load .env file
44
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
5+
import { ListToolsRequestSchema, CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";
56
import { AzureOpenAI } from "openai";
67
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
78
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
@@ -215,6 +216,43 @@ const server = new McpServer({
215216
capabilities: {},
216217
});
217218

219+
// --- Define the MCP Tool Logic ---
220+
const queryDocumentationToolHandler = async ({ queryText, productName, version, limit }: { queryText: string; productName: string; version?: string; limit: number }) => {
221+
console.error(`Received query: text="${queryText}", product="${productName}", version="${version || 'any'}", limit=${limit}`);
222+
223+
try {
224+
const results = await queryDocumentation(queryText, productName, version, limit);
225+
226+
if (results.length === 0) {
227+
return {
228+
content: [{ type: "text" as const, text: `No relevant documentation found for "${queryText}" in product "${productName}" ${version ? `(version ${version})` : ''}.` }],
229+
};
230+
}
231+
232+
const formattedResults = results.map((r, index) =>
233+
[
234+
`Result ${index + 1}:`,
235+
` Content: ${r.content}`,
236+
` Distance: ${r.distance.toFixed(4)}`,
237+
r.url ? ` URL: ${r.url}` : null,
238+
"---"
239+
].filter(line => line !== null).join("\n")
240+
).join("\n");
241+
242+
const responseText = `Found ${results.length} relevant documentation snippets for "${queryText}" in product "${productName}" ${version ? `(version ${version})` : ''}:\n\n${formattedResults}`;
243+
console.error(`Handler finished processing. Payload size (approx): ${responseText.length} chars. Returning response object...`);
244+
245+
return {
246+
content: [{ type: "text" as const, text: responseText }],
247+
};
248+
} catch (error: any) {
249+
console.error("Error processing 'query_documentation' tool:", error);
250+
return {
251+
content: [{ type: "text" as const, text: `Error querying documentation: ${error.message}` }],
252+
};
253+
}
254+
};
255+
218256
// --- Define the MCP Tool ---
219257
server.tool(
220258
"query_documentation",
@@ -225,41 +263,7 @@ server.tool(
225263
version: z.string().optional().describe("The specific version of the product documentation (e.g., '1.2.0'). Optional."),
226264
limit: z.number().int().positive().optional().default(4).describe("Maximum number of results to return. Defaults to 4."),
227265
},
228-
async ({ queryText, productName, version, limit }: { queryText: string; productName: string; version?: string; limit: number }) => {
229-
console.error(`Received query: text="${queryText}", product="${productName}", version="${version || 'any'}", limit=${limit}`);
230-
231-
try {
232-
const results = await queryDocumentation(queryText, productName, version, limit);
233-
234-
if (results.length === 0) {
235-
return {
236-
content: [{ type: "text", text: `No relevant documentation found for "${queryText}" in product "${productName}" ${version ? `(version ${version})` : ''}.` }],
237-
};
238-
}
239-
240-
const formattedResults = results.map((r, index) =>
241-
[
242-
`Result ${index + 1}:`,
243-
` Content: ${r.content}`,
244-
` Distance: ${r.distance.toFixed(4)}`,
245-
r.url ? ` URL: ${r.url}` : null,
246-
"---"
247-
].filter(line => line !== null).join("\n")
248-
).join("\n");
249-
250-
const responseText = `Found ${results.length} relevant documentation snippets for "${queryText}" in product "${productName}" ${version ? `(version ${version})` : ''}:\n\n${formattedResults}`;
251-
console.error(`Handler finished processing. Payload size (approx): ${responseText.length} chars. Returning response object...`);
252-
253-
return {
254-
content: [{ type: "text", text: responseText }],
255-
};
256-
} catch (error: any) {
257-
console.error("Error processing 'query_documentation' tool:", error);
258-
return {
259-
content: [{ type: "text", text: `Error querying documentation: ${error.message}` }],
260-
};
261-
}
262-
}
266+
queryDocumentationToolHandler
263267
);
264268

265269
// --- Transport Setup ---
@@ -400,6 +404,7 @@ async function main() {
400404
const app = express();
401405

402406
const transports: Map<string, StreamableHTTPServerTransport> = new Map<string, StreamableHTTPServerTransport>();
407+
const servers: Map<string, McpServer> = new Map<string, McpServer>();
403408

404409
// Handle POST requests for MCP initialization and method calls
405410
app.post('/mcp', async (req: Request, res: Response) => {
@@ -413,27 +418,52 @@ async function main() {
413418
// Reuse existing transport
414419
transport = transports.get(sessionId)!;
415420
} else if (!sessionId) {
416-
// New initialization request
421+
// New initialization request - create a new server instance for this session
422+
const sessionServer = new McpServer({
423+
name: serverName,
424+
version: serverVersion,
425+
}, {
426+
capabilities: {
427+
tools: {},
428+
},
429+
});
430+
431+
// Add the query_documentation tool to this server instance using the shared handler
432+
sessionServer.tool(
433+
"query_documentation",
434+
"Query documentation stored in a sqlite-vec database using vector search.",
435+
{
436+
queryText: z.string().min(1).describe("The natural language query to search for."),
437+
productName: z.string().min(1).describe("The name of the product documentation database to search within (e.g., 'my-product'). Corresponds to the DB filename without .db."),
438+
version: z.string().optional().describe("The specific version of the product documentation (e.g., '1.2.0'). Optional."),
439+
limit: z.number().int().positive().optional().default(4).describe("Maximum number of results to return. Defaults to 4."),
440+
},
441+
queryDocumentationToolHandler
442+
);
443+
417444
transport = new StreamableHTTPServerTransport({
418445
sessionIdGenerator: () => randomUUID(),
419446
onsessioninitialized: (sessionId: string) => {
420-
// Store the transport by session ID when session is initialized
447+
// Store the transport and server by session ID when session is initialized
421448
console.error(`Session initialized with ID: ${sessionId}`);
422449
transports.set(sessionId, transport);
450+
servers.set(sessionId, sessionServer);
423451
}
424452
});
425453

426-
// Set up onclose handler to clean up transport when closed
454+
// Set up onclose handler to clean up transport and server when closed
427455
transport.onclose = async () => {
428456
const sid = transport.sessionId;
429457
if (sid && transports.has(sid)) {
430-
console.error(`Transport closed for session ${sid}, removing from transports map`);
458+
console.error(`Transport closed for session ${sid}, removing from transports and servers map`);
431459
transports.delete(sid);
460+
servers.delete(sid);
432461
}
433462
};
434463

435-
// Connect the transport to the MCP server BEFORE handling the request
436-
await server.connect(transport);
464+
// Connect the transport to the session-specific MCP s
465+
// erver BEFORE handling the request
466+
await sessionServer.connect(transport);
437467

438468
await transport.handleRequest(req, res);
439469
return; // Already handled
@@ -549,12 +579,12 @@ async function main() {
549579

550580
// Handle server shutdown with proper SIGTERM/SIGINT support and timeout
551581
const shutdownHandler = createGracefulShutdownHandler(async () => {
552-
console.error('Closing HTTP transports...');
582+
console.error('Closing HTTP transports and servers...');
553583

554-
// Close all active transports with individual timeouts
584+
// Close all active transports and servers with individual timeouts
555585
const transportClosePromises = Array.from(transports.entries()).map(async ([sessionId, transport]) => {
556586
try {
557-
console.error(`Closing transport for session ${sessionId}`);
587+
console.error(`Closing transport and server for session ${sessionId}`);
558588

559589
// Add timeout to individual transport close operations
560590
const closeTimeout = new Promise<void>((_, reject) => {
@@ -567,17 +597,19 @@ async function main() {
567597
]);
568598

569599
transports.delete(sessionId);
570-
console.error(`Transport closed for session ${sessionId}`);
600+
servers.delete(sessionId);
601+
console.error(`Transport and server closed for session ${sessionId}`);
571602
} catch (error) {
572603
console.error(`Error closing transport for session ${sessionId}:`, error);
573-
// Still remove from map even if close failed
604+
// Still remove from maps even if close failed
574605
transports.delete(sessionId);
606+
servers.delete(sessionId);
575607
}
576608
});
577609

578610
// Wait for all transports to close, but with overall timeout handled by outer function
579611
await Promise.allSettled(transportClosePromises);
580-
console.error('All transports cleanup completed');
612+
console.error('All transports and servers cleanup completed');
581613
});
582614

583615
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));

0 commit comments

Comments
 (0)