Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/analysis/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .plugin import AnalysisPluginV0, Tag # noqa: F401
from .plugin import AnalysisFailedError, AnalysisPluginV0, Tag # noqa: F401
8 changes: 6 additions & 2 deletions src/analysis/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@


class AnalysisFailedError(Exception):
...
"""
This exception is used to cancel an analysis in a controlled way while still providing context information and
will not log an error with traceback to the terminal. It is an "expected exception" during analysis: Some
requirement is missing or the analysis input is incompatible and the analysis cannot be performend.
"""


class Tag(BaseModel):
Expand Down Expand Up @@ -112,7 +116,7 @@ def analyze(
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: dict[str, pydantic.BaseModel],
) -> typing.Optional[Schema]:
) -> Schema:
"""Analyze a file.
May return None if nothing was found.

Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/binwalk/code/binwalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import binwalk
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0
Expand Down Expand Up @@ -37,10 +38,10 @@ class Schema(BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='binwalk',
description='binwalk signature and entropy analysis',
version='1.0.0',
version=Version(1, 0, 0),
Schema=self.Schema,
mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_metadatadetector_get_device_architecture(architecture, bitness, endiann


@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginsSoftwareComponents:
class TestAnalysisPluginCpuArchitecture:
def test_analyze(self, analysis_plugin):
dependencies = {
'kernel_config': _mock_kernel_config_analysis_arm,
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/crypto_hints/code/crypto_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING, List

import pydantic
from semver import Version

from analysis.plugin import AnalysisPluginV0, addons, compat

Expand All @@ -15,10 +16,10 @@ class Schema(pydantic.BaseModel):
matches: List[dict]

def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='crypto_hints',
description='find indicators of specific crypto algorithms',
version='0.2.1',
version=Version(0, 2, 1),
Schema=AnalysisPlugin.Schema,
)
super().__init__(metadata=metadata)
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/analysis/crypto_material/code/crypto_material.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, List, NamedTuple

from pydantic import BaseModel, Field
from semver import Version

from analysis.plugin import AnalysisPluginV0, Tag, addons, compat
from helperFunctions.hash import get_md5
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self):
metadata = self.MetaData(
name='crypto_material',
description='detects crypto material like SSH keys and SSL certificates',
version='1.0.0',
version=Version(1, 0, 0),
mime_blacklist=['filesystem', *MIME_BLACKLIST_COMPRESSED],
Schema=self.Schema,
)
Expand Down
20 changes: 13 additions & 7 deletions src/plugins/analysis/cwe_checker/code/cwe_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

from docker.types import Mount
from pydantic import BaseModel
from requests import RequestException
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self):
'application/x-pie-executable',
'application/x-sharedlib',
],
version=Version(1, 0, 0),
version=Version(1, 0, 1),
Schema=self.Schema,
)
)
Expand Down Expand Up @@ -129,22 +130,27 @@ def _is_supported_arch(file_type_analysis: BaseModel) -> bool:
return any(supported_arch in arch_type for supported_arch in SUPPORTED_ARCHS)

def _do_full_analysis(self, file_path: str) -> dict:
output = self._run_cwe_checker_in_docker(file_path)
try:
output = self._run_cwe_checker_in_docker(file_path)
except RequestException as e:
raise AnalysisFailedError('No response from cwe_checker Docker container (possible timeout)') from e
if output is None:
raise Exception(f'Timeout or error during cwe_checker execution.\nUID: {file_path}')
raise AnalysisFailedError('cwe_checker output is missing (timeout or error during execution)')
try:
return self._parse_cwe_checker_output(output)
except json.JSONDecodeError as error:
raise Exception(f'cwe_checker execution failed\nUID: {file_path}') from error
raise AnalysisFailedError('cwe_checker execution failed: Could not parse output') from error

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema | None:
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
"""
This function handles only ELF executables. Otherwise, it returns an empty dictionary.
It calls the cwe_checker docker container.
"""
del virtual_file_path
if not self._is_supported_arch(analyses['file_type']):
return None
full_type = analyses['file_type'].full
arch = full_type.split(',')[1].strip() if full_type.startswith('ELF') else 'Unknown'
raise AnalysisFailedError(f'Unsupported architecture: {arch}')
result = self._do_full_analysis(file_handle.name)

return self.Schema(
Expand Down
6 changes: 3 additions & 3 deletions src/plugins/analysis/device_tree/code/device_tree.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict

from semver import Version

Expand All @@ -16,7 +16,7 @@

class AnalysisPlugin(AnalysisPluginV0):
def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='device_tree',
description='get the device tree in text from the device tree blob',
version=Version(2, 0, 1),
Expand All @@ -40,7 +40,7 @@ def analyze(
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: Dict[str, dict],
) -> Optional[Schema]:
) -> Schema:
del virtual_file_path, analyses

binary = file_handle.readall()
Expand Down
16 changes: 11 additions & 5 deletions src/plugins/analysis/example_plugin/code/example_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import io
from pathlib import Path

import pydantic
from pydantic import Field
from semver import Version

from analysis.plugin import AnalysisPluginV0
from analysis.plugin.plugin import AnalysisFailedError, AnalysisPluginV0


class AnalysisPlugin(AnalysisPluginV0):
Expand All @@ -24,10 +26,10 @@ class Schema(pydantic.BaseModel):
dependant_analysis: dict

def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='ExamplePlugin',
description='An example description',
version='0.0.0',
version=Version(0, 0, 0),
Schema=AnalysisPlugin.Schema,
# Note that you don't have to set these fields,
# they are just here to show that you can.
Expand All @@ -43,11 +45,15 @@ def summarize(self, result):
del result
return ['big-file', 'binary']

def analyze(self, file_handle: io.FileIO, virtual_file_path: str, analyses: dict) -> Schema:
def analyze(self, file_handle: io.FileIO, virtual_file_path: dict, analyses: dict) -> Schema:
first_byte = file_handle.read(1)
if first_byte == b'\xff':
raise AnalysisFailedError('reason for fail')
if first_byte == b'\xee':
raise Exception('Unexpected exception occurred.')
return AnalysisPlugin.Schema(
number=42,
name=file_handle.name,
name=Path(file_handle.name).name,
first_byte=first_byte.hex(),
virtual_file_path=virtual_file_path,
dependant_analysis=analyses['file_type'].model_dump(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

from docker.types import Mount
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0, Tag
from helperFunctions.docker import run_docker_container
from helperFunctions.tag import TagColor

Expand Down Expand Up @@ -130,7 +131,7 @@ def __init__(self):
description=(
'extract file system metadata (e.g. owner, group, etc.) from file system images contained in firmware'
),
version='1.2.0',
version=Version(1, 2, 0),
Schema=self.Schema,
timeout=30,
)
Expand Down Expand Up @@ -196,7 +197,7 @@ def _extract_metadata_from_file_system(self, file_handle: FileIO) -> list[FileMe
return _analyze_metadata_of_mounted_dir(json.loads(output_file.read_bytes()))
message = 'Mounting the file system failed'
logging.warning(f'{message} for {file_handle.name}:\n{output}')
raise RuntimeError(message)
raise AnalysisFailedError(message)

def _mount_in_docker(self, input_dir: str) -> str:
result = run_docker_container(
Expand Down Expand Up @@ -278,7 +279,7 @@ def _extract_metadata_from_tar(file_handle: FileIO) -> list[FileMetadata]:
except EOFError:
logging.warning(f'File {file_handle.name} ended unexpectedly')
except (tarfile.TarError, zlib.error, tarfile.ReadError) as error:
raise RuntimeError('Could not open tar archive') from error
raise AnalysisFailedError(f'Could not open tar archive: {error}') from error
return result


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import pytest

from analysis.plugin import AnalysisFailedError

from ..code.file_system_metadata import (
SGID_BIT,
STICKY_BIT,
Expand Down Expand Up @@ -100,7 +102,7 @@ def test_extract_metadata_from_file_system(self, analysis_plugin):
assert result[testfile_sticky_key].modification_time == 1518167842.0

def test_extract_metadata_from_file_system__unmountable(self, analysis_plugin):
with pytest.raises(RuntimeError, match='Mounting the file system failed'):
with pytest.raises(AnalysisFailedError, match='Mounting the file system failed'):
analysis_plugin._extract_metadata_from_file_system(FileIO(self.test_file_tar))

def test_extract_metadata_from_tar(self):
Expand Down Expand Up @@ -173,7 +175,7 @@ def test_extract_metadata_from_tar__packed_tar_bz(self):
)

def test_extract_metadata_tar_unreadable(self):
with pytest.raises(RuntimeError):
with pytest.raises(AnalysisFailedError):
_extract_metadata_from_tar(FileIO(TEST_DATA_DIR / 'squashfs.img'))

def test_extract_metadata_from_tar__eof_error(self):
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/file_type/code/file_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pydantic
from pydantic import Field
from semver import Version

from analysis.plugin import AnalysisPluginV0
from helperFunctions import magic
Expand All @@ -24,10 +25,10 @@ class Schema(pydantic.BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='file_type',
description='identify the file type',
version='1.0.0',
version=Version(1, 0, 0),
Schema=AnalysisPlugin.Schema,
),
)
Expand Down
10 changes: 5 additions & 5 deletions src/plugins/analysis/hashlookup/code/hashlookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import BaseModel, Field, model_validator
from semver import Version

from analysis.plugin import AnalysisPluginV0
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED, MIME_BLACKLIST_NON_EXECUTABLE

if TYPE_CHECKING:
Expand Down Expand Up @@ -109,19 +109,19 @@ def __init__(self):
)
)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema | None:
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del file_handle, virtual_file_path
try:
sha2_hash = analyses['file_hashes'].sha256
except (KeyError, AttributeError) as error:
raise HashLookupError('sha256 hash is missing in dependency results') from error
raise AnalysisFailedError('sha256 hash is missing in dependency results') from error

result = _look_up_hash(sha2_hash.upper())

if 'FileName' not in result:
if 'message' in result and result['message'] == 'Non existing SHA-256':
# sha256 hash unknown to hashlookup at time of analysis'
return None
raise AnalysisFailedError('No record found in circl.lu for this file.')
raise HashLookupError('Unknown error connecting to hashlookup API')
return self.Schema.model_validate(result)

Expand All @@ -134,4 +134,4 @@ def _look_up_hash(sha2_hash: str) -> dict:
url = f'https://hashlookup.circl.lu/lookup/sha256/{sha2_hash}'
return requests.get(url, headers={'accept': 'application/json'}).json()
except (requests.ConnectionError, json.JSONDecodeError) as error:
raise HashLookupError('Failed to connect to circl.lu hashlookup API') from error
raise AnalysisFailedError('Failed to connect to circl.lu hashlookup API') from error
5 changes: 3 additions & 2 deletions src/plugins/analysis/hashlookup/test/test_hashlookup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from analysis.plugin import AnalysisFailedError
from plugins.analysis.hash.code.hash import AnalysisPlugin as HashPlugin
from plugins.analysis.hashlookup.code.hashlookup import AnalysisPlugin, HashLookupError

Expand Down Expand Up @@ -71,8 +72,8 @@ def test_process_object_known_hash(self, analysis_plugin):

def test_process_object_unknown_hash(self, analysis_plugin):
dependencies = {'file_hashes': HashPlugin.Schema(md5='', sha256='unknown_hash')}
result = analysis_plugin.analyze(None, {}, dependencies)
assert result is None
with pytest.raises(AnalysisFailedError, match='No record found'):
analysis_plugin.analyze(None, {}, dependencies)

def test_process_object_error(self, analysis_plugin):
dependencies = {'file_hashes': HashPlugin.Schema(md5='', sha256='connection_error')}
Expand Down
Loading