Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/analysis/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .plugin import AnalysisPluginV0, Tag # noqa: F401
from .plugin import AnalysisFailedError, AnalysisPluginV0, Tag # noqa: F401
8 changes: 6 additions & 2 deletions src/analysis/plugin/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@


class AnalysisFailedError(Exception):
...
"""
This exception is used to cancel an analysis in a controlled way while still providing context information and
will not log an error with traceback to the terminal. It is an "expected exception" during analysis: Some
requirement is missing or the analysis input is incompatible and the analysis cannot be performend.
"""


class Tag(BaseModel):
Expand Down Expand Up @@ -114,7 +118,7 @@ def analyze(
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: dict[str, pydantic.BaseModel],
) -> typing.Optional[Schema]:
) -> Schema:
"""Analyze a file.
May return None if nothing was found.

Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/binwalk/code/binwalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import binwalk
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0
Expand Down Expand Up @@ -37,10 +38,10 @@ class Schema(BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='binwalk',
description='binwalk signature and entropy analysis',
version='1.0.0',
version=Version(1, 0, 0),
Schema=self.Schema,
mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def test_metadatadetector_get_device_architecture(architecture, bitness, endiann


@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginsSoftwareComponents:
class TestAnalysisPluginCpuArchitecture:
def test_analyze(self, analysis_plugin):
dependencies = {
'kernel_config': _mock_kernel_config_analysis_arm,
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/crypto_hints/code/crypto_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import TYPE_CHECKING, List

import pydantic
from semver import Version

from analysis.plugin import AnalysisPluginV0, addons, compat

Expand All @@ -15,10 +16,10 @@ class Schema(pydantic.BaseModel):
matches: List[dict]

def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='crypto_hints',
description='find indicators of specific crypto algorithms',
version='0.2.1',
version=Version(0, 2, 1),
Schema=AnalysisPlugin.Schema,
)
super().__init__(metadata=metadata)
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/analysis/crypto_material/code/crypto_material.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, List, NamedTuple

from pydantic import BaseModel, Field
from semver import Version

from analysis.plugin import AnalysisPluginV0, Tag, addons, compat
from helperFunctions.hash import get_md5
Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(self):
metadata = self.MetaData(
name='crypto_material',
description='detects crypto material like SSH keys and SSL certificates',
version='1.0.0',
version=Version(1, 0, 0),
mime_blacklist=['filesystem', *MIME_BLACKLIST_COMPRESSED],
Schema=self.Schema,
)
Expand Down
8 changes: 5 additions & 3 deletions src/plugins/analysis/cwe_checker/code/cwe_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,14 +137,16 @@ def _do_full_analysis(self, file_path: str) -> dict:
except json.JSONDecodeError as error:
raise Exception(f'cwe_checker execution failed\nUID: {file_path}') from error

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema | None:
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
"""
This function handles only ELF executables. Otherwise, it returns an empty dictionary.
It calls the cwe_checker docker container.
"""
del virtual_file_path
if not self._is_supported_arch(analyses['file_type']):
return None
full_type = analyses['file_type'].full
arch = full_type.split(',')[1].strip() if full_type.startswith('ELF') else 'Unknown'
raise AnalysisFailedError(f'Unsupported architecture: {arch}')
result = self._do_full_analysis(file_handle.name)

return self.Schema(
Expand Down
6 changes: 3 additions & 3 deletions src/plugins/analysis/device_tree/code/device_tree.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Dict, Optional
from typing import TYPE_CHECKING, Dict

from semver import Version

Expand All @@ -16,7 +16,7 @@

class AnalysisPlugin(AnalysisPluginV0):
def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='device_tree',
description='get the device tree in text from the device tree blob',
version=Version(2, 0, 1),
Expand All @@ -40,7 +40,7 @@ def analyze(
file_handle: io.FileIO,
virtual_file_path: dict,
analyses: Dict[str, dict],
) -> Optional[Schema]:
) -> Schema:
del virtual_file_path, analyses

binary = file_handle.readall()
Expand Down
20 changes: 12 additions & 8 deletions src/plugins/analysis/example_plugin/code/example_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import io
from pathlib import Path

import pydantic
from pydantic import Field
from semver import Version

from analysis.plugin import AnalysisPluginV0
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0


class AnalysisPlugin(AnalysisPluginV0):
Expand All @@ -24,10 +26,10 @@ class Schema(pydantic.BaseModel):
dependant_analysis: dict

def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='ExamplePlugin',
description='An example description',
version='0.0.0',
version=Version(0, 0, 0),
Schema=AnalysisPlugin.Schema,
# Note that you don't have to set these fields,
# they are just here to show that you can.
Expand All @@ -43,14 +45,16 @@ def summarize(self, result):
del result
return ['big-file', 'binary']

def analyze(self, file_handle: io.FileIO, virtual_file_path: str, analyses: dict) -> Schema:
file_type_analysis = analyses['file_type']

def analyze(self, file_handle: io.FileIO, virtual_file_path: dict, analyses: dict) -> Schema:
first_byte = file_handle.read(1)
if first_byte == b'\xff':
raise AnalysisFailedError('reason for fail')
if first_byte == b'\xee':
raise Exception('Unexpected exception occurred.')
return AnalysisPlugin.Schema(
number=42,
name=file_handle.name,
name=Path(file_handle.name).name,
first_byte=first_byte.hex(),
virtual_file_path=virtual_file_path,
dependant_analysis=file_type_analysis,
dependant_analysis=analyses['file_type'].model_dump(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from docker.types import Mount
from pydantic import BaseModel, Field
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0, Tag
Expand Down Expand Up @@ -130,7 +131,7 @@ def __init__(self):
description=(
'extract file system metadata (e.g. owner, group, etc.) from file system images contained in firmware'
),
version='1.2.0',
version=Version(1, 2, 0),
Schema=self.Schema,
timeout=30,
)
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/file_type/code/file_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pydantic
from pydantic import Field
from semver import Version

from analysis.plugin import AnalysisPluginV0
from helperFunctions import magic
Expand All @@ -24,10 +25,10 @@ class Schema(pydantic.BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='file_type',
description='identify the file type',
version='1.0.0',
version=Version(1, 0, 0),
Schema=AnalysisPlugin.Schema,
),
)
Expand Down
10 changes: 5 additions & 5 deletions src/plugins/analysis/hashlookup/code/hashlookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydantic import BaseModel, Field, model_validator
from semver import Version

from analysis.plugin import AnalysisPluginV0
from analysis.plugin import AnalysisFailedError, AnalysisPluginV0
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED, MIME_BLACKLIST_NON_EXECUTABLE

if TYPE_CHECKING:
Expand Down Expand Up @@ -109,19 +109,19 @@ def __init__(self):
)
)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema | None:
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del file_handle, virtual_file_path
try:
sha2_hash = analyses['file_hashes'].sha256
except (KeyError, AttributeError) as error:
raise HashLookupError('sha256 hash is missing in dependency results') from error
raise AnalysisFailedError('sha256 hash is missing in dependency results') from error

result = _look_up_hash(sha2_hash.upper())

if 'FileName' not in result:
if 'message' in result and result['message'] == 'Non existing SHA-256':
# sha256 hash unknown to hashlookup at time of analysis'
return None
raise AnalysisFailedError('No record found in circl.lu for this file.')
raise HashLookupError('Unknown error connecting to hashlookup API')
return self.Schema.model_validate(result)

Expand All @@ -134,4 +134,4 @@ def _look_up_hash(sha2_hash: str) -> dict:
url = f'https://hashlookup.circl.lu/lookup/sha256/{sha2_hash}'
return requests.get(url, headers={'accept': 'application/json'}).json()
except (requests.ConnectionError, json.JSONDecodeError) as error:
raise HashLookupError('Failed to connect to circl.lu hashlookup API') from error
raise AnalysisFailedError('Failed to connect to circl.lu hashlookup API') from error
5 changes: 3 additions & 2 deletions src/plugins/analysis/hashlookup/test/test_hashlookup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from analysis.plugin import AnalysisFailedError
from plugins.analysis.hash.code.hash import AnalysisPlugin as HashPlugin
from plugins.analysis.hashlookup.code.hashlookup import AnalysisPlugin, HashLookupError

Expand Down Expand Up @@ -71,8 +72,8 @@ def test_process_object_known_hash(self, analysis_plugin):

def test_process_object_unknown_hash(self, analysis_plugin):
dependencies = {'file_hashes': HashPlugin.Schema(md5='', sha256='unknown_hash')}
result = analysis_plugin.analyze(None, {}, dependencies)
assert result is None
with pytest.raises(AnalysisFailedError, match='No record found'):
analysis_plugin.analyze(None, {}, dependencies)

def test_process_object_error(self, analysis_plugin):
dependencies = {'file_hashes': HashPlugin.Schema(md5='', sha256='connection_error')}
Expand Down
Empty file.
Empty file.
3 changes: 1 addition & 2 deletions src/plugins/analysis/kernel_config/code/kernel_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from semver import Version

from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.tag import TagColor
from plugins.analysis.kernel_config.internal.checksec_check_kernel import CHECKSEC_PATH, check_kernel_config
from plugins.analysis.kernel_config.internal.decomp import GZDecompressor
Expand All @@ -34,7 +33,7 @@ class CheckSec(BaseModel):
selinux: dict


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class AnalysisPlugin(AnalysisPluginV0):
class Schema(BaseModel):
is_kernel_config: bool
kernel_config: Optional[str] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class Schema(BaseModel):
vulnerabilities: List[Vulnerability]

def __init__(self):
metadata = AnalysisPluginV0.MetaData(
metadata = self.MetaData(
name='known_vulnerabilities',
description='Rule based detection of known vulnerabilities like Heartbleed',
dependencies=['file_hashes', 'software_components'],
Expand Down
5 changes: 3 additions & 2 deletions src/plugins/analysis/linter/code/source_code_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pydantic
from docker.types import Mount
from pydantic import Field
from semver import Version

from analysis.plugin import AnalysisPluginV0
from helperFunctions.docker import run_docker_container
Expand Down Expand Up @@ -62,10 +63,10 @@ class Issue(pydantic.BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='source_code_analysis',
description='This plugin implements static code analysis for multiple scripting languages',
version='0.7.1',
version=Version(0, 7, 1),
Schema=AnalysisPlugin.Schema,
mime_whitelist=['text/'],
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import config
from analysis.plugin import AnalysisPluginV0, Tag, addons
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.tag import TagColor
from plugins.analysis.software_components.bin import OS_LIST
from plugins.mime_blacklists import MIME_BLACKLIST_NON_EXECUTABLE
Expand Down Expand Up @@ -38,7 +37,7 @@ class MatchingString(BaseModel):
identifier: str = Field(description='Identifier of the rule that this string matched (e.g. "$a")')


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class AnalysisPlugin(AnalysisPluginV0):
class Schema(BaseModel):
software_components: List[SoftwareMatch]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginsSoftwareComponents:
class TestAnalysisPluginSoftwareComponents:
def test_process_object(self, analysis_plugin):
with YARA_TEST_FILE.open('rb') as fp:
results = analysis_plugin.analyze(fp, {}, {})
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/analysis/strings/code/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _compile_regexes(self) -> list[tuple[Pattern[bytes], str]]:
for regex, encoding in STRING_REGEXES
]

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]):
def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del virtual_file_path, analyses
return self.Schema(
strings=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Schema(pydantic.BaseModel):

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
metadata=self.MetaData(
name='users_and_passwords',
description=(
'search for UNIX, httpd, and mosquitto password files, parse them and try to crack the passwords'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin)
class TestAnalysisPluginPasswordFileAnalyzer:
class TestAnalysisPluginUsersAndPasswords:
def test_process_object_shadow_file(self, analysis_plugin):
test_file = TEST_DATA_DIR / 'passwd_test'
with test_file.open() as fp:
Expand Down
Loading