Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 62 additions & 20 deletions src/storage/db_interface_common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import logging
from operator import or_
from typing import TYPE_CHECKING, Dict, Iterable, List

Expand Down Expand Up @@ -35,7 +34,7 @@
'software_components',
'users_and_passwords',
]
Summary = Dict[str, List[str]]
Summary = Dict[str, List[str]] # structure: {summary_entry: [file_UID_1, ...]}


class DbInterfaceCommon(ReadOnlyDbInterface):
Expand Down Expand Up @@ -320,31 +319,74 @@ def get_complete_object_including_all_summaries(self, uid: str) -> FileObject:
if fo is None:
raise Exception(f'UID not found: {uid}')
fo.list_of_all_included_files = self.get_list_of_all_included_files(fo)
summary = self.get_all_summaries(fo)
for plugin, analysis_result in fo.processed_analysis.items():
analysis_result['summary'] = self.get_summary(fo, plugin)
analysis_result['summary'] = summary.get(plugin, {})
return fo

def get_all_summaries(self, fo: FileObject) -> dict[str, Summary]:
if isinstance(fo, Firmware):
summary = self.get_fw_summary(fo.uid)
# In the "all summaries" case we include the summary of the FW object itself.
# Adding it from the FO is faster than the otherwise necessary outerjoin with or_().
for plugin, analysis_result in fo.processed_analysis.items():
for entry in analysis_result.get('summary', []):
summary.setdefault(plugin, {}).setdefault(entry, []).append(fo.uid)
return summary
return self.get_fo_summary(fo)

def get_summary(self, fo: FileObject, selected_analysis: str) -> Summary | None:
if selected_analysis not in fo.processed_analysis:
logging.warning(f'Analysis {selected_analysis} not available on {fo.uid}')
return None
if 'summary' not in fo.processed_analysis[selected_analysis]:
return None
if not isinstance(fo, Firmware):
included_files = fo.list_of_all_included_files or self.get_list_of_all_included_files(fo)
else:
included_files = self.get_all_files_in_fw(fo.uid).union({fo.uid})
return self._collect_summary_for_uid_list(included_files, selected_analysis)
if isinstance(fo, Firmware):
summary = self.get_fw_summary_for_plugin(fo.uid, selected_analysis)
# merge with existing summary of FW
for entry in fo.processed_analysis.get(selected_analysis, {}).get('summary', []):
summary.setdefault(entry, []).append(fo.uid)
return summary
return self.get_fo_summary_for_plugin(fo, selected_analysis)

def get_fo_summary(self, fo: FileObject, analysis_filter: str | None = None) -> dict[str, Summary]:
if not fo.list_of_all_included_files:
fo.list_of_all_included_files = self.get_list_of_all_included_files(fo)
return self._get_summary_for_uid_list(fo.list_of_all_included_files, analysis_filter)

def get_fo_summary_for_plugin(self, fo: FileObject, plugin: str) -> Summary:
return self.get_fo_summary(fo, plugin).get(plugin, {})

def get_fw_summary(self, uid: str, analysis_filter: str | None = None) -> dict[str, Summary]:
"""
Retrieve the summaries of all plugins for all included files.
This does *not* include the summary of the root object (a.k.a. the firmware container) itself!
"""
with self.get_read_only_session() as session:
query = (
select(AnalysisEntry.uid, AnalysisEntry.summary, AnalysisEntry.plugin)
.join(fw_files_table, fw_files_table.c.file_uid == AnalysisEntry.uid)
.filter(fw_files_table.c.root_uid == uid)
)
if analysis_filter is not None:
query = query.filter(AnalysisEntry.plugin == analysis_filter)
return self._collect_summary_result(query, session)

def _collect_summary_for_uid_list(self, uid_list: set[str] | list[str], plugin: str) -> Summary:
def get_fw_summary_for_plugin(self, root_uid: str, plugin: str) -> Summary:
return self.get_fw_summary(root_uid, plugin).get(plugin, {})

def _get_summary_for_uid_list(
self, uid_list: set[str] | list[str], analysis_filter: str | None = None
) -> dict[str, Summary]:
with self.get_read_only_session() as session:
query = select(AnalysisEntry.uid, AnalysisEntry.summary).filter(
AnalysisEntry.plugin == plugin, AnalysisEntry.uid.in_(uid_list)
query = select(AnalysisEntry.uid, AnalysisEntry.summary, AnalysisEntry.plugin).filter(
AnalysisEntry.uid.in_(uid_list)
)
summary = {}
for uid, summary_list in session.execute(query): # type: str, list[str]
for item in set(summary_list or []):
summary.setdefault(item, []).append(uid)
if analysis_filter is not None:
query = query.filter(AnalysisEntry.plugin == analysis_filter)
return self._collect_summary_result(query, session)

@staticmethod
def _collect_summary_result(query: Select, session) -> dict[str, Summary]:
summary = {}
for uid, summary_list, plugin in session.execute(query): # type: str, list[str], str
for item in set(summary_list or []):
summary.setdefault(plugin, {}).setdefault(item, []).append(uid)
return summary

# ===== tags =====
Expand Down
26 changes: 26 additions & 0 deletions src/test/integration/storage/test_db_interface_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,32 @@ def test_get_complete_object(backend_db, common_db):
assert isinstance(result, FileObject)
expected_summary = {'entry1': [parent_fo.uid], 'entry2': [parent_fo.uid, child_fo.uid], 'entry3': [child_fo.uid]}
_summary_is_equal(expected_summary, result.processed_analysis['test_plugin']['summary'])
_summary_is_equal(expected_summary, common_db.get_fo_summary_for_plugin(parent_fo, 'test_plugin'))


def test_get_summary(backend_db, common_db):
fw, parent_fo, child_fo = create_fw_with_parent_and_child()
fw.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry0'])
parent_fo.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry1', 'entry2'])
child_fo.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry2', 'entry3'])
backend_db.insert_multiple_objects(fw, parent_fo, child_fo)

expected_summary = {
'entry0': [fw.uid],
'entry1': [parent_fo.uid],
'entry2': [parent_fo.uid, child_fo.uid],
'entry3': [child_fo.uid],
}
fw_summary = common_db.get_all_summaries(fw)['test_plugin']
_summary_is_equal(expected_summary, fw_summary)
fw_plugin_summary = common_db.get_summary(fw, 'test_plugin')
_summary_is_equal(expected_summary, fw_plugin_summary)

expected_summary.pop('entry0')
fo_summary = common_db.get_all_summaries(parent_fo)['test_plugin']
_summary_is_equal(expected_summary, fo_summary)
fo_plugin_summary = common_db.get_summary(parent_fo, 'test_plugin')
_summary_is_equal(expected_summary, fo_plugin_summary)


def _summary_is_equal(expected_summary, summary):
Expand Down