diff --git a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java index 83203ddef8..dcf77f75d1 100644 --- a/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java +++ b/apps/api-runtime/src/main/java/com/akto/runtime/APICatalogSync.java @@ -8,6 +8,9 @@ import com.akto.dao.context.Context; import com.akto.dao.monitoring.FilterYamlTemplateDao; import com.akto.dao.runtime_filters.AdvancedTrafficFiltersDao; +import com.akto.data_actor.DataActor; +import com.akto.data_actor.DataActorFactory; +import com.akto.data_actor.DbActor; import com.akto.dao.filter.MergedUrlsDao; import com.akto.dto.*; import com.akto.dto.billing.SyncLimit; @@ -27,6 +30,8 @@ import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; import com.akto.util.filter.DictionaryFilter; +import com.akto.runtime.APICatalogSync.ApiMergerResult; +import com.akto.runtime.APICatalogSync.DbUpdateReturn; import com.akto.runtime.merge.MergeOnHostOnly; import com.akto.runtime.policies.AktoPolicyNew; import com.akto.task.Cluster; @@ -66,6 +71,8 @@ public class APICatalogSync { public Map dbState; public Map delta; public AktoPolicyNew aktoPolicyNew; + public SvcToSvcGraphManager svcToSvcGraphManager = null; + public DataActor dataActor = DataActorFactory.fetchInstance(); public Map sensitiveParamInfoBooleanMap; public static boolean mergeAsyncOutside = true; public BloomFilter existingAPIsInDb = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1_000_000, 0.001 ); @@ -88,6 +95,7 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo mergedUrls = new HashSet<>(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + this.svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); } } @@ -214,6 +222,9 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe for (HttpResponseParams responseParams: value) { try { aktoPolicyNew.process(responseParams); + if (svcToSvcGraphManager != null) { + svcToSvcGraphManager.processRecord(responseParams.getSvcToSvcGraphParams()); + } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1890,7 +1901,14 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s } loggerMaker.infoAndAddToDb("starting build from db inside syncWithDb", LogDb.RUNTIME); + buildFromDB(true, fetchAllSTI); + if (svcToSvcGraphManager != null){ + svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + if (svcToSvcGraphManager.getLastFetchFromDb() < Context.now() - 6 * 60 * 60) { + svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + } + } loggerMaker.infoAndAddToDb("Finished syncing with db", LogDb.RUNTIME); } diff --git a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java index d41cc860f2..ac2c2418bd 100644 --- a/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java +++ b/apps/database-abstractor/src/main/java/com/akto/action/DbAction.java @@ -11,6 +11,9 @@ import com.akto.dto.bulk_updates.BulkUpdates; import com.akto.dto.bulk_updates.UpdatePayload; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.runtime_filters.RuntimeFilter; import com.akto.dto.settings.DataControlSettings; import com.akto.dto.test_editor.TestingRunPlayground; @@ -1767,6 +1770,54 @@ public String findLatestTestingRunResultSummary(){ return Action.SUCCESS.toUpperCase(); } + public List svcToSvcGraphEdges; + public List svcToSvcGraphNodes; + + public String findSvcToSvcGraphNodes() { + try { + this.svcToSvcGraphNodes = DbLayer.findSvcToSvcGraphNodes(startTimestamp, endTimestamp, skip, limit); + } catch (Exception e) { + System.out.println("Error in findSvcToSvcGraphNodes " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + + } + + public String findSvcToSvcGraphEdges() { + try { + this.svcToSvcGraphEdges = DbLayer.findSvcToSvcGraphEdges(startTimestamp, endTimestamp, skip, limit); + } catch (Exception e) { + System.out.println("Error in findSvcToSvcGraphEdges " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + + public String updateSvcToSvcGraphEdges() { + try { + DbLayer.updateSvcToSvcGraphEdges(this.svcToSvcGraphEdges); + } catch (Exception e) { + System.out.println("Error in updateSvcToSvcGraphEdges " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + + public String updateSvcToSvcGraphNodes() { + try { + DbLayer.updateSvcToSvcGraphNodes(this.svcToSvcGraphNodes); + } catch (Exception e) { + System.out.println("Error in updateSvcToSvcGraphNodes " + e.toString()); + return Action.ERROR.toUpperCase(); + } + + return Action.SUCCESS.toUpperCase(); + } + public List getCustomDataTypes() { return customDataTypes; } diff --git a/apps/database-abstractor/src/main/resources/struts.xml b/apps/database-abstractor/src/main/resources/struts.xml index fb74b1d094..374f1cd651 100644 --- a/apps/database-abstractor/src/main/resources/struts.xml +++ b/apps/database-abstractor/src/main/resources/struts.xml @@ -1233,6 +1233,50 @@ ^actionErrors.* + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + + + + + + + 422 + false + ^actionErrors.* + + + diff --git a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java index 332511b76f..345754256b 100644 --- a/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java +++ b/apps/mini-runtime/src/main/java/com/akto/hybrid_runtime/APICatalogSync.java @@ -32,6 +32,7 @@ import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; import com.akto.metrics.AllMetrics; +import com.akto.runtime.SvcToSvcGraphManager; import com.akto.runtime.utils.Utils; import com.akto.data_actor.DataActor; import com.akto.data_actor.DataActorFactory; @@ -65,6 +66,7 @@ public class APICatalogSync { public Map dbState; public Map delta; public AktoPolicyNew aktoPolicyNew; + public SvcToSvcGraphManager svcToSvcGraphManager = null; public Map sensitiveParamInfoBooleanMap; public static boolean mergeAsyncOutside = true; public int lastStiFetchTs = 0; @@ -104,6 +106,7 @@ public APICatalogSync(String userIdentifier, int thresh, boolean fetchAllSTI, bo this.mergedUrls = new HashSet<>(); if (buildFromDb) { buildFromDB(false, fetchAllSTI); + this.svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); } } @@ -230,6 +233,9 @@ public void computeDelta(URLAggregator origAggregator, boolean triggerTemplateGe for (HttpResponseParams responseParams: value) { try { aktoPolicyNew.process(responseParams); + if (svcToSvcGraphManager != null) { + svcToSvcGraphManager.processRecord(responseParams.getSvcToSvcGraphParams()); + } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); @@ -1565,6 +1571,12 @@ public void syncWithDB(boolean syncImmediately, boolean fetchAllSTI, SyncLimit s now = Context.now(); loggerMaker.infoAndAddToDb("Finished syncing with db at : " + now, LogDb.RUNTIME); lastBuildFromDb = now; + if (svcToSvcGraphManager != null){ + svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + if (svcToSvcGraphManager.getLastFetchFromDb() < Context.now() - 6 * 60 * 60) { + svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + } + } } } diff --git a/libs/dao/pom.xml b/libs/dao/pom.xml index b01cd1c927..0a66ff73ab 100644 --- a/libs/dao/pom.xml +++ b/libs/dao/pom.xml @@ -182,6 +182,11 @@ fastjson2 2.0.51 + + org.projectlombok + lombok + 1.18.36 + diff --git a/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java new file mode 100644 index 0000000000..10f7e88c09 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphEdgesDao.java @@ -0,0 +1,22 @@ +package com.akto.dao.graph; + +import com.akto.dao.AccountsContextDao; +import com.akto.dto.graph.SvcToSvcGraphEdge; + +public class SvcToSvcGraphEdgesDao extends AccountsContextDao { + + public static final SvcToSvcGraphEdgesDao instance = new SvcToSvcGraphEdgesDao(); + + private SvcToSvcGraphEdgesDao() {} + + @Override + public String getCollName() { + return "svc_to_svc_graph_edges"; + } + + @Override + public Class getClassT() { + return SvcToSvcGraphEdge.class; + } + +} diff --git a/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java new file mode 100644 index 0000000000..26dfdd73f9 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dao/graph/SvcToSvcGraphNodesDao.java @@ -0,0 +1,22 @@ +package com.akto.dao.graph; + +import com.akto.dao.AccountsContextDao; +import com.akto.dto.graph.SvcToSvcGraphNode; + +public class SvcToSvcGraphNodesDao extends AccountsContextDao { + + public static final SvcToSvcGraphNodesDao instance = new SvcToSvcGraphNodesDao(); + + private SvcToSvcGraphNodesDao() {} + + @Override + public String getCollName() { + return "svc_to_svc_graph_nodes"; + } + + @Override + public Class getClassT() { + return SvcToSvcGraphNode.class; + } + +} diff --git a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java index 2f3c95262b..d74c056b0a 100644 --- a/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java +++ b/libs/dao/src/main/java/com/akto/dto/HttpResponseParams.java @@ -2,6 +2,7 @@ import com.akto.dao.context.Context; +import com.akto.dto.graph.SvcToSvcGraphParams; import java.util.HashMap; import java.util.List; @@ -28,6 +29,8 @@ public enum Source { String sourceIP; String destIP; String direction; + SvcToSvcGraphParams svcToSvcGraphParams; + // K8 pod tags in JSON string String tags; @@ -37,12 +40,12 @@ public HttpResponseParams(String type, int statusCode, String status, Map> headers, String payload, HttpRequestParams requestParams, int time, String accountId, boolean isPending, Source source, - String orig, String sourceIP, String destIP, String direction, String tags) { + String orig, String sourceIP, String destIP, String direction, SvcToSvcGraphParams svcToSvcGraphParams, String tags) { this.type = type; this.statusCode = statusCode; this.status = status; @@ -57,27 +60,10 @@ public HttpResponseParams(String type, int statusCode, String status, Map> headers, String payload, - HttpRequestParams requestParams, int time, String accountId, boolean isPending, Source source, - String orig, String sourceIP, String destIP, String direction) { - this.type = type; - this.statusCode = statusCode; - this.status = status; - this.headers = headers; - this.payload = payload; - this.requestParams = requestParams; - this.time = time; - this.accountId = accountId; - this.isPending = isPending; - this.source = source; - this.orig = orig; - this.sourceIP = sourceIP; - this.destIP = destIP; - this.direction = direction; - } public static boolean validHttpResponseCode(int statusCode) { return statusCode >= 200 && (statusCode < 300 || statusCode == 302); @@ -96,7 +82,11 @@ public HttpResponseParams copy() { this.isPending, this.source, this.orig, - this.sourceIP + this.sourceIP, + this.destIP, + this.direction, + this.svcToSvcGraphParams, + this.tags ); } @@ -185,6 +175,13 @@ public void setRequestParams(HttpRequestParams requestParams) { this.requestParams = requestParams; } + public SvcToSvcGraphParams getSvcToSvcGraphParams() { + return svcToSvcGraphParams; + } + + public void setSvcToSvcGraphParams(SvcToSvcGraphParams svcToSvcGraphParams) { + this.svcToSvcGraphParams = svcToSvcGraphParams; + } public String getTags() { return tags; } diff --git a/libs/dao/src/main/java/com/akto/dto/graph/K8sDaemonsetGraphParams.java b/libs/dao/src/main/java/com/akto/dto/graph/K8sDaemonsetGraphParams.java new file mode 100644 index 0000000000..098911d314 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/K8sDaemonsetGraphParams.java @@ -0,0 +1,26 @@ +package com.akto.dto.graph; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@ToString +@EqualsAndHashCode(callSuper = true) +public class K8sDaemonsetGraphParams extends SvcToSvcGraphParams { + + public static final String DIRECTION_OUTGOING = "2"; + public static final String DIRECTION_INCOMING = "1"; + + String hostInApiRequest; + String processId; + String socketId; + String daemonsetId; + String direction; + + @Override + public Type getType() { + return Type.K8S; + } +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraph.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraph.java new file mode 100644 index 0000000000..15a5c33aaa --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraph.java @@ -0,0 +1,14 @@ +package com.akto.dto.graph; + +import java.util.List; + +import lombok.*; + +@Getter +@Setter +@AllArgsConstructor +public class SvcToSvcGraph { + private List edges; + private List nodes; + private int lastFetchFromDb; +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java new file mode 100644 index 0000000000..753c0ff284 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphEdge.java @@ -0,0 +1,36 @@ +package com.akto.dto.graph; + +import org.bson.codecs.pojo.annotations.BsonId; + +import com.akto.dao.context.Context; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@ToString +@EqualsAndHashCode +public class SvcToSvcGraphEdge { + @BsonId + private String id; + + private String source; + + private String target; + + public static final String CREATTION_EPOCH = "creationEpoch"; + private int creationEpoch; + + private SvcToSvcGraphParams.Type type; + + private int lastSeenEpoch; + + private int counter; + + public static SvcToSvcGraphEdge createFromK8s(String source, String target) { + int ts = Context.now(); + return new SvcToSvcGraphEdge(source + "_" + target, source, target, ts, SvcToSvcGraphParams.Type.K8S, ts, 0); + } +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java new file mode 100644 index 0000000000..2f1ae7bb82 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphNode.java @@ -0,0 +1,32 @@ +package com.akto.dto.graph; + +import org.bson.codecs.pojo.annotations.BsonId; + +import com.akto.dao.context.Context; + +import lombok.*; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +@ToString +@EqualsAndHashCode + +public class SvcToSvcGraphNode { + @BsonId + private String id; + + private int creationEpoch; + + private SvcToSvcGraphParams.Type type; + + private int lastSeenEpoch; + + private int counter; + + public static SvcToSvcGraphNode createFromK8s(String name) { + int ts = Context.now(); + return new SvcToSvcGraphNode(name, ts, SvcToSvcGraphParams.Type.K8S, ts, 0); + } +} diff --git a/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java new file mode 100644 index 0000000000..4ae64032e1 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/graph/SvcToSvcGraphParams.java @@ -0,0 +1,10 @@ +package com.akto.dto.graph; + +public abstract class SvcToSvcGraphParams { + + public enum Type { + K8S + } + + public abstract Type getType(); +} diff --git a/libs/utils/pom.xml b/libs/utils/pom.xml index a36c242a08..6a6b290364 100644 --- a/libs/utils/pom.xml +++ b/libs/utils/pom.xml @@ -117,6 +117,11 @@ graphql-java 20.0 + + org.projectlombok + lombok + 1.18.36 + diff --git a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java index 4c133805cc..814ca05c0e 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/ClientActor.java @@ -3,6 +3,8 @@ import com.akto.DaoInit; import com.akto.dao.context.Context; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.jobs.Job; import com.akto.dto.jobs.JobExecutorType; import com.akto.dto.jobs.JobParams; @@ -64,6 +66,7 @@ import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; +import org.apache.kafka.common.protocol.types.Field.Str; import java.io.PrintWriter; import java.io.StringWriter; import okhttp3.MediaType; @@ -3852,6 +3855,118 @@ public List findTestSubCategoriesByTestSuiteId(List testSuiteId) return new ArrayList<>(); } } + @Override + public List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("startTimestamp", startTs); + obj.put("endTimestamp", endTs); + obj.put("skip", skip); + obj.put("limit", limit); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findSvcToSvcGraphEdges", "", "POST", obj.toString(), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in findSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); + return null; + } + BasicDBObject payloadObj; + try { + payloadObj = BasicDBObject.parse(responsePayload); + BasicDBList edges = (BasicDBList) payloadObj.get("svcToSvcGraphEdges"); + List edgesList = new ArrayList<>(); + for (Object edge: edges) { + BasicDBObject edgeObj = (BasicDBObject) edge; + SvcToSvcGraphEdge svcToSvcGraphEdge = objectMapper.readValue(edgeObj.toJson(), SvcToSvcGraphEdge.class); + edgesList.add(svcToSvcGraphEdge); + } + return edgesList; + } catch(Exception e) { + return null; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in findSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); + return null; + } + } + + public List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("startTimestamp", startTs); + obj.put("endTimestamp", endTs); + obj.put("skip", skip); + obj.put("limit", limit); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/findSvcToSvcGraphNodes", "", "POST", obj.toString(), headers, ""); + + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in findSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); + return null; + } + BasicDBObject payloadObj; + try { + + payloadObj = BasicDBObject.parse(responsePayload); + BasicDBList nodes = (BasicDBList) payloadObj.get("svcToSvcGraphNodes"); + List nodesList = new ArrayList<>(); + for (Object node: nodes) { + BasicDBObject nodeObj = (BasicDBObject) node; + SvcToSvcGraphNode svcToSvcGraphNode = objectMapper.readValue(nodeObj.toJson(), SvcToSvcGraphNode.class); + nodesList.add(svcToSvcGraphNode); + } + return nodesList; + } catch(Exception e) { + return null; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in findSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); + return null; + } + } + + @Override + public void updateSvcToSvcGraphEdges(List edges) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("svcToSvcGraphEdges", edges); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphEdges", "", "POST", gson.toJson(obj), headers, ""); + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in updateSvcToSvcGraphEdges", LoggerMaker.LogDb.RUNTIME); + return; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in updateSvcToSvcGraphEdges" + e, LoggerMaker.LogDb.RUNTIME); + return; + } + } + + @Override + public void updateSvcToSvcGraphNodes(List nodes) { + Map> headers = buildHeaders(); + BasicDBObject obj = new BasicDBObject(); + obj.put("svcToSvcGraphNodes", nodes); + OriginalHttpRequest request = new OriginalHttpRequest(url + "/updateSvcToSvcGraphNodes", "", "POST", gson.toJson(obj), headers, ""); + + try { + OriginalHttpResponse response = ApiExecutor.sendRequest(request, true, null, false, null); + String responsePayload = response.getBody(); + if (response.getStatusCode() != 200 || responsePayload == null) { + loggerMaker.errorAndAddToDb("non 2xx response in updateSvcToSvcGraphNodes", LoggerMaker.LogDb.RUNTIME); + return; + } + } catch (Exception e) { + loggerMaker.errorAndAddToDb("error in updateSvcToSvcGraphNodes" + e, LoggerMaker.LogDb.RUNTIME); + return; + } + } + public TestingRunPlayground getCurrentTestingRunDetailsFromEditor(int timestamp){ BasicDBObject obj = new BasicDBObject(); obj.put("ts", timestamp); diff --git a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java index 8c22f13e50..2092e05aaf 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DataActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DataActor.java @@ -5,6 +5,8 @@ import com.akto.dto.billing.Tokens; import com.akto.dto.dependency_flow.Node; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.jobs.JobExecutorType; import com.akto.dto.jobs.JobParams; import com.akto.dto.metrics.MetricData; @@ -36,6 +38,7 @@ import com.akto.dto.usage.MetricTypes; import com.mongodb.BasicDBObject; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -295,5 +298,66 @@ public abstract class DataActor { public abstract void scheduleAutoCreateTicketsJob(int accountId, JobParams params, JobExecutorType jobExecutorType); + + public List findAllSvcToSvcGraphEdges(int startTs, int endTs) { + List ret = new ArrayList<>(); + int skip = 0; + int limit = 1000; + while (true) { + List newList = findSvcToSvcGraphEdges(startTs, endTs, skip, limit); + + ret.addAll(newList); + skip += limit; + if (newList.size() < limit) { + return ret; + } + } + } + + protected abstract List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit); + + public List findAllSvcToSvcGraphNodes(int startTs, int endTs) { + List ret = new ArrayList<>(); + int skip = 0; + int limit = 1000; + while (true) { + List newList = findSvcToSvcGraphNodes(startTs, endTs, skip, limit); + + ret.addAll(newList); + skip += limit; + if (newList.size() < limit) { + return ret; + } + } + } + + protected abstract List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit); + + public abstract void updateSvcToSvcGraphEdges(List edges); + public abstract void updateSvcToSvcGraphNodes(List nodes); + + public void updateNewEdgesInBatches(List updateEdges) { + if (updateEdges.isEmpty()) return; + + int start = 0; + + do { + updateSvcToSvcGraphEdges(updateEdges.subList(start, Math.min(start + 1000, updateEdges.size()))); + start += 1000; + } while (start < updateEdges.size()); + } + + + public void updateNewNodesInBatches(List updateNodes) { + if (updateNodes.isEmpty()) return; + + int start = 0; + + do { + updateSvcToSvcGraphNodes(updateNodes.subList(start, Math.min(start + 1000, updateNodes.size()))); + start += 1000; + } while (start < updateNodes.size()); + } + public abstract String getLLMPromptResponse(JSONObject promptPayload); } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java index fe8cdca23e..1789c24c7b 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbActor.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbActor.java @@ -6,6 +6,8 @@ import com.akto.dto.billing.Tokens; import com.akto.dto.dependency_flow.Node; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.jobs.JobExecutorType; import com.akto.dto.jobs.JobParams; import com.akto.dto.metrics.MetricData; @@ -600,6 +602,26 @@ public TestingRunResultSummary findLatestTestingRunResultSummary(Bson filter){ return DbLayer.findLatestTestingRunResultSummary(filter); } + @Override + protected List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + return DbLayer.findSvcToSvcGraphEdges(startTs, endTs, skip, limit); + } + + @Override + protected List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + return DbLayer.findSvcToSvcGraphNodes(startTs, endTs, skip, limit); + } + + @Override + public void updateSvcToSvcGraphEdges(List edges) { + DbLayer.updateSvcToSvcGraphEdges(edges); + } + + @Override + public void updateSvcToSvcGraphNodes(List nodes) { + DbLayer.updateSvcToSvcGraphNodes(nodes); + } + public TestingRunPlayground getCurrentTestingRunDetailsFromEditor(int timestamp){ return DbLayer.getCurrentTestingRunDetailsFromEditor(timestamp); } diff --git a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java index 4001e5e2a6..354a4c4171 100644 --- a/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java +++ b/libs/utils/src/main/java/com/akto/data_actor/DbLayer.java @@ -13,12 +13,16 @@ import com.akto.bulk_update_util.ApiInfoBulkUpdate; import com.akto.dao.*; import com.akto.dao.filter.MergedUrlsDao; +import com.akto.dao.graph.SvcToSvcGraphEdgesDao; +import com.akto.dao.graph.SvcToSvcGraphNodesDao; import com.akto.dao.metrics.MetricDataDao; import com.akto.dao.settings.DataControlSettingsDao; import com.akto.dao.testing.config.TestSuiteDao; import com.akto.dependency_analyser.DependencyAnalyserUtils; import com.akto.dto.*; import com.akto.dto.filter.MergedUrls; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; import com.akto.dto.metrics.MetricData; import com.akto.dto.settings.DataControlSettings; import com.akto.dto.testing.config.TestSuites; @@ -334,10 +338,9 @@ public static List fetchEndpointsInCollection() { int apiCollectionId = -1; List pipeline = new ArrayList<>(); BasicDBObject groupedId = - new BasicDBObject("apiCollectionId", "$apiCollectionId") - .append("url", "$url") - .append("method", "$method"); - + new BasicDBObject("apiCollectionId", "$apiCollectionId") + .append("url", "$url") + .append("method", "$method"); if (apiCollectionId != -1) { pipeline.add(Aggregates.match(Filters.eq("apiCollectionId", apiCollectionId))); } @@ -1155,6 +1158,49 @@ public static TestingRunResultSummary findLatestTestingRunResultSummary(Bson fil return TestingRunResultSummariesDao.instance.findLatestOne(filter); } + public static List findSvcToSvcGraphEdges(int startTs, int endTs, int skip, int limit) { + return SvcToSvcGraphEdgesDao.instance.findAll(Filters.and( + Filters.gte(SvcToSvcGraphEdge.CREATTION_EPOCH, startTs), + Filters.lte(SvcToSvcGraphEdge.CREATTION_EPOCH, endTs) + ), skip, limit, Sorts.ascending(SvcToSvcGraphEdge.CREATTION_EPOCH)); + } + + public static List findSvcToSvcGraphNodes(int startTs, int endTs, int skip, int limit) { + return SvcToSvcGraphNodesDao.instance.findAll(Filters.and( + Filters.gte(SvcToSvcGraphEdge.CREATTION_EPOCH, startTs), + Filters.lte(SvcToSvcGraphEdge.CREATTION_EPOCH, endTs) + ), skip, limit, Sorts.ascending(SvcToSvcGraphEdge.CREATTION_EPOCH)); + } + + public static void updateSvcToSvcGraphEdges(List edges) { + if (edges == null || edges.isEmpty()) { + return; + } + + BulkWriteOptions options = new BulkWriteOptions().ordered(false).bypassDocumentValidation(true); + List> bulkList = new ArrayList<>(); + for(SvcToSvcGraphEdge edge: edges) { + bulkList.add(new InsertOneModel(edge)); + } + + SvcToSvcGraphEdgesDao.instance.bulkWrite(bulkList, options); + } + + public static void updateSvcToSvcGraphNodes(List nodes) { + + if (nodes == null || nodes.isEmpty()) { + return; + } + + BulkWriteOptions options = new BulkWriteOptions().ordered(false).bypassDocumentValidation(true); + List> bulkList = new ArrayList<>(); + for(SvcToSvcGraphNode node: nodes) { + bulkList.add(new InsertOneModel(node)); + } + + SvcToSvcGraphNodesDao.instance.bulkWrite(bulkList, options); + } + public static TestingRunPlayground getCurrentTestingRunDetailsFromEditor(int timestamp){ return TestingRunPlaygroundDao.instance.findOne( Filters.and( diff --git a/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java b/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java new file mode 100644 index 0000000000..753a0caf38 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/runtime/SvcToSvcGraphManager.java @@ -0,0 +1,229 @@ +package com.akto.runtime; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.akto.dao.context.Context; +import com.akto.data_actor.DataActor; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphEdge; +import com.akto.dto.graph.SvcToSvcGraphNode; +import com.akto.dto.graph.SvcToSvcGraphParams; + +import lombok.*; + +@Setter +@Getter +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class SvcToSvcGraphManager { + + Map> edges; + Set nodes; + + Map> newEdges; + Set newNodes; + + Map mapCompleteProcessIdToServiceNames; + int lastFetchFromDb; + + public static SvcToSvcGraphManager createFromEdgesAndNodes(DataActor dataActor) { + int now = Context.now(); + List edges = dataActor.findAllSvcToSvcGraphEdges(0, now); + List nodes = dataActor.findAllSvcToSvcGraphNodes(0, now); + SvcToSvcGraphManager instance = new SvcToSvcGraphManager(createEdgesMap(edges), createNodesSet(nodes), new HashMap<>(), new HashSet<>(), new HashMap<>(), now); + return instance; + + } + + private static Map> createEdgesMap(List edges) { + Map> ret = new HashMap<>(); + for (SvcToSvcGraphEdge svcToSvcGraphEdge : edges) { + String source = svcToSvcGraphEdge.getSource(); + String target = svcToSvcGraphEdge.getTarget(); + Set currEdges = ret.get(source); + + if (currEdges == null) { + currEdges = new HashSet<>(); + ret.put(source, currEdges); + } + + currEdges.add(target); + } + + return ret; + + } + + private static Set createNodesSet(List nodes) { + Set ret = new HashSet<>(); + for (SvcToSvcGraphNode svcToSvcGraphNode : nodes) { + + ret.add(svcToSvcGraphNode.getId()); + } + + return ret; + + } + + private boolean addToMap(Map> edges, String source, String target) { + Set currEdges = edges.get(source); + + if (currEdges == null) { + currEdges = new HashSet<>(); + edges.put(source, currEdges); + } + + if (currEdges.contains(target)) return false; + + currEdges.add(target); + + return true; + } + + public void processRecord(SvcToSvcGraphParams svcToSvcGraphParams) { + if (svcToSvcGraphParams == null) { + return; + } + + switch (svcToSvcGraphParams.getType()) { + case K8S: + K8sDaemonsetGraphParams k8sDaemonsetGraphParams = (K8sDaemonsetGraphParams) svcToSvcGraphParams; + + String completeProcessId = k8sDaemonsetGraphParams.getDaemonsetId() + "_" + k8sDaemonsetGraphParams.getProcessId(); + String serviceName = k8sDaemonsetGraphParams.getHostInApiRequest(); + + addNode(serviceName); + + switch (k8sDaemonsetGraphParams.getDirection()) { + case K8sDaemonsetGraphParams.DIRECTION_INCOMING: + addCompleteProcessId(completeProcessId, serviceName); + // addEdge (from other tools or from other services using ip address) + break; + case K8sDaemonsetGraphParams.DIRECTION_OUTGOING: + addEdge(completeProcessId, serviceName); + break; + default: + break; + } + + break; + default: + throw new RuntimeException("Unknown type: " + svcToSvcGraphParams.getType()); + } + } + + private void addCompleteProcessId(String completeProcessId, String serviceName) { + if (mapCompleteProcessIdToServiceNames.containsKey(completeProcessId)) { + String existingServiceName = mapCompleteProcessIdToServiceNames.get(completeProcessId); + if (existingServiceName.equalsIgnoreCase(serviceName)) { + return; + } else { + // this is strange. Process id is same but service name is different + } + } + + mapCompleteProcessIdToServiceNames.put(completeProcessId, serviceName); + + } + + private void addEdge(String completeProcessId, String serviceName) { + String source = mapCompleteProcessIdToServiceNames.get(completeProcessId); + if (source == null) return; + + String target = serviceName; + + boolean isNewEdge = addToMap(edges, source, target); + + if (isNewEdge) { + addToMap(newEdges, source, target); + } + } + + private boolean addNode(String serviceName) { + boolean isNewNode = nodes.add(serviceName); + + if (isNewNode) { + newNodes.add(serviceName); + return true; + } + + return false; + } + + private List updateWithNewEdgesAndReturnDelta(List incrEdges) { + List updates = new ArrayList<>(); + for (SvcToSvcGraphEdge svcToSvcGraphEdge : incrEdges) { + String newSrc = svcToSvcGraphEdge.getSource(); + String newTgt = svcToSvcGraphEdge.getTarget(); + boolean isNewEdge = addToMap(edges, newSrc, newTgt); + + if (!isNewEdge) { + continue; + } + + if (newEdges.containsKey(newSrc)) { + newEdges.get(newSrc).remove(newTgt); + continue; + } + + } + + for (String src: newEdges.keySet()) { + Set targets = newEdges.get(src); + for (String tgt: targets) { + SvcToSvcGraphEdge svcToSvcGraphEdge = SvcToSvcGraphEdge.createFromK8s(src, tgt); + updates.add(svcToSvcGraphEdge); + } + } + + return updates; + } + + private List updateWithNewNodesAndReturnDelta(List incrNodes) { + List updates = new ArrayList<>(); + for (String node: newNodes) { + boolean isNewNode = addNode(node); + + if (!isNewNode) { + continue; + } + + newNodes.remove(node); + + } + + for (String node: newNodes) { + SvcToSvcGraphNode svcToSvcGraphNode = SvcToSvcGraphNode.createFromK8s(node); + updates.add(svcToSvcGraphNode); + } + + return updates; + } + + private void resetWithNewTs(int fetchFromDbTs) { + newEdges.clear(); + newNodes.clear(); + this.lastFetchFromDb = fetchFromDbTs; + } + + public SvcToSvcGraph updateWithNewDataAndReturnDelta(DataActor dataActor) { + int now = Context.now(); + + + List updateEdges = updateWithNewEdgesAndReturnDelta(dataActor.findAllSvcToSvcGraphEdges(lastFetchFromDb, now)); + List updateNodes = updateWithNewNodesAndReturnDelta(dataActor.findAllSvcToSvcGraphNodes(lastFetchFromDb, now)); + resetWithNewTs(now); + + dataActor.updateNewEdgesInBatches(updateEdges); + dataActor.updateNewNodesInBatches(updateNodes); + + return new SvcToSvcGraph(updateEdges, updateNodes, now); + } + +} diff --git a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java index de44909b9e..13e42670b0 100644 --- a/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java +++ b/libs/utils/src/main/java/com/akto/runtime/parser/SampleParser.java @@ -12,6 +12,8 @@ import com.akto.dto.HttpRequestParams; import com.akto.dto.HttpResponseParams; import com.akto.dto.OriginalHttpRequest; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraphParams; import com.akto.dto.TrafficProducerLog; import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; @@ -77,6 +79,22 @@ public static HttpResponseParams parseSampleMessage(String message) throws Excep boolean isPending = !isPendingStr.toLowerCase().equals("false"); String sourceStr = (String) json.getOrDefault("source", HttpResponseParams.Source.OTHER.name()); HttpResponseParams.Source source = HttpResponseParams.Source.valueOf(sourceStr); + + String enableGraph = (String) json.getOrDefault("enable_graph", "false"); + SvcToSvcGraphParams graphParams = null; + if (enableGraph.equals("true")) { + List hostNameList = requestHeaders.getOrDefault("host", requestHeaders.getOrDefault(":authority", new ArrayList<>())); + if (hostNameList != null && hostNameList.size()>0) { + String processId = (String) json.get("process_id"); + String socketId = (String) json.get("socket_id"); + String daemonsetId = (String) json.get("daemonset_id"); + String hostname = hostNameList.get(0); + if (hostname.charAt(0) >= 'a' && hostname.charAt(0) <= 'z') { + graphParams = new K8sDaemonsetGraphParams(hostNameList.get(0), processId, socketId, daemonsetId, direction); + } + } + + } // JSON string of K8 POD tags String tags = (String) json.getOrDefault("tag", ""); @@ -86,7 +104,7 @@ public static HttpResponseParams parseSampleMessage(String message) throws Excep } return new HttpResponseParams( - type,statusCode, status, responseHeaders, payload, requestParams, time, accountId, isPending, source, message, sourceIP, destIP, direction, tags + type,statusCode, status, responseHeaders, payload, requestParams, time, accountId, isPending, source, message, sourceIP, destIP, direction, graphParams, tags ); } diff --git a/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java b/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java new file mode 100644 index 0000000000..c25032cfaa --- /dev/null +++ b/libs/utils/src/test/java/com/akto/runtime/TestSvcToSvcGraph.java @@ -0,0 +1,53 @@ +package com.akto.runtime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.Test; + +import com.akto.MongoBasedTest; +import com.akto.data_actor.DataActor; +import com.akto.data_actor.DataActorFactory; +import com.akto.dto.graph.K8sDaemonsetGraphParams; +import com.akto.dto.graph.SvcToSvcGraph; +import com.akto.dto.graph.SvcToSvcGraphParams; + +public class TestSvcToSvcGraph extends MongoBasedTest { + + + private SvcToSvcGraphParams in(String serviceName) { + return new K8sDaemonsetGraphParams(serviceName, serviceName, "1324", "some_id", "1"); + } + + private SvcToSvcGraphParams out(String sourceServiceName, String targetServiceName) { + return new K8sDaemonsetGraphParams(targetServiceName, sourceServiceName, "1324", "some_id", "2"); + } + + @Test + public void testAlgo() { + DataActor dataActor = DataActorFactory.fetchInstance(); + SvcToSvcGraphManager svcToSvcGraphManager = SvcToSvcGraphManager.createFromEdgesAndNodes(dataActor); + + + svcToSvcGraphManager.processRecord(in("api.gateway.com")); + svcToSvcGraphManager.processRecord(in("api.details.com")); + svcToSvcGraphManager.processRecord(in("api.reviews.com")); + svcToSvcGraphManager.processRecord(in("api.google.com")); + svcToSvcGraphManager.processRecord(out("api.gateway.com", "api.details.com")); + svcToSvcGraphManager.processRecord(out("api.gateway.com", "api.reviews.com")); + svcToSvcGraphManager.processRecord(out("api.reviews.com", "api.google.com")); + + SvcToSvcGraph changes = svcToSvcGraphManager.updateWithNewDataAndReturnDelta(dataActor); + + changes.getNodes().forEach(node -> { + System.out.println("Node: " + node.getId()); + }); + + changes.getEdges().forEach(edge -> { + System.out.println("Edge: " + edge.getSource() + " -> " + edge.getTarget()); + }); + + assertEquals(4, changes.getNodes().size()); + assertEquals(3, changes.getEdges().size()); + } + +}