diff --git a/apps/dashboard/src/main/java/com/akto/action/test_editor/SaveTestEditorAction.java b/apps/dashboard/src/main/java/com/akto/action/test_editor/SaveTestEditorAction.java index 5a00679603..2e5739a776 100644 --- a/apps/dashboard/src/main/java/com/akto/action/test_editor/SaveTestEditorAction.java +++ b/apps/dashboard/src/main/java/com/akto/action/test_editor/SaveTestEditorAction.java @@ -16,6 +16,7 @@ import com.akto.dao.testing.TestingRunResultDao; import com.akto.dto.AccountSettings; import com.akto.dto.ApiInfo; +import com.akto.dto.ApiInfo.ApiInfoKey; import com.akto.dto.CustomAuthType; import com.akto.dto.User; import com.akto.dto.test_editor.Category; @@ -45,6 +46,7 @@ import com.akto.store.TestingUtil; import com.akto.test_editor.execution.VariableResolver; import com.akto.testing.TestExecutor; +import com.akto.testing.Utils; import com.akto.util.Constants; import com.akto.util.enums.GlobalEnums; import com.akto.util.enums.GlobalEnums.Severity; @@ -334,7 +336,11 @@ public String runTestForGivenTemplate() { int lastSampleIndex = sampleDataList.get(0).getSamples().size() - 1; TestingRunConfig testingRunConfig = new TestingRunConfig(); - testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs); + List samples = testingUtil.getSampleMessages().get(infoKey); + TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(null, infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null); + if(testingRunResult == null){ + testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs, samples.get(samples.size() - 1)); + } if (testingRunResult == null) { testingRunResult = new TestingRunResult( new ObjectId(), infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory() ,Collections.singletonList(new TestResult(null, sampleDataList.get(0).getSamples().get(lastSampleIndex), @@ -355,6 +361,7 @@ public String runTestForGivenTemplate() { subCategoryMap.put(testConfig.getId(), infoObj); List runResults = new ArrayList<>(); + this.testingRunResult = testingRunResult; for (GenericTestResult testResult: this.testingRunResult.getTestResults()) { if (testResult instanceof TestResult) { diff --git a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/components/layouts/header/Headers.js b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/components/layouts/header/Headers.js index bf25f6f4a8..9cb53d2392 100644 --- a/apps/dashboard/web/polaris_web/web/src/apps/dashboard/components/layouts/header/Headers.js +++ b/apps/dashboard/web/polaris_web/web/src/apps/dashboard/components/layouts/header/Headers.js @@ -213,7 +213,7 @@ export default function Header() { {(Object.keys(currentTestsObj).length > 0 && currentTestsObj?.testRunsArr?.length !== 0 && currentTestsObj?.totalTestsCompleted > 0) ? Test run status diff --git a/apps/testing-cli/src/main/java/com/akto/testing_cli/Main.java b/apps/testing-cli/src/main/java/com/akto/testing_cli/Main.java index 22d6c442d1..e382cf5649 100644 --- a/apps/testing-cli/src/main/java/com/akto/testing_cli/Main.java +++ b/apps/testing-cli/src/main/java/com/akto/testing_cli/Main.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import com.akto.DaoInit; +import com.akto.dao.context.Context; import com.akto.dao.test_editor.TestConfigYamlParser; import com.akto.dao.test_editor.TestEditorEnums.ContextOperator; import com.akto.dto.AccountSettings; @@ -40,6 +41,7 @@ import com.akto.store.TestingUtil; import com.akto.testing.ApiExecutor; import com.akto.testing.TestExecutor; +import com.akto.testing.Utils; import com.akto.util.ColorConstants; import com.akto.util.VersionUtil; @@ -317,11 +319,13 @@ public static void main(String[] args) { for (String testSubCategory : testingRunConfig.getTestSubCategoryList()) { TestConfig testConfig = testConfigMap.get(testSubCategory); for (ApiInfo.ApiInfoKey it : apiInfoKeys) { - TestingRunResult testingRunResult = null; try { - testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig, - testingRunConfig, false, new ArrayList<>()); + List samples = testingUtil.getSampleMessages().get(it); + testingRunResult = Utils.generateFailedRunResultForMessage(null, it, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null); + if(testingRunResult == null){ + testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig, null, false, new ArrayList<>(), samples.get(samples.size() - 1)); + } } catch (Exception e) { e.printStackTrace(); } diff --git a/apps/testing/src/main/java/com/akto/testing/Main.java b/apps/testing/src/main/java/com/akto/testing/Main.java index 601778e03f..6414c4e31c 100644 --- a/apps/testing/src/main/java/com/akto/testing/Main.java +++ b/apps/testing/src/main/java/com/akto/testing/Main.java @@ -4,7 +4,6 @@ import com.akto.billing.UsageMetricUtils; import com.akto.crons.GetRunningTestsStatus; import com.akto.dao.*; -import com.akto.dao.billing.OrganizationsDao; import com.akto.dao.context.Context; import com.akto.dao.testing.TestingRunConfigDao; import com.akto.dao.testing.TestingRunDao; @@ -16,7 +15,6 @@ import com.akto.dto.billing.SyncLimit; import com.akto.dto.test_run_findings.TestingRunIssues; import com.akto.dto.*; -import com.akto.dto.billing.Organization; import com.akto.dto.testing.*; import com.akto.dto.testing.TestingEndpoints.Operator; import com.akto.dto.testing.TestingRun.State; @@ -29,14 +27,16 @@ import com.akto.log.LoggerMaker; import com.akto.log.LoggerMaker.LogDb; import com.akto.mixpanel.AktoMixpanel; -import com.akto.usage.UsageMetricHandler; import com.akto.notifications.slack.APITestStatusAlert; +import com.akto.notifications.slack.CustomTextAlert; import com.akto.notifications.slack.NewIssuesModel; import com.akto.notifications.slack.SlackAlerts; import com.akto.notifications.slack.SlackSender; import com.akto.rules.RequiredConfigs; import com.akto.task.Cluster; import com.akto.test_editor.execution.Executor; +import com.akto.testing.kafka_utils.ConsumerUtil; +import com.akto.testing.kafka_utils.Producer; import com.akto.util.AccountTask; import com.akto.util.Constants; import com.akto.util.DashboardMode; @@ -51,6 +51,7 @@ import com.mongodb.WriteConcern; import com.mongodb.client.model.*; import com.mongodb.client.result.DeleteResult; +import com.slack.api.Slack; import org.bson.Document; import org.bson.conversions.Bson; @@ -58,6 +59,9 @@ import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import static com.akto.testing.Utils.readJsonContentFromFile; import java.util.*; import java.util.concurrent.Executors; @@ -74,12 +78,12 @@ public class Main { public static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); public static final ScheduledExecutorService deleteScheduler = Executors.newScheduledThreadPool(1); - public static final ScheduledExecutorService testTelemetryScheduler = Executors.newScheduledThreadPool(2); - public static final ScheduledExecutorService schedulerAccessMatrix = Executors.newScheduledThreadPool(2); public static boolean SKIP_SSRF_CHECK = ("true".equalsIgnoreCase(System.getenv("SKIP_SSRF_CHECK")) || !DashboardMode.isSaasDeployment()); public static final boolean IS_SAAS = "true".equalsIgnoreCase(System.getenv("IS_SAAS")); + public static final String AKTO_SLACK_WEBHOOK = System.getenv("AKTO_SLACK_WEBHOOK"); + public static final Slack SLACK_INSTANCE = Slack.getInstance(); private static Map emptyCountIssuesMap = new HashMap<>(); @@ -241,6 +245,39 @@ private static void setTestingRunConfig(TestingRun testingRun, TestingRunResultS } } + private static BasicDBObject checkIfAlreadyTestIsRunningOnMachine(){ + // this will return true if consumer is running and this the latest summary of the testing run + // and also the summary should be in running state + try { + BasicDBObject currentTestInfo = readJsonContentFromFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, BasicDBObject.class); + if(currentTestInfo == null){ + return null; + } + if(!currentTestInfo.getBoolean("CONSUMER_RUNNING", false)){ + return null; + } + String testingRunId = currentTestInfo.getString("testingRunId"); + String testingRunSummaryId = currentTestInfo.getString("summaryId"); + + int accountID = currentTestInfo.getInt("accountId"); + Context.accountId.set(accountID); + + TestingRunResultSummary testingRunResultSummary = TestingRunResultSummariesDao.instance.findOne(Filters.eq(Constants.ID, new ObjectId(testingRunSummaryId)), Projections.include(TestingRunResultSummary.STATE)); + if(testingRunResultSummary == null || testingRunResultSummary.getState() == null || testingRunResultSummary.getState() != State.RUNNING){ + return null; + } + + TestingRunResultSummary latestSummary = TestingRunResultSummariesDao.instance.findLatestOne(Filters.eq(TestingRunResultSummary.TESTING_RUN_ID, new ObjectId(testingRunId))); + if(latestSummary.getHexId().equals(testingRunSummaryId)){ + return currentTestInfo; + }else{ + return null; + } + } catch (Exception e) { + logger.error("Error in reading the testing state file: " + e.getMessage()); + return null; + } + } // Runnable task for monitoring memory static class MemoryMonitorTask implements Runnable { @@ -309,6 +346,66 @@ public static void main(String[] args) throws InterruptedException { loggerMaker.infoAndAddToDb("Starting.......", LogDb.TESTING); + Producer testingProducer = new Producer(); + ConsumerUtil testingConsumer = new ConsumerUtil(); + TestCompletion testCompletion = new TestCompletion(); + if(Constants.IS_NEW_TESTING_ENABLED){ + testingConsumer.initializeConsumer(); + } + + // read from files here and then see if we want to init the Producer and run the consumer + // if producer is running, then we can skip the check and let the default testing pick up the job + + BasicDBObject currentTestInfo = null; + if(Constants.IS_NEW_TESTING_ENABLED){ + currentTestInfo = checkIfAlreadyTestIsRunningOnMachine(); + } + + if(currentTestInfo != null){ + try { + int accountId = Context.accountId.get(); + loggerMaker.infoAndAddToDb("Tests were already running on this machine, thus resuming the test for account: "+ accountId, LogDb.TESTING); + FeatureAccess featureAccess = UsageMetricUtils.getFeatureAccess(accountId, MetricTypes.TEST_RUNS); + + + String testingRunId = currentTestInfo.getString("testingRunId"); + String testingRunSummaryId = currentTestInfo.getString("summaryId"); + TestingRun testingRun = TestingRunDao.instance.findOne(Filters.eq(Constants.ID, new ObjectId(testingRunId))); + TestingRunConfig baseConfig = TestingRunConfigDao.instance.findOne(Constants.ID, testingRun.getTestIdConfig()); + testingRun.setTestingRunConfig(baseConfig); + ObjectId summaryId = new ObjectId(testingRunSummaryId); + testingProducer.initProducer(testingRun, summaryId, featureAccess.fetchSyncLimit(), true); + int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime(); + testingConsumer.init(maxRunTime); + + // mark the test completed here + testCompletion.markTestAsCompleteAndRunFunctions(testingRun, summaryId); + + if (StringUtils.hasLength(AKTO_SLACK_WEBHOOK) ) { + try { + CustomTextAlert customTextAlert = new CustomTextAlert("Test completed for accountId=" + accountId + " testingRun=" + testingRun.getHexId() + " summaryId=" + summaryId.toHexString() + " : @Arjun you are up now. Make your time worth it. :)"); + SLACK_INSTANCE.send(AKTO_SLACK_WEBHOOK, customTextAlert.toJson()); + } catch (Exception e) { + logger.error("Error sending slack alert for completion of test", e); + } + + } + + deleteScheduler.execute(() -> { + Context.accountId.set(accountId); + try { + deleteNonVulnerableResults(); + + } catch (Exception e) { + loggerMaker.errorAndAddToDb(e, "Error in deleting testing run results"); + } + }); + } catch (Exception e) { + logger.error("Error in running failed tests from file.", e); + } + } + + schedulerAccessMatrix.scheduleAtFixedRate(new Runnable() { public void run() { AccountTask.instance.executeTaskForNonHybridAccounts(account -> { @@ -327,6 +424,12 @@ public void run() { loggerMaker.infoAndAddToDb("os.arch: " + System.getProperty("os.arch"), LogDb.TESTING); loggerMaker.infoAndAddToDb("os.version: " + System.getProperty("os.version"), LogDb.TESTING); + // create /testing-info folder in the memory from here + if(Constants.IS_NEW_TESTING_ENABLED){ + boolean val = Utils.createFolder(Constants.TESTING_STATE_FOLDER_PATH); + logger.info("Testing info folder status: " + val); + } + SingleTypeInfo.init(); while (true) { AccountTask.instance.executeTaskForNonHybridAccounts(account -> { @@ -394,9 +497,6 @@ public void run() { } SyncLimit syncLimit = featureAccess.fetchSyncLimit(); - // saving the initial usageLeft, to calc delta later. - int usageLeft = syncLimit.getUsageLeft(); - /* * Since the role cache is static * so to prevent it from being shared across accounts. @@ -536,7 +636,6 @@ public void run() { summaryId = trrs.getId(); } - TestExecutor testExecutor = new TestExecutor(); if (trrs.getState() == State.SCHEDULED) { if (trrs.getMetadata()!= null && trrs.getMetadata().containsKey("pull_request_id") && trrs.getMetadata().containsKey("commit_sha_head") ) { //case of github status push @@ -545,46 +644,38 @@ public void run() { } } RequiredConfigs.initiate(); + int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime(); + + if (StringUtils.hasLength(AKTO_SLACK_WEBHOOK) ) { + CustomTextAlert customTextAlert = new CustomTextAlert("Test started: accountId=" + Context.accountId.get() + " testingRun=" + testingRun.getHexId() + " summaryId=" + summaryId.toHexString() + " time=" + maxRunTime); + SLACK_INSTANCE.send(AKTO_SLACK_WEBHOOK, customTextAlert.toJson()); + } + if(!maxRetriesReached){ - testExecutor.init(testingRun, summaryId, syncLimit); - raiseMixpanelEvent(summaryId, testingRun, accountId); + // init producer and the consumer here + // producer for testing is currently calls init functions from test-executor + if(Constants.IS_NEW_TESTING_ENABLED){ + testingProducer.initProducer(testingRun, summaryId, syncLimit, false); + testingConsumer.init(maxRunTime); + }else{ + TestExecutor testExecutor = new TestExecutor(); + testExecutor.init(testingRun, summaryId, syncLimit, false); + } } } catch (Exception e) { loggerMaker.errorAndAddToDb(e, "Error in init " + e); } - Bson completedUpdate = Updates.combine( - Updates.set(TestingRun.STATE, TestingRun.State.COMPLETED), - Updates.set(TestingRun.END_TIMESTAMP, Context.now()) - ); - - if (testingRun.getPeriodInSeconds() > 0 ) { - completedUpdate = Updates.combine( - Updates.set(TestingRun.STATE, TestingRun.State.SCHEDULED), - Updates.set(TestingRun.END_TIMESTAMP, Context.now()), - Updates.set(TestingRun.SCHEDULE_TIMESTAMP, testingRun.getScheduleTimestamp() + testingRun.getPeriodInSeconds()) - ); - } else if (testingRun.getPeriodInSeconds() == -1) { - completedUpdate = Updates.combine( - Updates.set(TestingRun.STATE, TestingRun.State.SCHEDULED), - Updates.set(TestingRun.END_TIMESTAMP, Context.now()), - Updates.set(TestingRun.SCHEDULE_TIMESTAMP, testingRun.getScheduleTimestamp() + 5 * 60) - ); - } - - if(GetRunningTestsStatus.getRunningTests().isTestRunning(testingRun.getId())){ - loggerMaker.infoAndAddToDb("Updating status of running test to Completed."); - TestingRunDao.instance.getMCollection().withWriteConcern(writeConcern).findOneAndUpdate( - Filters.eq("_id", testingRun.getId()), completedUpdate - ); - } - - if(summaryId != null && testingRun.getTestIdConfig() != 1){ - TestExecutor.updateTestSummary(summaryId); + testCompletion.markTestAsCompleteAndRunFunctions(testingRun, summaryId); + if (StringUtils.hasLength(AKTO_SLACK_WEBHOOK) ) { + try { + CustomTextAlert customTextAlert = new CustomTextAlert("Test completed for accountId=" + accountId + " testingRun=" + testingRun.getHexId() + " summaryId=" + summaryId.toHexString() + " : @Arjun you are up now. Make your time worth it. :)"); + SLACK_INSTANCE.send(AKTO_SLACK_WEBHOOK, customTextAlert.toJson()); + } catch (Exception e) { + logger.error("Error sending slack alert for completion of test", e); + } + } - - loggerMaker.infoAndAddToDb("Tests completed in " + (Context.now() - start) + " seconds for account: " + accountId, LogDb.TESTING); - /* * In case the testing run results start overflowing * due to being a capped collection, @@ -602,32 +693,6 @@ public void run() { } }); - Organization organization = OrganizationsDao.instance.findOne( - Filters.in(Organization.ACCOUNTS, Context.accountId.get())); - - if(organization != null && organization.getTestTelemetryEnabled()){ - loggerMaker.infoAndAddToDb("Test telemetry enabled for account: " + accountId + ", sending results", LogDb.TESTING); - ObjectId finalSummaryId = summaryId; - testTelemetryScheduler.execute(() -> { - Context.accountId.set(accountId); - try { - com.akto.onprem.Constants.sendTestResults(finalSummaryId, organization); - loggerMaker.infoAndAddToDb("Test telemetry sent for account: " + accountId, LogDb.TESTING); - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "Error in sending test telemetry for account: " + accountId); - } - }); - } else { - loggerMaker.infoAndAddToDb("Test telemetry disabled for account: " + accountId, LogDb.TESTING); - } - - // update usage after test is completed. - int deltaUsage = 0; - if(syncLimit.checkLimit){ - deltaUsage = usageLeft - syncLimit.getUsageLeft(); - } - - UsageMetricHandler.calcAndFetchFeatureAccessUsingDeltaUsage(MetricTypes.TEST_RUNS, accountId, deltaUsage); }, "testing"); Thread.sleep(1000); diff --git a/apps/testing/src/main/java/com/akto/testing/TestCompletion.java b/apps/testing/src/main/java/com/akto/testing/TestCompletion.java new file mode 100644 index 0000000000..ba64e19eb8 --- /dev/null +++ b/apps/testing/src/main/java/com/akto/testing/TestCompletion.java @@ -0,0 +1,94 @@ +package com.akto.testing; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.bson.conversions.Bson; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.akto.billing.UsageMetricUtils; +import com.akto.crons.GetRunningTestsStatus; +import com.akto.dao.billing.OrganizationsDao; +import com.akto.dao.context.Context; +import com.akto.dao.testing.TestingRunDao; +import com.akto.dto.billing.FeatureAccess; +import com.akto.dto.billing.Organization; +import com.akto.dto.billing.SyncLimit; +import com.akto.dto.testing.TestingRun; +import com.akto.dto.usage.MetricTypes; +import com.akto.usage.UsageMetricHandler; +import com.mongodb.WriteConcern; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Updates; + +public class TestCompletion { + + private static final Logger logger = LoggerFactory.getLogger(TestCompletion.class); + public static final ScheduledExecutorService testTelemetryScheduler = Executors.newScheduledThreadPool(2); + + public void markTestAsCompleteAndRunFunctions(TestingRun testingRun, ObjectId summaryId){ + Bson completedUpdate = Updates.combine( + Updates.set(TestingRun.STATE, TestingRun.State.COMPLETED), + Updates.set(TestingRun.END_TIMESTAMP, Context.now()) + ); + + if (testingRun.getPeriodInSeconds() > 0 ) { + completedUpdate = Updates.combine( + Updates.set(TestingRun.STATE, TestingRun.State.SCHEDULED), + Updates.set(TestingRun.END_TIMESTAMP, Context.now()), + Updates.set(TestingRun.SCHEDULE_TIMESTAMP, testingRun.getScheduleTimestamp() + testingRun.getPeriodInSeconds()) + ); + } else if (testingRun.getPeriodInSeconds() == -1) { + completedUpdate = Updates.combine( + Updates.set(TestingRun.STATE, TestingRun.State.SCHEDULED), + Updates.set(TestingRun.END_TIMESTAMP, Context.now()), + Updates.set(TestingRun.SCHEDULE_TIMESTAMP, testingRun.getScheduleTimestamp() + 5 * 60) + ); + } + + if(GetRunningTestsStatus.getRunningTests().isTestRunning(testingRun.getId())){ + TestingRunDao.instance.getMCollection().withWriteConcern(WriteConcern.W1).findOneAndUpdate( + Filters.eq("_id", testingRun.getId()), completedUpdate + ); + } + + if(summaryId != null && testingRun.getTestIdConfig() != 1){ + TestExecutor.updateTestSummary(summaryId); + } + + int accountId = Context.accountId.get(); + + Organization organization = OrganizationsDao.instance.findOne( + Filters.in(Organization.ACCOUNTS, accountId)); + + FeatureAccess featureAccess = UsageMetricUtils.getFeatureAccess(accountId, MetricTypes.TEST_RUNS); + SyncLimit syncLimit = featureAccess.fetchSyncLimit(); + int usageLeft = syncLimit.getUsageLeft(); + + if(organization != null && organization.getTestTelemetryEnabled()){ + logger.info("Test telemetry enabled for account: " + accountId + ", sending results"); + ObjectId finalSummaryId = summaryId; + testTelemetryScheduler.execute(() -> { + Context.accountId.set(accountId); + try { + com.akto.onprem.Constants.sendTestResults(finalSummaryId, organization); + logger.info("Test telemetry sent for account: " + accountId); + } catch (Exception e) { + logger.error("Error in sending test telemetry for account: " + accountId + " " + e.getMessage()); + } + }); + } else { + logger.info("Test telemetry disabled for account: " + accountId); + } + + // update usage after test is completed. + int deltaUsage = 0; + if(syncLimit.checkLimit){ + deltaUsage = usageLeft - syncLimit.getUsageLeft(); + } + + UsageMetricHandler.calcAndFetchFeatureAccessUsingDeltaUsage(MetricTypes.TEST_RUNS, accountId, deltaUsage); + } +} diff --git a/apps/testing/src/main/java/com/akto/testing/TestExecutor.java b/apps/testing/src/main/java/com/akto/testing/TestExecutor.java index 687186c55f..399867d9ae 100644 --- a/apps/testing/src/main/java/com/akto/testing/TestExecutor.java +++ b/apps/testing/src/main/java/com/akto/testing/TestExecutor.java @@ -31,6 +31,7 @@ import com.akto.dto.testing.TestResult.Confidence; import com.akto.dto.testing.TestResult.TestError; import com.akto.dto.testing.TestingRun.State; +import com.akto.dto.testing.info.SingleTestPayload; import com.akto.dto.type.RequestTemplate; import com.akto.dto.type.SingleTypeInfo; import com.akto.dto.type.URLMethods; @@ -45,6 +46,8 @@ import com.akto.test_editor.execution.Executor; import com.akto.test_editor.execution.VariableResolver; import com.akto.test_editor.filter.data_operands_impl.ValidationResult; +import com.akto.testing.kafka_utils.TestingConfigurations; +import com.akto.testing.kafka_utils.Producer; import com.akto.testing.yaml_tests.YamlTestTemplate; import com.akto.testing_issues.TestingIssuesHandler; import com.akto.usage.UsageMetricCalculator; @@ -67,11 +70,13 @@ import org.slf4j.LoggerFactory; import static com.akto.test_editor.execution.Build.modifyRequest; +import static com.akto.testing.Utils.writeJsonContentInFile; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; public class TestExecutor { @@ -81,7 +86,6 @@ public class TestExecutor { public static long acceptableSizeInBytes = 5_000_000; private static final Gson gson = new Gson(); - private static Map> requestRestrictionMap = new ConcurrentHashMap<>(); public static final String REQUEST_HOUR = "requestHour"; public static final String COUNT = "count"; public static final int ALLOWED_REQUEST_PER_HOUR = 100; @@ -92,9 +96,9 @@ public static synchronized void setExpiryTimeOfAuthToken(int newExpiryTime) { expiryTimeOfAuthToken = newExpiryTime; } - public void init(TestingRun testingRun, ObjectId summaryId, SyncLimit syncLimit) { + public void init(TestingRun testingRun, ObjectId summaryId, SyncLimit syncLimit, boolean shouldInitOnly) { if (testingRun.getTestIdConfig() != 1) { - apiWiseInit(testingRun, summaryId, false, new ArrayList<>(), syncLimit); + apiWiseInit(testingRun, summaryId, false, new ArrayList<>(), syncLimit, shouldInitOnly); } else { workflowInit(testingRun, summaryId, false, new ArrayList<>()); } @@ -147,10 +151,21 @@ public void workflowInit (TestingRun testingRun, ObjectId summaryId, boolean deb ); } - public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug, List testLogs, SyncLimit syncLimit) { + public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug, List testLogs, SyncLimit syncLimit, boolean shouldInitOnly) { + + // write producer running here as producer has been initiated now int accountId = Context.accountId.get(); - int now = Context.now(); - int maxConcurrentRequests = testingRun.getMaxConcurrentRequests() > 0 ? Math.min( testingRun.getMaxConcurrentRequests(), 100) : 10; + + BasicDBObject dbObject = new BasicDBObject(); + if(!shouldInitOnly && Constants.IS_NEW_TESTING_ENABLED){ + dbObject.put("PRODUCER_RUNNING", true); + dbObject.put("CONSUMER_RUNNING", false); + dbObject.put("accountId", accountId); + dbObject.put("summaryId", summaryId.toHexString()); + dbObject.put("testingRunId", testingRun.getId().toHexString()); + writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, dbObject); + } + TestingEndpoints testingEndpoints = testingRun.getTestingEndpoints(); if (testingRun.getTestingRunConfig() != null) { @@ -236,17 +251,8 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug loggerMaker.infoAndAddToDb("StatusCodeAnalyser result = " + StatusCodeAnalyser.result, LogDb.TESTING); loggerMaker.infoAndAddToDb("StatusCodeAnalyser defaultPayloadsMap = " + StatusCodeAnalyser.defaultPayloadsMap, LogDb.TESTING); - CountDownLatch latch = new CountDownLatch(apiInfoKeyList.size()); - ExecutorService threadPool = Executors.newFixedThreadPool(maxConcurrentRequests); - List> futureTestingRunResults = new ArrayList<>(); - Map hostsToApiCollectionMap = new HashMap<>(); - ConcurrentHashMap subCategoryEndpointMap = new ConcurrentHashMap<>(); Map apiInfoKeyToHostMap = new HashMap<>(); - String hostName; - - loggerMaker.infoAndAddToDb("Started filling hostname map with categories at :" + Context.now()); - int timeNow = Context.now(); // for (String testSubCategory: testingRunSubCategories) { // TestConfig testConfig = testConfigMap.get(testSubCategory); // if (testConfig == null || testConfig.getStrategy() == null || testConfig.getStrategy().getRunOnce() == null) { @@ -268,46 +274,156 @@ public void apiWiseInit(TestingRun testingRun, ObjectId summaryId, boolean debug // } // } // } - loggerMaker.infoAndAddToDb("Completed filling hostname map with categories in :" + (Context.now() - timeNow)); - final int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime(); // if nothing specified wait for 30 minutes + // init the singleton class here + TestingConfigurations.getInstance().init(testingUtil, testingRun.getTestingRunConfig(), debug, testConfigMap); - for (ApiInfo.ApiInfoKey apiInfoKey: apiInfoKeyList) { - try { - Future future = threadPool.submit( - () -> startWithLatch(apiInfoKey, - testingRun.getTestIdConfig(), - testingRun.getId(), testingRun.getTestingRunConfig(), testingUtil, summaryId, - accountId, latch, now, maxRunTime, testConfigMap, testingRun, subCategoryEndpointMap, - apiInfoKeyToHostMap, debug, testLogs, syncLimit, authMechanism)); - futureTestingRunResults.add(future); - } catch (Exception e) { - loggerMaker.errorAndAddToDb("Error in API " + apiInfoKey + " : " + e.getMessage(), LogDb.TESTING); + if(!shouldInitOnly){ + int maxThreads = Math.min(testingRunSubCategories.size(), 1000); + if(maxThreads == 0){ + loggerMaker.infoAndAddToDb("Subcategories list are empty"); + return; + } + + if(!Constants.IS_NEW_TESTING_ENABLED){ + maxThreads = Math.min(100, Math.max(10, testingRun.getMaxConcurrentRequests())); } - } - loggerMaker.infoAndAddToDb("hostsToApiCollectionMap : " + hostsToApiCollectionMap.keySet(), LogDb.TESTING); - loggerMaker.infoAndAddToDb("Waiting...", LogDb.TESTING); + List> testingRecords = new ArrayList<>(); + ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads); + + // create count down latch to know when inserting kafka records are completed. + CountDownLatch latch = new CountDownLatch(apiInfoKeyList.size()); + int tempRunTime = 10 * 60; + if(!Constants.IS_NEW_TESTING_ENABLED){ + tempRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime(); + } - try { - boolean awaitResult = latch.await(maxRunTime, TimeUnit.SECONDS); - loggerMaker.infoAndAddToDb("Await result: " + awaitResult, LogDb.TESTING); + final int maxRunTime = tempRunTime; + for (ApiInfo.ApiInfoKey apiInfoKey: apiInfoKeyList) { + List messages = testingUtil.getSampleMessages().get(apiInfoKey); + if(Constants.IS_NEW_TESTING_ENABLED){ + for (String testSubCategory: testingRunSubCategories) { + Future future = threadPool.submit(() ->insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, new AtomicBoolean(false))); + testingRecords.add(future); + } + latch.countDown(); + } + else{ + Future future = threadPool.submit(() -> startWithLatch(testingRunSubCategories, accountId, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, latch)); + testingRecords.add(future); + } + } + + + try { + boolean awaitResult = latch.await(maxRunTime, TimeUnit.SECONDS); + + if (!awaitResult) { // latch countdown didn't reach 0 + for (Future future : testingRecords) { + future.cancel(true); + } + loggerMaker.infoAndAddToDb("Canceled all running future tasks due to timeout.", LogDb.TESTING); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + if(!shouldInitOnly && Constants.IS_NEW_TESTING_ENABLED){ + dbObject.put("PRODUCER_RUNNING", false); + dbObject.put("CONSUMER_RUNNING", true); + writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, dbObject); + loggerMaker.infoAndAddToDb("Finished inserting records in kafka", LogDb.TESTING); + } + } + + } - if (!awaitResult) { // latch countdown didn't reach 0 - for (Future future : futureTestingRunResults) { - future.cancel(true); + private Void startWithLatch(List testingRunSubCategories,int accountId,ApiInfo.ApiInfoKey apiInfoKey, + List messages, ObjectId summaryId, SyncLimit syncLimit, Map apiInfoKeyToHostMap, + ConcurrentHashMap subCategoryEndpointMap, Map testConfigMap, + List testLogs, TestingRun testingRun, CountDownLatch latch){ + + Context.accountId.set(accountId); + AtomicBoolean isApiInfoTested = new AtomicBoolean(false); + for (String testSubCategory: testingRunSubCategories) { + loggerMaker.infoAndAddToDb("Trying to run test for category: " + testSubCategory + " with summary state: " + GetRunningTestsStatus.getRunningTests().getCurrentState(summaryId) ); + if(GetRunningTestsStatus.getRunningTests().isTestRunning(summaryId, true)){ + insertRecordInKafka(accountId, testSubCategory, apiInfoKey, messages, summaryId, syncLimit, apiInfoKeyToHostMap, subCategoryEndpointMap, testConfigMap, testLogs, testingRun, isApiInfoTested); + }else{ + logger.info("Test stopped for id: " + testingRun.getHexId()); + break; } - loggerMaker.infoAndAddToDb("Canceled all running future tasks due to timeout.", LogDb.TESTING); } + if(isApiInfoTested.get()){ + logger.info("Api: " + apiInfoKey.toString() + " has been successfully tested"); + ApiInfoDao.instance.updateLastTestedField(apiInfoKey); + } + latch.countDown(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + return null; + } + + private Void insertRecordInKafka(int accountId, String testSubCategory, ApiInfo.ApiInfoKey apiInfoKey, + List messages, ObjectId summaryId, SyncLimit syncLimit, Map apiInfoKeyToHostMap, + ConcurrentHashMap subCategoryEndpointMap, Map testConfigMap, + List testLogs, TestingRun testingRun, AtomicBoolean isApiInfoTested) { + Context.accountId.set(accountId); + TestConfig testConfig = testConfigMap.get(testSubCategory); + + if (testConfig == null) { + loggerMaker.infoAndAddToDb("Found testing config null: " + apiInfoKey.toString() + " : " + testSubCategory); + return null; } - loggerMaker.infoAndAddToDb("Finished testing", LogDb.TESTING); + if (!applyRunOnceCheck(apiInfoKey, testConfig, subCategoryEndpointMap, apiInfoKeyToHostMap, testSubCategory)) { + return null; + } - } + String failMessage = null; + if (!demoCollections.contains(apiInfoKey.getApiCollectionId()) && + syncLimit.updateUsageLeftAndCheckSkip()) { + failMessage = TestError.USAGE_EXCEEDED.getMessage(); + } + + String testSuperType = testConfig.getInfo().getCategory().getName(); + String testSubType = testConfig.getInfo().getSubCategory(); + TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(testingRun.getId(), apiInfoKey, testSuperType, testSubType, summaryId, messages, failMessage); + if(testingRunResult != null){ + loggerMaker.infoAndAddToDb("Skipping test from producers because: " + failMessage + " apiinfo: " + apiInfoKey.toString(), LogDb.TESTING); + }else if (Constants.IS_NEW_TESTING_ENABLED){ + // push data to kafka here and inside that call run test new function + // create an object of TestMessage + SingleTestPayload singleTestPayload = new SingleTestPayload( + testingRun.getId(), summaryId, apiInfoKey, testSubType, testLogs, accountId + ); + logger.info("Inserting record for apiInfoKey: " + apiInfoKey.toString() + " subcategory: " + testSubType); + try { + Producer.pushMessagesToKafka(Arrays.asList(singleTestPayload)); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + + }else{ + if(GetRunningTestsStatus.getRunningTests().isTestRunning(summaryId, true)){ + TestingConfigurations instance = TestingConfigurations.getInstance(); + String sampleMessage = messages.get(messages.size() - 1); + testingRunResult = runTestNew(apiInfoKey, summaryId, instance.getTestingUtil(), summaryId, testConfig, instance.getTestingRunConfig(), instance.isDebug(), testLogs, sampleMessage); + if (testingRunResult != null) { + List errorList = testingRunResult.getErrorsList(); + if (errorList == null || !errorList.contains(TestResult.API_CALL_FAILED_ERROR_STRING)) { + isApiInfoTested.set(true); + } + } + insertResultsAndMakeIssues(Collections.singletonList(testingRunResult), summaryId); + } + + } + return null; + } + public static void updateTestSummary(ObjectId summaryId){ loggerMaker.infoAndAddToDb("Finished updating results count", LogDb.TESTING); @@ -503,26 +619,6 @@ public Map generateResponseMap(String payloadStr, Map testConfigMap, TestingRun testingRun, - ConcurrentHashMap subCategoryEndpointMap, Map apiInfoKeyToHostMap, - boolean debug, List testLogs, SyncLimit syncLimit, AuthMechanism authMechanism) { - - Context.accountId.set(accountId); - loggerMaker.infoAndAddToDb("Starting test for " + apiInfoKey, LogDb.TESTING); - - try { - startTestNew(apiInfoKey, testRunId, testingRunConfig, testingUtil, testRunResultSummaryId, testConfigMap, subCategoryEndpointMap, apiInfoKeyToHostMap, debug, testLogs, startTime, maxRunTime, syncLimit, authMechanism); - } catch (Exception e) { - loggerMaker.errorAndAddToDb(e, "error while running tests: " + e); - } - - latch.countDown(); - return null; - } - public static void trim(TestingRunResult testingRunResult) { List testResults = testingRunResult.getTestResults(); int endIdx = testResults.size(); @@ -598,101 +694,12 @@ public void insertResultsAndMakeIssues(List testingRunResults, TestingIssuesHandler handler = new TestingIssuesHandler(); boolean triggeredByTestEditor = false; handler.handleIssuesCreationFromTestingRunResults(testingRunResults, triggeredByTestEditor); - testingRunResults.clear(); } } Set deactivatedCollections = UsageMetricCalculator.getDeactivated(); Set demoCollections = UsageMetricCalculator.getDemos(); - public void startTestNew(ApiInfo.ApiInfoKey apiInfoKey, ObjectId testRunId, - TestingRunConfig testingRunConfig, TestingUtil testingUtil, - ObjectId testRunResultSummaryId, Map testConfigMap, - ConcurrentHashMap subCategoryEndpointMap, Map apiInfoKeyToHostMap, - boolean debug, List testLogs, int startTime, int timeToKill, SyncLimit syncLimit, AuthMechanism authMechanism) { - - List testSubCategories = testingRunConfig == null ? new ArrayList<>() : testingRunConfig.getTestSubCategoryList(); - - int countSuccessfulTests = 0; - for (String testSubCategory: testSubCategories) { - loggerMaker.infoAndAddToDb("Trying to run test for category: " + testSubCategory + " with summary state: " + GetRunningTestsStatus.getRunningTests().getCurrentState(testRunResultSummaryId) ); - if(GetRunningTestsStatus.getRunningTests().isTestRunning(testRunResultSummaryId, true)){ - loggerMaker.infoAndAddToDb("Entered tests for api: " + apiInfoKey.toString() + " : " + testSubCategory); - if (Context.now() - startTime > timeToKill) { - loggerMaker.infoAndAddToDb("Timed out in " + (Context.now()-startTime) + "seconds"); - return; - } - List testingRunResults = new ArrayList<>(); - - TestConfig testConfig = testConfigMap.get(testSubCategory); - - if (testConfig == null) { - loggerMaker.infoAndAddToDb("Found testing config null: " + apiInfoKey.toString() + " : " + testSubCategory); - continue; - } - TestingRunResult testingRunResult = null; - if (!applyRunOnceCheck(apiInfoKey, testConfig, subCategoryEndpointMap, apiInfoKeyToHostMap, testSubCategory)) { - continue; - } - String failMessage = null; - if (deactivatedCollections.contains(apiInfoKey.getApiCollectionId())) { - failMessage = TestError.DEACTIVATED_ENDPOINT.getMessage(); - } else if (!demoCollections.contains(apiInfoKey.getApiCollectionId()) && - syncLimit.updateUsageLeftAndCheckSkip()) { - failMessage = TestError.USAGE_EXCEEDED.getMessage(); - } - - if (failMessage != null) { - List testResults = new ArrayList<>(); - String testSuperType = testConfig.getInfo().getCategory().getName(); - String testSubType = testConfig.getInfo().getSubCategory(); - testResults.add(new TestResult(null, null, Collections.singletonList(failMessage), 0, false, Confidence.HIGH, null)); - loggerMaker.infoAndAddToDb("Skipping test, " + failMessage, LogDb.TESTING); - testingRunResult = new TestingRunResult( - testRunId, apiInfoKey, testSuperType, testSubType, testResults, - false, new ArrayList<>(), 100, Context.now(), - Context.now(), testRunResultSummaryId, null, Collections.singletonList(new TestingRunResult.TestLog(TestingRunResult.TestLogType.INFO, "No samples messages found"))); - } - - try { - if(testingRunResult==null){ - - // check and update automated auth token here - int diffTimeInMinutes = (Context.now() - startTime)/60; - if(diffTimeInMinutes != 0 && (diffTimeInMinutes % 10) == 0){ - // check for expiry in every 10 minutes - checkAndUpdateAuthMechanism(Context.now(), authMechanism); - } - - testingRunResult = runTestNew(apiInfoKey,testRunId,testingUtil,testRunResultSummaryId, testConfig, testingRunConfig, debug, testLogs); - } - } catch (Exception e) { - loggerMaker.errorAndAddToDb("Error while running tests for " + testSubCategory + ": " + e.getMessage(), LogDb.TESTING); - e.printStackTrace(); - } - if (testingRunResult != null) { - List errorList = testingRunResult.getErrorsList(); - testingRunResults.add(testingRunResult); - if (errorList == null || !errorList.contains(TestResult.API_CALL_FAILED_ERROR_STRING)) { - countSuccessfulTests++; - } - } - - insertResultsAndMakeIssues(testingRunResults, testRunResultSummaryId); - - }else{ - if(GetRunningTestsStatus.getRunningTests().getCurrentState(testRunId) != null && GetRunningTestsStatus.getRunningTests().getCurrentState(testRunId).equals(TestingRun.State.STOPPED)){ - logger.info("Test stopped for id: " + testRunId.toString()); - } - return; - } - } - if(countSuccessfulTests > 0){ - ApiInfoDao.instance.updateLastTestedField(apiInfoKey); - } - - } - private Map> cleanUpTestArtifacts(List testingRunResults, ApiInfoKey apiInfoKey, TestingUtil testingUtil, TestingRunConfig testingRunConfig) { Map> cleanedUpRequests = new HashMap<>(); @@ -842,35 +849,11 @@ public boolean applyRunOnceCheck(ApiInfoKey apiInfoKey, TestConfig testConfig, C } public TestingRunResult runTestNew(ApiInfo.ApiInfoKey apiInfoKey, ObjectId testRunId, TestingUtil testingUtil, - ObjectId testRunResultSummaryId, TestConfig testConfig, TestingRunConfig testingRunConfig, boolean debug, List testLogs) { + ObjectId testRunResultSummaryId, TestConfig testConfig, TestingRunConfig testingRunConfig, boolean debug, List testLogs, String message) { String testSuperType = testConfig.getInfo().getCategory().getName(); String testSubType = testConfig.getInfo().getSubCategory(); - if (deactivatedCollections.contains(apiInfoKey.getApiCollectionId())) { - List testResults = new ArrayList<>(); - testResults.add(new TestResult(null, null, Collections.singletonList(TestError.DEACTIVATED_ENDPOINT.getMessage()),0, false, Confidence.HIGH, null)); - return new TestingRunResult( - testRunId, apiInfoKey, testSuperType, testSubType ,testResults, - false,new ArrayList<>(),100,Context.now(), - Context.now(), testRunResultSummaryId, null, Collections.singletonList(new TestingRunResult.TestLog(TestingRunResult.TestLogType.INFO, "Deactivated endpoint")) - ); - } - - List messages = testingUtil.getSampleMessages().get(apiInfoKey); - if (messages == null || messages.isEmpty()){ - List testResults = new ArrayList<>(); - loggerMaker.infoAndAddToDb("Skipping test, messages empty: " + apiInfoKey.toString(), LogDb.TESTING); - testResults.add(new TestResult(null, null, Collections.singletonList(TestError.NO_PATH.getMessage()),0, false, Confidence.HIGH, null)); - return new TestingRunResult( - testRunId, apiInfoKey, testSuperType, testSubType ,testResults, - false,new ArrayList<>(),100,Context.now(), - Context.now(), testRunResultSummaryId, null, Collections.singletonList(new TestingRunResult.TestLog(TestingRunResult.TestLogType.INFO, "No samples messages found")) - ); - } - - String message = messages.get(messages.size() - 1); - RawApi rawApi = RawApi.buildFromMessage(message, true); int startTime = Context.now(); diff --git a/apps/testing/src/main/java/com/akto/testing/kafka_utils/ConsumerUtil.java b/apps/testing/src/main/java/com/akto/testing/kafka_utils/ConsumerUtil.java new file mode 100644 index 0000000000..cadaca26b6 --- /dev/null +++ b/apps/testing/src/main/java/com/akto/testing/kafka_utils/ConsumerUtil.java @@ -0,0 +1,185 @@ +package com.akto.testing.kafka_utils; +import static com.akto.testing.Utils.readJsonContentFromFile; +import static com.akto.testing.Utils.writeJsonContentInFile; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.*; +import org.bson.types.ObjectId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +import com.akto.DaoInit; +import com.akto.crons.GetRunningTestsStatus; +import com.akto.dao.context.Context; +import com.akto.dto.ApiInfo; +import com.akto.dto.ApiInfo.ApiInfoKey; +import com.akto.dto.test_editor.TestConfig; +import com.akto.dto.testing.TestingRunResult; +import com.akto.dto.testing.info.SingleTestPayload; +import com.akto.notifications.slack.CustomTextAlert; +import com.akto.testing.Main; +import com.akto.testing.TestExecutor; +import com.akto.util.Constants; +import com.akto.util.DashboardMode; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.mongodb.BasicDBObject; +import com.mongodb.ConnectionString; +import com.mongodb.ReadPreference; +import com.mongodb.WriteConcern; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelStreamProcessor; + +public class ConsumerUtil { + + static Properties properties = com.akto.runtime.utils.Utils.configProperties(Constants.LOCAL_KAFKA_BROKER_URL, Constants.AKTO_KAFKA_GROUP_ID_CONFIG, Constants.AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG); + static{ + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000); + } + private static Consumer consumer = Constants.IS_NEW_TESTING_ENABLED ? new KafkaConsumer<>(properties) : null; + private static final Logger logger = LoggerFactory.getLogger(ConsumerUtil.class); + public static ExecutorService executor = Executors.newFixedThreadPool(100); + + public void initializeConsumer() { + String mongoURI = System.getenv("AKTO_MONGO_CONN"); + ReadPreference readPreference = ReadPreference.secondary(); + if(DashboardMode.isOnPremDeployment()){ + readPreference = ReadPreference.primary(); + } + WriteConcern writeConcern = WriteConcern.W1; + DaoInit.init(new ConnectionString(mongoURI), readPreference, writeConcern); + } + + public static SingleTestPayload parseTestMessage(String message) { + JSONObject jsonObject = JSON.parseObject(message); + ObjectId testingRunId = new ObjectId(jsonObject.getString("testingRunId")); + ObjectId testingRunResultSummaryId = new ObjectId(jsonObject.getString("testingRunResultSummaryId")); + ApiInfo.ApiInfoKey apiInfoKey = ApiInfo.getApiInfoKeyFromString(jsonObject.getString("apiInfoKey")); + String subcategory = jsonObject.getString("subcategory"); + List testLogs = JSON.parseArray(jsonObject.getString("testLogs"), TestingRunResult.TestLog.class); + int accountId = jsonObject.getInteger("accountId"); + return new SingleTestPayload(testingRunId, testingRunResultSummaryId, apiInfoKey, subcategory, testLogs, accountId); + } + + public void runTestFromMessage(String message){ + SingleTestPayload singleTestPayload = parseTestMessage(message); + Context.accountId.set(singleTestPayload.getAccountId()); + TestExecutor executor = new TestExecutor(); + + TestingConfigurations instance = TestingConfigurations.getInstance(); + String subCategory = singleTestPayload.getSubcategory(); + TestConfig testConfig = instance.getTestConfigMap().get(subCategory); + ApiInfoKey apiInfoKey = singleTestPayload.getApiInfoKey(); + + List messagesList = instance.getTestingUtil().getSampleMessages().get(apiInfoKey); + if(messagesList == null || messagesList.isEmpty()){} + else{ + String sample = messagesList.get(messagesList.size() - 1); + logger.info("Running test for: " + apiInfoKey + " with subcategory: " + subCategory); + TestingRunResult runResult = executor.runTestNew(apiInfoKey, singleTestPayload.getTestingRunId(), instance.getTestingUtil(), singleTestPayload.getTestingRunResultSummaryId(),testConfig , instance.getTestingRunConfig(), instance.isDebug(), singleTestPayload.getTestLogs(), sample); + executor.insertResultsAndMakeIssues(Collections.singletonList(runResult), singleTestPayload.getTestingRunResultSummaryId()); + } + } + + public void init(int maxRunTimeInSeconds) { + executor = Executors.newFixedThreadPool(100); + BasicDBObject currentTestInfo = readJsonContentFromFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, BasicDBObject.class); + final String summaryIdForTest = currentTestInfo.getString("summaryId"); + final ObjectId summaryObjectId = new ObjectId(summaryIdForTest); + final int startTime = Context.now(); + AtomicInteger lastRecordRead = new AtomicInteger(Context.now()); + boolean isConsumerRunning = false; + if(currentTestInfo != null){ + isConsumerRunning = currentTestInfo.getBoolean("CONSUMER_RUNNING"); + } + + ParallelStreamProcessor parallelConsumer = null; + + if(isConsumerRunning){ + String topicName = Constants.TEST_RESULTS_TOPIC_NAME; + consumer = new KafkaConsumer<>(properties); + + ParallelConsumerOptions options = ParallelConsumerOptions.builder() + .consumer(consumer) + .ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED) // Use unordered for parallelism + .maxConcurrency(100) // Number of threads for parallel processing + .commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC) // Commit offsets synchronously + .batchSize(1) // Number of records to process in each poll + .maxFailureHistory(3) + .build(); + + parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(options); + parallelConsumer.subscribe(Arrays.asList(topicName)); + if (StringUtils.hasLength(Main.AKTO_SLACK_WEBHOOK) ) { + try { + CustomTextAlert customTextAlert = new CustomTextAlert("Tests being picked for execution" + currentTestInfo.getInt("accountId") + " summaryId=" + summaryIdForTest); + Main.SLACK_INSTANCE.send(Main.AKTO_SLACK_WEBHOOK, customTextAlert.toJson()); + } catch (Exception e) { + logger.error("Error sending slack alert for completion of test", e); + } + + } + } + + try { + parallelConsumer.poll(record -> { + String threadName = Thread.currentThread().getName(); + String message = record.value(); + logger.info("Thread [" + threadName + "] picked up record: " + message); + try { + lastRecordRead.set(Context.now()); + Future future = executor.submit(() -> runTestFromMessage(message)); + try { + future.get(4, TimeUnit.MINUTES); + } catch (InterruptedException e) { + logger.error("Task timed out: " + message); + future.cancel(true); + } catch (Exception e) { + logger.error("Error in task execution: " + message, e); + } + } finally { + logger.info("Thread [" + threadName + "] finished processing record: " + message); + } + }); + + while (parallelConsumer != null) { + if(!GetRunningTestsStatus.getRunningTests().isTestRunning(summaryObjectId, true)){ + logger.info("Tests have been marked stopped."); + executor.shutdownNow(); + break; + } + else if ((Context.now() - startTime > maxRunTimeInSeconds)) { + logger.info("Max run time reached. Stopping consumer."); + executor.shutdownNow(); + break; + }else if((Context.now() - lastRecordRead.get() > 10)){ + logger.info("Records are empty now, thus executing final tests"); + executor.shutdown(); + executor.awaitTermination(maxRunTimeInSeconds, TimeUnit.SECONDS); + break; + } + Thread.sleep(100); + } + + } catch (Exception e) { + logger.info("Error in polling records"); + }finally{ + logger.info("Closing consumer as all results have been executed."); + parallelConsumer.closeDrainFirst(); + parallelConsumer = null; + consumer.close(); + writeJsonContentInFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, null); + } + } +} diff --git a/apps/testing/src/main/java/com/akto/testing/kafka_utils/Producer.java b/apps/testing/src/main/java/com/akto/testing/kafka_utils/Producer.java new file mode 100644 index 0000000000..5d6c620f74 --- /dev/null +++ b/apps/testing/src/main/java/com/akto/testing/kafka_utils/Producer.java @@ -0,0 +1,92 @@ +package com.akto.testing.kafka_utils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.bson.types.ObjectId; + +import com.akto.dto.billing.SyncLimit; +import com.akto.dto.testing.TestingRun; +import com.akto.dto.testing.info.SingleTestPayload; +import com.akto.kafka.Kafka; +import com.akto.testing.TestExecutor; +import com.akto.util.Constants; + +public class Producer { + + public static final Kafka producer = Constants.IS_NEW_TESTING_ENABLED ? new Kafka(Constants.LOCAL_KAFKA_BROKER_URL, 500, 1000) : null; + public static Void pushMessagesToKafka(List messages){ + for(SingleTestPayload singleTestPayload: messages){ + String messageString = singleTestPayload.toString(); + producer.send(messageString, Constants.TEST_RESULTS_TOPIC_NAME); + } + return null; + } + + private static void deleteAllMessagesFromTopic(String bootstrapServers, String topicName) + throws ExecutionException, InterruptedException { + + // 1) Build minimal properties for AdminClient + Properties adminProps = new Properties(); + adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + // 2) Create AdminClient + try (AdminClient adminClient = AdminClient.create(adminProps)) { + + // 3) Describe the topic to get partition info + DescribeTopicsResult describeTopicsResult = adminClient + .describeTopics(Collections.singletonList(topicName)); + TopicDescription topicDescription = describeTopicsResult.values() + .get(topicName).get(); // may throw if topic doesn’t exist + + // 4) Collect partitions for the topic + List topicPartitions = topicDescription.partitions().stream() + .map(info -> new TopicPartition(topicName, info.partition())) + .collect(Collectors.toList()); + + // 5) Request the latest offsets for each partition + Map latestOffsetsRequest = + new HashMap<>(); + for (TopicPartition tp : topicPartitions) { + latestOffsetsRequest.put(tp, org.apache.kafka.clients.admin.OffsetSpec.latest()); + } + ListOffsetsResult listOffsetsResult = adminClient.listOffsets(latestOffsetsRequest); + + // 6) Build the map for deleteRecords (partition -> RecordsToDelete) + Map partitionRecordsToDelete = new HashMap<>(); + for (TopicPartition tp : topicPartitions) { + long latestOffset = listOffsetsResult.partitionResult(tp).get().offset(); + partitionRecordsToDelete.put(tp, RecordsToDelete.beforeOffset(latestOffset)); + } + + // 7) Delete all records up to the latest offset + adminClient.deleteRecords(partitionRecordsToDelete).all().get(); + + System.out.println("All existing messages in topic \"" + topicName + "\" have been deleted."); + } + } + + public void initProducer(TestingRun testingRun, ObjectId summaryId, SyncLimit syncLimit, boolean doInitOnly){ + TestExecutor executor = new TestExecutor(); + if(!doInitOnly){ + try { + deleteAllMessagesFromTopic(Constants.LOCAL_KAFKA_BROKER_URL, Constants.TEST_RESULTS_TOPIC_NAME); + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } + } + executor.init(testingRun, summaryId, syncLimit, doInitOnly); + } +} diff --git a/apps/testing/src/main/java/com/akto/testing/kafka_utils/TestingConfigurations.java b/apps/testing/src/main/java/com/akto/testing/kafka_utils/TestingConfigurations.java new file mode 100644 index 0000000000..af1671959b --- /dev/null +++ b/apps/testing/src/main/java/com/akto/testing/kafka_utils/TestingConfigurations.java @@ -0,0 +1,49 @@ +package com.akto.testing.kafka_utils; + + +import java.util.Map; + +import com.akto.dto.test_editor.TestConfig; +import com.akto.dto.testing.TestingRunConfig; +import com.akto.store.TestingUtil; + +public class TestingConfigurations { + + private static final TestingConfigurations instance = new TestingConfigurations(); + + private TestingUtil testingUtil; + private TestingRunConfig testingRunConfig; + private boolean debug; + Map testConfigMap; + + private TestingConfigurations() { + } + + public static TestingConfigurations getInstance() { + return instance; + } + + public synchronized void init(TestingUtil testingUtil, TestingRunConfig testingRunConfig, boolean debug, Map testConfigMap) { + this.testingUtil = testingUtil; + this.testingRunConfig = testingRunConfig; + this.debug = debug; + this.testConfigMap = testConfigMap; + } + + public boolean isDebug() { + return debug; + } + + public TestingRunConfig getTestingRunConfig() { + return testingRunConfig; + } + + public TestingUtil getTestingUtil() { + return testingUtil; + } + + public Map getTestConfigMap() { + return testConfigMap; + } + +} diff --git a/apps/testing/src/main/java/com/akto/testing/workflow_node_executor/YamlNodeExecutor.java b/apps/testing/src/main/java/com/akto/testing/workflow_node_executor/YamlNodeExecutor.java index ded450dcda..c2f55e1726 100644 --- a/apps/testing/src/main/java/com/akto/testing/workflow_node_executor/YamlNodeExecutor.java +++ b/apps/testing/src/main/java/com/akto/testing/workflow_node_executor/YamlNodeExecutor.java @@ -10,6 +10,7 @@ import com.akto.dto.type.SingleTypeInfo; import com.akto.dto.*; +import com.akto.dto.ApiInfo.ApiInfoKey; import com.akto.dto.type.URLMethods; import com.akto.test_editor.execution.Memory; import org.json.JSONObject; @@ -41,6 +42,7 @@ import com.akto.test_editor.execution.ExecutorAlgorithm; import com.akto.testing.ApiExecutor; import com.akto.testing.TestExecutor; +import com.akto.testing.Utils; import com.akto.util.Constants; import static com.akto.runtime.utils.Utils.convertOriginalReqRespToString; import com.google.gson.Gson; @@ -316,7 +318,12 @@ public WorkflowTestResult.NodeResult processYamlNode(Node node, Map customAuthTypes = yamlNodeDetails.getCustomAuthTypes(); TestingUtil testingUtil = new TestingUtil(authMechanism, messageStore, null, null, customAuthTypes); TestExecutor executor = new TestExecutor(); - TestingRunResult testingRunResult = executor.runTestNew(yamlNodeDetails.getApiInfoKey(), null, testingUtil, null, testConfig, null, debug, testLogs); + ApiInfoKey infoKey = yamlNodeDetails.getApiInfoKey(); + List samples = testingUtil.getSampleMessages().get(infoKey); + TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(null, infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null); + if(testingRunResult == null){ + testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, null, debug, testLogs, samples.get(samples.size() - 1)); + } List errors = new ArrayList<>(); List messages = new ArrayList<>(); diff --git a/libs/dao/src/main/java/com/akto/dto/ApiInfo.java b/libs/dao/src/main/java/com/akto/dto/ApiInfo.java index 9351f4e3d3..a8cda92ac4 100644 --- a/libs/dao/src/main/java/com/akto/dto/ApiInfo.java +++ b/libs/dao/src/main/java/com/akto/dto/ApiInfo.java @@ -315,6 +315,11 @@ public String findSeverity() { return null; } + public static ApiInfoKey getApiInfoKeyFromString(String key) { + String[] parts = key.split(" "); + return new ApiInfoKey(Integer.parseInt(parts[0]), parts[1], URLMethods.Method.valueOf(parts[2])); + } + @Override public String toString() { return "{" + diff --git a/libs/dao/src/main/java/com/akto/dto/testing/info/SingleTestPayload.java b/libs/dao/src/main/java/com/akto/dto/testing/info/SingleTestPayload.java new file mode 100644 index 0000000000..9f78522d73 --- /dev/null +++ b/libs/dao/src/main/java/com/akto/dto/testing/info/SingleTestPayload.java @@ -0,0 +1,82 @@ +package com.akto.dto.testing.info; + +import java.util.List; + +import org.bson.types.ObjectId; + +import com.akto.dto.ApiInfo.ApiInfoKey; +import com.akto.dto.testing.TestingRunResult; + +public class SingleTestPayload { + + private ObjectId testingRunId; + private ObjectId testingRunResultSummaryId; + private ApiInfoKey apiInfoKey; + private String subcategory; + private List testLogs; + private int accountId; + + public SingleTestPayload (ObjectId testingRunId, ObjectId testingRunResultSummaryId, ApiInfoKey apiInfoKey, String subcategory, List testLogs, int accountId){ + this.testingRunId = testingRunId; + this.testingRunResultSummaryId = testingRunResultSummaryId; + this.apiInfoKey = apiInfoKey; + this.subcategory = subcategory; + this.testLogs = testLogs; + this.accountId = accountId; + } + + public ObjectId getTestingRunId() { + return testingRunId; + } + public void setTestingRunId(ObjectId testingRunId) { + this.testingRunId = testingRunId; + } + + public ObjectId getTestingRunResultSummaryId() { + return testingRunResultSummaryId; + } + public void setTestingRunResultSummaryId(ObjectId testingRunResultSummaryId) { + this.testingRunResultSummaryId = testingRunResultSummaryId; + } + + public ApiInfoKey getApiInfoKey() { + return apiInfoKey; + } + public void setApiInfoKey(ApiInfoKey apiInfoKey) { + this.apiInfoKey = apiInfoKey; + } + + public String getSubcategory() { + return subcategory; + } + public void setSubcategory(String subcategory) { + this.subcategory = subcategory; + } + + public List getTestLogs() { + return testLogs; + } + + public void setTestLogs(List testLogs) { + this.testLogs = testLogs; + } + + public int getAccountId() { + return accountId; + } + public void setAccountId(int accountId) { + this.accountId = accountId; + } + + @Override + public String toString() { + return "{" + + "\"testingRunId\":\"" + (testingRunId != null ? testingRunId.toHexString() : null) + "\"," + + "\"testingRunResultSummaryId\":\"" + (testingRunResultSummaryId != null ? testingRunResultSummaryId.toHexString() : null) + "\"," + + "\"apiInfoKey\":" + (apiInfoKey != null ? "\"" + apiInfoKey.toString() + "\"" : null) + "," + + "\"subcategory\":\"" + (subcategory != null ? subcategory : null) + "\"," + + "\"testLogs\":" + (testLogs != null ? testLogs.toString() : null) + "," + + "\"accountId\":" + accountId + "," + + "}"; + } +} diff --git a/libs/dao/src/main/java/com/akto/util/Constants.java b/libs/dao/src/main/java/com/akto/util/Constants.java index 55fe285c16..1b4693409c 100644 --- a/libs/dao/src/main/java/com/akto/util/Constants.java +++ b/libs/dao/src/main/java/com/akto/util/Constants.java @@ -1,5 +1,7 @@ package com.akto.util; +import org.springframework.util.StringUtils; + public class Constants { private Constants() {} @@ -23,6 +25,14 @@ private Constants() {} public static final String AKTO_NODE_ID = "x-akto-node"; public static final String AKTO_REMOVE_AUTH= "x-akto-remove-auth"; + public static final String LOCAL_KAFKA_BROKER_URL = System.getenv("KAFKA_BROKER_URL") != null ? System.getenv("KAFKA_BROKER_URL") : "localhost:29092"; // run kafka process with name kafka1 in docker + public static final String TEST_RESULTS_TOPIC_NAME = "akto.test.messages"; + public static final String AKTO_KAFKA_GROUP_ID_CONFIG = "testing-group"; + public static final int AKTO_KAFKA_MAX_POLL_RECORDS_CONFIG = 1; // read one message at a time + public static final String TESTING_STATE_FOLDER_PATH = System.getenv("TESTING_STATE_FOLDER_PATH") != null ? System.getenv("TESTING_STATE_FOLDER_PATH") : "testing-info"; + public static final String TESTING_STATE_FILE_NAME = "testing-state.json"; + public static final boolean IS_NEW_TESTING_ENABLED = StringUtils.hasLength(System.getenv("NEW_TESTING_ENABLED")); + public static final String UNDERSCORE = "_"; public final static String _AKTO = "AKTO"; diff --git a/libs/utils/pom.xml b/libs/utils/pom.xml index 38084f0cbd..45f58345ce 100644 --- a/libs/utils/pom.xml +++ b/libs/utils/pom.xml @@ -99,6 +99,11 @@ kafka-clients 3.0.0 + + io.confluent.parallelconsumer + parallel-consumer-core + 0.5.2.4 + org.kohsuke diff --git a/libs/utils/src/main/java/com/akto/notifications/slack/CustomTextAlert.java b/libs/utils/src/main/java/com/akto/notifications/slack/CustomTextAlert.java new file mode 100644 index 0000000000..2aa964e8a4 --- /dev/null +++ b/libs/utils/src/main/java/com/akto/notifications/slack/CustomTextAlert.java @@ -0,0 +1,30 @@ +package com.akto.notifications.slack; + +import com.mongodb.BasicDBList; +import com.mongodb.BasicDBObject; + +import static com.akto.notifications.slack.SlackAlertType.CUSTOM_TEXT_ALERT; + +public class CustomTextAlert extends SlackAlerts { + private final String text; + + public CustomTextAlert(String text) { + super(CUSTOM_TEXT_ALERT); + this.text = text; + } + + + @Override + public String toJson() { + long unixTime = System.currentTimeMillis() / 1000L; + String dateText = ""; // Get time from unix timestamp or just get fallback text + + BasicDBList blocksList = new BasicDBList(); + blocksList.add(createTextContext(text)); + blocksList.add(createTextContext(dateText)); + + BasicDBObject blockObj = new BasicDBObject("blocks", blocksList); + + return toAttachment(blockObj).toJson(); + } +} diff --git a/libs/utils/src/main/java/com/akto/notifications/slack/SlackAlertType.java b/libs/utils/src/main/java/com/akto/notifications/slack/SlackAlertType.java index e4f7dba48c..f3e335cfe7 100644 --- a/libs/utils/src/main/java/com/akto/notifications/slack/SlackAlertType.java +++ b/libs/utils/src/main/java/com/akto/notifications/slack/SlackAlertType.java @@ -4,5 +4,6 @@ public enum SlackAlertType { ACCOUNT_LIMIT_WARNING_ALERT, API_TEST_STATUS_ALERT, NEW_SERVICE_STATUS_ALERT, - NEW_USER_JOINING_ALERT + NEW_USER_JOINING_ALERT, + CUSTOM_TEXT_ALERT } diff --git a/libs/utils/src/main/java/com/akto/testing/Utils.java b/libs/utils/src/main/java/com/akto/testing/Utils.java index 8aef6d7950..c91fc351f4 100644 --- a/libs/utils/src/main/java/com/akto/testing/Utils.java +++ b/libs/utils/src/main/java/com/akto/testing/Utils.java @@ -1,5 +1,6 @@ package com.akto.testing; +import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -13,6 +14,8 @@ import org.bson.conversions.Bson; import org.bson.types.ObjectId; + +import com.akto.dao.context.Context; import com.akto.dao.testing.TestingRunResultDao; import com.akto.dao.testing.VulnerableTestingRunResultDao; import com.akto.dao.testing_run_findings.TestingRunIssuesDao; @@ -25,6 +28,10 @@ import com.akto.dto.test_editor.Util; import com.akto.dto.test_run_findings.TestingIssuesId; import com.akto.dto.test_run_findings.TestingRunIssues; +import com.akto.dto.testing.GenericTestResult; +import com.akto.dto.testing.TestResult; +import com.akto.dto.testing.TestResult.Confidence; +import com.akto.dto.testing.TestResult.TestError; import com.akto.dto.testing.TestingRunResult; import com.akto.dto.testing.WorkflowUpdatedSampleData; import com.akto.dto.type.RequestTemplate; @@ -33,13 +40,14 @@ import com.akto.test_editor.filter.Filter; import com.akto.test_editor.filter.data_operands_impl.ValidationResult; import com.akto.testing_utils.TestingUtils; +import com.akto.usage.UsageMetricCalculator; import com.akto.util.Constants; import com.akto.util.JSONUtils; import com.akto.util.enums.GlobalEnums; import com.akto.util.enums.GlobalEnums.Severity; +import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.BasicDBList; import com.mongodb.BasicDBObject; -import com.mongodb.ConnectionString; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Aggregates; @@ -473,6 +481,7 @@ public static void modifyQueryOperations(OriginalHttpRequest httpRequest, List finalCountIssuesMap(ObjectId testingRunResultSummaryId){ Map countIssuesMap = new HashMap<>(); + countIssuesMap.put(Severity.CRITICAL.toString(), 0); countIssuesMap.put(Severity.HIGH.toString(), 0); countIssuesMap.put(Severity.MEDIUM.toString(), 0); countIssuesMap.put(Severity.LOW.toString(), 0); @@ -488,7 +497,7 @@ public static Map finalCountIssuesMap(ObjectId testingRunResult allVulResults = TestingRunResultDao.instance.findAll(filterQ, projection); }else{ allVulResults = VulnerableTestingRunResultDao.instance.findAll( - Filters.eq(TestingRunResult.TEST_RUN_RESULT_SUMMARY_ID, testingRunResultSummaryId) + Filters.eq(TestingRunResult.TEST_RUN_RESULT_SUMMARY_ID, testingRunResultSummaryId), projection ); } @@ -536,5 +545,67 @@ public static List fetchLatestTestingRunResult(Bson filter){ return resultsFromNonVulCollection; } + + public static TestingRunResult generateFailedRunResultForMessage(ObjectId testingRunId,ApiInfoKey apiInfoKey, String testSuperType, + String testSubType, ObjectId testRunResultSummaryId, List messages, String errorMessage) { + + TestingRunResult testingRunResult = null; + Set deactivatedCollections = UsageMetricCalculator.getDeactivated(); + List testResults = new ArrayList<>(); + String failMessage = errorMessage; + + if(deactivatedCollections.contains(apiInfoKey.getApiCollectionId())){ + failMessage = TestError.DEACTIVATED_ENDPOINT.getMessage(); + }else if(messages == null || messages.isEmpty()){ + failMessage = TestError.NO_PATH.getMessage(); + } + + if(failMessage != null){ + testResults.add(new TestResult(null, null, Collections.singletonList(failMessage),0, false, Confidence.HIGH, null)); + testingRunResult = new TestingRunResult( + testingRunId, apiInfoKey, testSuperType, testSubType, testResults, + false, new ArrayList<>(), 100, Context.now(), + Context.now(), testRunResultSummaryId, null, Collections + .singletonList(new TestingRunResult.TestLog(TestingRunResult.TestLogType.INFO, failMessage))); + } + return testingRunResult; + } + + public static boolean createFolder(String folderName){ + File statusDir = new File(folderName); + + if (!statusDir.exists()) { + boolean created = statusDir.mkdirs(); + if (!created) { + System.err.println("Failed to create directory: " + folderName); + return false; + } + return true; + } + return false; + } + + public static void writeJsonContentInFile(String folderName, String fileName, Object content){ + try { + File file = new File(folderName, fileName); + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.writerWithDefaultPrettyPrinter().writeValue(file, content); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static T readJsonContentFromFile(String folderName, String fileName, Class valueType) { + T result = null; + try { + File file = new File(folderName, fileName); + ObjectMapper objectMapper = new ObjectMapper(); + result = objectMapper.readValue(file, valueType); + } catch (Exception e) { + e.printStackTrace(); + } + return result; + } + }