Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/plugins/analysis/binary_forensics/code/binary_forensics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from pydantic import BaseModel
from semver import Version

import config
from analysis.plugin import AnalysisPluginV0
from plugins.analysis.binary_forensics.internal.binwalk import BinwalkSignatureResult, get_binwalk_signature_analysis
from plugins.analysis.binary_forensics.internal.entropy import Entropy, get_entropy_analysis
from plugins.analysis.binary_forensics.internal.unblob import UnblobResult, get_unblob_result
from plugins.mime_blacklists import MIME_BLACKLIST_COMPRESSED

if TYPE_CHECKING:
from io import FileIO


class AnalysisPlugin(AnalysisPluginV0):
class Schema(BaseModel):
entropy: Entropy
file_matches: list[BinwalkSignatureResult]
# unblob may only exist if the file was unpacked using the generic carver
unblob_matches: list[UnblobResult] | None

def __init__(self):
super().__init__(
metadata=AnalysisPluginV0.MetaData(
name='binary_forensics',
description='binary forensic analysis (entropy and Binwalk file signatures)',
version=Version(1, 0, 0),
Schema=self.Schema,
mime_blacklist=['audio/', 'image/', 'video/', 'text/', *MIME_BLACKLIST_COMPRESSED],
),
)
self.thresholds = {
'very high entropy': self._get_plugin_cfg_entry('very_high_entropy_threshold', 0.95),
'high entropy': self._get_plugin_cfg_entry('high_entropy_threshold', 0.8),
'medium high entropy': self._get_plugin_cfg_entry('medium_high_entropy_threshold', 0.6),
'medium entropy': self._get_plugin_cfg_entry('medium_entropy_threshold', 0.4),
'medium low entropy': self._get_plugin_cfg_entry('medium_low_entropy_threshold', 0.2),
'low entropy': self._get_plugin_cfg_entry('low_entropy_threshold', 0.05),
}

def _get_plugin_cfg_entry(self, name: str, default: float) -> float:
entry = getattr(config.backend.plugin.get(self.metadata.name, {}), name, default)
try:
return float(entry)
except (TypeError, ValueError):
logging.warning(f'Failed to parse config entry {name} of plugin {self.metadata.name} (should be float)')
return default

def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema:
del virtual_file_path

return self.Schema(
entropy=get_entropy_analysis(file_handle),
file_matches=get_binwalk_signature_analysis(file_handle, timeout=self.metadata.timeout),
unblob_matches=get_unblob_result(analyses['unpacker']),
)

def summarize(self, result: Schema) -> list:
return [*self._summarize_entropy(result.entropy), *self._summarize_binwalk_result(result.file_matches)]

def _summarize_entropy(self, result: Entropy) -> list[str]:
for key, value in self.thresholds.items():
if result.avg_entropy > value:
return [key]
return ['very low entropy']

@staticmethod
def _summarize_binwalk_result(binwalk_result: list[BinwalkSignatureResult]) -> list[str]:
summary = []
for item in binwalk_result:
summary.append(item.name)
return summary
104 changes: 104 additions & 0 deletions src/plugins/analysis/binary_forensics/internal/binwalk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

import json
import logging
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING

from docker.errors import DockerException
from docker.types import Mount
from pydantic import BaseModel
from requests.exceptions import JSONDecodeError, ReadTimeout

from analysis.plugin.plugin import AnalysisFailedError
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
from io import FileIO


DOCKER_IMAGE = 'refirmlabs/binwalk:latest'


class BinwalkSignatureResult(BaseModel):
offset: int
id: str
size: int
name: str
confidence: int
description: str


def get_binwalk_signature_analysis(file: FileIO, timeout: int) -> list[BinwalkSignatureResult]:
return _parse_binwalk_output(_get_docker_output(file, timeout))


