Skip to content

Commit 41baaae

Browse files
add http streamable support mini-runtime
1 parent c7ddd86 commit 41baaae

File tree

2 files changed

+70
-6
lines changed

2 files changed

+70
-6
lines changed

apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.akto.hybrid_runtime;
22

33
import com.akto.dao.context.Context;
4+
import com.akto.data_actor.DataActor;
45
import com.akto.data_actor.DataActorFactory;
56
import com.akto.dto.APIConfig;
67
import com.akto.dto.AccountSettings;
@@ -32,6 +33,7 @@
3233
import com.akto.testing.ApiExecutor;
3334
import com.akto.util.Constants;
3435
import com.akto.util.JSONUtils;
36+
import com.akto.util.McpSseEndpointHelper;
3537
import com.akto.util.Pair;
3638
import com.fasterxml.jackson.databind.ObjectMapper;
3739
import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -57,11 +59,17 @@ public class McpToolsSyncJobExecutor {
5759

5860
private static final LoggerMaker logger = new LoggerMaker(McpToolsSyncJobExecutor.class, LogDb.RUNTIME);
5961
private static final ObjectMapper mapper = new ObjectMapper();
62+
public static final DataActor dataActor = DataActorFactory.fetchInstance();
6063
public static final String MCP_TOOLS_LIST_REQUEST_JSON =
6164
"{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"" + McpSchema.METHOD_TOOLS_LIST + "\", \"params\": {}}";
6265
public static final String MCP_RESOURCE_LIST_REQUEST_JSON =
6366
"{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"" + McpSchema.METHOD_RESOURCES_LIST + "\", \"params\": {}}";
6467
public static final String LOCAL_IP = "127.0.0.1";
68+
69+
// MCP Transport types
70+
private static final String TRANSPORT_SSE = "SSE";
71+
private static final String TRANSPORT_HTTP = "HTTP";
72+
6573
private ServerCapabilities mcpServerCapabilities = null;
6674

6775
public static final McpToolsSyncJobExecutor INSTANCE = new McpToolsSyncJobExecutor();
@@ -310,7 +318,7 @@ public static Map<String, Object> generateExampleArguments(JsonSchema inputSchem
310318
public Pair<JSONRPCResponse, HttpResponseParams> getMcpMethodResponse(String host, String mcpMethod,
311319
String mcpMethodRequestJson, ApiCollection apiCollection) throws Exception {
312320
OriginalHttpRequest mcpRequest = createRequest(host, mcpMethod, mcpMethodRequestJson);
313-
String jsonrpcResponse = sendRequest(mcpRequest);
321+
String jsonrpcResponse = sendRequest(mcpRequest, apiCollection);
314322

315323
JSONRPCResponse rpcResponse = (JSONRPCResponse) McpSchema.deserializeJsonRpcMessage(mapper, jsonrpcResponse);
316324

@@ -347,13 +355,61 @@ private String buildHeaders(String host) {
347355
return "{\"Content-Type\":\"application/json\",\"Accept\":\"*/*\",\"host\":\"" + host + "\"}";
348356
}
349357

350-
private String sendRequest(OriginalHttpRequest request) throws Exception {
358+
359+
private String detectAndSetTransportType(OriginalHttpRequest request, ApiCollection apiCollection) throws Exception {
360+
// Try SSE first if sseCallbackUrl is set
361+
if (apiCollection.getSseCallbackUrl() != null && !apiCollection.getSseCallbackUrl().isEmpty()) {
362+
try {
363+
logger.info("Attempting to detect transport type for MCP server: {}", apiCollection.getHostName());
364+
365+
// Clone request for SSE detection to avoid modifying original
366+
OriginalHttpRequest sseTestRequest = request.copy();
367+
McpSseEndpointHelper.addSseEndpointHeader(sseTestRequest, apiCollection.getId());
368+
ApiExecutor.sendRequestWithSse(sseTestRequest, true, null, false,
369+
new ArrayList<>(), false, true);
370+
371+
// If SSE works, update the collection
372+
dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE);
373+
logger.info("Detected SSE transport for MCP server: {}", apiCollection.getHostName());
374+
return TRANSPORT_SSE;
375+
} catch (Exception sseException) {
376+
logger.info("SSE transport failed, falling back to HTTP transport: {}", sseException.getMessage());
377+
// Fall back to HTTP - no need to test, just store it
378+
dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE);
379+
return TRANSPORT_HTTP;
380+
}
381+
}
382+
383+
// Default to HTTP if no sseCallbackUrl
384+
logger.info("No SSE callback URL found, using HTTP transport for: {}", apiCollection.getHostName());
385+
dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE);
386+
return TRANSPORT_HTTP;
387+
}
388+
private String sendRequest(OriginalHttpRequest request, ApiCollection apiCollection) throws Exception {
351389
try {
352-
OriginalHttpResponse response = ApiExecutor.sendRequestWithSse(request, true, null, false,
353-
new ArrayList<>(), false, true);
390+
String transportType = apiCollection.getMcpTransportType();
391+
392+
// If transport type is not set, try to detect it
393+
if (transportType == null || transportType.isEmpty()) {
394+
transportType = detectAndSetTransportType(request, apiCollection);
395+
}
396+
397+
OriginalHttpResponse response;
398+
if (TRANSPORT_HTTP.equals(transportType)) {
399+
// Use standard HTTP POST for streamable responses
400+
// Use sendRequestSkipSse to prevent ApiExecutor from trying SSE
401+
logger.info("Using HTTP transport for MCP server: {}", apiCollection.getHostName());
402+
response = ApiExecutor.sendRequestSkipSse(request, true, null, false, new ArrayList<>(), false);
403+
} else {
404+
// Use SSE transport
405+
logger.info("Using SSE transport for MCP server: {}", apiCollection.getHostName());
406+
McpSseEndpointHelper.addSseEndpointHeader(request, apiCollection.getId());
407+
response = ApiExecutor.sendRequestWithSse(request, true, null, false,
408+
new ArrayList<>(), false, true);
409+
}
354410
return response.getBody();
355411
} catch (Exception e) {
356-
logger.error("Error while making request to MCP server.", e);
412+
logger.error("Error while making request to MCP server: {}", e.getMessage(), e);
357413
throw e;
358414
}
359415
}

libs/utils/src/main/java/com/akto/testing/ApiExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,8 @@ private static OriginalHttpResponse sendWithRequestBody(OriginalHttpRequest requ
562562

563563
if (payload == null) payload = "";
564564
if (body == null) {// body not created by GRPC block yet
565-
if (request.getHeaders().containsKey("charset")) {
565+
// Create body with null MediaType for JSON-RPC requests to prevent OkHttp from adding charset parameter
566+
if (request.getHeaders().containsKey("charset") || isJsonRpcRequest(request)) {
566567
body = RequestBody.create(payload, null);
567568
request.getHeaders().remove("charset");
568569
} else {
@@ -664,6 +665,13 @@ private static String waitForMatchingSseMessage(SseSession session, String id, l
664665
throw new Exception("Timeout waiting for SSE message with id=" + id);
665666
}
666667

668+
669+
public static OriginalHttpResponse sendRequestSkipSse(OriginalHttpRequest request, boolean followRedirects,
670+
TestingRunConfig testingRunConfig, boolean debug, List<TestingRunResult.TestLog> testLogs,
671+
boolean skipSSRFCheck) throws Exception {
672+
return sendRequest(request, followRedirects, testingRunConfig, debug, testLogs, skipSSRFCheck, true);
673+
}
674+
667675
public static OriginalHttpResponse sendRequestWithSse(OriginalHttpRequest request, boolean followRedirects,
668676
TestingRunConfig testingRunConfig, boolean debug, List<TestingRunResult.TestLog> testLogs,
669677
boolean skipSSRFCheck, boolean overrideMessageEndpoint) throws Exception {

0 commit comments

Comments
 (0)