diff --git a/src/storage/db_interface_common.py b/src/storage/db_interface_common.py index 2f6228223..000e7500a 100644 --- a/src/storage/db_interface_common.py +++ b/src/storage/db_interface_common.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging from operator import or_ from typing import TYPE_CHECKING, Dict, Iterable, List @@ -35,7 +34,7 @@ 'software_components', 'users_and_passwords', ] -Summary = Dict[str, List[str]] +Summary = Dict[str, List[str]] # structure: {summary_entry: [file_UID_1, ...]} class DbInterfaceCommon(ReadOnlyDbInterface): @@ -320,31 +319,74 @@ def get_complete_object_including_all_summaries(self, uid: str) -> FileObject: if fo is None: raise Exception(f'UID not found: {uid}') fo.list_of_all_included_files = self.get_list_of_all_included_files(fo) + summary = self.get_all_summaries(fo) for plugin, analysis_result in fo.processed_analysis.items(): - analysis_result['summary'] = self.get_summary(fo, plugin) + analysis_result['summary'] = summary.get(plugin, {}) return fo + def get_all_summaries(self, fo: FileObject) -> dict[str, Summary]: + if isinstance(fo, Firmware): + summary = self.get_fw_summary(fo.uid) + # In the "all summaries" case we include the summary of the FW object itself. + # Adding it from the FO is faster than the otherwise necessary outerjoin with or_(). + for plugin, analysis_result in fo.processed_analysis.items(): + for entry in analysis_result.get('summary', []): + summary.setdefault(plugin, {}).setdefault(entry, []).append(fo.uid) + return summary + return self.get_fo_summary(fo) + def get_summary(self, fo: FileObject, selected_analysis: str) -> Summary | None: - if selected_analysis not in fo.processed_analysis: - logging.warning(f'Analysis {selected_analysis} not available on {fo.uid}') - return None - if 'summary' not in fo.processed_analysis[selected_analysis]: - return None - if not isinstance(fo, Firmware): - included_files = fo.list_of_all_included_files or self.get_list_of_all_included_files(fo) - else: - included_files = self.get_all_files_in_fw(fo.uid).union({fo.uid}) - return self._collect_summary_for_uid_list(included_files, selected_analysis) + if isinstance(fo, Firmware): + summary = self.get_fw_summary_for_plugin(fo.uid, selected_analysis) + # merge with existing summary of FW + for entry in fo.processed_analysis.get(selected_analysis, {}).get('summary', []): + summary.setdefault(entry, []).append(fo.uid) + return summary + return self.get_fo_summary_for_plugin(fo, selected_analysis) + + def get_fo_summary(self, fo: FileObject, analysis_filter: str | None = None) -> dict[str, Summary]: + if not fo.list_of_all_included_files: + fo.list_of_all_included_files = self.get_list_of_all_included_files(fo) + return self._get_summary_for_uid_list(fo.list_of_all_included_files, analysis_filter) + + def get_fo_summary_for_plugin(self, fo: FileObject, plugin: str) -> Summary: + return self.get_fo_summary(fo, plugin).get(plugin, {}) + + def get_fw_summary(self, uid: str, analysis_filter: str | None = None) -> dict[str, Summary]: + """ + Retrieve the summaries of all plugins for all included files. + This does *not* include the summary of the root object (a.k.a. the firmware container) itself! + """ + with self.get_read_only_session() as session: + query = ( + select(AnalysisEntry.uid, AnalysisEntry.summary, AnalysisEntry.plugin) + .join(fw_files_table, fw_files_table.c.file_uid == AnalysisEntry.uid) + .filter(fw_files_table.c.root_uid == uid) + ) + if analysis_filter is not None: + query = query.filter(AnalysisEntry.plugin == analysis_filter) + return self._collect_summary_result(query, session) - def _collect_summary_for_uid_list(self, uid_list: set[str] | list[str], plugin: str) -> Summary: + def get_fw_summary_for_plugin(self, root_uid: str, plugin: str) -> Summary: + return self.get_fw_summary(root_uid, plugin).get(plugin, {}) + + def _get_summary_for_uid_list( + self, uid_list: set[str] | list[str], analysis_filter: str | None = None + ) -> dict[str, Summary]: with self.get_read_only_session() as session: - query = select(AnalysisEntry.uid, AnalysisEntry.summary).filter( - AnalysisEntry.plugin == plugin, AnalysisEntry.uid.in_(uid_list) + query = select(AnalysisEntry.uid, AnalysisEntry.summary, AnalysisEntry.plugin).filter( + AnalysisEntry.uid.in_(uid_list) ) - summary = {} - for uid, summary_list in session.execute(query): # type: str, list[str] - for item in set(summary_list or []): - summary.setdefault(item, []).append(uid) + if analysis_filter is not None: + query = query.filter(AnalysisEntry.plugin == analysis_filter) + return self._collect_summary_result(query, session) + + @staticmethod + def _collect_summary_result(query: Select, session) -> dict[str, Summary]: + summary = {} + for uid, summary_list, plugin in session.execute(query): # type: str, list[str], str + for item in set(summary_list or []): + summary.setdefault(plugin, {}).setdefault(item, []).append(uid) return summary # ===== tags ===== diff --git a/src/test/integration/storage/test_db_interface_common.py b/src/test/integration/storage/test_db_interface_common.py index 2e360e5dd..157519e48 100644 --- a/src/test/integration/storage/test_db_interface_common.py +++ b/src/test/integration/storage/test_db_interface_common.py @@ -149,6 +149,32 @@ def test_get_complete_object(backend_db, common_db): assert isinstance(result, FileObject) expected_summary = {'entry1': [parent_fo.uid], 'entry2': [parent_fo.uid, child_fo.uid], 'entry3': [child_fo.uid]} _summary_is_equal(expected_summary, result.processed_analysis['test_plugin']['summary']) + _summary_is_equal(expected_summary, common_db.get_fo_summary_for_plugin(parent_fo, 'test_plugin')) + + +def test_get_summary(backend_db, common_db): + fw, parent_fo, child_fo = create_fw_with_parent_and_child() + fw.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry0']) + parent_fo.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry1', 'entry2']) + child_fo.processed_analysis['test_plugin'] = generate_analysis_entry(summary=['entry2', 'entry3']) + backend_db.insert_multiple_objects(fw, parent_fo, child_fo) + + expected_summary = { + 'entry0': [fw.uid], + 'entry1': [parent_fo.uid], + 'entry2': [parent_fo.uid, child_fo.uid], + 'entry3': [child_fo.uid], + } + fw_summary = common_db.get_all_summaries(fw)['test_plugin'] + _summary_is_equal(expected_summary, fw_summary) + fw_plugin_summary = common_db.get_summary(fw, 'test_plugin') + _summary_is_equal(expected_summary, fw_plugin_summary) + + expected_summary.pop('entry0') + fo_summary = common_db.get_all_summaries(parent_fo)['test_plugin'] + _summary_is_equal(expected_summary, fo_summary) + fo_plugin_summary = common_db.get_summary(parent_fo, 'test_plugin') + _summary_is_equal(expected_summary, fo_plugin_summary) def _summary_is_equal(expected_summary, summary):