def _parse_binwalk_output(binwalk_output: list[dict]) -> list[BinwalkSignatureResult]:
"""
Expected result structure: (binwalk 3.1.1)
[
{
'Analysis': {
'file_path': '/io/input',
'file_map': [
{
'offset': <int>,
'id': <str>,
'size': <int>,
'name': <str>,
'confidence': <int>,
'description': <str>,
'always_display': <bool>,
'extraction_declined': <bool>,
},
...
],
}
}
]
The outer array has only one entry, since we analyze only one file
"""
try:
return [
BinwalkSignatureResult(
offset=file_result['offset'],
id=file_result['id'],
size=file_result['size'],
name=file_result['name'],
confidence=file_result['confidence'],
description=file_result['description'],
)
for file_result in binwalk_output[0]['Analysis']['file_map']
]
except (KeyError, IndexError) as err:
# FixMe: sadly, there are no tags for the docker container versions, so we can't pin it at the moment
# this should not happen -- if it happens, the plugin needs to be fixed
logging.exception('Failed to binwalk result')
raise AnalysisFailedError('Failed to binwalk result') from err


def _get_docker_output(file: FileIO, timeout: int) -> list[dict]:
container_input_path = '/io/input'
container_output_path = '/io/output'
with NamedTemporaryFile() as temp_file:
Path(temp_file.name).touch()
try:
run_docker_container(
DOCKER_IMAGE,
combine_stderr_stdout=True,
timeout=timeout - 1,
command=f'{container_input_path} -l {container_output_path}',
mounts=[
Mount(container_input_path, file.name, type='bind', read_only=True),
Mount(container_output_path, temp_file.name, type='bind', read_only=False),
],
logging_label='binwalk',
)
return json.loads(Path(temp_file.name).read_text())
except ReadTimeout as err:
raise AnalysisFailedError('Docker container timed out') from err
except (DockerException, OSError) as err:
raise AnalysisFailedError('Docker process error') from err
except JSONDecodeError as err:
raise AnalysisFailedError('Docker output JSON parsing error') from err
57 changes: 57 additions & 0 deletions src/plugins/analysis/binary_forensics/internal/entropy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

import math
from pathlib import Path
from typing import TYPE_CHECKING, List

from entropython import metric_entropy
from pydantic import BaseModel

if TYPE_CHECKING:
from io import FileIO

BLOCK_SIZE_MIN = 2**10 # 1 KiB
BLOCK_SIZE_MAX = 2**20 # 1 MiB


class Block(BaseModel):
offset: int
entropy: float


class Entropy(BaseModel):
avg_entropy: float
blocks: List[Block]
blocksize: int


def get_entropy_analysis(file_handle: FileIO) -> Entropy:
file = Path(file_handle.name)
size = file.stat().st_size
if size == 0:
return Entropy(avg_entropy=0, blocksize=0, blocks=[])

blocksize = _get_blocksize(size)
blocks = []
offset = 0
with file.open('rb') as fp:
while block := fp.read(blocksize):
blocks.append(Block(offset=offset, entropy=metric_entropy(block)))
offset += len(block)
avg_entropy = _calculate_avg_entropy(blocks, size, blocksize)
return Entropy(avg_entropy=avg_entropy, blocksize=blocksize, blocks=blocks)


def _get_blocksize(file_size: int) -> int:
# this will always give 32 to 64 points to plot (except the file is smaller than 15 KiB or larger than 32 MiB)
blocksize = 2 ** (math.floor(math.log2(file_size)) - 5)
return min(BLOCK_SIZE_MAX, max(blocksize, BLOCK_SIZE_MIN))


def _calculate_avg_entropy(blocks: list[Block], file_size: int, blocksize: int) -> float:
avg_entropy = 0
for block in blocks[:-1]:
avg_entropy += block.entropy * blocksize
last_block_size = file_size - blocks[-1].offset
avg_entropy += blocks[-1].entropy * last_block_size
return avg_entropy / file_size
22 changes: 22 additions & 0 deletions src/plugins/analysis/binary_forensics/internal/unblob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

import re

from pydantic import BaseModel

UNBLOB_REGEX = re.compile(r'start: (\d+), end: \d+, size: (\d+), type: (\w+)')


class UnblobResult(BaseModel):
offset: int
size: int
type: str


def get_unblob_result(unpacking_result: dict) -> list[UnblobResult] | None:
if unpacking_result['plugin_used'] != 'generic_carver':
return None
matches = UNBLOB_REGEX.findall(unpacking_result.get('output', ''))
if not matches:
return None
return [UnblobResult(offset=offset, size=size, type=type_) for offset, size, type_ in matches]
Loading