From 7577bcbf704729c5a40512ba442c00f34728a61a Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Thu, 7 Dec 2023 13:46:26 +0530 Subject: [PATCH 01/10] Onboard MDE for Linux to LISA --- lisa/sut_orchestrator/azure/tools.py | 30 +++++++++++++ microsoft/testsuites/vm_extensions/mde.py | 54 +++++++++++++++++++++++ 2 files changed, 84 insertions(+) create mode 100644 microsoft/testsuites/vm_extensions/mde.py diff --git a/lisa/sut_orchestrator/azure/tools.py b/lisa/sut_orchestrator/azure/tools.py index e6a491bd0c..0957e1e029 100644 --- a/lisa/sut_orchestrator/azure/tools.py +++ b/lisa/sut_orchestrator/azure/tools.py @@ -2,6 +2,7 @@ # Licensed under the MIT license. import re +import json from pathlib import PurePath from typing import Any, Dict, List, Optional, Tuple, Type @@ -561,3 +562,32 @@ def get_pool_records(self, pool_id: int, force_run: bool = False) -> Dict[str, s records[content_split[i]] = content_split[i + 1] return records + +class mdatp(Tool): + @property + def command(self) -> str: + return "mdatp" + + @property + def can_install(self) -> bool: + return False + + def get_result( + self, + arg: str, + json_out: bool = False, + sudo: bool = False, + ) -> None: + if json_out: + arg += ' --output json' + result = self.run( + arg, + sudo=sudo, + shell=True, + ) + + result.assert_exit_code() + if json_out: + return json.loads(result.stdout) + return result.stdout.split() + diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py new file mode 100644 index 0000000000..6d67aef257 --- /dev/null +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -0,0 +1,54 @@ +import time +from typing import cast + +from assertpy import assert_that +from retry import retry + +from lisa import ( + Logger, + Node, + TestCaseMetadata, + TestSuite, + TestSuiteMetadata, + simple_requirement, +) +from lisa.operating_system import BSD, Posix +from lisa.sut_orchestrator.azure.common import ( + add_tag_for_vm, + add_user_assign_identity, + get_managed_service_identity_client, + get_node_context, +) +from lisa.sut_orchestrator.azure.features import AzureExtension +from lisa.sut_orchestrator.azure.platform_ import AzurePlatform +from lisa.sut_orchestrator.azure.tools import Azsecd +from lisa.sut_orchestrator.azure.tools import mdatp +from lisa.testsuite import TestResult +from lisa.tools import Cat, Journalctl, Service +from lisa.util import LisaException, UnsupportedDistroException + + +@TestSuiteMetadata( + area="vm_extension", + category="functional", + description=""" + MDE Test Suite + """, +) +class MDE(TestSuite): + @TestCaseMetadata( + description=""" + Verify if MDE is healthy + """, + priority=1, + requirement=simple_requirement( + #supported_features=[AzureExtension], unsupported_os=[BSD] + ), + ) + def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: + output = node.tools[mdatp].get_result('health', json_out=True) + + log.info(output) + + assert_that(output['healthy']).is_equal_to(True) + From 61b0bbcacbf7571ac15323af31e3ba78bb7cd86c Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Sat, 9 Dec 2023 11:10:11 +0530 Subject: [PATCH 02/10] Azure --- lisa/sut_orchestrator/azure/tools.py | 20 +++++++++++++++++++- microsoft/testsuites/vm_extensions/mde.py | 18 ++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/lisa/sut_orchestrator/azure/tools.py b/lisa/sut_orchestrator/azure/tools.py index 0957e1e029..dd6e94dd5a 100644 --- a/lisa/sut_orchestrator/azure/tools.py +++ b/lisa/sut_orchestrator/azure/tools.py @@ -2,6 +2,7 @@ # Licensed under the MIT license. import re +import sys import json from pathlib import PurePath from typing import Any, Dict, List, Optional, Tuple, Type @@ -21,6 +22,8 @@ get_matched_str, ) from lisa.util.process import ExecutableResult +from typing import Optional, cast +from lisa.operating_system import Posix class Waagent(Tool): @@ -566,11 +569,25 @@ def get_pool_records(self, pool_id: int, force_run: bool = False) -> Dict[str, s class mdatp(Tool): @property def command(self) -> str: + self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") return "mdatp" @property def can_install(self) -> bool: - return False + self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") + return True + + def _install(self) -> bool: + self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") + posix_os: Posix = cast(Posix, self.node.os) + posix_os.add_azure_core_repo() + if posix_os.is_package_in_repo(self.command): + posix_os.install_packages(self.command) + self._log.info(f"{self.command} is installed successfully") + else: + raise UnsupportedDistroException( + node.os, f"The distro doesn't have {package} in its repo") + return self._check_exists() def get_result( self, @@ -578,6 +595,7 @@ def get_result( json_out: bool = False, sudo: bool = False, ) -> None: + self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") if json_out: arg += ' --output json' result = self.run( diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index 6d67aef257..ef7d6dfcfb 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -46,6 +46,24 @@ class MDE(TestSuite): ), ) def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: + + environment = result.environment + assert environment, "fail to get environment from testresult" + platform = environment.platform + log.info(platform) + #assert isinstance(platform, AzurePlatform) + #rm_client = platform._rm_client + #assert rm_client + #msi_client = get_managed_service_identity_client(platform) + + node_context = get_node_context(node) + resource_group_name = node_context.resource_group_name + location = node_context.location + vm_name = node_context.vm_name + + # Add resource tag for AzSecPack + tag = {"exemptPolicy": True} + add_tag_for_vm(platform, resource_group_name, vm_name, tag, log) output = node.tools[mdatp].get_result('health', json_out=True) log.info(output) From 0a001c4a15c958d26a6125b7455dab2a39aed6f0 Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Sat, 9 Dec 2023 10:07:59 +0000 Subject: [PATCH 03/10] Azure Runbook with test case support --- lisa/sut_orchestrator/azure/tools.py | 22 ++++++++++- microsoft/runbook/my.yml | 57 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 microsoft/runbook/my.yml diff --git a/lisa/sut_orchestrator/azure/tools.py b/lisa/sut_orchestrator/azure/tools.py index dd6e94dd5a..f0f16e25ee 100644 --- a/lisa/sut_orchestrator/azure/tools.py +++ b/lisa/sut_orchestrator/azure/tools.py @@ -580,13 +580,31 @@ def can_install(self) -> bool: def _install(self) -> bool: self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") posix_os: Posix = cast(Posix, self.node.os) - posix_os.add_azure_core_repo() + cur_repo = posix_os.get_repositories(); + self._log.info(f"{cur_repo}") + + #posix_os.add_azure_core_repo(code_name="bionic") + arch_name = "arm64,armhf,amd64" + repo_url = "https://packages.microsoft.com/ubuntu/18.04/prod" + code_name = "insiders-fast" + posix_os.add_repository( + repo=(f"deb [arch={arch_name}] {repo_url} {code_name} main"), + keys_location=[ + "https://packages.microsoft.com/keys/microsoft.asc", + "https://packages.microsoft.com/keys/msopentech.asc", + ], + ) + + + cur_repo = posix_os.get_repositories(); + self._log.info(f"{cur_repo}") + if posix_os.is_package_in_repo(self.command): posix_os.install_packages(self.command) self._log.info(f"{self.command} is installed successfully") else: raise UnsupportedDistroException( - node.os, f"The distro doesn't have {package} in its repo") + self.node.os, f"The distro doesn't have {self.command} in its repo") return self._check_exists() def get_result( diff --git a/microsoft/runbook/my.yml b/microsoft/runbook/my.yml new file mode 100644 index 0000000000..0b903bfc8c --- /dev/null +++ b/microsoft/runbook/my.yml @@ -0,0 +1,57 @@ +name: azure default +include: + - path: ./debug.yml +variable: + - name: origin + value: tiers/tier.yml + - name: case + value: verify_cpu_count + - name: location + value: "westus3" + - name: keep_environment + value: "no" + - name: resource_group_name + value: "" + - name: marketplace_image + value: "" + - name: vhd + value: "" + - name: vm_size + value: "" + - name: deploy + value: true + - name: wait_delete + value: false + - name: concurrency + value: 5 + - name: admin_private_key_file + value: "" + is_secret: true + - name: admin_password + value: "" + is_secret: true + - name: case + value: verify_cpu_count +concurrency: $(concurrency) +notifier: + - type: html + - type: env_stats +platform: + - type: azure + admin_private_key_file: $(admin_private_key_file) + admin_password: $(admin_password) + keep_environment: $(keep_environment) + azure: + resource_group_name: $(resource_group_name) + deploy: $(deploy) + subscription_id: $(subscription_id) + wait_delete: $(wait_delete) + requirement: + core_count: + min: 2 + azure: + marketplace: $(marketplace_image) + vhd: $(vhd) + location: $(location) + vm_size: $(vm_size) + From 5ac45d11a132b992ebe3b2d11d5daf3f086266b1 Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Thu, 21 Dec 2023 07:26:47 +0000 Subject: [PATCH 04/10] Tests --- lisa/sut_orchestrator/azure/tools.py | 36 +----- microsoft/runbook/my.yml | 3 +- microsoft/testsuites/vm_extensions/mde.py | 148 +++++++++++++++++----- 3 files changed, 121 insertions(+), 66 deletions(-) diff --git a/lisa/sut_orchestrator/azure/tools.py b/lisa/sut_orchestrator/azure/tools.py index f0f16e25ee..52abc764b0 100644 --- a/lisa/sut_orchestrator/azure/tools.py +++ b/lisa/sut_orchestrator/azure/tools.py @@ -2,9 +2,10 @@ # Licensed under the MIT license. import re +import os import sys import json -from pathlib import PurePath +from pathlib import PurePath, Path from typing import Any, Dict, List, Optional, Tuple, Type from assertpy import assert_that @@ -26,6 +27,7 @@ from lisa.operating_system import Posix + class Waagent(Tool): __version_pattern = re.compile(r"(?<=\-)([^\s]+)") @@ -575,36 +577,9 @@ def command(self) -> str: @property def can_install(self) -> bool: self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") - return True + return False def _install(self) -> bool: - self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") - posix_os: Posix = cast(Posix, self.node.os) - cur_repo = posix_os.get_repositories(); - self._log.info(f"{cur_repo}") - - #posix_os.add_azure_core_repo(code_name="bionic") - arch_name = "arm64,armhf,amd64" - repo_url = "https://packages.microsoft.com/ubuntu/18.04/prod" - code_name = "insiders-fast" - posix_os.add_repository( - repo=(f"deb [arch={arch_name}] {repo_url} {code_name} main"), - keys_location=[ - "https://packages.microsoft.com/keys/microsoft.asc", - "https://packages.microsoft.com/keys/msopentech.asc", - ], - ) - - - cur_repo = posix_os.get_repositories(); - self._log.info(f"{cur_repo}") - - if posix_os.is_package_in_repo(self.command): - posix_os.install_packages(self.command) - self._log.info(f"{self.command} is installed successfully") - else: - raise UnsupportedDistroException( - self.node.os, f"The distro doesn't have {self.command} in its repo") return self._check_exists() def get_result( @@ -612,7 +587,7 @@ def get_result( arg: str, json_out: bool = False, sudo: bool = False, - ) -> None: + ) -> Any: self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") if json_out: arg += ' --output json' @@ -620,6 +595,7 @@ def get_result( arg, sudo=sudo, shell=True, + force_run=True, ) result.assert_exit_code() diff --git a/microsoft/runbook/my.yml b/microsoft/runbook/my.yml index 0b903bfc8c..bf5e52c398 100644 --- a/microsoft/runbook/my.yml +++ b/microsoft/runbook/my.yml @@ -42,7 +42,7 @@ platform: admin_password: $(admin_password) keep_environment: $(keep_environment) azure: - resource_group_name: $(resource_group_name) + resource_group_name: "lisa-test-zakhter" deploy: $(deploy) subscription_id: $(subscription_id) wait_delete: $(wait_delete) @@ -50,6 +50,7 @@ platform: core_count: min: 2 azure: + #marketplace: "Debian:debian-10-daily:10-gen2:0.20231218.1599" #$(marketplace_image) marketplace: $(marketplace_image) vhd: $(vhd) location: $(location) diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index ef7d6dfcfb..5c3c9b9992 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -1,8 +1,11 @@ +import os import time -from typing import cast +import requests + +from typing import Any +from pathlib import Path, PurePath from assertpy import assert_that -from retry import retry from lisa import ( Logger, @@ -12,20 +15,12 @@ TestSuiteMetadata, simple_requirement, ) -from lisa.operating_system import BSD, Posix -from lisa.sut_orchestrator.azure.common import ( - add_tag_for_vm, - add_user_assign_identity, - get_managed_service_identity_client, - get_node_context, -) -from lisa.sut_orchestrator.azure.features import AzureExtension -from lisa.sut_orchestrator.azure.platform_ import AzurePlatform -from lisa.sut_orchestrator.azure.tools import Azsecd +from lisa.operating_system import BSD from lisa.sut_orchestrator.azure.tools import mdatp from lisa.testsuite import TestResult -from lisa.tools import Cat, Journalctl, Service -from lisa.util import LisaException, UnsupportedDistroException +from lisa.tools import RemoteCopy, Whoami, Curl +from lisa import CustomScriptBuilder, CustomScript +from lisa.util import LisaException @TestSuiteMetadata( @@ -36,37 +31,120 @@ """, ) class MDE(TestSuite): + + def before_case(self, log: Logger, **kwargs: Any) -> None: + response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh") + if response.ok: + script = response.text + import tempfile + _, self.mde_installer = tempfile.mkstemp(prefix='mde_installer', suffix='.sh') + with open(self.mde_installer, 'w') as writer: + writer.write(script) + self._echo_script = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer)), + [os.path.basename(self.mde_installer)]) + else: + log.error('Unable to download mde_installer.sh script') + + @TestCaseMetadata( + description=""" + Verify MDE installation + """, + priority=1, + requirement=simple_requirement(min_core_count=2, + min_memory_mb=1024, + unsupported_os=[BSD]) + ) + def verify_install(self, node: Node, log: Logger, result: TestResult) -> None: + script: CustomScript = node.tools[self._echo_script] + log.info('Installing MDE') + result1 = script.run(parameters="--install", sudo=True) + log.info(result1) + + try: + output = node.tools[mdatp]._check_exists() + except LisaException as e: + log.error(e) + output = False + + assert_that(output).described_as('Unable to install MDE').is_equal_to(True) + @TestCaseMetadata( description=""" Verify if MDE is healthy """, priority=1, - requirement=simple_requirement( - #supported_features=[AzureExtension], unsupported_os=[BSD] - ), + requirement=simple_requirement(min_core_count=2, + min_memory_mb=1024, + unsupported_os=[BSD]) ) - def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: + def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: + username = node.tools[Whoami].get_username() + + remote_copy = node.tools[RemoteCopy] + remote_copy.copy_to_remote( + PurePath("/home/zakhter/projects/lab/MicrosoftDefenderATPOnboardingLinuxServer.py"), PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py")) + + script: CustomScript = node.tools[self._echo_script] + + log.info('Onboarding MDE') + result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True) + log.info(result1) - environment = result.environment - assert environment, "fail to get environment from testresult" - platform = environment.platform - log.info(platform) - #assert isinstance(platform, AzurePlatform) - #rm_client = platform._rm_client - #assert rm_client - #msi_client = get_managed_service_identity_client(platform) - - node_context = get_node_context(node) - resource_group_name = node_context.resource_group_name - location = node_context.location - vm_name = node_context.vm_name - - # Add resource tag for AzSecPack - tag = {"exemptPolicy": True} - add_tag_for_vm(platform, resource_group_name, vm_name, tag, log) + output = node.tools[mdatp].get_result('health --field licensed') + + log.info(output) + + assert_that(output).is_equal_to(['true']) + + @TestCaseMetadata( + description=""" + Verify if MDE is healthy + """, + priority=1, + requirement=simple_requirement(min_core_count=2, + min_memory_mb=1024, + unsupported_os=[BSD]) + ) + def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: output = node.tools[mdatp].get_result('health', json_out=True) log.info(output) assert_that(output['healthy']).is_equal_to(True) + @TestCaseMetadata( + description=""" + Verify if MDE is healthy + """, + priority=1, + requirement=simple_requirement(min_core_count=2, + min_memory_mb=1024, + unsupported_os=[BSD]) + ) + def eicar_test(self, node: Node, log: Logger, result: TestResult) -> None: + log.info('Running EICAR test') + + output = node.tools[mdatp].get_result('health --field real_time_protection_enabled') + if output == ['false']: + output = node.tools[mdatp].get_result('config real-time-protection --value enabled', sudo=True) + assert_that(' '.join(output)).is_equal_to('Configuration property updated.') + + current_threat_list= node.tools[mdatp].get_result('threat list') + log.info(current_threat_list) + + node.tools[Curl].fetch(arg="-o /tmp/eicar.com.txt", + execute_arg="", + url="https://secure.eicar.org/eicar.com.txt") + + time.sleep(5) #Wait for remediation + + new_threat_list = node.tools[mdatp].get_result('threat list') + log.info(new_threat_list) + + eicar_detect = ' '.join(new_threat_list).replace(' '.join(current_threat_list), '') + + log.info(eicar_detect) + log.info(eicar_detect.find('Name: Virus:DOS/EICAR_Test_File')) + assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).is_equal_to(True) + + From 49560a630d6e92c0f63c887bd556601e78c80a7a Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Thu, 21 Dec 2023 15:52:21 +0000 Subject: [PATCH 05/10] Cleanup --- lisa/sut_orchestrator/azure/tools.py | 44 +--------- lisa/tools/__init__.py | 1 + lisa/tools/mde.py | 99 +++++++++++++++++++++++ microsoft/runbook/my.yml | 4 + microsoft/testsuites/vm_extensions/mde.py | 82 +++++-------------- 5 files changed, 123 insertions(+), 107 deletions(-) create mode 100644 lisa/tools/mde.py diff --git a/lisa/sut_orchestrator/azure/tools.py b/lisa/sut_orchestrator/azure/tools.py index 52abc764b0..e6a491bd0c 100644 --- a/lisa/sut_orchestrator/azure/tools.py +++ b/lisa/sut_orchestrator/azure/tools.py @@ -2,10 +2,7 @@ # Licensed under the MIT license. import re -import os -import sys -import json -from pathlib import PurePath, Path +from pathlib import PurePath from typing import Any, Dict, List, Optional, Tuple, Type from assertpy import assert_that @@ -23,9 +20,6 @@ get_matched_str, ) from lisa.util.process import ExecutableResult -from typing import Optional, cast -from lisa.operating_system import Posix - class Waagent(Tool): @@ -567,39 +561,3 @@ def get_pool_records(self, pool_id: int, force_run: bool = False) -> Dict[str, s records[content_split[i]] = content_split[i + 1] return records - -class mdatp(Tool): - @property - def command(self) -> str: - self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") - return "mdatp" - - @property - def can_install(self) -> bool: - self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") - return False - - def _install(self) -> bool: - return self._check_exists() - - def get_result( - self, - arg: str, - json_out: bool = False, - sudo: bool = False, - ) -> Any: - self._log.info(f"Inside the func: [{sys._getframe().f_code.co_name}]") - if json_out: - arg += ' --output json' - result = self.run( - arg, - sudo=sudo, - shell=True, - force_run=True, - ) - - result.assert_exit_code() - if json_out: - return json.loads(result.stdout) - return result.stdout.split() - diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 4dc2e237ce..2f63a8cfde 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -65,6 +65,7 @@ from .lsvmbus import Lsvmbus from .make import Make from .mdadm import Mdadm +from .mde import MDE from .mkdir import Mkdir from .mkfs import FileSystem, Mkfs, Mkfsext, Mkfsxfs from .modinfo import Modinfo diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py new file mode 100644 index 0000000000..cb19aa9576 --- /dev/null +++ b/lisa/tools/mde.py @@ -0,0 +1,99 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os +import sys +import json +import requests +from pathlib import Path, PurePath +from typing import Any + +from lisa.executable import Tool +from lisa.executable import CustomScriptBuilder, CustomScript +#from . import RemoteCopy, Whoami, Curl +from .remote_copy import RemoteCopy +from .whoami import Whoami + +class MDE(Tool): + @property + def command(self) -> str: + return "mdatp" + + @property + def can_install(self) -> bool: + return True + + def get_mde_installer(self) -> bool: + if not hasattr(self, '_mde_installer'): + response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh") + if response.ok: + script = response.text + import tempfile + _, self.mde_installer_script = tempfile.mkstemp(prefix='mde_installer', suffix='.sh') + with open(self.mde_installer_script, 'w') as writer: + writer.write(script) + self._mde_installer = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer_script)), + [os.path.basename(self.mde_installer_script)]) + return True + return False + return True + + def _install(self) -> bool: + if not self.get_mde_installer(): + self._log.error("Unable to download mde_installer.sh script. MDE can't be installed") + + mde_installer: CustomScript = self.node.tools[self._mde_installer] + self._log.info('Installing MDE') + result1 = mde_installer.run(parameters="--install", sudo=True) + self._log.info(result1) + + return self._check_exists() + + + def onboard(self, onboarding_script: PurePath) -> bool: + if not self._check_exists(): + self._log.error("MDE is not installed, onboarding not possible") + return False + + username = self.node.tools[Whoami].get_username() + + remote_copy = self.node.tools[RemoteCopy] + remote_copy.copy_to_remote(onboarding_script, PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py")) + + if not self.get_mde_installer(): + self._log.error("Unable to download mde_installer.sh script. MDE can't be onboarded") + + script: CustomScript = self.node.tools[self._mde_installer] + + self._log.info('Onboarding MDE') + result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True) + self._log.info(result1) + + output = self.get_result('health --field licensed') + + self._log.info(output) + + return bool(output == ['true']) + + + def get_result( + self, + arg: str, + json_out: bool = False, + sudo: bool = False, + ) -> Any: + if json_out: + arg += ' --output json' + result = self.run( + arg, + sudo=sudo, + shell=True, + force_run=True, + ) + + result.assert_exit_code(include_output=True) + if json_out: + return json.loads(result.stdout) + return result.stdout.split() + + diff --git a/microsoft/runbook/my.yml b/microsoft/runbook/my.yml index bf5e52c398..d93b904506 100644 --- a/microsoft/runbook/my.yml +++ b/microsoft/runbook/my.yml @@ -6,6 +6,10 @@ variable: value: tiers/tier.yml - name: case value: verify_cpu_count + - name: onboarding_script + is_secret: true + is_case_visible: True + value: "" - name: location value: "westus3" - name: keep_environment diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index 5c3c9b9992..16a379618a 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -1,9 +1,7 @@ -import os import time -import requests from typing import Any -from pathlib import Path, PurePath +from pathlib import PurePath from assertpy import assert_that @@ -16,12 +14,9 @@ simple_requirement, ) from lisa.operating_system import BSD -from lisa.sut_orchestrator.azure.tools import mdatp from lisa.testsuite import TestResult -from lisa.tools import RemoteCopy, Whoami, Curl -from lisa import CustomScriptBuilder, CustomScript -from lisa.util import LisaException - +from lisa.tools import Curl, MDE as mdatp +from lisa.util import LisaException, SkippedException @TestSuiteMetadata( area="vm_extension", @@ -33,17 +28,10 @@ class MDE(TestSuite): def before_case(self, log: Logger, **kwargs: Any) -> None: - response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh") - if response.ok: - script = response.text - import tempfile - _, self.mde_installer = tempfile.mkstemp(prefix='mde_installer', suffix='.sh') - with open(self.mde_installer, 'w') as writer: - writer.write(script) - self._echo_script = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer)), - [os.path.basename(self.mde_installer)]) - else: - log.error('Unable to download mde_installer.sh script') + variables = kwargs["variables"] + self.onboarding_script = variables.get("onboarding_script", "") + if not self.onboarding_script: + raise SkippedException("Onboarding script is not provided.") @TestCaseMetadata( description=""" @@ -54,12 +42,9 @@ def before_case(self, log: Logger, **kwargs: Any) -> None: min_memory_mb=1024, unsupported_os=[BSD]) ) - def verify_install(self, node: Node, log: Logger, result: TestResult) -> None: - script: CustomScript = node.tools[self._echo_script] - log.info('Installing MDE') - result1 = script.run(parameters="--install", sudo=True) - log.info(result1) + def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: + #Invoking tools first time, intalls the tool. try: output = node.tools[mdatp]._check_exists() except LisaException as e: @@ -68,43 +53,22 @@ def verify_install(self, node: Node, log: Logger, result: TestResult) -> None: assert_that(output).described_as('Unable to install MDE').is_equal_to(True) - @TestCaseMetadata( - description=""" - Verify if MDE is healthy - """, - priority=1, - requirement=simple_requirement(min_core_count=2, - min_memory_mb=1024, - unsupported_os=[BSD]) - ) - def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: - username = node.tools[Whoami].get_username() + self.verify_onboard(node, log, result) - remote_copy = node.tools[RemoteCopy] - remote_copy.copy_to_remote( - PurePath("/home/zakhter/projects/lab/MicrosoftDefenderATPOnboardingLinuxServer.py"), PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py")) + self.verify_health(node, log, result) - script: CustomScript = node.tools[self._echo_script] + self.verify_eicar_detection(node, log, result) - log.info('Onboarding MDE') - result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True) - log.info(result1) + def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: - output = node.tools[mdatp].get_result('health --field licensed') + onboarding_result = node.tools[mdatp].onboard(PurePath(self.onboarding_script)) - log.info(output) + assert_that(onboarding_result).is_equal_to(True) + + output = node.tools[mdatp].get_result('health --field licensed') assert_that(output).is_equal_to(['true']) - @TestCaseMetadata( - description=""" - Verify if MDE is healthy - """, - priority=1, - requirement=simple_requirement(min_core_count=2, - min_memory_mb=1024, - unsupported_os=[BSD]) - ) def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: output = node.tools[mdatp].get_result('health', json_out=True) @@ -112,16 +76,7 @@ def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: assert_that(output['healthy']).is_equal_to(True) - @TestCaseMetadata( - description=""" - Verify if MDE is healthy - """, - priority=1, - requirement=simple_requirement(min_core_count=2, - min_memory_mb=1024, - unsupported_os=[BSD]) - ) - def eicar_test(self, node: Node, log: Logger, result: TestResult) -> None: + def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> None: log.info('Running EICAR test') output = node.tools[mdatp].get_result('health --field real_time_protection_enabled') @@ -144,7 +99,6 @@ def eicar_test(self, node: Node, log: Logger, result: TestResult) -> None: eicar_detect = ' '.join(new_threat_list).replace(' '.join(current_threat_list), '') log.info(eicar_detect) - log.info(eicar_detect.find('Name: Virus:DOS/EICAR_Test_File')) assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).is_equal_to(True) From a7d6c14321bd824004b6c8207cdbdc02672dd416 Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Fri, 22 Dec 2023 06:54:43 +0000 Subject: [PATCH 06/10] Using Azure SAS Uri for getting onboarding script --- lisa/tools/mde.py | 19 +++++++++---------- microsoft/runbook/my.yml | 4 ++++ microsoft/testsuites/vm_extensions/mde.py | 8 ++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py index cb19aa9576..cb4e04ca15 100644 --- a/lisa/tools/mde.py +++ b/lisa/tools/mde.py @@ -2,17 +2,14 @@ # Licensed under the MIT license. import os -import sys import json import requests -from pathlib import Path, PurePath +from pathlib import Path from typing import Any from lisa.executable import Tool from lisa.executable import CustomScriptBuilder, CustomScript -#from . import RemoteCopy, Whoami, Curl -from .remote_copy import RemoteCopy -from .whoami import Whoami +from lisa.base_tools import Wget class MDE(Tool): @property @@ -50,15 +47,17 @@ def _install(self) -> bool: return self._check_exists() - def onboard(self, onboarding_script: PurePath) -> bool: + def onboard(self, onboarding_script_sas_uri: str) -> bool: if not self._check_exists(): self._log.error("MDE is not installed, onboarding not possible") return False - username = self.node.tools[Whoami].get_username() + wget = self.node.tools[Wget] - remote_copy = self.node.tools[RemoteCopy] - remote_copy.copy_to_remote(onboarding_script, PurePath(f"/home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py")) + download_path = wget.get( + url=onboarding_script_sas_uri, + filename="MicrosoftDefenderATPOnboardingLinuxServer.py", + ) if not self.get_mde_installer(): self._log.error("Unable to download mde_installer.sh script. MDE can't be onboarded") @@ -66,7 +65,7 @@ def onboard(self, onboarding_script: PurePath) -> bool: script: CustomScript = self.node.tools[self._mde_installer] self._log.info('Onboarding MDE') - result1 = script.run(parameters=f"--onboard /home/{username}/MicrosoftDefenderATPOnboardingLinuxServer.py/MicrosoftDefenderATPOnboardingLinuxServer.py", sudo=True) + result1 = script.run(parameters=f"--onboard {download_path}", sudo=True) self._log.info(result1) output = self.get_result('health --field licensed') diff --git a/microsoft/runbook/my.yml b/microsoft/runbook/my.yml index d93b904506..e018be433c 100644 --- a/microsoft/runbook/my.yml +++ b/microsoft/runbook/my.yml @@ -10,6 +10,10 @@ variable: is_secret: true is_case_visible: True value: "" + - name: onboarding_script_sas_uri + is_secret: true + is_case_visible: True + value: "" - name: location value: "westus3" - name: keep_environment diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index 16a379618a..d0330cec0e 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -29,9 +29,9 @@ class MDE(TestSuite): def before_case(self, log: Logger, **kwargs: Any) -> None: variables = kwargs["variables"] - self.onboarding_script = variables.get("onboarding_script", "") - if not self.onboarding_script: - raise SkippedException("Onboarding script is not provided.") + self.onboarding_script_sas_uri = variables.get("onboarding_script_sas_uri", "") + if not self.onboarding_script_sas_uri: + raise SkippedException("Onboarding script SAS URI is not provided.") @TestCaseMetadata( description=""" @@ -61,7 +61,7 @@ def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: - onboarding_result = node.tools[mdatp].onboard(PurePath(self.onboarding_script)) + onboarding_result = node.tools[mdatp].onboard(self.onboarding_script_sas_uri) assert_that(onboarding_result).is_equal_to(True) From 749d8d4f41320d0ed4f384f31489211f6ea296b8 Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Fri, 22 Dec 2023 08:45:42 +0000 Subject: [PATCH 07/10] Review comments --- lisa/tools/mde.py | 31 ++++++++++------------- microsoft/testsuites/vm_extensions/mde.py | 29 +++++++++++++++------ 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py index cb4e04ca15..5b8aeaee9f 100644 --- a/lisa/tools/mde.py +++ b/lisa/tools/mde.py @@ -9,6 +9,7 @@ from lisa.executable import Tool from lisa.executable import CustomScriptBuilder, CustomScript +from .chmod import Chmod from lisa.base_tools import Wget class MDE(Tool): @@ -21,27 +22,25 @@ def can_install(self) -> bool: return True def get_mde_installer(self) -> bool: - if not hasattr(self, '_mde_installer'): - response = requests.get("https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh") - if response.ok: - script = response.text - import tempfile - _, self.mde_installer_script = tempfile.mkstemp(prefix='mde_installer', suffix='.sh') - with open(self.mde_installer_script, 'w') as writer: - writer.write(script) - self._mde_installer = CustomScriptBuilder(Path(os.path.dirname(self.mde_installer_script)), - [os.path.basename(self.mde_installer_script)]) - return True - return False + if not hasattr(self, 'mde_installer'): + wget = self.node.tools[Wget] + + download_path = wget.get( + url="https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh", + filename="mde_installer.sh", + ) + self.mde_installer = download_path + self._log.info(self.mde_installer) + self.node.tools[Chmod].update_folder( + self.mde_installer, "777", sudo=True) return True def _install(self) -> bool: if not self.get_mde_installer(): self._log.error("Unable to download mde_installer.sh script. MDE can't be installed") - mde_installer: CustomScript = self.node.tools[self._mde_installer] self._log.info('Installing MDE') - result1 = mde_installer.run(parameters="--install", sudo=True) + result1 = self.node.execute(f"{self.mde_installer} --install", shell=True, sudo=True) self._log.info(result1) return self._check_exists() @@ -62,10 +61,8 @@ def onboard(self, onboarding_script_sas_uri: str) -> bool: if not self.get_mde_installer(): self._log.error("Unable to download mde_installer.sh script. MDE can't be onboarded") - script: CustomScript = self.node.tools[self._mde_installer] - self._log.info('Onboarding MDE') - result1 = script.run(parameters=f"--onboard {download_path}", sudo=True) + result1 = self.node.execute(f"{self.mde_installer} --onboard {download_path}", shell=True, sudo=True) self._log.info(result1) output = self.get_result('health --field licensed') diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index d0330cec0e..0b8973e744 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -22,7 +22,19 @@ area="vm_extension", category="functional", description=""" - MDE Test Suite + Verify MDE installation + Microsoft Defender for Endpoint(MDE) for Linux includes antimalware and endpoint detection and response (EDR) capabilities. + + This test suites validates if MDE can be installed, onboarded and detect an EICAR file. + + The test requires the onboarding script to be kept in Azure Storage Account and provide the SAS url for downloading + under the secret variable `onboarding_script_sas_uri`. + + The suite runs the following tests: + 1. Installation test + 2. Onboarding test + 3. Health test + 4. EICAR detection test """, ) class MDE(TestSuite): @@ -35,7 +47,7 @@ def before_case(self, log: Logger, **kwargs: Any) -> None: @TestCaseMetadata( description=""" - Verify MDE installation + Verify MDE installation, onboarding, health and EICAR detection. """, priority=1, requirement=simple_requirement(min_core_count=2, @@ -46,7 +58,8 @@ def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: #Invoking tools first time, intalls the tool. try: - output = node.tools[mdatp]._check_exists() + node.tools[mdatp] + output = True except LisaException as e: log.error(e) output = False @@ -63,18 +76,18 @@ def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: onboarding_result = node.tools[mdatp].onboard(self.onboarding_script_sas_uri) - assert_that(onboarding_result).is_equal_to(True) + assert_that(onboarding_result).described_as('Unable to onboard MDE').is_equal_to(True) output = node.tools[mdatp].get_result('health --field licensed') - assert_that(output).is_equal_to(['true']) + assert_that(output).described_as('MDE is not licensed').is_equal_to(['true']) def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: output = node.tools[mdatp].get_result('health', json_out=True) log.info(output) - assert_that(output['healthy']).is_equal_to(True) + assert_that(output['healthy']).described_as('MDE is not healthy').is_equal_to(True) def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> None: log.info('Running EICAR test') @@ -82,7 +95,7 @@ def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> output = node.tools[mdatp].get_result('health --field real_time_protection_enabled') if output == ['false']: output = node.tools[mdatp].get_result('config real-time-protection --value enabled', sudo=True) - assert_that(' '.join(output)).is_equal_to('Configuration property updated.') + assert_that(' '.join(output)).described_as('Unable to enable RTP for MDE').is_equal_to('Configuration property updated.') current_threat_list= node.tools[mdatp].get_result('threat list') log.info(current_threat_list) @@ -99,6 +112,6 @@ def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> eicar_detect = ' '.join(new_threat_list).replace(' '.join(current_threat_list), '') log.info(eicar_detect) - assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).is_equal_to(True) + assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).described_as('MDE is not able to detect EICAR file').is_equal_to(True) From dd107689fbd4a106db67b10d24035fe90786eb6c Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Fri, 22 Dec 2023 09:04:55 +0000 Subject: [PATCH 08/10] Cleanup --- lisa/tools/mde.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py index 5b8aeaee9f..0b6fc141f9 100644 --- a/lisa/tools/mde.py +++ b/lisa/tools/mde.py @@ -1,14 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -import os import json -import requests -from pathlib import Path from typing import Any from lisa.executable import Tool -from lisa.executable import CustomScriptBuilder, CustomScript from .chmod import Chmod from lisa.base_tools import Wget From bf6866738dce3af545094eede3e1713a286d962e Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Fri, 22 Dec 2023 10:38:45 +0000 Subject: [PATCH 09/10] Linter --- lisa/tools/__init__.py | 1 + lisa/tools/mde.py | 44 +++++----- microsoft/testsuites/vm_extensions/mde.py | 99 +++++++++++++---------- 3 files changed, 83 insertions(+), 61 deletions(-) diff --git a/lisa/tools/__init__.py b/lisa/tools/__init__.py index 2f63a8cfde..089e9d7521 100644 --- a/lisa/tools/__init__.py +++ b/lisa/tools/__init__.py @@ -180,6 +180,7 @@ "Lsvmbus", "Make", "Mdadm", + "MDE", "Mkdir", "Mkfs", "Mkfsext", diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py index 0b6fc141f9..dbd1dacb41 100644 --- a/lisa/tools/mde.py +++ b/lisa/tools/mde.py @@ -4,9 +4,11 @@ import json from typing import Any +from lisa.base_tools import Wget from lisa.executable import Tool + from .chmod import Chmod -from lisa.base_tools import Wget + class MDE(Tool): @property @@ -18,30 +20,33 @@ def can_install(self) -> bool: return True def get_mde_installer(self) -> bool: - if not hasattr(self, 'mde_installer'): + if not hasattr(self, "mde_installer"): wget = self.node.tools[Wget] download_path = wget.get( - url="https://raw.githubusercontent.com/microsoft/mdatp-xplat/master/linux/installation/mde_installer.sh", - filename="mde_installer.sh", + url="https://raw.githubusercontent.com/microsoft/mdatp-xplat/" + "master/linux/installation/mde_installer.sh", + filename="mde_installer.sh", ) self.mde_installer = download_path self._log.info(self.mde_installer) - self.node.tools[Chmod].update_folder( - self.mde_installer, "777", sudo=True) + self.node.tools[Chmod].update_folder(self.mde_installer, "777", sudo=True) return True def _install(self) -> bool: if not self.get_mde_installer(): - self._log.error("Unable to download mde_installer.sh script. MDE can't be installed") + self._log.error( + "Unable to download mde_installer.sh script. MDE can't be installed" + ) - self._log.info('Installing MDE') - result1 = self.node.execute(f"{self.mde_installer} --install", shell=True, sudo=True) + self._log.info("Installing MDE") + result1 = self.node.execute( + f"{self.mde_installer} --install", shell=True, sudo=True + ) self._log.info(result1) return self._check_exists() - def onboard(self, onboarding_script_sas_uri: str) -> bool: if not self._check_exists(): self._log.error("MDE is not installed, onboarding not possible") @@ -55,18 +60,21 @@ def onboard(self, onboarding_script_sas_uri: str) -> bool: ) if not self.get_mde_installer(): - self._log.error("Unable to download mde_installer.sh script. MDE can't be onboarded") + self._log.error( + "Unable to download mde_installer.sh script. MDE can't be onboarded" + ) - self._log.info('Onboarding MDE') - result1 = self.node.execute(f"{self.mde_installer} --onboard {download_path}", shell=True, sudo=True) + self._log.info("Onboarding MDE") + result1 = self.node.execute( + f"{self.mde_installer} --onboard {download_path}", shell=True, sudo=True + ) self._log.info(result1) - output = self.get_result('health --field licensed') + output = self.get_result("health --field licensed") self._log.info(output) - return bool(output == ['true']) - + return bool(output == ["true"]) def get_result( self, @@ -75,7 +83,7 @@ def get_result( sudo: bool = False, ) -> Any: if json_out: - arg += ' --output json' + arg += " --output json" result = self.run( arg, sudo=sudo, @@ -87,5 +95,3 @@ def get_result( if json_out: return json.loads(result.stdout) return result.stdout.split() - - diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index 0b8973e744..93a01ee235 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -1,7 +1,5 @@ import time - from typing import Any -from pathlib import PurePath from assertpy import assert_that @@ -15,20 +13,24 @@ ) from lisa.operating_system import BSD from lisa.testsuite import TestResult -from lisa.tools import Curl, MDE as mdatp +from lisa.tools import MDE, Curl from lisa.util import LisaException, SkippedException + @TestSuiteMetadata( area="vm_extension", category="functional", description=""" Verify MDE installation - Microsoft Defender for Endpoint(MDE) for Linux includes antimalware and endpoint detection and response (EDR) capabilities. + Microsoft Defender for Endpoint(MDE) for Linux includes + antimalware and endpoint detection and response (EDR) capabilities. - This test suites validates if MDE can be installed, onboarded and detect an EICAR file. + This test suites validates if MDE can be installed, onboarded + and detect an EICAR file. - The test requires the onboarding script to be kept in Azure Storage Account and provide the SAS url for downloading - under the secret variable `onboarding_script_sas_uri`. + The test requires the onboarding script to be kept in Azure Storage Account + and provide the SAS url for downloading under the + secret variable `onboarding_script_sas_uri`. The suite runs the following tests: 1. Installation test @@ -37,8 +39,7 @@ 4. EICAR detection test """, ) -class MDE(TestSuite): - +class MDETest(TestSuite): def before_case(self, log: Logger, **kwargs: Any) -> None: variables = kwargs["variables"] self.onboarding_script_sas_uri = variables.get("onboarding_script_sas_uri", "") @@ -50,21 +51,20 @@ def before_case(self, log: Logger, **kwargs: Any) -> None: Verify MDE installation, onboarding, health and EICAR detection. """, priority=1, - requirement=simple_requirement(min_core_count=2, - min_memory_mb=1024, - unsupported_os=[BSD]) + requirement=simple_requirement( + min_core_count=2, min_memory_mb=1024, unsupported_os=[BSD] + ), ) def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: - - #Invoking tools first time, intalls the tool. + # Invoking tools first time, intalls the tool. try: - node.tools[mdatp] + node.tools[MDE] output = True except LisaException as e: log.error(e) output = False - assert_that(output).described_as('Unable to install MDE').is_equal_to(True) + assert_that(output).described_as("Unable to install MDE").is_equal_to(True) self.verify_onboard(node, log, result) @@ -73,45 +73,60 @@ def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: self.verify_eicar_detection(node, log, result) def verify_onboard(self, node: Node, log: Logger, result: TestResult) -> None: + onboarding_result = node.tools[MDE].onboard(self.onboarding_script_sas_uri) - onboarding_result = node.tools[mdatp].onboard(self.onboarding_script_sas_uri) - - assert_that(onboarding_result).described_as('Unable to onboard MDE').is_equal_to(True) + assert_that(onboarding_result).described_as( + "Unable to onboard MDE" + ).is_equal_to(True) - output = node.tools[mdatp].get_result('health --field licensed') + output = node.tools[MDE].get_result("health --field licensed") - assert_that(output).described_as('MDE is not licensed').is_equal_to(['true']) + assert_that(output).described_as("MDE is not licensed").is_equal_to(["true"]) def verify_health(self, node: Node, log: Logger, result: TestResult) -> None: - output = node.tools[mdatp].get_result('health', json_out=True) + output = node.tools[MDE].get_result("health", json_out=True) log.info(output) - assert_that(output['healthy']).described_as('MDE is not healthy').is_equal_to(True) - - def verify_eicar_detection(self, node: Node, log: Logger, result: TestResult) -> None: - log.info('Running EICAR test') - - output = node.tools[mdatp].get_result('health --field real_time_protection_enabled') - if output == ['false']: - output = node.tools[mdatp].get_result('config real-time-protection --value enabled', sudo=True) - assert_that(' '.join(output)).described_as('Unable to enable RTP for MDE').is_equal_to('Configuration property updated.') - - current_threat_list= node.tools[mdatp].get_result('threat list') + assert_that(output["healthy"]).described_as("MDE is not healthy").is_equal_to( + True + ) + + def verify_eicar_detection( + self, node: Node, log: Logger, result: TestResult + ) -> None: + log.info("Running EICAR test") + + output = node.tools[MDE].get_result( + "health --field real_time_protection_enabled" + ) + if output == ["false"]: + output = node.tools[MDE].get_result( + "config real-time-protection --value enabled", sudo=True + ) + assert_that(" ".join(output)).described_as( + "Unable to enable RTP for MDE" + ).is_equal_to("Configuration property updated.") + + current_threat_list = node.tools[MDE].get_result("threat list") log.info(current_threat_list) - node.tools[Curl].fetch(arg="-o /tmp/eicar.com.txt", - execute_arg="", - url="https://secure.eicar.org/eicar.com.txt") + node.tools[Curl].fetch( + arg="-o /tmp/eicar.com.txt", + execute_arg="", + url="https://secure.eicar.org/eicar.com.txt", + ) - time.sleep(5) #Wait for remediation + time.sleep(5) # Wait for remediation - new_threat_list = node.tools[mdatp].get_result('threat list') + new_threat_list = node.tools[MDE].get_result("threat list") log.info(new_threat_list) - eicar_detect = ' '.join(new_threat_list).replace(' '.join(current_threat_list), '') + eicar_detect = " ".join(new_threat_list).replace( + " ".join(current_threat_list), "" + ) log.info(eicar_detect) - assert_that('Name: Virus:DOS/EICAR_Test_File' in eicar_detect).described_as('MDE is not able to detect EICAR file').is_equal_to(True) - - + assert_that("Name: Virus:DOS/EICAR_Test_File" in eicar_detect).described_as( + "MDE is not able to detect EICAR file" + ).is_equal_to(True) From 9e1e46fa21e8dab4cb3408dd8c7366d60c03ee9c Mon Sep 17 00:00:00 2001 From: Zeeshan Akhter Date: Fri, 22 Dec 2023 10:54:59 +0000 Subject: [PATCH 10/10] Linter --- lisa/tools/mde.py | 1 - microsoft/testsuites/vm_extensions/mde.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lisa/tools/mde.py b/lisa/tools/mde.py index dbd1dacb41..9b9801ba5c 100644 --- a/lisa/tools/mde.py +++ b/lisa/tools/mde.py @@ -29,7 +29,6 @@ def get_mde_installer(self) -> bool: filename="mde_installer.sh", ) self.mde_installer = download_path - self._log.info(self.mde_installer) self.node.tools[Chmod].update_folder(self.mde_installer, "777", sudo=True) return True diff --git a/microsoft/testsuites/vm_extensions/mde.py b/microsoft/testsuites/vm_extensions/mde.py index 93a01ee235..77a8efce90 100644 --- a/microsoft/testsuites/vm_extensions/mde.py +++ b/microsoft/testsuites/vm_extensions/mde.py @@ -58,8 +58,7 @@ def before_case(self, log: Logger, **kwargs: Any) -> None: def verify_mde(self, node: Node, log: Logger, result: TestResult) -> None: # Invoking tools first time, intalls the tool. try: - node.tools[MDE] - output = True + output = node.tools[MDE]._check_exists() except LisaException as e: log.error(e) output = False