diff --git a/src/plugins/analysis/binwalk/__init__.py b/src/plugins/analysis/binary_forensics/__init__.py similarity index 100% rename from src/plugins/analysis/binwalk/__init__.py rename to src/plugins/analysis/binary_forensics/__init__.py diff --git a/src/plugins/analysis/binwalk/code/__init__.py b/src/plugins/analysis/binary_forensics/code/__init__.py similarity index 100% rename from src/plugins/analysis/binwalk/code/__init__.py rename to src/plugins/analysis/binary_forensics/code/__init__.py diff --git a/src/plugins/analysis/binary_forensics/code/binary_forensics.py b/src/plugins/analysis/binary_forensics/code/binary_forensics.py new file mode 100644 index 000000000..e82008353 --- /dev/null +++ b/src/plugins/analysis/binary_forensics/code/binary_forensics.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +from pydantic import BaseModel +from semver import Version + +import config +from analysis.plugin import AnalysisPluginV0 +from plugins.analysis.binary_forensics.internal.binwalk import BinwalkSignatureResult, get_binwalk_signature_analysis +from plugins.analysis.binary_forensics.internal.entropy import Entropy, get_entropy_analysis +from plugins.analysis.binary_forensics.internal.unblob import UnblobResult, get_unblob_result +from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED + +if TYPE_CHECKING: + from io import FileIO + + +class AnalysisPlugin(AnalysisPluginV0): + class Schema(BaseModel): + entropy: Entropy + file_matches: list[BinwalkSignatureResult] + # unblob may only exist if the file was unpacked using the generic carver + unblob_matches: list[UnblobResult] | None + + def __init__(self): + super().__init__( + metadata=AnalysisPluginV0.MetaData( + name='binary_forensics', + description='binary forensic analysis (entropy and Binwalk file signatures)', + version=Version(1, 0, 0), + Schema=self.Schema, + mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED], + ), + ) + self.thresholds = { + 'very high entropy': self._get_plugin_cfg_entry('very_high_entropy_threshold', 0.95), + 'high entropy': self._get_plugin_cfg_entry('high_entropy_threshold', 0.8), + 'medium high entropy': self._get_plugin_cfg_entry('medium_high_entropy_threshold', 0.6), + 'medium entropy': self._get_plugin_cfg_entry('medium_entropy_threshold', 0.4), + 'medium low entropy': self._get_plugin_cfg_entry('medium_low_entropy_threshold', 0.2), + 'low entropy': self._get_plugin_cfg_entry('low_entropy_threshold', 0.05), + } + + def _get_plugin_cfg_entry(self, name: str, default: float) -> float: + entry = getattr(config.backend.plugin.get(self.metadata.name, {}), name, default) + try: + return float(entry) + except (TypeError, ValueError): + logging.warning(f'Failed to parse config entry {name} of plugin {self.metadata.name} (should be float)') + return default + + def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: + del virtual_file_path + + return self.Schema( + entropy=get_entropy_analysis(file_handle), + file_matches=get_binwalk_signature_analysis(file_handle, timeout=self.metadata.timeout), + unblob_matches=get_unblob_result(analyses['unpacker']), + ) + + def summarize(self, result: Schema) -> list: + return [*self._summarize_entropy(result.entropy), *self._summarize_binwalk_result(result.file_matches)] + + def _summarize_entropy(self, result: Entropy) -> list[str]: + for key, value in self.thresholds.items(): + if result.avg_entropy > value: + return [key] + return ['very low entropy'] + + @staticmethod + def _summarize_binwalk_result(binwalk_result: list[BinwalkSignatureResult]) -> list[str]: + summary = [] + for item in binwalk_result: + summary.append(item.name) + return summary diff --git a/src/plugins/analysis/binwalk/test/__init__.py b/src/plugins/analysis/binary_forensics/internal/__init__.py similarity index 100% rename from src/plugins/analysis/binwalk/test/__init__.py rename to src/plugins/analysis/binary_forensics/internal/__init__.py diff --git a/src/plugins/analysis/binary_forensics/internal/binwalk.py b/src/plugins/analysis/binary_forensics/internal/binwalk.py new file mode 100644 index 000000000..843701b10 --- /dev/null +++ b/src/plugins/analysis/binary_forensics/internal/binwalk.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import TYPE_CHECKING + +from docker.errors import DockerException +from docker.types import Mount +from pydantic import BaseModel +from requests.exceptions import JSONDecodeError, ReadTimeout + +from analysis.plugin.plugin import AnalysisFailedError +from helperFunctions.docker import run_docker_container + +if TYPE_CHECKING: + from io import FileIO + + +DOCKER_IMAGE = 'refirmlabs/binwalk:latest' + + +class BinwalkSignatureResult(BaseModel): + offset: int + id: str + size: int + name: str + confidence: int + description: str + + +def get_binwalk_signature_analysis(file: FileIO, timeout: int) -> list[BinwalkSignatureResult]: + return _parse_binwalk_output(_get_docker_output(file, timeout)) + + +def _parse_binwalk_output(binwalk_output: list[dict]) -> list[BinwalkSignatureResult]: + """ + Expected result structure: (binwalk 3.1.1) + [ + { + 'Analysis': { + 'file_path': '/io/input', + 'file_map': [ + { + 'offset': , + 'id': , + 'size': , + 'name': , + 'confidence': , + 'description': , + 'always_display': , + 'extraction_declined': , + }, + ... + ], + } + } + ] + The outer array has only one entry, since we analyze only one file + """ + try: + return [ + BinwalkSignatureResult( + offset=file_result['offset'], + id=file_result['id'], + size=file_result['size'], + name=file_result['name'], + confidence=file_result['confidence'], + description=file_result['description'], + ) + for file_result in binwalk_output[0]['Analysis']['file_map'] + ] + except (KeyError, IndexError) as err: + # FixMe: sadly, there are no tags for the docker container versions, so we can't pin it at the moment + # this should not happen -- if it happens, the plugin needs to be fixed + logging.exception('Failed to binwalk result') + raise AnalysisFailedError('Failed to binwalk result') from err + + +def _get_docker_output(file: FileIO, timeout: int) -> list[dict]: + container_input_path = '/io/input' + container_output_path = '/io/output' + with NamedTemporaryFile() as temp_file: + Path(temp_file.name).touch() + try: + run_docker_container( + DOCKER_IMAGE, + combine_stderr_stdout=True, + timeout=timeout - 1, + command=f'{container_input_path} -l {container_output_path}', + mounts=[ + Mount(container_input_path, file.name, type='bind', read_only=True), + Mount(container_output_path, temp_file.name, type='bind', read_only=False), + ], + logging_label='binwalk', + ) + return json.loads(Path(temp_file.name).read_text()) + except ReadTimeout as err: + raise AnalysisFailedError('Docker container timed out') from err + except (DockerException, OSError) as err: + raise AnalysisFailedError('Docker process error') from err + except JSONDecodeError as err: + raise AnalysisFailedError('Docker output JSON parsing error') from err diff --git a/src/plugins/analysis/binary_forensics/internal/entropy.py b/src/plugins/analysis/binary_forensics/internal/entropy.py new file mode 100644 index 000000000..47756363a --- /dev/null +++ b/src/plugins/analysis/binary_forensics/internal/entropy.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import math +from pathlib import Path +from typing import TYPE_CHECKING, List + +from entropython import metric_entropy +from pydantic import BaseModel + +if TYPE_CHECKING: + from io import FileIO + +BLOCK_SIZE_MIN = 2**10 # 1 KiB +BLOCK_SIZE_MAX = 2**20 # 1 MiB + + +class Block(BaseModel): + offset: int + entropy: float + + +class Entropy(BaseModel): + avg_entropy: float + blocks: List[Block] + blocksize: int + + +def get_entropy_analysis(file_handle: FileIO) -> Entropy: + file = Path(file_handle.name) + size = file.stat().st_size + if size == 0: + return Entropy(avg_entropy=0, blocksize=0, blocks=[]) + + blocksize = _get_blocksize(size) + blocks = [] + offset = 0 + with file.open('rb') as fp: + while block := fp.read(blocksize): + blocks.append(Block(offset=offset, entropy=metric_entropy(block))) + offset += len(block) + avg_entropy = _calculate_avg_entropy(blocks, size, blocksize) + return Entropy(avg_entropy=avg_entropy, blocksize=blocksize, blocks=blocks) + + +def _get_blocksize(file_size: int) -> int: + # this will always give 32 to 64 points to plot (except the file is smaller than 15 KiB or larger than 32 MiB) + blocksize = 2 ** (math.floor(math.log2(file_size)) - 5) + return min(BLOCK_SIZE_MAX, max(blocksize, BLOCK_SIZE_MIN)) + + +def _calculate_avg_entropy(blocks: list[Block], file_size: int, blocksize: int) -> float: + avg_entropy = 0 + for block in blocks[:-1]: + avg_entropy += block.entropy * blocksize + last_block_size = file_size - blocks[-1].offset + avg_entropy += blocks[-1].entropy * last_block_size + return avg_entropy / file_size diff --git a/src/plugins/analysis/binary_forensics/internal/unblob.py b/src/plugins/analysis/binary_forensics/internal/unblob.py new file mode 100644 index 000000000..b959cffbc --- /dev/null +++ b/src/plugins/analysis/binary_forensics/internal/unblob.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import re + +from pydantic import BaseModel + +UNBLOB_REGEX = re.compile(r'start: (\d+), end: \d+, size: (\d+), type: (\w+)') + + +class UnblobResult(BaseModel): + offset: int + size: int + type: str + + +def get_unblob_result(unpacking_result: dict) -> list[UnblobResult] | None: + if unpacking_result['plugin_used'] != 'generic_carver': + return None + matches = UNBLOB_REGEX.findall(unpacking_result.get('output', '')) + if not matches: + return None + return [UnblobResult(offset=offset, size=size, type=type_) for offset, size, type_ in matches] diff --git a/src/plugins/analysis/binary_forensics/view/binary_forensics.html b/src/plugins/analysis/binary_forensics/view/binary_forensics.html new file mode 100644 index 000000000..aa7f1516b --- /dev/null +++ b/src/plugins/analysis/binary_forensics/view/binary_forensics.html @@ -0,0 +1,303 @@ +{% extends "analysis_plugins/general_information.html" %} + +{% block analysis_result_details %} + + + Average Entropy + {{ analysis_result.entropy.avg_entropy | nice_number }} + + + Entropy Plot + + + + + + Binwalk Matches + +
+
    + {% for item in analysis_result.file_matches %} +
  • + + + + + + + + + + + + + + + + + + + + + + + + + +
    offset + + {{ item.offset | nice_number }} + +
    size{{ item.size | nice_number }}
    name{{ item.name }}
    description + {% if ", " in item.description %} + {% for part in item.description.split(", ") %} + {% if loop.first %} + {{ part }} +
      + {% else %} +
    • {{ part }}
    • + {% endif %} + {% endfor %} +
    + {% else %} + {{ item.description }} + {% endif %} +
    confidence{{ item.confidence }}
    id{{ item.id }}
    +
  • + {% endfor %} +
