Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tests through kafka #2005

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.akto.dao.testing.TestingRunResultDao;
import com.akto.dto.AccountSettings;
import com.akto.dto.ApiInfo;
import com.akto.dto.ApiInfo.ApiInfoKey;
import com.akto.dto.CustomAuthType;
import com.akto.dto.User;
import com.akto.dto.test_editor.Category;
Expand Down Expand Up @@ -45,6 +46,7 @@
import com.akto.store.TestingUtil;
import com.akto.test_editor.execution.VariableResolver;
import com.akto.testing.TestExecutor;
import com.akto.testing.Utils;
import com.akto.util.Constants;
import com.akto.util.enums.GlobalEnums;
import com.akto.util.enums.GlobalEnums.Severity;
Expand Down Expand Up @@ -334,7 +336,11 @@ public String runTestForGivenTemplate() {
int lastSampleIndex = sampleDataList.get(0).getSamples().size() - 1;

TestingRunConfig testingRunConfig = new TestingRunConfig();
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs);
List<String> samples = testingUtil.getSampleMessages().get(infoKey);
TestingRunResult testingRunResult = Utils.generateFailedRunResultForMessage(null, infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null);
if(testingRunResult == null){
testingRunResult = executor.runTestNew(infoKey, null, testingUtil, null, testConfig, testingRunConfig, true, testLogs, samples.get(samples.size() - 1));
}
if (testingRunResult == null) {
testingRunResult = new TestingRunResult(
new ObjectId(), infoKey, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory() ,Collections.singletonList(new TestResult(null, sampleDataList.get(0).getSamples().get(lastSampleIndex),
Expand Down
10 changes: 7 additions & 3 deletions apps/testing-cli/src/main/java/com/akto/testing_cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import com.akto.DaoInit;
import com.akto.dao.context.Context;
import com.akto.dao.test_editor.TestConfigYamlParser;
import com.akto.dao.test_editor.TestEditorEnums.ContextOperator;
import com.akto.dto.AccountSettings;
Expand All @@ -40,6 +41,7 @@
import com.akto.store.TestingUtil;
import com.akto.testing.ApiExecutor;
import com.akto.testing.TestExecutor;
import com.akto.testing.Utils;
import com.akto.util.ColorConstants;
import com.akto.util.VersionUtil;

Expand Down Expand Up @@ -317,11 +319,13 @@ public static void main(String[] args) {
for (String testSubCategory : testingRunConfig.getTestSubCategoryList()) {
TestConfig testConfig = testConfigMap.get(testSubCategory);
for (ApiInfo.ApiInfoKey it : apiInfoKeys) {

TestingRunResult testingRunResult = null;
try {
testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig,
testingRunConfig, false, new ArrayList<>());
List<String> samples = testingUtil.getSampleMessages().get(it);
testingRunResult = Utils.generateFailedRunResultForMessage(null, it, testConfig.getInfo().getCategory().getName(), testConfig.getInfo().getSubCategory(), null,samples , null);
if(testingRunResult == null){
testingRunResult = testExecutor.runTestNew(it, null, testingUtil, null, testConfig, null, false, new ArrayList<>(), samples.get(samples.size() - 1));
}
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
73 changes: 70 additions & 3 deletions apps/testing/src/main/java/com/akto/testing/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import com.akto.rules.RequiredConfigs;
import com.akto.task.Cluster;
import com.akto.test_editor.execution.Executor;
import com.akto.testing.testing_with_kafka.TestingConsumer;
import com.akto.testing.testing_with_kafka.TestingProducer;
import com.akto.util.AccountTask;
import com.akto.util.Constants;
import com.akto.util.DashboardMode;
Expand All @@ -59,6 +61,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.akto.testing.Utils.readJsonContentFromFile;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -241,6 +245,39 @@ private static void setTestingRunConfig(TestingRun testingRun, TestingRunResultS
}
}

private static BasicDBObject checkIfAlreadyTestIsRunningOnMachine(){
// this will return true if consumer is running and this the latest summary of the testing run
// and also the summary should be in running state
try {
BasicDBObject currentTestInfo = readJsonContentFromFile(Constants.TESTING_STATE_FOLDER_PATH, Constants.TESTING_STATE_FILE_NAME, BasicDBObject.class);
if(currentTestInfo == null){
return null;
}
if(!currentTestInfo.getBoolean("CONSUMER_RUNNING", false)){
return null;
}
String testingRunId = currentTestInfo.getString("testingRunId");
String testingRunSummaryId = currentTestInfo.getString("summaryId");

int accountID = currentTestInfo.getInt("accountId");
Context.accountId.set(accountID);

TestingRunResultSummary testingRunResultSummary = TestingRunResultSummariesDao.instance.findOne(Filters.eq(Constants.ID, new ObjectId(testingRunSummaryId)), Projections.include(TestingRunResultSummary.STATE));
if(testingRunResultSummary == null || testingRunResultSummary.getState() == null || testingRunResultSummary.getState() != State.RUNNING){
return null;
}

TestingRunResultSummary latestSummary = TestingRunResultSummariesDao.instance.findLatestOne(Filters.eq(TestingRunResultSummary.TESTING_RUN_ID, new ObjectId(testingRunId)));
if(latestSummary.getHexId().equals(testingRunSummaryId)){
return currentTestInfo;
}else{
return null;
}
} catch (Exception e) {
logger.error("Error in reading the testing state file: " + e.getMessage());
return null;
}
}

// Runnable task for monitoring memory
static class MemoryMonitorTask implements Runnable {
Expand Down Expand Up @@ -309,6 +346,30 @@ public static void main(String[] args) throws InterruptedException {

loggerMaker.infoAndAddToDb("Starting.......", LogDb.TESTING);

TestingProducer testingProducer = new TestingProducer();
TestingConsumer testingConsumer = new TestingConsumer();
testingConsumer.initializeConsumer();

// read from files here and then see if we want to init the Producer and run the consumer
// if producer is running, then we can skip the check and let the default testing pick up the job

BasicDBObject currentTestInfo = checkIfAlreadyTestIsRunningOnMachine();
if(currentTestInfo != null){
loggerMaker.infoAndAddToDb("Tests were already running on this machine, thus resuming the test for account: "+ Context.accountId.get(), LogDb.TESTING);
FeatureAccess featureAccess = UsageMetricUtils.getFeatureAccess(Context.accountId.get(), MetricTypes.TEST_RUNS);

String testingRunId = currentTestInfo.getString("testingRunId");
String testingRunSummaryId = currentTestInfo.getString("summaryId");
TestingRun testingRun = TestingRunDao.instance.findOne(Filters.eq(Constants.ID, new ObjectId(testingRunId)));
TestingRunConfig baseConfig = TestingRunConfigDao.instance.findOne(Constants.ID, testingRun.getTestIdConfig());
testingRun.setTestingRunConfig(baseConfig);
ObjectId summaryId = new ObjectId(testingRunSummaryId);
testingProducer.initProducer(testingRun, summaryId, featureAccess.fetchSyncLimit(), true);
int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime();
testingConsumer.init(maxRunTime);
}


schedulerAccessMatrix.scheduleAtFixedRate(new Runnable() {
public void run() {
AccountTask.instance.executeTaskForNonHybridAccounts(account -> {
Expand All @@ -327,6 +388,10 @@ public void run() {
loggerMaker.infoAndAddToDb("os.arch: " + System.getProperty("os.arch"), LogDb.TESTING);
loggerMaker.infoAndAddToDb("os.version: " + System.getProperty("os.version"), LogDb.TESTING);

// create /testing-info folder in the memory from here
boolean val = Utils.createFolder(Constants.TESTING_STATE_FOLDER_PATH);
logger.info("Testing info folder status: " + val);

SingleTypeInfo.init();
while (true) {
AccountTask.instance.executeTaskForNonHybridAccounts(account -> {
Expand Down Expand Up @@ -536,7 +601,6 @@ public void run() {
summaryId = trrs.getId();
}

TestExecutor testExecutor = new TestExecutor();
if (trrs.getState() == State.SCHEDULED) {
if (trrs.getMetadata()!= null && trrs.getMetadata().containsKey("pull_request_id") && trrs.getMetadata().containsKey("commit_sha_head") ) {
//case of github status push
Expand All @@ -546,8 +610,11 @@ public void run() {
}
RequiredConfigs.initiate();
if(!maxRetriesReached){
testExecutor.init(testingRun, summaryId, syncLimit);
raiseMixpanelEvent(summaryId, testingRun, accountId);
// init producer and the consumer here
// producer for testing is currently calls init functions from test-executor
int maxRunTime = testingRun.getTestRunTime() <= 0 ? 30*60 : testingRun.getTestRunTime();
testingProducer.initProducer(testingRun, summaryId, syncLimit, false);
testingConsumer.init(maxRunTime);
}

} catch (Exception e) {
Expand Down
Loading
Loading