diff --git a/src/analysis/PluginBase.py b/src/analysis/PluginBase.py deleted file mode 100644 index d3a62ef76..000000000 --- a/src/analysis/PluginBase.py +++ /dev/null @@ -1,259 +0,0 @@ -from __future__ import annotations # noqa: N999 - -import ctypes -import logging -import os -from multiprocessing import Array, Manager, Queue, Value -from queue import Empty -from time import time -from typing import TYPE_CHECKING - -from packaging.version import InvalidVersion -from packaging.version import parse as parse_version - -import config -from helperFunctions.process import ( - ExceptionSafeProcess, - check_worker_exceptions, - start_single_worker, - stop_processes, - terminate_process_and_children, -) -from helperFunctions.tag import TagColor -from plugins.base import BasePlugin - -if TYPE_CHECKING: - from objects.file import FileObject - -META_KEYS = { - 'tags', - 'summary', - 'analysis_date', - 'plugin_version', - 'system_version', - 'file_system_flag', - 'result', -} - - -def sanitize_processed_analysis(processed_analysis_entry: dict) -> dict: - # Old analysis plugins (before AnalysisPluginV0) could write anything they want to processed_analysis. - # We put everything the plugin wrote into a separate dict so that it matches the behavior of AnalysisPluginV0 - result = {} - for key in list(processed_analysis_entry): - if key in META_KEYS: - continue - - result[key] = processed_analysis_entry.pop(key) - - processed_analysis_entry['result'] = result - - return processed_analysis_entry - - -class PluginInitException(Exception): # noqa: N818 - def __init__(self, *args, plugin: AnalysisBasePlugin): - self.plugin: AnalysisBasePlugin = plugin - super().__init__(*args) - - -class AnalysisBasePlugin(BasePlugin): - """ - This is the base plugin. All analysis plugins should be a subclass of this class. - """ - - # must be set by the plugin: - FILE = None - NAME = None - DESCRIPTION = None - VERSION = None - - # can be set by the plugin: - RECURSIVE = True # If `True` (default) recursively analyze included files - TIMEOUT = 300 - SYSTEM_VERSION = None - MIME_BLACKLIST = [] # noqa: RUF012 - MIME_WHITELIST = [] # noqa: RUF012 - - ANALYSIS_STATS_LIMIT = 1000 - - def __init__(self, no_multithread=False, view_updater=None): - super().__init__(plugin_path=self.FILE, view_updater=view_updater) - self._check_plugin_attributes() - self.additional_setup() - self.in_queue = Queue() - self.out_queue = Queue() - self.stop_condition = Value('i', 0) - self.workers = [] - self.thread_count = 1 if no_multithread else self._get_thread_count() - self.active = [Value('i', 0) for _ in range(self.thread_count)] - self.manager = Manager() - self.analysis_stats = Array(ctypes.c_float, self.ANALYSIS_STATS_LIMIT) - self.analysis_stats_count = Value('i', 0) - self.analysis_stats_index = Value('i', 0) - - def _get_thread_count(self): - """ - Get the thread count from the config. If there is no configuration for this plugin use the default value. - """ - default_process_count = config.backend.plugin_defaults.processes - plugin_config = config.backend.plugin.get(self.NAME, None) - return getattr(plugin_config, 'processes', default_process_count) - - def additional_setup(self): - """ - This function can be implemented by the plugin to do initialization - """ - - def start(self): - """Starts the plugin workers.""" - for process_index in range(self.thread_count): - self.workers.append(start_single_worker(process_index, 'Analysis', self.worker)) - logging.debug(f'{self.NAME}: {len(self.workers)} worker threads started') - - def shutdown(self): - """ - This function can be called to shut down all working threads - """ - logging.debug('Shutting down...') - self.stop_condition.value = 1 - self.in_queue.close() - stop_processes(self.workers, timeout=10.0) # give running analyses some time to finish - self.out_queue.close() - self.manager.shutdown() - - def _check_plugin_attributes(self): - for attribute in ['FILE', 'NAME', 'VERSION']: - if getattr(self, attribute, None) is None: - raise PluginInitException(f'Plugin {self.NAME} is missing {attribute} in configuration', plugin=self) - self._check_version(self.VERSION) - if self.SYSTEM_VERSION: - self._check_version(self.SYSTEM_VERSION, label='System version') - - def _check_version(self, version: str, label: str = 'Version'): - try: - parse_version(version) - except InvalidVersion: - raise PluginInitException( # noqa: B904 - f'{label} "{version}" of plugin {self.NAME} is invalid', plugin=self - ) - - def add_job(self, fw_object: FileObject): - if self._analysis_depth_not_reached_yet(fw_object): - self.in_queue.put(fw_object) - else: - self.out_queue.put(fw_object) - - def _analysis_depth_not_reached_yet(self, fo): - return self.RECURSIVE or fo.depth == 0 - - def process_object(self, file_object): - """ - This function must be implemented by the plugin - """ - return file_object - - def analyze_file(self, file_object): - fo = self.process_object(file_object) - return self._add_plugin_version_and_timestamp_to_analysis_result(fo) - - def _add_plugin_version_and_timestamp_to_analysis_result(self, fo): - fo.processed_analysis[self.NAME].update(self.init_dict()) - return fo - - # ---- internal functions ---- - - def add_analysis_tag(self, file_object, tag_name, value, color=TagColor.LIGHT_BLUE, propagate=False): - new_tag = { - tag_name: { - 'value': value, - 'color': color, - 'propagate': propagate, - }, - 'root_uid': file_object.root_uid, - } - if 'tags' not in file_object.processed_analysis[self.NAME]: - file_object.processed_analysis[self.NAME]['tags'] = new_tag - else: - file_object.processed_analysis[self.NAME]['tags'].update(new_tag) - - def init_dict(self) -> dict: - result_update = { - 'analysis_date': time(), - 'plugin_version': self.VERSION, - 'result': {}, - } - if self.SYSTEM_VERSION: - result_update.update({'system_version': self.SYSTEM_VERSION}) - return result_update - - def process_next_object(self, task, result): - task.processed_analysis.update({self.NAME: {}}) - finished_task = self.analyze_file(task) - result.append(finished_task) - - @staticmethod - def timeout_happened(process): - return process.is_alive() - - def worker_processing_with_timeout(self, worker_id, next_task: FileObject): - result = self.manager.list() - process = ExceptionSafeProcess(target=self.process_next_object, args=(next_task, result), reraise=False) - start = time() - process.start() - process.join(timeout=self.TIMEOUT) - duration = time() - start - if duration > 120: # noqa: PLR2004 - logging.info(f'Analysis {self.NAME} on {next_task.uid} is slow: took {duration:.1f} seconds') - self._update_duration_stats(duration) - - if self.timeout_happened(process): - result_fo = self._handle_failed_analysis(next_task, process, worker_id, 'Timeout') - elif process.exception: - _, trace = process.exception - result_fo = self._handle_failed_analysis(next_task, process, worker_id, 'Exception', trace=trace) - else: - result_fo = result.pop() - logging.debug(f'Worker {worker_id}: Finished {self.NAME} analysis on {next_task.uid}') - - processed_analysis_entry = result_fo.processed_analysis.pop(self.NAME) - result_fo.processed_analysis[self.NAME] = sanitize_processed_analysis(processed_analysis_entry) - self.out_queue.put(result_fo) - - def _update_duration_stats(self, duration): - with self.analysis_stats.get_lock(): - self.analysis_stats[self.analysis_stats_index.value] = duration - self.analysis_stats_index.value += 1 - if self.analysis_stats_index.value >= self.ANALYSIS_STATS_LIMIT: - # if the stats array is full, overwrite the oldest result - self.analysis_stats_index.value = 0 - if self.analysis_stats_count.value < self.ANALYSIS_STATS_LIMIT: - self.analysis_stats_count.value += 1 - - def _handle_failed_analysis(self, fw_object, process, worker_id, cause: str, trace: str | None = None): - terminate_process_and_children(process) - fw_object.analysis_exception = (self.NAME, f'{cause} occurred during analysis') - message = f'Worker {worker_id}: {cause} during analysis {self.NAME} on {fw_object.uid}' - if trace: - message += f':\n{trace}' - logging.error(message) - - return fw_object - - def worker(self, worker_id): - logging.debug(f'started {self.NAME} worker {worker_id} (pid={os.getpid()})') - while self.stop_condition.value == 0: - try: - next_task = self.in_queue.get(timeout=float(config.backend.block_delay)) - logging.debug(f'Worker {worker_id}: Begin {self.NAME} analysis on {next_task.uid}') - except Empty: - self.active[worker_id].value = 0 - else: - self.active[worker_id].value = 1 - next_task.processed_analysis.update({self.NAME: {}}) - self.worker_processing_with_timeout(worker_id, next_task) - - logging.debug(f'worker {worker_id} stopped') - - def check_exceptions(self): - return check_worker_exceptions(self.workers, 'Analysis', self.worker) diff --git a/src/analysis/YaraPluginBase.py b/src/analysis/YaraPluginBase.py deleted file mode 100644 index 65881fc06..000000000 --- a/src/analysis/YaraPluginBase.py +++ /dev/null @@ -1,119 +0,0 @@ -from __future__ import annotations # noqa: N999 - -import logging -import re -import subprocess -from pathlib import Path - -import yaml -from yaml.parser import ParserError - -from analysis.PluginBase import AnalysisBasePlugin, PluginInitException -from helperFunctions.fileSystem import get_src_dir - - -class YaraBasePlugin(AnalysisBasePlugin): - """ - This should be the base for all YARA based analysis plugins - """ - - NAME = 'Yara_Base_Plugin' - DESCRIPTION = 'this is a Yara plugin' - VERSION = '0.0' - FILE = None - - def __init__(self, view_updater=None): - """ - recursive flag: If True recursively analyze included files - propagate flag: If True add analysis result of child to parent object - """ - self.signature_path = self._get_signature_file(self.FILE) if self.FILE else None - if self.signature_path and not Path(self.signature_path).exists(): - raise PluginInitException( - f'Signature file {self.signature_path} not found. Did you run "compile_yara_signatures.py"?', - plugin=self, - ) - self.SYSTEM_VERSION = self.get_yara_system_version() - super().__init__(view_updater=view_updater) - - def get_yara_system_version(self): - with subprocess.Popen(['yara', '--version'], stdout=subprocess.PIPE) as process: - yara_version = process.stdout.readline().decode().strip() - - access_time = int(Path(self.signature_path).stat().st_mtime) - return f'{yara_version}-{access_time}' - - def process_object(self, file_object): - if self.signature_path is not None: - compiled_flag = '-C' if Path(self.signature_path).read_bytes().startswith(b'YARA') else '' - command = f'yara {compiled_flag} --print-meta --print-strings {self.signature_path} {file_object.file_path}' - with subprocess.Popen(command, shell=True, stdout=subprocess.PIPE) as process: - output = process.stdout.read().decode() - try: - result = self._parse_yara_output(output) - file_object.processed_analysis[self.NAME] = result - file_object.processed_analysis[self.NAME]['summary'] = list(result.keys()) - except (ValueError, TypeError): - file_object.processed_analysis[self.NAME] = {'failed': 'Processing corrupted. Likely bad call to yara.'} - else: - file_object.processed_analysis[self.NAME] = {'failed': 'Signature path not set'} - return file_object - - @staticmethod - def _get_signature_file_name(plugin_path): - return plugin_path.split('/')[-3] + '.yc' - - def _get_signature_file(self, plugin_path): - sig_file_name = self._get_signature_file_name(plugin_path) - return str(Path(get_src_dir()) / 'analysis/signatures' / sig_file_name) - - @staticmethod - def _parse_yara_output(output): - resulting_matches = {} - - match_blocks, rules = _split_output_in_rules_and_matches(output) - - matches_regex = re.compile(r'((0x[a-f0-9]*):(\$[a-zA-Z0-9_]+):\s(.+))+') - for index, rule in enumerate(rules): - for match in matches_regex.findall(match_blocks[index]): - _append_match_to_result(match, resulting_matches, rule) - - return resulting_matches - - -def _split_output_in_rules_and_matches(output): - split_regex = re.compile(r'\n*.*\[.*\]\s/.+\n*') - match_blocks = split_regex.split(output) - while '' in match_blocks: - match_blocks.remove('') - - rule_regex = re.compile(r'(\w*)\s\[(.*)\]\s([.]{0,2}/)(.+)') - rules = rule_regex.findall(output) - - if not len(match_blocks) == len(rules): - raise ValueError() - return match_blocks, rules - - -def _append_match_to_result(match, resulting_matches: dict[str, dict], rule): - rule_name, meta_string, _, _ = rule - _, offset, matched_tag, matched_string = match - resulting_matches.setdefault( - rule_name, {'rule': rule_name, 'matches': True, 'strings': [], 'meta': _parse_meta_data(meta_string)} - ) - resulting_matches[rule_name]['strings'].append((int(offset, 16), matched_tag, matched_string)) - - -def _parse_meta_data(meta_data_string: str) -> dict[str, str | bool | int]: - """ - Will be of form 'item0=lowercaseboolean0,item1="str1",item2=int2,...' - """ - try: - # YARA insert backslashes before single quotes in the meta output and the YAML parser doesn't like that - meta_data_string = meta_data_string.replace(r'\'', "'") - meta_data = yaml.safe_load(f'{{{meta_data_string.replace("=", ": ")}}}') - assert isinstance(meta_data, dict) - return meta_data - except (ParserError, AssertionError): - logging.warning(f"Malformed meta string '{meta_data_string}'") - return {} diff --git a/src/analysis/plugin/compat.py b/src/analysis/plugin/compat.py index e624f9615..e2d9a5dd0 100644 --- a/src/analysis/plugin/compat.py +++ b/src/analysis/plugin/compat.py @@ -1,60 +1,5 @@ import yara -from statistic.analysis_stats import ANALYSIS_STATS_LIMIT - - -class AnalysisBasePluginAdapterMixin: - """A mixin that makes AnalysisPluginV0 compatible to AnalysisBasePlugin""" - - def start(self): - # This is a no-op - pass - - @property - def NAME(self): # noqa: N802 - return self.metadata.name - - @property - def DESCRIPTION(self): # noqa: N802 - return self.metadata.description - - @property - def DEPENDENCIES(self): # noqa: N802 - return self.metadata.dependencies - - @property - def VERSION(self): # noqa: N802 - return str(self.metadata.version) - - @property - def RECURSIVE(self): # noqa: N802 - return False - - @property - def TIMEOUT(self): # noqa: N802 - return self.metadata.timeout - - @property - def SYSTEM_VERSION(self): # noqa: N802 - return self.metadata.system_version - - @property - def MIME_BLACKLIST(self): # noqa: N802 - return self.metadata.mime_blacklist - - @property - def MIME_WHITELIST(self): # noqa: N802 - return self.metadata.mime_whitelist - - @property - def ANALYSIS_STATS_LIMIT(self): # noqa: N802 - # Since no plugin sets this, we just use the default from AnalysisBasePlugin here - return ANALYSIS_STATS_LIMIT - - def shutdown(self): - # The shutdown of plugin workers is handled by the PluginRunner - pass - def yara_match_to_dict(match: yara.Match) -> dict: """Converts a ``yara.Match`` to the format that :py:class:`analysis.YaraPluginBase` would return.""" diff --git a/src/analysis/plugin/plugin.py b/src/analysis/plugin/plugin.py index d804ccd77..80819f041 100644 --- a/src/analysis/plugin/plugin.py +++ b/src/analysis/plugin/plugin.py @@ -8,8 +8,6 @@ import semver from pydantic import BaseModel, ConfigDict, field_validator -from . import compat - if typing.TYPE_CHECKING: import io @@ -35,7 +33,7 @@ class Tag(BaseModel): propagate: bool = False -class AnalysisPluginV0(compat.AnalysisBasePluginAdapterMixin, metaclass=abc.ABCMeta): +class AnalysisPluginV0(metaclass=abc.ABCMeta): """An abstract class that all analysis plugins must inherit from. Analysis plugins should not depend on FACT_core code where they mustn't. diff --git a/src/conftest.py b/src/conftest.py index 528ea9891..c2f7e46c9 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -5,7 +5,7 @@ import os from pathlib import Path from tempfile import TemporaryDirectory -from typing import Type, Union +from typing import Type import pytest from pydantic import BaseModel, ConfigDict, Field @@ -13,8 +13,6 @@ import config from analysis.plugin import AnalysisPluginV0 -from analysis.PluginBase import AnalysisBasePlugin -from test.common_helper import CommonDatabaseMock from test.conftest import merge_markers @@ -186,7 +184,7 @@ class AnalysisPluginTestConfig(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) #: The class of the plugin to be tested. It will most probably be called ``AnalysisPlugin``. - plugin_class: Union[Type[AnalysisBasePlugin], Type[AnalysisPluginV0]] = AnalysisBasePlugin + plugin_class: Type[AnalysisPluginV0] = AnalysisPluginV0 #: Whether or not to start the workers (see ``AnalysisPlugin.start``). #: Not supported for AnalysisPluginV0 start_processes: bool = False @@ -240,26 +238,11 @@ def my_fancy_test(analysis_plugin, monkeypatch): # FIXME now with AnalysisPluginV0 analysis plugins became way simpler # We might want to delete everything from AnalysisPluginTestConfig in the future PluginClass = test_config.plugin_class # noqa: N806 - if issubclass(PluginClass, AnalysisPluginV0): - assert ( - test_config.init_kwargs == {} - ), 'AnalysisPluginTestConfig.init_kwargs must be empty for AnalysisPluginV0 instances' - assert ( - not test_config.start_processes - ), 'AnalysisPluginTestConfig.start_processes cannot be True for AnalysisPluginV0 instances' - - yield PluginClass() - - elif issubclass(PluginClass, AnalysisBasePlugin): - plugin_instance = PluginClass( - view_updater=CommonDatabaseMock(), - **test_config.init_kwargs, - ) - - # We don't want to actually start workers when testing, except for some special cases - if test_config.start_processes: - plugin_instance.start() - - yield plugin_instance - - plugin_instance.shutdown() + assert ( + test_config.init_kwargs == {} + ), 'AnalysisPluginTestConfig.init_kwargs must be empty for AnalysisPluginV0 instances' + assert ( + not test_config.start_processes + ), 'AnalysisPluginTestConfig.start_processes cannot be True for AnalysisPluginV0 instances' + + return PluginClass() diff --git a/src/plugins/analysis/cve_lookup/code/cve_lookup.py b/src/plugins/analysis/cve_lookup/code/cve_lookup.py index b5d5883d6..1c0d0cced 100644 --- a/src/plugins/analysis/cve_lookup/code/cve_lookup.py +++ b/src/plugins/analysis/cve_lookup/code/cve_lookup.py @@ -52,8 +52,8 @@ def __init__(self): ) ) ) - self.min_crit_score = getattr(config.backend.plugin.get(self.NAME, {}), 'min-critical-score', 9.0) - self.match_any = getattr(config.backend.plugin.get(self.NAME, {}), 'match-any', False) + self.min_crit_score = getattr(config.backend.plugin.get(self.metadata.name, {}), 'min-critical-score', 9.0) + self.match_any = getattr(config.backend.plugin.get(self.metadata.name, {}), 'match-any', False) def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema: """ diff --git a/src/plugins/analysis/cwe_checker/code/cwe_checker.py b/src/plugins/analysis/cwe_checker/code/cwe_checker.py index 34df3b48d..d7a45c765 100644 --- a/src/plugins/analysis/cwe_checker/code/cwe_checker.py +++ b/src/plugins/analysis/cwe_checker/code/cwe_checker.py @@ -67,8 +67,8 @@ def __init__(self): ) ) self._log_version_string() - self.memory_limit = getattr(config.backend.plugin.get(self.NAME, None), 'memory_limit', '4G') - self.swap_limit = getattr(config.backend.plugin.get(self.NAME, None), 'memswap_limit', '4G') + self.memory_limit = getattr(config.backend.plugin.get(self.metadata.name, None), 'memory_limit', '4G') + self.swap_limit = getattr(config.backend.plugin.get(self.metadata.name, None), 'memswap_limit', '4G') def _log_version_string(self): output = self._run_cwe_checker_to_get_version_string() @@ -92,7 +92,7 @@ def _run_cwe_checker_in_docker(self, file_path: str) -> bytes: result = run_docker_container( DOCKER_IMAGE, combine_stderr_stdout=True, - timeout=self.TIMEOUT - 30, + timeout=self.metadata.timeout - 30, command='/input --json --quiet', mounts=[ Mount('/input', file_path, type='bind'), diff --git a/src/plugins/analysis/dummy/code/__init__.py b/src/plugins/analysis/dummy/code/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/plugins/analysis/dummy/code/dummy.py b/src/plugins/analysis/dummy/code/dummy.py deleted file mode 100644 index ad7bcf5f2..000000000 --- a/src/plugins/analysis/dummy/code/dummy.py +++ /dev/null @@ -1,22 +0,0 @@ -from analysis.PluginBase import AnalysisBasePlugin - - -class AnalysisPlugin(AnalysisBasePlugin): - """ - This is a mock for testing - """ - - NAME = 'dummy_plugin_for_testing_only' - DEPENDENCIES = [] # noqa: RUF012 - VERSION = '0.0' - DESCRIPTION = 'this is a dummy plugin' - FILE = __file__ - - def process_object(self, file_object): - """ - This function must be implemented by the plugin. - Analysis result must be a list stored in file_object.processed_analysis[self.NAME] - """ - file_object.processed_analysis[self.NAME] = {'1': 'first result', '2': 'second result'} - file_object.processed_analysis[self.NAME]['summary'] = ['first result', 'second result'] - return file_object diff --git a/src/plugins/analysis/dummy/routes/__init__.py b/src/plugins/analysis/dummy/routes/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/plugins/analysis/example_plugin/code/example_plugin.py b/src/plugins/analysis/example_plugin/code/example_plugin.py index 5f2de832a..dce6adfce 100644 --- a/src/plugins/analysis/example_plugin/code/example_plugin.py +++ b/src/plugins/analysis/example_plugin/code/example_plugin.py @@ -44,13 +44,11 @@ def summarize(self, result): return ['big-file', 'binary'] def analyze(self, file_handle: io.FileIO, virtual_file_path: str, analyses: dict) -> Schema: - file_type_analysis = analyses['file_type'] - first_byte = file_handle.read(1) return AnalysisPlugin.Schema( number=42, name=file_handle.name, first_byte=first_byte.hex(), virtual_file_path=virtual_file_path, - dependant_analysis=file_type_analysis, + dependant_analysis=analyses['file_type'].model_dump(), ) diff --git a/src/plugins/analysis/dummy/__init__.py b/src/plugins/analysis/example_plugin/routes/__init__.py similarity index 100% rename from src/plugins/analysis/dummy/__init__.py rename to src/plugins/analysis/example_plugin/routes/__init__.py diff --git a/src/plugins/analysis/dummy/routes/routes.py b/src/plugins/analysis/example_plugin/routes/routes.py similarity index 100% rename from src/plugins/analysis/dummy/routes/routes.py rename to src/plugins/analysis/example_plugin/routes/routes.py diff --git a/src/plugins/analysis/file_system_metadata/code/file_system_metadata.py b/src/plugins/analysis/file_system_metadata/code/file_system_metadata.py index 902d069e2..c6f64b27f 100644 --- a/src/plugins/analysis/file_system_metadata/code/file_system_metadata.py +++ b/src/plugins/analysis/file_system_metadata/code/file_system_metadata.py @@ -206,7 +206,7 @@ def _mount_in_docker(self, input_dir: str) -> str: mounts=[ Mount('/work', input_dir, type='bind'), ], - timeout=int(self.TIMEOUT * 0.8), # docker call gets 80% of the analysis time before it times out + timeout=int(self.metadata.timeout * 0.8), # docker call gets 80% of the analysis time before it times out privileged=True, ) return result.stdout diff --git a/src/plugins/analysis/hash/code/hash.py b/src/plugins/analysis/hash/code/hash.py index 2ab2e05a4..9b13529b1 100644 --- a/src/plugins/analysis/hash/code/hash.py +++ b/src/plugins/analysis/hash/code/hash.py @@ -64,7 +64,7 @@ def __init__(self): Schema=self.Schema, ), ) - configured_hashes = getattr(config.backend.plugin.get(self.NAME, None), 'hashes', []) + configured_hashes = getattr(config.backend.plugin.get(self.metadata.name, None), 'hashes', []) self.hashes_to_create = set(configured_hashes).union({'sha256', 'md5'}) def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema: diff --git a/src/plugins/analysis/input_vectors/code/input_vectors.py b/src/plugins/analysis/input_vectors/code/input_vectors.py index fd5675a63..38d6a53ef 100644 --- a/src/plugins/analysis/input_vectors/code/input_vectors.py +++ b/src/plugins/analysis/input_vectors/code/input_vectors.py @@ -82,7 +82,7 @@ def _run_docker(self, file_handle: FileIO) -> dict: DOCKER_IMAGE, # We explicitly don't want stderr to ignore "Cannot analyse at [...]" combine_stderr_stdout=False, - logging_label=self.NAME, + logging_label=self.metadata.name, timeout=TIMEOUT_IN_SECONDS, command=CONTAINER_TARGET_PATH, mounts=[ diff --git a/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py b/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py index 46abc7969..59245be9b 100644 --- a/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py +++ b/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py @@ -100,7 +100,7 @@ def find_geo_location(self, ip_address: str) -> Location | None: ValueError, InvalidDatabaseError, ) as exception: - logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True) + logging.debug(f'Error during {self.metadata.name} analysis: {exception!s}', exc_info=True) return None def summarize(self, result: Schema) -> list: diff --git a/src/plugins/analysis/ipc/code/ipc_analyzer.py b/src/plugins/analysis/ipc/code/ipc_analyzer.py index ed2f71840..7ba84cb8d 100644 --- a/src/plugins/analysis/ipc/code/ipc_analyzer.py +++ b/src/plugins/analysis/ipc/code/ipc_analyzer.py @@ -83,7 +83,7 @@ def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict: run_docker_container( DOCKER_IMAGE, combine_stderr_stdout=True, - timeout=self.TIMEOUT, + timeout=self.metadata.timeout, command=f'{mount} /results/', mounts=[ Mount('/results/', str(folder.resolve()), type='bind'), diff --git a/src/plugins/analysis/kernel_config/code/kernel_config.py b/src/plugins/analysis/kernel_config/code/kernel_config.py index 1a5d77dc3..9caa578b3 100644 --- a/src/plugins/analysis/kernel_config/code/kernel_config.py +++ b/src/plugins/analysis/kernel_config/code/kernel_config.py @@ -8,7 +8,6 @@ from semver import Version from analysis.plugin import AnalysisPluginV0, Tag -from analysis.plugin.compat import AnalysisBasePluginAdapterMixin from helperFunctions.tag import TagColor from plugins.analysis.kernel_config.internal.checksec_check_kernel import CHECKSEC_PATH, check_kernel_config from plugins.analysis.kernel_config.internal.decomp import GZDecompressor @@ -34,7 +33,7 @@ class CheckSec(BaseModel): selinux: dict -class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): +class AnalysisPlugin(AnalysisPluginV0): class Schema(BaseModel): is_kernel_config: bool kernel_config: Optional[str] = None diff --git a/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py b/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py index 10e6c45ad..a52e8e052 100644 --- a/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py +++ b/src/plugins/analysis/known_vulnerabilities/code/known_vulnerabilities.py @@ -132,7 +132,7 @@ def _check_netusb_vulnerability(self, file_path: str) -> list[Vulnerability]: with suppress(DockerException, TimeoutError): run_docker_container( 'fact/known-vulnerabilities', - logging_label=self.NAME, + logging_label=self.metadata.name, timeout=60, mounts=[ Mount('/io', tmp_dir, type='bind'), diff --git a/src/plugins/analysis/software_components/code/software_components.py b/src/plugins/analysis/software_components/code/software_components.py index cefa760dc..ee78c3aa1 100644 --- a/src/plugins/analysis/software_components/code/software_components.py +++ b/src/plugins/analysis/software_components/code/software_components.py @@ -9,7 +9,6 @@ import config from analysis.plugin import AnalysisPluginV0, Tag, addons -from analysis.plugin.compat import AnalysisBasePluginAdapterMixin from helperFunctions.tag import TagColor from plugins.analysis.software_components.bin import OS_LIST from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE @@ -38,7 +37,7 @@ class MatchingString(BaseModel): identifier: str = Field(description='Identifier of the rule that this string matched (e.g. "$a")') -class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): +class AnalysisPlugin(AnalysisPluginV0): class Schema(BaseModel): software_components: List[SoftwareMatch] diff --git a/src/scheduler/analysis/plugin.py b/src/scheduler/analysis/plugin.py index 3c21839ec..f14a5857b 100644 --- a/src/scheduler/analysis/plugin.py +++ b/src/scheduler/analysis/plugin.py @@ -102,9 +102,8 @@ def start(self): def shutdown(self): for worker in self._workers: - if not worker.is_alive(): - continue - worker.terminate() + if worker.is_alive(): + worker.terminate() def queue_analysis(self, file_object: FileObject): """Queues the analysis of ``file_object`` with ``self._plugin``. diff --git a/src/scheduler/analysis/scheduler.py b/src/scheduler/analysis/scheduler.py index 3e866b00d..ce6140688 100644 --- a/src/scheduler/analysis/scheduler.py +++ b/src/scheduler/analysis/scheduler.py @@ -3,20 +3,19 @@ import logging import os import time -from concurrent.futures import ThreadPoolExecutor from multiprocessing import Lock, Queue, Value from pathlib import Path from queue import Empty from time import sleep -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from packaging.version import InvalidVersion from packaging.version import parse as parse_version from pydantic import ValidationError +from semver import Version import config from analysis.plugin import AnalysisPluginV0 -from analysis.PluginBase import AnalysisBasePlugin from helperFunctions.compare_sets import substring_is_in_list from helperFunctions.logging import TerminalColors, color_string from helperFunctions.plugin import discover_analysis_plugins @@ -103,12 +102,12 @@ class AnalysisScheduler: def __init__( self, - post_analysis: Optional[Callable[[str, str, dict], None]] = None, + post_analysis: Callable[[str, str, dict], None] | None = None, db_interface=None, unpacking_locks: UnpackingLockManager | None = None, ): - self.analysis_plugins: dict[str, AnalysisPluginV0 | AnalysisBasePlugin] = {} - self._plugin_runners = {} + self.analysis_plugins: dict[str, AnalysisPluginV0] = {} + self._plugin_runners: dict[str, PluginRunner] = {} self._load_plugins() self.stop_condition = Value('i', 0) @@ -127,9 +126,9 @@ def __init__( def start(self): self.status.start() - self._start_runner_processes() + self._start_scheduling_processes() self._start_result_collector() - self._start_plugins() + self._start_plugin_runners() logging.info('Analysis scheduler online') logging.info(f'Analysis plugins available: {self._format_available_plugins()}') @@ -140,7 +139,6 @@ def shutdown(self): """ logging.debug('Shutting down analysis scheduler') self.stop_condition.value = 1 - futures = [] # first shut down scheduling, then analysis plugins and lastly the result collector stop_processes(self.schedule_processes, config.backend.block_delay + 1) @@ -149,15 +147,9 @@ def shutdown(self): for runner in self._plugin_runners.values(): for worker in runner._workers: - if not worker.is_alive(): - continue - worker.join(Worker.SIGTERM_TIMEOUT + 1) - - with ThreadPoolExecutor() as pool: - for plugin in self.analysis_plugins.values(): - futures.append(pool.submit(plugin.shutdown)) - for future in futures: - future.result() # call result() to make sure all threads are finished and there are no exceptions + if worker.is_alive(): + worker.join(Worker.SIGTERM_TIMEOUT + 1) + stop_processes(self.result_collector_processes, config.backend.block_delay + 1) self.process_queue.close() self.status.shutdown() @@ -209,7 +201,7 @@ def _get_list_of_available_plugins(self) -> list[str]: def _format_available_plugins(self) -> str: plugins = [] for plugin_name in sorted(self.analysis_plugins, key=str.lower): - plugins.append(f'{plugin_name} {self.analysis_plugins[plugin_name].VERSION}') + plugins.append(f'{plugin_name} {self.analysis_plugins[plugin_name].metadata.version}') return ', '.join(plugins) def cancel_analysis(self, root_uid: str): @@ -229,14 +221,10 @@ def _load_plugins(self): for plugin_module in discover_analysis_plugins(): try: PluginClass = plugin_module.AnalysisPlugin # noqa: N806 - if issubclass(PluginClass, AnalysisPluginV0): - plugin: AnalysisPluginV0 = PluginClass() - self.analysis_plugins[plugin.metadata.name] = plugin - schemata[plugin.metadata.name] = plugin.metadata.Schema - _sync_view(plugin_module, plugin.metadata.name) - elif issubclass(PluginClass, AnalysisBasePlugin): - self.analysis_plugins[PluginClass.NAME] = PluginClass() - schemata[PluginClass.NAME] = dict + plugin: AnalysisPluginV0 = PluginClass() + self.analysis_plugins[plugin.metadata.name] = plugin + schemata[plugin.metadata.name] = plugin.metadata.Schema + _sync_view(plugin_module, plugin.metadata.name) except Exception: logging.error(f'Could not import analysis plugin {plugin_module.AnalysisPlugin.NAME}', exc_info=True) @@ -299,11 +287,11 @@ def get_plugin_dict(self) -> dict: thread_count = config.backend.plugin_defaults.processes # TODO this should not be a tuple but rather a dictionary/class result[plugin] = ( - self.analysis_plugins[plugin].DESCRIPTION, + self.analysis_plugins[plugin].metadata.description, mandatory_flag, dict(current_plugin_plugin_sets), - self.analysis_plugins[plugin].VERSION, - self.analysis_plugins[plugin].DEPENDENCIES, + str(self.analysis_plugins[plugin].metadata.version), + self.analysis_plugins[plugin].metadata.dependencies, blacklist, whitelist, thread_count, @@ -311,16 +299,13 @@ def get_plugin_dict(self) -> dict: result['unpacker'] = ('Additional information provided by the unpacker', True, False) return result - def _start_plugins(self): - for plugin in self.analysis_plugins.values(): - plugin.start() - + def _start_plugin_runners(self): for runner in self._plugin_runners.values(): runner.start() # ---- task runner functions ---- - def _start_runner_processes(self): + def _start_scheduling_processes(self): self.schedule_processes = [ ExceptionSafeProcess(target=self._task_runner, args=(i,)) for i in range(config.backend.scheduling_worker_count) @@ -374,17 +359,14 @@ def _start_or_skip_analysis(self, analysis_to_do: str, file_object: FileObject): else: if file_object.binary is None: self._set_binary(file_object) - if isinstance(plugin, AnalysisPluginV0): - runner = self._plugin_runners[plugin.metadata.name] - try: - runner.queue_analysis(file_object) - except ValidationError as err: - analysis_result = self._get_skipped_analysis_result( - analysis_to_do, - f'error during dependency collection: could not apply schema to dependency results: {err!s}', - ) - elif isinstance(plugin, AnalysisBasePlugin): - plugin.add_job(file_object) + runner = self._plugin_runners[plugin.metadata.name] + try: + runner.queue_analysis(file_object) + except ValidationError as err: + analysis_result = self._get_skipped_analysis_result( + analysis_to_do, + f'error during dependency collection: could not apply schema to dependency results: {err!s}', + ) if analysis_result is not None: file_object.processed_analysis[analysis_to_do] = analysis_result @@ -418,17 +400,16 @@ def _analysis_is_already_in_db_and_up_to_date(self, analysis_to_do: str, uid: st return False return self._analysis_is_up_to_date(db_entry, self.analysis_plugins[analysis_to_do], uid) - def _analysis_is_up_to_date(self, db_entry: dict, analysis_plugin: AnalysisBasePlugin, uid: str) -> bool: - try: - current_system_version = analysis_plugin.SYSTEM_VERSION - except AttributeError: - current_system_version = None - + def _analysis_is_up_to_date(self, db_entry: dict, analysis_plugin: AnalysisPluginV0, uid: str) -> bool: try: - if self._current_version_is_newer(analysis_plugin.VERSION, current_system_version, db_entry): + if self._current_version_is_newer( + analysis_plugin.metadata.version, + analysis_plugin.metadata.system_version, + db_entry, + ): return False - except TypeError: - logging.error(f'Plugin or system version of "{analysis_plugin.NAME}" plugin is or was invalid!') + except (TypeError, ValueError): + logging.error(f'Plugin or system version of "{analysis_plugin.metadata.name}" plugin is or was invalid!') return False except InvalidVersion as error: logging.exception(f'Error while parsing plugin version: {error}') @@ -438,21 +419,21 @@ def _analysis_is_up_to_date(self, db_entry: dict, analysis_plugin: AnalysisBaseP @staticmethod def _current_version_is_newer( - current_plugin_version: str, current_system_version: str | None, db_entry: dict[str, str | None] + current_plugin_version: Version, current_system_version: str | None, db_entry: dict[str, str | None] ) -> bool: - plugin_version_is_newer = parse_version(current_plugin_version) > parse_version(db_entry['plugin_version']) + plugin_version_is_newer = current_plugin_version > Version.parse(db_entry['plugin_version']) system_version_is_newer = parse_version(_fix_system_version(current_system_version)) > parse_version( _fix_system_version(db_entry.get('system_version')) ) return plugin_version_is_newer or system_version_is_newer - def _dependencies_are_up_to_date(self, db_entry: dict, analysis_plugin: AnalysisBasePlugin, uid: str) -> bool: + def _dependencies_are_up_to_date(self, db_entry: dict, analysis_plugin: AnalysisPluginV0, uid: str) -> bool: """ If an analysis result of a dependency was updated (i.e., it is newer than this analysis), it could have changed and in turn change the outcome of this analysis. Therefore, this analysis should also run again. """ - for dependency in analysis_plugin.DEPENDENCIES: + for dependency in analysis_plugin.metadata.dependencies: dependency_entry = self.db_backend_service.get_analysis(uid, dependency) if dependency_entry is None or db_entry['analysis_date'] < dependency_entry['analysis_date']: return False @@ -468,7 +449,7 @@ def _get_skipped_analysis_result(self, analysis_to_do: str, reason: str) -> dict return { 'summary': [], 'analysis_date': time.time(), - 'plugin_version': self.analysis_plugins[analysis_to_do].VERSION, + 'plugin_version': self.analysis_plugins[analysis_to_do].metadata.version, 'result': { 'skipped': reason, }, @@ -507,12 +488,12 @@ def _get_blacklist_and_whitelist_from_config(analysis_plugin: str) -> tuple[list def _get_blacklist_and_whitelist_from_plugin(self, analysis_plugin: str) -> tuple[list, list]: try: - blacklist = self.analysis_plugins[analysis_plugin].MIME_BLACKLIST + blacklist = self.analysis_plugins[analysis_plugin].metadata.mime_blacklist except AttributeError: blacklist = [] try: - whitelist = self.analysis_plugins[analysis_plugin].MIME_WHITELIST + whitelist = self.analysis_plugins[analysis_plugin].metadata.mime_whitelist except AttributeError: whitelist = [] @@ -534,11 +515,8 @@ def _result_collector(self, index: int = 0): while self.stop_condition.value == 0: nop = True for plugin_name, plugin in self.analysis_plugins.items(): - if isinstance(plugin, AnalysisPluginV0): - runner = self._plugin_runners[plugin.metadata.name] - out_queue = runner.out_queue - elif isinstance(plugin, AnalysisBasePlugin): - out_queue = plugin.out_queue + runner = self._plugin_runners[plugin.metadata.name] + out_queue = runner.out_queue try: fw = out_queue.get_nowait() @@ -587,11 +565,8 @@ def _do_callback(fw_object: FileObject): # ---- miscellaneous functions ---- def get_combined_analysis_workload(self): - plugin_queues = [ - plugin.in_queue for plugin in self.analysis_plugins.values() if isinstance(plugin, AnalysisBasePlugin) - ] runner_queue_sum = sum([runner.get_queue_len() for runner in self._plugin_runners.values()]) - return self.process_queue.qsize() + sum(queue.qsize() for queue in plugin_queues) + runner_queue_sum + return self.process_queue.qsize() + runner_queue_sum def get_scheduled_workload(self) -> dict: """ @@ -620,22 +595,14 @@ def get_scheduled_workload(self) -> dict: 'analysis_main_scheduler': self.process_queue.qsize(), 'plugins': {}, } - for plugin_name, plugin in self.analysis_plugins.items(): - if isinstance(plugin, AnalysisPluginV0): - runner = self._plugin_runners[plugin_name] - workload['plugins'][plugin_name] = { - 'queue': runner.get_queue_len(), - 'out_queue': runner.out_queue.qsize(), - 'active': runner.get_active_worker_count(), - 'stats': get_plugin_stats(runner.stats, runner.stats_count), - } - elif isinstance(plugin, AnalysisBasePlugin): - workload['plugins'][plugin_name] = { - 'queue': plugin.in_queue.qsize(), - 'out_queue': plugin.out_queue.qsize(), - 'active': (sum(plugin.active[i].value for i in range(plugin.thread_count))), - 'stats': get_plugin_stats(plugin.analysis_stats, plugin.analysis_stats_count), - } + for plugin_name in self.analysis_plugins: + runner = self._plugin_runners[plugin_name] + workload['plugins'][plugin_name] = { + 'queue': runner.get_queue_len(), + 'out_queue': runner.out_queue.qsize(), + 'active': runner.get_active_worker_count(), + 'stats': get_plugin_stats(runner.stats, runner.stats_count), + } return workload def check_exceptions(self) -> bool: @@ -645,11 +612,6 @@ def check_exceptions(self) -> bool: :return: Boolean value stating if any attached process ran into an exception """ - for plugin in self.analysis_plugins.values(): - if isinstance(plugin, AnalysisPluginV0): - continue - if plugin.check_exceptions(): - return True return check_worker_exceptions(self.schedule_processes + self.result_collector_processes, 'Scheduler') @@ -659,9 +621,7 @@ def _fix_system_version(system_version: str | None) -> str: return system_version.replace('_', '-') if system_version else '0' -def _dependencies_are_unfulfilled(plugin: AnalysisPluginV0 | AnalysisBasePlugin, fw_object: FileObject): - # FIXME: update when old base class gets removed - dependencies = plugin.metadata.dependencies if isinstance(plugin, AnalysisPluginV0) else plugin.DEPENDENCIES +def _dependencies_are_unfulfilled(plugin: AnalysisPluginV0, fw_object: FileObject): return any( dep not in fw_object.processed_analysis or 'result' not in fw_object.processed_analysis[dep] @@ -670,7 +630,7 @@ def _dependencies_are_unfulfilled(plugin: AnalysisPluginV0 | AnalysisBasePlugin, for k in ('skipped', 'failed') if (result := fw_object.processed_analysis[dep]['result']) is not None ) - for dep in dependencies + for dep in plugin.metadata.dependencies ) diff --git a/src/scheduler/task_scheduler.py b/src/scheduler/task_scheduler.py index b2caeb9bd..8c178c4e2 100644 --- a/src/scheduler/task_scheduler.py +++ b/src/scheduler/task_scheduler.py @@ -3,11 +3,12 @@ import logging from copy import copy from time import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Iterable from helperFunctions.merge_generators import shuffled if TYPE_CHECKING: + from analysis.plugin import AnalysisPluginV0 from objects.file import FileObject from objects.firmware import Firmware @@ -16,7 +17,7 @@ class AnalysisTaskScheduler: def __init__(self, plugins): - self.plugins = plugins + self.plugins: dict[str, AnalysisPluginV0] = plugins def schedule_analysis_tasks(self, fo, scheduled_analysis, mandatory=False): scheduled_analysis = self._add_dependencies_recursively(copy(scheduled_analysis) or []) @@ -51,7 +52,7 @@ def _get_plugins_with_met_dependencies( return [ plugin for plugin in remaining_plugins - if all(dependency in met_dependencies for dependency in self.plugins[plugin].DEPENDENCIES) + if all(dependency in met_dependencies for dependency in self.plugins[plugin].metadata.dependencies) ] def _add_dependencies_recursively(self, scheduled_analyses: list[str]) -> list[str]: @@ -63,16 +64,16 @@ def _add_dependencies_recursively(self, scheduled_analyses: list[str]) -> list[s scheduled_analyses_set.update(new_dependencies) return list(scheduled_analyses_set) - def get_cumulative_remaining_dependencies(self, scheduled_analyses: set[str]) -> set[str]: + def get_cumulative_remaining_dependencies(self, scheduled_analyses: Iterable[str]) -> set[str]: return { - dependency for plugin in scheduled_analyses for dependency in self.plugins[plugin].DEPENDENCIES + dependency for plugin in scheduled_analyses for dependency in self.plugins[plugin].metadata.dependencies }.difference(scheduled_analyses) def reschedule_failed_analysis_task(self, fw_object: Firmware | FileObject): failed_plugin, cause = fw_object.analysis_exception fw_object.processed_analysis[failed_plugin] = self._get_failed_analysis_result(cause, failed_plugin) for plugin in fw_object.scheduled_analysis[:]: - if failed_plugin in self.plugins[plugin].DEPENDENCIES: + if failed_plugin in self.plugins[plugin].metadata.dependencies: fw_object.scheduled_analysis.remove(plugin) logging.warning( f'Unscheduled analysis {plugin} for {fw_object.uid} because dependency {failed_plugin} failed' @@ -85,6 +86,6 @@ def reschedule_failed_analysis_task(self, fw_object: Firmware | FileObject): def _get_failed_analysis_result(self, cause: str, plugin: str) -> dict: return { 'result': {'failed': cause}, - 'plugin_version': self.plugins[plugin].VERSION, + 'plugin_version': self.plugins[plugin].metadata.version, 'analysis_date': time(), } diff --git a/src/start_fact_backend.py b/src/start_fact_backend.py index 77f2324f4..b65951b5d 100755 --- a/src/start_fact_backend.py +++ b/src/start_fact_backend.py @@ -27,7 +27,6 @@ from fact_base import FactBase import config -from analysis.PluginBase import PluginInitException from helperFunctions.process import complete_shutdown from intercom.back_end_binding import InterComBackEndBinding from scheduler.analysis import AnalysisScheduler @@ -49,11 +48,7 @@ def __init__(self): self._create_docker_base_dir() _check_ulimit() - try: - self.analysis_service = AnalysisScheduler(unpacking_locks=self.unpacking_lock_manager) - except PluginInitException as error: - logging.critical(f'Error during initialization of plugin {error.plugin.NAME}: {error}.') - complete_shutdown() + self.analysis_service = AnalysisScheduler(unpacking_locks=self.unpacking_lock_manager) self.unpacking_service = UnpackingScheduler( post_unpack=self.analysis_service.start_analysis_of_object, analysis_workload=self.analysis_service.get_combined_analysis_workload, diff --git a/src/storage/db_interface_backend.py b/src/storage/db_interface_backend.py index 7820ba205..af1fd8cab 100644 --- a/src/storage/db_interface_backend.py +++ b/src/storage/db_interface_backend.py @@ -4,6 +4,7 @@ from contextlib import suppress from typing import TYPE_CHECKING +from semver import Version from sqlalchemy import select from sqlalchemy.exc import IntegrityError @@ -115,7 +116,7 @@ def insert_analysis(self, uid: str, plugin: str, analysis_dict: dict): analysis = AnalysisEntry( uid=uid, plugin=plugin, - plugin_version=analysis_dict['plugin_version'], + plugin_version=self._sanitize_plugin_version(analysis_dict['plugin_version']), system_version=analysis_dict.get('system_version'), analysis_date=analysis_dict['analysis_date'], summary=analysis_dict.get('summary'), @@ -125,6 +126,12 @@ def insert_analysis(self, uid: str, plugin: str, analysis_dict: dict): ) session.add(analysis) + @staticmethod + def _sanitize_plugin_version(plugin_version: str | Version) -> str: + if isinstance(plugin_version, Version): + return str(plugin_version) + return plugin_version + def add_vfp(self, parent_uid: str, child_uid: str, paths: list[str]): """Adds a new "virtual file path" for file `child_uid` with path `path` in `parent_uid`""" with self.get_read_write_session() as session: @@ -196,7 +203,7 @@ def _update_virtual_file_path(file_object: FileObject, session: Session): def update_analysis(self, uid: str, plugin: str, analysis_data: dict): with self.get_read_write_session() as session: entry = session.get(AnalysisEntry, (uid, plugin)) - entry.plugin_version = analysis_data['plugin_version'] + entry.plugin_version = self._sanitize_plugin_version(analysis_data['plugin_version']) entry.system_version = analysis_data.get('system_version') entry.analysis_date = analysis_data['analysis_date'] entry.summary = analysis_data.get('summary') diff --git a/src/test/unit/analysis/test_addons_yara.py b/src/test/unit/analysis/test_addons_yara.py index bca6bc20f..fb853e75c 100644 --- a/src/test/unit/analysis/test_addons_yara.py +++ b/src/test/unit/analysis/test_addons_yara.py @@ -1,55 +1,8 @@ -from io import FileIO from pathlib import Path import yara -from analysis.plugin.addons import Yara -from analysis.plugin.compat import yara_match_to_dict -from analysis.YaraPluginBase import YaraBasePlugin from helperFunctions.fileSystem import get_src_dir -from test.common_helper import create_test_file_object - -signature_file = str(Path(get_src_dir()) / 'test/unit/analysis/test.yara') -test_target = str(Path(get_src_dir()) / 'test/data/files/get_files_test/testfile1') - -EXPECTED_RESULT = { - 'matches': True, - 'meta': { - 'description': 'Generic Software', - 'open_source': False, - 'software_name': 'Test Software', - 'website': 'http://www.fkie.fraunhofer.de', - }, - 'rule': 'testRule', - 'strings': [(0, '$a', 'test'), (22, '$a', 'Test')], -} - - -class MockYaraPlugin(YaraBasePlugin): - def __init__(self): - self.signature_path = signature_file - self.NAME = 'test_plugin' - - -class MockYaraAddonPlugin(Yara): - def __init__(self): - self._rules = yara.compile(signature_file) - - -def test_output_is_compatible(): - fo = create_test_file_object(test_target) - plugin = MockYaraPlugin() - plugin.process_object(fo) - assert fo.processed_analysis['test_plugin']['testRule'] == EXPECTED_RESULT - - yara_addon_plugin = MockYaraAddonPlugin() - file = FileIO(test_target) - yara_matches = yara_addon_plugin.match(file) - assert all(isinstance(m, yara.Match) for m in yara_matches) - converted_match = yara_match_to_dict(yara_matches[0]) - assert converted_match['strings'] == EXPECTED_RESULT['strings'] - for key, value in EXPECTED_RESULT['meta'].items(): - assert converted_match['meta'][key] == value def test_compile(): diff --git a/src/test/unit/analysis/test_analysis_plugin.py b/src/test/unit/analysis/test_analysis_plugin.py new file mode 100644 index 000000000..53d056add --- /dev/null +++ b/src/test/unit/analysis/test_analysis_plugin.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from pydantic import BaseModel +from semver import Version + +from analysis.plugin import AnalysisPluginV0, Tag +from helperFunctions.tag import TagColor + + +class DummyPlugin(AnalysisPluginV0): + class Schema(BaseModel): + foo: str + + def __init__(self): + metadata = self.MetaData(name='DummyPlugin', description='', Schema=self.Schema, version=Version(0, 1, 0)) + super().__init__(metadata) + + def analyze(self, file_handle, virtual_file_path, analyses): + return self.Schema(foo='foo') + + +class ExtendedDummyPlugin(DummyPlugin): + def summarize(self, result): + return [result.foo] + + def get_tags(self, result, summary): + return [Tag(name=result.foo, value=result.foo, color=TagColor.GREEN)] + + +def test_get_analysis(): + plugin = DummyPlugin() + + result = plugin.get_analysis(None, {}, {}) + expected_keys = ['analysis_date', 'plugin_version', 'result', 'summary', 'system_version', 'tags'] + assert all(k in result for k in expected_keys) + assert 'foo' in result['result'] + assert result['summary'] == [] + assert result['tags'] == {} + + +def test_get_analysis_extended(): + plugin = ExtendedDummyPlugin() + + result = plugin.get_analysis(None, {}, {}) + expected_keys = ['analysis_date', 'plugin_version', 'result', 'summary', 'system_version', 'tags'] + assert all(k in result for k in expected_keys) + assert 'foo' in result['result'] + assert result['summary'] == ['foo'] + assert len(result['tags']) == 1 + assert result['tags']['foo'] == {'color': TagColor.GREEN, 'propagate': False, 'value': 'foo'} + + +def test_summarize(): + plugin = DummyPlugin() + extended_plugin = ExtendedDummyPlugin() + + assert ( + plugin.summarize(plugin.Schema(foo='foo')) == [] + ), 'if the plugin does not implement summarize, it should return an empty list' + assert extended_plugin.summarize(plugin.Schema(foo='foo')) == ['foo'] diff --git a/src/test/unit/analysis/test_plugin_base.py b/src/test/unit/analysis/test_plugin_base.py deleted file mode 100644 index 18d530f52..000000000 --- a/src/test/unit/analysis/test_plugin_base.py +++ /dev/null @@ -1,150 +0,0 @@ -from pathlib import Path - -import pytest - -from analysis.PluginBase import AnalysisBasePlugin, PluginInitException -from helperFunctions.fileSystem import get_src_dir -from objects.file import FileObject -from plugins.analysis.dummy.code.dummy import AnalysisPlugin as DummyPlugin - -PLUGIN_PATH = Path(get_src_dir()) / 'plugins' / 'analysis' - - -@pytest.mark.backend_config_overwrite( - { - 'plugin': { - 'dummy_plugin_for_testing_only': { - 'name': 'dummy_plugin_for_testing_only', - 'processes': 2, - } - }, - 'block_delay': 0.1, - } -) -@pytest.mark.AnalysisPluginTestConfig( - plugin_class=DummyPlugin, - start_processes=True, -) -class TestPluginBaseCore: - def test_object_processing_no_children(self, analysis_plugin): - root_object = FileObject(binary=b'root_file') - analysis_plugin.in_queue.put(root_object) - processed_object = analysis_plugin.out_queue.get() - assert processed_object.uid == root_object.uid, 'uid changed' - assert 'dummy_plugin_for_testing_only' in processed_object.processed_analysis, 'object not processed' - assert ( - processed_object.processed_analysis['dummy_plugin_for_testing_only']['plugin_version'] == '0.0' - ), 'plugin version missing in results' - assert ( - processed_object.processed_analysis['dummy_plugin_for_testing_only']['analysis_date'] > 1 - ), 'analysis date missing in results' - - def test_object_processing_one_child(self, analysis_plugin): - root_object = FileObject(binary=b'root_file') - child_object = FileObject(binary=b'first_child_object') - root_object.add_included_file(child_object) - analysis_plugin.in_queue.put(root_object) - processed_object = analysis_plugin.out_queue.get() - assert processed_object.uid == root_object.uid, 'uid changed' - assert child_object.uid in root_object.files_included, 'child object not in processed file' - - -@pytest.mark.AnalysisPluginTestConfig(plugin_class=DummyPlugin) -class TestPluginBaseAddJob: - def test_analysis_depth_not_reached_yet(self, analysis_plugin): - fo = FileObject(binary=b'test', scheduled_analysis=[]) - - fo.depth = 1 - analysis_plugin.RECURSIVE = False - assert not analysis_plugin._analysis_depth_not_reached_yet(fo), 'positive but not root object' - - fo.depth = 0 - analysis_plugin.RECURSIVE = False - assert analysis_plugin._analysis_depth_not_reached_yet(fo) - - fo.depth = 1 - analysis_plugin.RECURSIVE = True - assert analysis_plugin._analysis_depth_not_reached_yet(fo) - - fo.depth = 0 - analysis_plugin.RECURSIVE = True - assert analysis_plugin._analysis_depth_not_reached_yet(fo) - - @pytest.mark.AnalysisPluginTestConfig(start_processes=True) - def test__add_job__recursive_is_set(self, analysis_plugin): - fo = FileObject(binary=b'test', scheduled_analysis=[]) - fo.depth = 1 - analysis_plugin.recursive = False - analysis_plugin.add_job(fo) - out_fo = analysis_plugin.out_queue.get(timeout=5) - assert isinstance(out_fo, FileObject), 'not added to out_queue' - analysis_plugin.recursive = True - assert analysis_plugin._analysis_depth_not_reached_yet(fo), 'not positive but recursive' - - -class TestPluginBaseOffline: - def test_get_view_file_path(self): - code_path = PLUGIN_PATH / 'file_type' / 'code' / 'file_type.py' - expected_view_path = PLUGIN_PATH / 'file_type' / 'view' / 'file_type.html' - - assert AnalysisBasePlugin._get_view_file_path(str(code_path)) == expected_view_path - - without_view = PLUGIN_PATH / 'dummy' / 'code' / 'dummy.py' - assert AnalysisBasePlugin._get_view_file_path(str(without_view)) is None - - -class TestPluginNotRunning: - def multithread_config_test(self, multithread_flag, threads_wanted): - self.p_base = DummyPlugin(no_multithread=multithread_flag) - assert self.p_base.thread_count == threads_wanted, 'number of threads not correct' - self.p_base.shutdown() - - @pytest.mark.backend_config_overwrite( - { - 'plugin': { - 'dummy_plugin_for_testing_only': { - 'name': 'dummy_plugin_for_testing_only', - 'processes': 4, - } - } - } - ) - def test_no_multithread(self): - self.multithread_config_test(True, 1) - - @pytest.mark.backend_config_overwrite( - { - 'plugin': { - 'dummy_plugin_for_testing_only': { - 'name': 'dummy_plugin_for_testing_only', - 'processes': 2, - } - } - } - ) - def test_normal_multithread(self): - self.multithread_config_test(False, 2) - - def test_init_result_dict(self): - self.p_base = DummyPlugin() - resultdict = self.p_base.init_dict() - assert 'analysis_date' in resultdict, 'analysis date missing' - assert resultdict['plugin_version'] == '0.0', 'plugin version field not correct' - self.p_base.shutdown() - - -@pytest.mark.AnalysisPluginTestConfig(plugin_class=DummyPlugin) -def test_timeout(analysis_plugin, monkeypatch): # noqa: ARG001 - analysis_plugin.TIMEOUT = 0 - analysis_plugin.start() - - fo_in = FileObject(binary=b'test', scheduled_analysis=[]) - analysis_plugin.add_job(fo_in) - fo_out = analysis_plugin.out_queue.get(timeout=5) - - assert 'summary' not in fo_out.processed_analysis['dummy_plugin_for_testing_only'] - - -def test_attribute_check(): - with pytest.raises(PluginInitException): - AnalysisBasePlugin() diff --git a/src/test/unit/analysis/test_yara_plugin_base.py b/src/test/unit/analysis/test_yara_plugin_base.py deleted file mode 100644 index a34af385d..000000000 --- a/src/test/unit/analysis/test_yara_plugin_base.py +++ /dev/null @@ -1,103 +0,0 @@ -from __future__ import annotations - -import logging -import os -import re -from pathlib import Path - -import pytest - -from analysis.YaraPluginBase import YaraBasePlugin, _parse_meta_data, _split_output_in_rules_and_matches -from helperFunctions.fileSystem import get_src_dir -from objects.file import FileObject -from test.common_helper import get_test_data_dir - -YARA_TEST_OUTPUT = Path(get_test_data_dir(), 'yara_matches').read_text() - - -class YaraPlugin(YaraBasePlugin): - FILE = '/foo/bar/Yara_Base_Plugin/code/test.py' - - -@pytest.mark.AnalysisPluginTestConfig(plugin_class=YaraPlugin) -class TestAnalysisYaraBasePlugin: - def test_get_signature_paths(self, analysis_plugin): - intended_signature_path = os.path.join( # noqa: PTH118 - get_src_dir(), 'analysis/signatures', analysis_plugin.NAME - ) - assert isinstance(analysis_plugin.signature_path, str), 'incorrect type' - assert f'{intended_signature_path.rstrip("/")}.yc' == analysis_plugin.signature_path, 'signature path is wrong' - - def test_process_object(self, analysis_plugin): - test_file = FileObject(file_path=str(get_test_data_dir() / 'yara_test_file')) - test_file.processed_analysis.update({analysis_plugin.NAME: []}) - processed_file = analysis_plugin.process_object(test_file) - results = processed_file.processed_analysis[analysis_plugin.NAME] - assert len(results) == 2, 'not all matches found' - assert 'testRule' in results, 'testRule match not found' - assert results['summary'] == ['testRule'] - - def test_process_object_nothing_found(self, analysis_plugin): - test_file = FileObject(file_path=str(get_test_data_dir() / 'zero_byte')) - test_file.processed_analysis.update({analysis_plugin.NAME: []}) - processed_file = analysis_plugin.process_object(test_file) - assert len(processed_file.processed_analysis[analysis_plugin.NAME]) == 1, 'result present but should not' - assert processed_file.processed_analysis[analysis_plugin.NAME]['summary'] == [], 'summary not empty' - - -def test_parse_yara_output(): - matches = YaraBasePlugin._parse_yara_output(YARA_TEST_OUTPUT) - - assert isinstance(matches, dict), 'matches should be dict' - assert 'PgpPublicKeyBlock' in matches, 'Pgp block should have been matched' - assert matches['PgpPublicKeyBlock']['strings'][0][0] == 0, 'first block should start at 0x0' - assert 'r_libjpeg8_8d12b1_0' in matches - assert matches['r_libjpeg8_8d12b1_0']['meta']['description'] == 'foo [bar]' - assert len(matches) == 7, 'not all matches found' - - -def test_get_signature_file_name(): - assert YaraBasePlugin._get_signature_file_name('/foo/bar/plugin_name/code/test.py') == 'plugin_name.yc' - - -def test_parse_meta_data_error(caplog): - with caplog.at_level(logging.WARNING): - _parse_meta_data('illegal=meta=entry') - assert 'Malformed meta' in caplog.messages[0] - - -YARA_RULE_META_REGEX = re.compile(r'rule (\w+)\W*?meta:([\w\W]+?)strings:') - - -def _find_rule_files() -> list[Path]: - signature_files = [] - for dir_ in Path(get_src_dir()).glob('plugins/*/*/signatures'): - for file in dir_.iterdir(): - if any(file.name.endswith(suffix) for suffix in ('yara', 'yar')) and file.name != '00_meta_filter.yara': - signature_files.append(file) - return signature_files - - -@pytest.mark.parametrize('signature_file', _find_rule_files()) -def test_rule_metadata_can_be_parsed(caplog, signature_file): - rules = YARA_RULE_META_REGEX.findall(signature_file.read_text()) - assert rules, f'no rules found in {signature_file}' - - for rule_name, meta_data in rules: - if rule_name == 'SHORT_NAME_OF_SOFTWARE': # ignore demo rule - continue - yara_output_form = ','.join( - meta_data.replace(' ', '').replace('\t', '').replace(' = ', '=').replace("'", "\\'").splitlines() - ).strip(',') - with caplog.at_level(logging.WARNING): - output = _parse_meta_data(yara_output_form) - assert all( - 'Malformed meta' not in m for m in caplog.messages - ), f'meta of rule {rule_name} cannot be parsed: {caplog.messages[-1]}' - assert any(key in output for key in ('description', 'desc', 'author')), f'wrong output: {output}' - - -def test_split_output_uneven(): - uneven_yara_output = 'rule1 [meta=0,data=1] /path\n0x0:$a1: AA BB \nrule2 [meta=0,data=1] /path\n' - with pytest.raises(ValueError): # noqa: PT011 - _split_output_in_rules_and_matches(uneven_yara_output) diff --git a/src/test/unit/scheduler/test_analysis.py b/src/test/unit/scheduler/test_analysis.py index e7854d392..9ddfec50c 100644 --- a/src/test/unit/scheduler/test_analysis.py +++ b/src/test/unit/scheduler/test_analysis.py @@ -1,10 +1,13 @@ +from __future__ import annotations + +from dataclasses import KW_ONLY, dataclass, field from multiprocessing import Queue from time import sleep from unittest import mock import pytest +from semver import Version -from analysis.PluginBase import AnalysisBasePlugin from objects.firmware import Firmware from scheduler.analysis import AnalysisScheduler from scheduler.task_scheduler import MANDATORY_PLUGINS @@ -34,7 +37,7 @@ def get_analysis(self, *_): ) class TestScheduleInitialAnalysis: def test_plugin_registration(self, analysis_scheduler): - assert 'dummy_plugin_for_testing_only' in analysis_scheduler.analysis_plugins, 'Dummy plugin not found' + assert 'ExamplePlugin' in analysis_scheduler.analysis_plugins, 'Example Plugin not found' @pytest.mark.SchedulerTestConfig(start_processes=False) def test_schedule_firmware_init_no_analysis_selected(self, analysis_scheduler): @@ -49,18 +52,14 @@ def test_schedule_firmware_init_no_analysis_selected(self, analysis_scheduler): @pytest.mark.SchedulerTestConfig(start_processes=True) def test_whole_run_analysis_selected(self, analysis_scheduler, post_analysis_queue): test_fw = Firmware(file_path=get_test_data_dir() / 'get_files_test/testfile1') - test_fw.scheduled_analysis = ['dummy_plugin_for_testing_only'] + test_fw.scheduled_analysis = ['ExamplePlugin'] analysis_scheduler.start_analysis_of_object(test_fw) analysis_results = [post_analysis_queue.get(timeout=10) for _ in range(3)] - analysis_results = [ - {'uid': uid, 'plugin': plugin, 'result': result} for uid, plugin, result in analysis_results - ] + analysis_results = {plugin: result for uid, plugin, result in analysis_results} assert len(analysis_results) == 3, 'analysis not done' - assert analysis_results[0]['plugin'] == 'file_type' - assert analysis_results[1]['plugin'] == 'dummy_plugin_for_testing_only' - assert analysis_results[2]['plugin'] == 'file_hashes' - assert analysis_results[1]['result']['result']['1'] == 'first result', 'result not correct' - assert analysis_results[1]['result']['summary'] == ['first result', 'second result'] + assert set(analysis_results) == {'file_type', 'ExamplePlugin', 'file_hashes'} + assert analysis_results['ExamplePlugin']['result']['first_byte'] == '74' + assert analysis_results['ExamplePlugin']['summary'] == ['big-file', 'binary'] def test_expected_plugins_are_found(self, analysis_scheduler): result = analysis_scheduler.get_plugin_dict() @@ -82,7 +81,7 @@ def test_remove_example_plugins(self, analysis_scheduler): def test_get_plugin_dict_description(self, analysis_scheduler): result = analysis_scheduler.get_plugin_dict() assert ( - result['file_type'][0] == analysis_scheduler.analysis_plugins['file_type'].DESCRIPTION + result['file_type'][0] == analysis_scheduler.analysis_plugins['file_type'].metadata.description ), 'description not correct' @pytest.mark.backend_config_overwrite( @@ -106,9 +105,11 @@ def test_get_plugin_dict_flags(self, analysis_scheduler): def test_get_plugin_dict_version(self, analysis_scheduler): result = analysis_scheduler.get_plugin_dict() - assert result['file_type'][3] == analysis_scheduler.analysis_plugins['file_type'].VERSION, 'version not correct' assert ( - result['file_hashes'][3] == analysis_scheduler.analysis_plugins['file_hashes'].VERSION + result['file_type'][3] == analysis_scheduler.analysis_plugins['file_type'].metadata.version + ), 'version not correct' + assert ( + result['file_hashes'][3] == analysis_scheduler.analysis_plugins['file_hashes'].metadata.version ), 'version not correct' def test_process_next_analysis_unknown_plugin(self, analysis_scheduler): @@ -122,8 +123,8 @@ def test_process_next_analysis_unknown_plugin(self, analysis_scheduler): @pytest.mark.backend_config_overwrite( { 'plugin': { - 'dummy_plugin_for_testing_only': { - 'name': 'dummy_plugin_for_testing_only', + 'ExamplePlugin': { + 'name': 'ExamplePlugin', 'mime_whitelist': ['foo', 'bar'], }, } @@ -133,9 +134,9 @@ def test_skip_analysis_because_whitelist(self, analysis_scheduler, post_analysis test_fw = Firmware(file_path=get_test_data_dir() / 'get_files_test/testfile1') test_fw.scheduled_analysis = ['file_hashes'] test_fw.processed_analysis['file_type'] = {'result': {'mime': 'text/plain'}} - analysis_scheduler._start_or_skip_analysis('dummy_plugin_for_testing_only', test_fw) + analysis_scheduler._start_or_skip_analysis('ExamplePlugin', test_fw) uid, plugin, analysis_result = post_analysis_queue.get(timeout=10) - assert plugin == 'dummy_plugin_for_testing_only' + assert plugin == 'ExamplePlugin' assert 'skipped' in analysis_result['result'] @@ -144,13 +145,12 @@ class TestAnalysisSchedulerBlacklist: file_object = MockFileObject() class PluginMock: - DEPENDENCIES = [] # noqa: RUF012 - def __init__(self, blacklist=None, whitelist=None): + self.metadata = MetaDataMock() if blacklist: - self.MIME_BLACKLIST = blacklist + self.metadata.mime_blacklist = blacklist if whitelist: - self.MIME_WHITELIST = whitelist + self.metadata.mime_whitelist = whitelist def shutdown(self): pass @@ -263,13 +263,10 @@ def add_file_type_mock(_, fo): class TestAnalysisSkipping: class PluginMock: - DEPENDENCIES = [] # noqa: RUF012 - def __init__(self, version, system_version): - self.VERSION = version - self.NAME = 'test plug-in' + self.metadata = MetaDataMock(version=version, name='test plug-in') if system_version: - self.SYSTEM_VERSION = system_version + self.metadata.system_version = system_version class BackendMock: def __init__(self, analysis_result): @@ -299,14 +296,14 @@ def setup_class(cls): 'expected_output', ), [ - ('1.0', None, '1.0', None, True), - ('1.1', None, '1.0', None, False), - ('1.0', None, '1.1', None, True), - ('1.0', '2.0', '1.0', '2.0', True), - ('1.0', '2.0', '1.0', '2.1', True), - ('1.0', '2.1', '1.0', '2.0', False), - ('1.0', '2.0', '1.0', None, False), - (' 1.0', '1.1', '1.1', '1.0', False), # invalid version string + (Version(1, 0), None, '1.0.0', None, True), + (Version(1, 1), None, '1.0.0', None, False), + (Version(1, 0), None, '1.1.0', None, True), + (Version(1, 0), '2.0', '1.0.0', '2.0', True), + (Version(1, 0), '2.0', '1.0.0', '2.1', True), + (Version(1, 0), '2.1', '1.0.0', '2.0', False), + (Version(1, 0), '2.0', '1.0.0', None, False), + ('foo', '1.1', '1.1.0', '1.0', False), # invalid version string ], ) def test_analysis_is_already_in_db_and_up_to_date( @@ -357,12 +354,10 @@ def test_is_forced_update(self): class TestAnalysisShouldReanalyse: class PluginMock: - DEPENDENCIES = ['plugin_dep'] # noqa: RUF012 - NAME = 'plugin_root' - def __init__(self, plugin_version, system_version): - self.VERSION = plugin_version - self.SYSTEM_VERSION = system_version + self.metadata = MetaDataMock(version=plugin_version, system_version=system_version) + self.metadata.dependencies = ['plugin_dep'] + self.metadata.name = 'plugin_root' class BackendMock: def __init__(self, dependency_analysis_date, system_version=None): @@ -392,13 +387,13 @@ def setup_class(cls): 'expected_result', ), [ - (10, 20, '1.0', None, '1.0', None, False), # analysis date < dependency date => not up to date - (20, 10, '1.0', None, '1.0', None, True), # analysis date > dependency date => up to date - (20, 10, '1.1', None, '1.0', None, False), # plugin version > db version => not up to date - (20, 10, '1.0', None, '1.1', None, True), # plugin version < db version => up to date - (20, 10, '1.0', '1.1', '1.0', '1.0', False), # system version > db system version => not up to date - (20, 10, '1.0', '1.0', '1.0', '1.1', True), # system version < db system version => up to date - (20, 10, '1.0', '1.0', '1.0', None, False), # system version did not exist in db => not up to date + (10, 20, Version(1, 0), None, '1.0.0', None, False), # analysis date < dependency date => not up to date + (20, 10, Version(1, 0), None, '1.0.0', None, True), # analysis date > dependency date => up to date + (20, 10, Version(1, 1), None, '1.0.0', None, False), # plugin version > db version => not up to date + (20, 10, Version(1, 0), None, '1.1.0', None, True), # plugin version < db version => up to date + (20, 10, Version(1, 0), '1.1', '1.0.0', '1.0', False), # system version > db sys version => not up to date + (20, 10, Version(1, 0), '1.0', '1.0.0', '1.1', True), # system version < db sys version => up to date + (20, 10, Version(1, 0), '1.0', '1.0.0', None, False), # system version didn't exist in db => not up to date (20, 10, 'foo', '1.0', '1.0', None, False), # invalid version => not up to date ], ) @@ -422,9 +417,21 @@ def test_analysis_is_up_to_date( # noqa: PLR0913 assert self.scheduler._analysis_is_up_to_date(analysis_db_entry, plugin, 'uid') == expected_result -class PluginMock(AnalysisBasePlugin): - def __init__(self, dependencies): - self.DEPENDENCIES = dependencies +@dataclass +class MetaDataMock: + _: KW_ONLY + name: str = 'mock_plugin' + dependencies: list[str] = field(default_factory=list) + version: Version = field(default=Version(0, 1, 0)) + system_version: str | None = None + + +class MockRunner: + def __init__(self): + self._in_queue = Queue() + + def get_queue_len(self): + return self._in_queue.qsize() def test_combined_analysis_workload(monkeypatch): @@ -433,16 +440,15 @@ def test_combined_analysis_workload(monkeypatch): scheduler.analysis_plugins = {} scheduler._plugin_runners = {} - dummy_plugin = scheduler.analysis_plugins['dummy_plugin'] = PluginMock([]) - dummy_plugin.in_queue = Queue() + dummy_runner = scheduler._plugin_runners['dummy_plugin'] = MockRunner() scheduler.process_queue = Queue() try: assert scheduler.get_combined_analysis_workload() == 0 scheduler.process_queue.put({}) for _ in range(2): - dummy_plugin.in_queue.put({}) + dummy_runner._in_queue.put({}) assert scheduler.get_combined_analysis_workload() == 3 finally: sleep(0.1) # let the queue finish internally to not cause "Broken pipe" scheduler.process_queue.close() - dummy_plugin.in_queue.close() + dummy_runner._in_queue.close() diff --git a/src/test/unit/scheduler/test_analysis_status.py b/src/test/unit/scheduler/test_analysis_status.py index ad54b2c22..753e5fc0b 100644 --- a/src/test/unit/scheduler/test_analysis_status.py +++ b/src/test/unit/scheduler/test_analysis_status.py @@ -10,9 +10,18 @@ ROOT_UID = 'root_uid' +class MockRedis: + def __init__(self): + self.analysis_status = None + + def set_analysis_status(self, status): + self.analysis_status = status + + class TestAnalysisStatus: def setup_method(self): self.status = AnalysisStatus() + self.status._worker.redis = MockRedis() def test_add_firmware_to_current_analyses(self): fw = Firmware(binary=b'foo') @@ -207,3 +216,73 @@ def test_cancel_unknown_uid(self): self.status._worker._update_status() assert ROOT_UID in self.status._worker.currently_running + + def test_add_update(self): + fw = Firmware(binary=b'foo') + fw.files_included = ['foo', 'bar'] + self.status.add_update(fw, fw.files_included) + # _update_status is called twice, because add_object is called first and then add_update + self.status._worker._update_status() + self.status._worker._update_status() + + assert fw.uid in self.status._worker.currently_running + result = self.status._worker.currently_running[fw.uid] + assert result.files_to_unpack == set() + assert result.completed_files == set() + assert result.unpacked_files_count == result.total_files_count + assert result.analyzed_files_count == 0 + assert result.files_to_analyze == {fw.uid, *fw.files_included} + assert result.total_files_count == len(fw.files_included) + 1 + + def test_add_analysis(self): + self.status._worker.currently_running = { + ROOT_UID: FwAnalysisStatus( + files_to_unpack=set(), + files_to_analyze={'foo', 'bar'}, + analysis_plugins={}, + hid='', + total_files_count=3, + ) + } + self.status._currently_analyzed[ROOT_UID] = True + fo = FileObject(binary=b'foo') + fo.root_uid = ROOT_UID + fo.uid = 'foo' + + result = self.status._worker.currently_running[ROOT_UID] + assert result.analysis_plugins == {} + + self.status.add_analysis(fo, 'some_plugin') + self.status._worker._update_status() + + assert result.analysis_plugins == {'some_plugin': 1} + + def test_store_status(self): + self.status._worker.currently_running = { + ROOT_UID: FwAnalysisStatus( + files_to_unpack={'bar'}, + files_to_analyze={'foo', 'bar'}, + analysis_plugins={}, + hid='', + unpacked_files_count=2, + total_files_count=3, + ) + } + recently_finished = { + 'other_UID': { + 'duration': 13.37, + 'total_files_count': 1337, + 'time_finished': 1337, + 'hid': 'analysis_status.hid', + } + } + self.status._worker.recently_finished = recently_finished + self.status._worker._store_status() + + status = self.status._worker.redis.analysis_status + assert ROOT_UID in status['current_analyses'] + assert status['current_analyses'][ROOT_UID]['unpacked_count'] == 2 + assert status['current_analyses'][ROOT_UID]['analyzed_count'] == 0 + assert status['current_analyses'][ROOT_UID]['total_count'] == 3 + assert status['recently_canceled_analyses'] == {} + assert status['recently_finished_analyses'] == recently_finished diff --git a/src/test/unit/scheduler/test_task_scheduler.py b/src/test/unit/scheduler/test_task_scheduler.py index d342b050c..39402147c 100644 --- a/src/test/unit/scheduler/test_task_scheduler.py +++ b/src/test/unit/scheduler/test_task_scheduler.py @@ -1,14 +1,24 @@ +from __future__ import annotations + +from dataclasses import dataclass + import pytest +from semver import Version from objects.firmware import Firmware from scheduler.task_scheduler import AnalysisTaskScheduler +@dataclass +class MetadataMock: + dependencies: list[str] + version: Version + + class TestAnalysisScheduling: class PluginMock: def __init__(self, dependencies): - self.DEPENDENCIES = dependencies - self.VERSION = 1 + self.metadata = MetadataMock(dependencies, Version(1, 0, 0)) def setup_class(self): self.analysis_plugins = {} diff --git a/src/test/unit/statistic/test_analysis_stats.py b/src/test/unit/statistic/test_analysis_stats.py index f0ba0351e..3daedfe4a 100644 --- a/src/test/unit/statistic/test_analysis_stats.py +++ b/src/test/unit/statistic/test_analysis_stats.py @@ -1,34 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass + import pytest +from pydantic import BaseModel -from analysis.PluginBase import AnalysisBasePlugin +from scheduler.analysis import plugin from statistic.analysis_stats import get_plugin_stats from test.common_helper import create_test_firmware -class MockPlugin(AnalysisBasePlugin): - NAME = 'mock_plugin' - FILE = __file__ - VERSION = '0.0' - ANALYSIS_STATS_LIMIT = 5 +@dataclass +class MockMetadata: + name: str + dependencies: list[str] + + +class MockDependency: + metadata = MockMetadata('dependency', []) + + class Schema(BaseModel): + a: int + b: str - def _add_plugin_version_and_timestamp_to_analysis_result(self, fo): - return fo + +@dataclass +class MockPlugin: + metadata: MockMetadata + + def get_analysis(self, *_, **__): + return 1 + + +class MockFSOrganizer: + def generate_path(self, fw): + return fw.file_path @pytest.fixture -def mock_plugin(): - plugin = MockPlugin() - yield plugin - plugin.shutdown() +def mock_runner(): + runner_config = plugin.PluginRunner.Config(process_count=1, timeout=5) + metadata = MockMetadata(name='test', dependencies=[MockDependency.metadata.name]) + runner = plugin.PluginRunner( + MockPlugin(metadata), + runner_config, + {MockDependency.metadata.name: MockDependency.Schema}, + ) + runner._fsorganizer = MockFSOrganizer() + yield runner + runner.shutdown() -def test_get_plugin_stats(mock_plugin): - mock_plugin.analysis_stats[0] = 1.0 - mock_plugin.analysis_stats[1] = 2.0 - mock_plugin.analysis_stats[2] = 3.0 - mock_plugin.analysis_stats_count.value = 3 +def test_get_plugin_stats(mock_runner): + mock_runner.stats[0] = 1.0 + mock_runner.stats[1] = 2.0 + mock_runner.stats[2] = 3.0 + mock_runner.stats_count.value = 3 - result = get_plugin_stats(mock_plugin.analysis_stats, mock_plugin.analysis_stats_count) + result = get_plugin_stats(mock_runner.stats, mock_runner.stats_count) assert result == { 'count': '3', 'max': '3.00', @@ -40,17 +69,19 @@ def test_get_plugin_stats(mock_plugin): @pytest.mark.flaky(reruns=3) # test occasionally fails on the CI -def test_update_duration_stats(mock_plugin): - mock_plugin.start() - assert mock_plugin.analysis_stats_count.value == mock_plugin.analysis_stats_index.value == 0 +def test_update_duration_stats(mock_runner): + plugin.ANALYSIS_STATS_LIMIT = 5 + mock_runner.start() + assert mock_runner.stats_count.value == mock_runner._stats_idx.value == 0 fw = create_test_firmware() + fw.processed_analysis[MockDependency.metadata.name] = {'result': {'a': 1, 'b': '2'}} for _ in range(4): - mock_plugin.add_job(fw) - mock_plugin.out_queue.get(timeout=5) - assert mock_plugin.analysis_stats_count.value == mock_plugin.analysis_stats_index.value == 4 - mock_plugin.add_job(fw) - mock_plugin.out_queue.get(timeout=5) - assert mock_plugin.analysis_stats_count.value == 5 - assert mock_plugin.analysis_stats_index.value == 0, 'index should start at 0 when max count is reached' - - assert get_plugin_stats(mock_plugin.analysis_stats, mock_plugin.analysis_stats_count) is not None + mock_runner.queue_analysis(fw) + mock_runner.out_queue.get(timeout=5) + assert mock_runner.stats_count.value == mock_runner._stats_idx.value == 4 + mock_runner.queue_analysis(fw) + mock_runner.out_queue.get(timeout=5) + assert mock_runner.stats_count.value == 5 + assert mock_runner._stats_idx.value == 0, 'index should start at 0 when max count is reached' + + assert get_plugin_stats(mock_runner.stats, mock_runner.stats_count) is not None diff --git a/src/test/unit/web_interface/test_plugin_routes.py b/src/test/unit/web_interface/test_plugin_routes.py index 5232aaabf..b05fb196c 100644 --- a/src/test/unit/web_interface/test_plugin_routes.py +++ b/src/test/unit/web_interface/test_plugin_routes.py @@ -39,13 +39,13 @@ def test_get_modules_in_path(self): def test_find_plugins(self): result = _find_plugins() categories, plugins = zip(*result) - plugins = chain(*plugins) + plugins = set(chain(*plugins)) assert all(c in categories for c in PLUGIN_CATEGORIES) - assert 'dummy' in plugins + assert 'file_type' in plugins assert 'file_coverage' in plugins def test_module_has_routes(self): - assert _module_has_routes('dummy', 'analysis') is True + assert _module_has_routes('example_plugin', 'analysis') is True assert _module_has_routes('file_type', 'analysis') is False def test_import_module_routes(self): @@ -54,7 +54,7 @@ def test_import_module_routes(self): assert dummy_endpoint not in self._get_app_endpoints(self.app) - plugin_routes._import_module_routes('dummy', 'analysis') + plugin_routes._import_module_routes('example_plugin', 'analysis') assert dummy_endpoint in self._get_app_endpoints(self.app) test_client = self.app.test_client() @@ -67,7 +67,7 @@ def test_import_module_routes__rest(self): assert dummy_endpoint not in self._get_app_endpoints(self.app) - plugin_routes._import_module_routes('dummy', 'analysis') + plugin_routes._import_module_routes('example_plugin', 'analysis') test_client = self.app.test_client() result = test_client.get(dummy_endpoint).json diff --git a/src/unpacker/unpack.py b/src/unpacker/unpack.py index bd013315a..083737194 100644 --- a/src/unpacker/unpack.py +++ b/src/unpacker/unpack.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING, Optional import config -from analysis.PluginBase import sanitize_processed_analysis from helperFunctions import magic from helperFunctions.fileSystem import file_is_empty, get_relative_object_path from helperFunctions.tag import TagColor @@ -111,3 +110,26 @@ def _check_path(self, file_object: FileObject): error = ExtractionError('File not found') self._store_unpacking_error_skip_info(file_object, error=error) raise error + + +def sanitize_processed_analysis(processed_analysis_entry: dict) -> dict: + # Old analysis plugins (before AnalysisPluginV0) could write anything they want to processed_analysis. + # We put everything the plugin wrote into a separate dict so that it matches the behavior of AnalysisPluginV0 + result = {} + for key in list(processed_analysis_entry): + if key in { + 'tags', + 'summary', + 'analysis_date', + 'plugin_version', + 'system_version', + 'file_system_flag', + 'result', + }: + continue + + result[key] = processed_analysis_entry.pop(key) + + processed_analysis_entry['result'] = result + + return processed_analysis_entry diff --git a/src/web_interface/rest/rest_base.py b/src/web_interface/rest/rest_base.py index 82a371ccf..d971bac0e 100644 --- a/src/web_interface/rest/rest_base.py +++ b/src/web_interface/rest/rest_base.py @@ -48,7 +48,10 @@ def __init__(self, app=None, db=None, intercom=None, status=None): def _wrap_response(api): @api.representation('application/json') def output_json(data, code, headers=None): - output_data = json.dumps(data, cls=ReportEncoder, sort_keys=True) + try: + output_data = json.dumps(data, cls=ReportEncoder, sort_keys=True) + except TypeError as error: + raise Exception(f'Could not encode JSON: {data!r}') from error resp = make_response(output_data, code) resp.headers.extend(headers if headers else {}) return resp