From 02eea4ce69a6c97ec6abbffa3f70ca7832fe1598 Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Fri, 3 Oct 2025 09:27:34 +0530 Subject: [PATCH 1/6] fixes for ClientActor --- .../main/java/com/akto/action/DbAction.java | 44 ++++++++++++++----- .../src/main/resources/struts.xml | 14 +++++- .../java/com/akto/dao/ApiCollectionsDao.java | 8 ++++ .../main/java/com/akto/dto/ApiCollection.java | 13 +++++- .../java/com/akto/data_actor/ClientActor.java | 19 ++++++++ .../java/com/akto/data_actor/DataActor.java | 2 + 6 files changed, 87 insertions(+), 13 deletions(-) diff --git a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java index 597ed65cd5..b21ba70133 100644 --- a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java +++ b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java @@ -305,6 +305,21 @@ public String updateApiCollectionNameForVxlan() { return Action.SUCCESS.toUpperCase(); } + String transportType; + public String updateTransportType() { + try { + ApiCollection apiCollection = ApiCollectionsDao.instance.getMeta(apiCollectionId); + if (apiCollection != null) { + ApiCollectionsDao.instance.updateTransportType(apiCollection, transportType); + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "error in updateTransportType " + e.toString()); + return Action.ERROR.toUpperCase(); + } + return Action.SUCCESS.toUpperCase(); + } + + public String updateCidrList() { try { DbLayer.updateCidrList(cidrList); @@ -414,9 +429,9 @@ public String bulkWriteSti() { } } } - + System.out.println("filters: " + filters.toString()); - + if (isDeleteWrite) { writes.add( new DeleteOneModel<>(Filters.and(filters), new DeleteOptions()) @@ -427,7 +442,7 @@ public String bulkWriteSti() { ); } } - + DbLayer.bulkWriteSingleTypeInfo(writes); } catch (Exception e) { String err = "Error: "; @@ -466,7 +481,7 @@ public String bulkWriteSampleData() { String responseCodeStr = mObj.get("responseCode").toString(); int responseCode = Integer.valueOf(responseCodeStr); - + Bson filters = Filters.and(Filters.eq("_id.apiCollectionId", apiCollectionId), Filters.eq("_id.bucketEndEpoch", bucketEndEpoch), Filters.eq("_id.bucketStartEpoch", bucketStartEpoch), @@ -474,7 +489,7 @@ public String bulkWriteSampleData() { Filters.eq("_id.responseCode", responseCode), Filters.eq("_id.url", mObj.get("url"))); List updatePayloadList = bulkUpdate.getUpdates(); - + List updates = new ArrayList<>(); for (String payload: updatePayloadList) { Map json = gson.fromJson(payload, Map.class); @@ -836,23 +851,23 @@ public String bulkWriteOverageInfo() { Filters.eq(UningestedApiOverage.METHOD, bulkUpdate.getFilters().get(UningestedApiOverage.METHOD)), Filters.eq(UningestedApiOverage.URL, bulkUpdate.getFilters().get(UningestedApiOverage.URL)) ); - + List updatePayloadList = bulkUpdate.getUpdates(); List updates = new ArrayList<>(); - + for (String payload: updatePayloadList) { Map json = gson.fromJson(payload, Map.class); String field = (String) json.get("field"); Object val = json.get("val"); String op = (String) json.get("op"); - + if ("setOnInsert".equals(op)) { updates.add(Updates.setOnInsert(field, val)); } else if ("set".equals(op)) { updates.add(Updates.set(field, val)); } } - + if (!updates.isEmpty()) { writes.add( new UpdateOneModel<>(filters, Updates.combine(updates), new UpdateOptions().upsert(true)) @@ -1129,7 +1144,7 @@ public String findPendingTestingRun() { testingRun = DbLayer.findPendingTestingRun(delta); if (testingRun != null) { /* - * There is a db call involved for collectionWiseTestingEndpoints, thus this hack. + * There is a db call involved for collectionWiseTestingEndpoints, thus this hack. */ if(testingRun.getTestingEndpoints() instanceof CollectionWiseTestingEndpoints){ CollectionWiseTestingEndpoints ts = (CollectionWiseTestingEndpoints) testingRun.getTestingEndpoints(); @@ -1790,7 +1805,7 @@ public String fetchTestScript() { return Action.ERROR.toUpperCase(); } } - + public String countTestingRunResultSummaries() { count = DbLayer.countTestingRunResultSummaries(filter); return Action.SUCCESS.toUpperCase(); @@ -2773,4 +2788,11 @@ public void setTestingRunPlayground(TestingRunPlayground testingRunPlayground) { this.testingRunPlayground = testingRunPlayground; } + public String getTransportType() { + return transportType; + } + + public void setTransportType(String transportType) { + this.transportType = transportType; + } } diff --git a/apps/database-abstractor/src/main/resources/struts.xml b/apps/database-abstractor/src/main/resources/struts.xml index a5d169c331..4534a1ce0f 100644 --- a/apps/database-abstractor/src/main/resources/struts.xml +++ b/apps/database-abstractor/src/main/resources/struts.xml @@ -57,6 +57,18 @@ + + + + + + 422 + false + ^actionErrors.* + + + + @@ -1200,7 +1212,7 @@ ^actionErrors.* - + diff --git a/libs/dao/src/main/java/com/akto/dao/ApiCollectionsDao.java b/libs/dao/src/main/java/com/akto/dao/ApiCollectionsDao.java index 15635a34a5..23a04031ef 100644 --- a/libs/dao/src/main/java/com/akto/dao/ApiCollectionsDao.java +++ b/libs/dao/src/main/java/com/akto/dao/ApiCollectionsDao.java @@ -74,6 +74,14 @@ public Map getApiCollectionsMetaMap() { return apiCollectionsMap; } + public void updateTransportType(ApiCollection apiCollection, String transportType) { + Bson filter = Filters.eq(ApiCollection.ID, apiCollection.getId()); + Bson update = Updates.set(ApiCollection.MCP_TRANSPORT_TYPE, transportType); + ApiCollectionsDao.instance.updateOne(filter, update); + apiCollection.setMcpTransportType(transportType); + } + + public List getMetaAll() { return ApiCollectionsDao.instance.findAll(new BasicDBObject(), Projections.exclude("urls")); } diff --git a/libs/dao/src/main/java/com/akto/dto/ApiCollection.java b/libs/dao/src/main/java/com/akto/dto/ApiCollection.java index 58ec4fc18a..fef9769940 100644 --- a/libs/dao/src/main/java/com/akto/dto/ApiCollection.java +++ b/libs/dao/src/main/java/com/akto/dto/ApiCollection.java @@ -62,6 +62,17 @@ public class ApiCollection { String sseCallbackUrl; public static final String SSE_CALLBACK_URL = "sseCallbackUrl"; + private String mcpTransportType; + public static final String MCP_TRANSPORT_TYPE = "mcpTransportType"; + + public String getMcpTransportType() { + return mcpTransportType; + } + + public void setMcpTransportType(String mcpTransportType) { + this.mcpTransportType = mcpTransportType; + } + public enum Type { API_GROUP } @@ -91,7 +102,7 @@ public enum ENV_TYPE { "localhost", "local", "intranet", "lan", "example", "invalid", "home", "corp", "priv", "localdomain", "localnet", "network", "int", "private"); - + private static final List ENV_KEYWORDS_WITHOUT_DOT = Arrays.asList( "kubernetes", "internal" ); diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index d147192feb..1c2ba7238b 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -235,6 +235,25 @@ public void updateModuleInfo(ModuleInfo moduleInfo) { } } + @Override + public void updateTransportType(int apiCollectionId, String transportType) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("apiCollectionId", apiCollectionId); + obj.put("transportType", transportType); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateTransportType", "", "POST", obj.toString(), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + if (response.getStatusCode() != 200) { + loggerMaker.errorAndAddToDb("non 2xx response in updateTransportType", LoggerMaker.LogDb.RUNTIME); + return; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error updating transport type" + e + " apiCollectionId " + apiCollectionId + + " transportType " + transportType, LoggerMaker.LogDb.RUNTIME); + } + } + public APIConfig fetchApiConfig(String configName) { Map> headers = buildHeaders(); String queryParams = "?configName="+configName; diff --git a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java index 2e6aa13c11..165c8a7e1d 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java @@ -57,6 +57,8 @@ public abstract class DataActor { public abstract void updateApiCollectionNameForVxlan(int vxlanId, String name); + public abstract void updateTransportType(int apiCollectionId, String transportType); + public abstract APIConfig fetchApiConfig(String configName); public abstract void bulkWriteSingleTypeInfo(List writesForApiInfo); From c7ddd862677193cf28f6b66c2df06e69c9b1e9a9 Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Fri, 3 Oct 2025 09:47:09 +0530 Subject: [PATCH 2/6] adding DbActor --- libs/utils/src/main/java/com/akto/data_actor/DbActor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java index d4dfd29c1b..6b4bd061bb 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java @@ -1,5 +1,6 @@ package com.akto.data_actor; +import com.akto.dao.ApiCollectionsDao; import com.akto.dto.*; import com.akto.dto.ApiInfo.ApiInfoKey; import com.akto.dto.billing.Organization; @@ -70,6 +71,11 @@ public void updateApiCollectionNameForVxlan(int vxlanId, String name) { DbLayer.updateApiCollectionName(vxlanId, name); } + @Override + public void updateTransportType(int apiCollectionId, String transportType) { + ApiCollectionsDao.instance.updateTransportType(ApiCollectionsDao.instance.getMeta(apiCollectionId), transportType); + } + public APIConfig fetchApiConfig(String configName) { return DbLayer.fetchApiconfig(configName); } From 41baaaeb476de763bccb15d892fd04ff9326335f Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Fri, 3 Oct 2025 09:47:09 +0530 Subject: [PATCH 3/6] add http streamable support mini-runtime --- .../McpToolsSyncJobExecutor.java | 66 +++++++++++++++++-- .../java/com/akto/testing/ApiExecutor.java | 10 ++- 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java index 644e8aba50..ef9ceb695e 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java @@ -1,6 +1,7 @@ package com.akto.hybrid_runtime; import com.akto.dao.context.Context; +import com.akto.data_actor.DataActor; import com.akto.data_actor.DataActorFactory; import com.akto.dto.APIConfig; import com.akto.dto.AccountSettings; @@ -32,6 +33,7 @@ import com.akto.testing.ApiExecutor; import com.akto.util.Constants; import com.akto.util.JSONUtils; +import com.akto.util.McpSseEndpointHelper; import com.akto.util.Pair; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -57,11 +59,17 @@ public class McpToolsSyncJobExecutor { private static final LoggerMaker logger = new LoggerMaker(McpToolsSyncJobExecutor.class, LogDb.RUNTIME); private static final ObjectMapper mapper = new ObjectMapper(); + public static final DataActor dataActor = DataActorFactory.fetchInstance(); public static final String MCP_TOOLS_LIST_REQUEST_JSON = "{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"" + McpSchema.METHOD_TOOLS_LIST + "\", \"params\": {}}"; public static final String MCP_RESOURCE_LIST_REQUEST_JSON = "{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"" + McpSchema.METHOD_RESOURCES_LIST + "\", \"params\": {}}"; public static final String LOCAL_IP = "127.0.0.1"; + + // MCP Transport types + private static final String TRANSPORT_SSE = "SSE"; + private static final String TRANSPORT_HTTP = "HTTP"; + private ServerCapabilities mcpServerCapabilities = null; public static final McpToolsSyncJobExecutor INSTANCE = new McpToolsSyncJobExecutor(); @@ -310,7 +318,7 @@ public static Map generateExampleArguments(JsonSchema inputSchem public Pair getMcpMethodResponse(String host, String mcpMethod, String mcpMethodRequestJson, ApiCollection apiCollection) throws Exception { OriginalHttpRequest mcpRequest = createRequest(host, mcpMethod, mcpMethodRequestJson); - String jsonrpcResponse = sendRequest(mcpRequest); + String jsonrpcResponse = sendRequest(mcpRequest, apiCollection); JSONRPCResponse rpcResponse = (JSONRPCResponse) McpSchema.deserializeJsonRpcMessage(mapper, jsonrpcResponse); @@ -347,13 +355,61 @@ private String buildHeaders(String host) { return "{\"Content-Type\":\"application/json\",\"Accept\":\"*/*\",\"host\":\"" + host + "\"}"; } - private String sendRequest(OriginalHttpRequest request) throws Exception { + + private String detectAndSetTransportType(OriginalHttpRequest request, ApiCollection apiCollection) throws Exception { + // Try SSE first if sseCallbackUrl is set + if (apiCollection.getSseCallbackUrl() != null && !apiCollection.getSseCallbackUrl().isEmpty()) { + try { + logger.info("Attempting to detect transport type for MCP server: {}", apiCollection.getHostName()); + + // Clone request for SSE detection to avoid modifying original + OriginalHttpRequest sseTestRequest = request.copy(); + McpSseEndpointHelper.addSseEndpointHeader(sseTestRequest, apiCollection.getId()); + ApiExecutor.sendRequestWithSse(sseTestRequest, true, null, false, + new ArrayList<>(), false, true); + + // If SSE works, update the collection + dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE); + logger.info("Detected SSE transport for MCP server: {}", apiCollection.getHostName()); + return TRANSPORT_SSE; + } catch (Exception sseException) { + logger.info("SSE transport failed, falling back to HTTP transport: {}", sseException.getMessage()); + // Fall back to HTTP - no need to test, just store it + dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE); + return TRANSPORT_HTTP; + } + } + + // Default to HTTP if no sseCallbackUrl + logger.info("No SSE callback URL found, using HTTP transport for: {}", apiCollection.getHostName()); + dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE); + return TRANSPORT_HTTP; + } + private String sendRequest(OriginalHttpRequest request, ApiCollection apiCollection) throws Exception { try { - OriginalHttpResponse response = ApiExecutor.sendRequestWithSse(request, true, null, false, - new ArrayList<>(), false, true); + String transportType = apiCollection.getMcpTransportType(); + + // If transport type is not set, try to detect it + if (transportType == null || transportType.isEmpty()) { + transportType = detectAndSetTransportType(request, apiCollection); + } + + OriginalHttpResponse response; + if (TRANSPORT_HTTP.equals(transportType)) { + // Use standard HTTP POST for streamable responses + // Use sendRequestSkipSse to prevent ApiExecutor from trying SSE + logger.info("Using HTTP transport for MCP server: {}", apiCollection.getHostName()); + response = ApiExecutor.sendRequestSkipSse(request, true, null, false, new ArrayList<>(), false); + } else { + // Use SSE transport + logger.info("Using SSE transport for MCP server: {}", apiCollection.getHostName()); + McpSseEndpointHelper.addSseEndpointHeader(request, apiCollection.getId()); + response = ApiExecutor.sendRequestWithSse(request, true, null, false, + new ArrayList<>(), false, true); + } return response.getBody(); } catch (Exception e) { - logger.error("Error while making request to MCP server.", e); + logger.error("Error while making request to MCP server: {}", e.getMessage(), e); throw e; } } diff --git a/libs/utils/src/main/java/com/akto/testing/ApiExecutor.java b/libs/utils/src/main/java/com/akto/testing/ApiExecutor.java index 1fffd835d3..941559e710 100644 --- a/libs/utils/src/main/java/com/akto/testing/ApiExecutor.java +++ b/libs/utils/src/main/java/com/akto/testing/ApiExecutor.java @@ -562,7 +562,8 @@ private static OriginalHttpResponse sendWithRequestBody(OriginalHttpRequest requ if (payload == null) payload = ""; if (body == null) {// body not created by GRPC block yet - if (request.getHeaders().containsKey("charset")) { + // Create body with null MediaType for JSON-RPC requests to prevent OkHttp from adding charset parameter + if (request.getHeaders().containsKey("charset") || isJsonRpcRequest(request)) { body = RequestBody.create(payload, null); request.getHeaders().remove("charset"); } else { @@ -664,6 +665,13 @@ private static String waitForMatchingSseMessage(SseSession session, String id, l throw new Exception("Timeout waiting for SSE message with id=" + id); } + + public static OriginalHttpResponse sendRequestSkipSse(OriginalHttpRequest request, boolean followRedirects, + TestingRunConfig testingRunConfig, boolean debug, List testLogs, + boolean skipSSRFCheck) throws Exception { + return sendRequest(request, followRedirects, testingRunConfig, debug, testLogs, skipSSRFCheck, true); + } + public static OriginalHttpResponse sendRequestWithSse(OriginalHttpRequest request, boolean followRedirects, TestingRunConfig testingRunConfig, boolean debug, List testLogs, boolean skipSSRFCheck, boolean overrideMessageEndpoint) throws Exception { From 9f410d87bd1acbefa8048af87937c0a0e39ffdd1 Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Fri, 3 Oct 2025 13:45:13 +0530 Subject: [PATCH 4/6] fix transport type --- .../java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java index ef9ceb695e..8b5c6589d9 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java @@ -375,14 +375,14 @@ private String detectAndSetTransportType(OriginalHttpRequest request, ApiCollect } catch (Exception sseException) { logger.info("SSE transport failed, falling back to HTTP transport: {}", sseException.getMessage()); // Fall back to HTTP - no need to test, just store it - dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE); + dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_HTTP); return TRANSPORT_HTTP; } } // Default to HTTP if no sseCallbackUrl logger.info("No SSE callback URL found, using HTTP transport for: {}", apiCollection.getHostName()); - dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_SSE); + dataActor.updateTransportType(apiCollection.getId(), TRANSPORT_HTTP); return TRANSPORT_HTTP; } private String sendRequest(OriginalHttpRequest request, ApiCollection apiCollection) throws Exception { From 45369db52a692939b24e6cb1c28e4cedc4345a22 Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Tue, 7 Oct 2025 14:17:22 +0530 Subject: [PATCH 5/6] add mcp endpoint --- .../java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java index 8b5c6589d9..a284627a33 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java @@ -317,7 +317,7 @@ public static Map generateExampleArguments(JsonSchema inputSchem public Pair getMcpMethodResponse(String host, String mcpMethod, String mcpMethodRequestJson, ApiCollection apiCollection) throws Exception { - OriginalHttpRequest mcpRequest = createRequest(host, mcpMethod, mcpMethodRequestJson); + OriginalHttpRequest mcpRequest = createRequest(host, apiCollection, mcpMethodRequestJson); String jsonrpcResponse = sendRequest(mcpRequest, apiCollection); JSONRPCResponse rpcResponse = (JSONRPCResponse) McpSchema.deserializeJsonRpcMessage(mapper, jsonrpcResponse); From e7d7a2a111155c5faf3ecddc4bf2d3267ca385b1 Mon Sep 17 00:00:00 2001 From: SHIVAM RAWAT Date: Tue, 7 Oct 2025 14:18:02 +0530 Subject: [PATCH 6/6] add mcp endpoint --- .../com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java index a284627a33..1f58c7a82d 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/McpToolsSyncJobExecutor.java @@ -341,8 +341,10 @@ public Pair getMcpMethodResponse(String hos return new Pair<>(rpcResponse, responseParams); } - private OriginalHttpRequest createRequest(String host, String mcpMethod, String mcpMethodRequestJson) { - return new OriginalHttpRequest("", + private OriginalHttpRequest createRequest(String host, ApiCollection apiCollection, String mcpMethodRequestJson) { + String mcpEndpoint = apiCollection.getSseCallbackUrl(); + + return new OriginalHttpRequest(mcpEndpoint, null, HttpMethod.POST.name(), mcpMethodRequestJson,