+
+ + + {% if analysis_result.unblob_matches %} + + Unblob Matches + +
+
    + {% for item in analysis_result.unblob_matches %} +
  • + + + + + + + + + + + + + +
    offset + + {{ item.offset | nice_number }} + +
    size{{ item.size | nice_number }}
    type{{ item.type }}
    +
  • + {% endfor %} +
+
+ + + {% endif %} + + Entropy Data + + Blocksize: {{ analysis_result.entropy.blocksize | number_format }} +
+
+ +
+ + + + + + + +{% endblock %} diff --git a/src/plugins/analysis/binwalk/apt-pkgs-runtime.txt b/src/plugins/analysis/binwalk/apt-pkgs-runtime.txt deleted file mode 100644 index 8f748ce37..000000000 --- a/src/plugins/analysis/binwalk/apt-pkgs-runtime.txt +++ /dev/null @@ -1 +0,0 @@ -xvfb diff --git a/src/plugins/analysis/binwalk/code/binwalk.py b/src/plugins/analysis/binwalk/code/binwalk.py deleted file mode 100644 index 10ba5e71d..000000000 --- a/src/plugins/analysis/binwalk/code/binwalk.py +++ /dev/null @@ -1,82 +0,0 @@ -from __future__ import annotations - -from base64 import b64encode -from pathlib import Path -from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, List - -import binwalk -from pydantic import BaseModel, Field - -import config -from analysis.plugin import AnalysisPluginV0 -from analysis.plugin.plugin import AnalysisFailedError -from helperFunctions.install import OperateInDirectory -from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED - -if TYPE_CHECKING: - import io - - from binwalk.modules.entropy import Entropy - from binwalk.modules.signature import Signature - - -class SignatureScanResult(BaseModel): - offset: int - description: str - - -class AnalysisPlugin(AnalysisPluginV0): - class Schema(BaseModel): - entropy_analysis_graph: str = Field( - description='An entropy analysis graph generated by binwalk as base64 string.', - ) - signature_analysis: List[SignatureScanResult] = Field( - description='The result of the signature analysis from binwalk.', - ) - - def __init__(self): - super().__init__( - metadata=AnalysisPluginV0.MetaData( - name='binwalk', - description='binwalk signature and entropy analysis', - version='1.0.0', - Schema=self.Schema, - mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED], - ), - ) - - def analyze(self, file_handle: io.FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: - del virtual_file_path, analyses - - # FixMe: fix formatting once Python 3.8 is deprecated (2024-10-31) - with TemporaryDirectory( - prefix='fact_analysis_binwalk_', dir=config.backend.temp_dir_path - ) as tmp_dir, OperateInDirectory(tmp_dir): - output: tuple[Signature, Entropy] = binwalk.scan( - file_handle.name, - signature=True, - entropy=True, - save=True, - quiet=True, - ) - signature_result, entropy_result = output - if not entropy_result.output_file or not (pic_path := Path(entropy_result.output_file)).is_file(): - raise AnalysisFailedError('Entropy output file is missing') - return self.Schema( - entropy_analysis_graph=b64encode(pic_path.read_bytes()).decode(), - signature_analysis=[ - SignatureScanResult(offset=i.offset, description=i.description) for i in signature_result.results - ], - ) - - def summarize(self, result: Schema) -> list: - summary = [] - for item in result.signature_analysis: # type: SignatureScanResult - if 'entropy edge' in item.description: - continue - if ',' in item.description: - summary.append(item.description.split(',', maxsplit=1)[0]) - elif item.description: - summary.append(item.description) - return summary diff --git a/src/plugins/analysis/binwalk/dnf-pkgs-runtime.txt b/src/plugins/analysis/binwalk/dnf-pkgs-runtime.txt deleted file mode 100644 index 927a8a24d..000000000 --- a/src/plugins/analysis/binwalk/dnf-pkgs-runtime.txt +++ /dev/null @@ -1 +0,0 @@ -xorg-x11-server-Xvfb diff --git a/src/plugins/analysis/binwalk/install.py b/src/plugins/analysis/binwalk/install.py deleted file mode 100644 index 1ace7164c..000000000 --- a/src/plugins/analysis/binwalk/install.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python3 # noqa: EXE001 - -import logging -import urllib.request -from pathlib import Path - -try: - from helperFunctions.install import OperateInDirectory, check_distribution, is_virtualenv, run_cmd_with_logging - from plugins.installer import AbstractPluginInstaller -except ImportError: - import sys - - SRC_PATH = Path(__file__).absolute().parent.parent.parent.parent - sys.path.append(str(SRC_PATH)) - - from helperFunctions.install import OperateInDirectory, check_distribution, is_virtualenv, run_cmd_with_logging - from plugins.installer import AbstractPluginInstaller - - -BINWALK_VERSION = '2.4.1' - - -class BinwalkInstaller(AbstractPluginInstaller): - base_path = Path(__file__).resolve().parent - - def build(self): - url_binwalk = f'https://github.com/OSPG/binwalk/archive/refs/tags/v{BINWALK_VERSION}.tar.gz' - dest_binwalk = f'binwalk-v{BINWALK_VERSION}.tar.gz' - urllib.request.urlretrieve(url_binwalk, dest_binwalk) - - run_cmd_with_logging(f'tar -xf {dest_binwalk}') - - with OperateInDirectory(f'binwalk-{BINWALK_VERSION}'): - if is_virtualenv(): - run_cmd_with_logging('pip install -U .') - else: - run_cmd_with_logging('sudo -EH pip3 install -U .') - - -# Alias for generic use -Installer = BinwalkInstaller - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - distribution = check_distribution() - installer = Installer(distribution) - installer.install() diff --git a/src/plugins/analysis/binwalk/requirements.txt b/src/plugins/analysis/binwalk/requirements.txt deleted file mode 100644 index cb82452ad..000000000 --- a/src/plugins/analysis/binwalk/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -capstone==4.0.2 -cstruct==4.0 -matplotlib==3.7.3 diff --git a/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py b/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py deleted file mode 100644 index 89af0d208..000000000 --- a/src/plugins/analysis/binwalk/test/test_plugin_binwalk.py +++ /dev/null @@ -1,30 +0,0 @@ -import pytest - -from test.common_helper import get_test_data_dir - -from ..code.binwalk import AnalysisPlugin - -TEST_FILE = get_test_data_dir() / 'container' / 'test.zip' - - -@pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) -class TestPluginBinwalk: - def test_signature_analysis(self, analysis_plugin): - assert TEST_FILE.is_file(), 'test file is missing' - with TEST_FILE.open() as fp: - result = analysis_plugin.analyze(fp, {}, {}) - assert len(result.signature_analysis) > 0, 'no binwalk signature analysis found' - assert 'Zip archive data' in result.signature_analysis[0].description, 'no valid binwalk signature analysis' - - def test_entropy_graph(self, analysis_plugin): - assert TEST_FILE.is_file(), 'test file is missing' - with TEST_FILE.open() as fp: - result = analysis_plugin.analyze(fp, {}, {}) - assert len(result.entropy_analysis_graph) > 0, 'no binwalk entropy graph found' - - def test_summary(self, analysis_plugin): - with TEST_FILE.open() as fp: - test_result = analysis_plugin.analyze(fp, {}, {}) - summary = analysis_plugin.summarize(test_result) - for line in summary: - assert line in {'Zip archive data', 'End of Zip archive'} diff --git a/src/plugins/analysis/binwalk/view/binwalk.html b/src/plugins/analysis/binwalk/view/binwalk.html deleted file mode 100644 index f900959cd..000000000 --- a/src/plugins/analysis/binwalk/view/binwalk.html +++ /dev/null @@ -1,51 +0,0 @@ -{% extends "analysis_plugins/general_information.html" %} - - -{% block analysis_result_details %} - - Signature Analysis - - - - - - - - {%- for item in analysis_result.signature_analysis -%} - - - - - - {%- endfor -%} -
OffsetOffset (hex)Description
{{ item.offset }}{{ item.offset | hex }} - {%- if "," in item.description -%} - {%- set outer_loop = loop -%} - {%- for element in item.description.split(",") -%} - {%- if loop.first -%} - {{ element }} - -
    - {%- else -%} -
  • {{ element }}
  • - {%- endif -%} - {%- endfor -%} -
- {%- else -%} - {{ item.description }} - {%- endif -%} -
- - - - - Entropy Graph - - binwalk entropy graph - - -{% endblock %}