diff --git a/src/plugins/analysis/cve_lookup/internal/data_parsing.py b/src/plugins/analysis/cve_lookup/internal/data_parsing.py index 15e48f181..3227a42e0 100644 --- a/src/plugins/analysis/cve_lookup/internal/data_parsing.py +++ b/src/plugins/analysis/cve_lookup/internal/data_parsing.py @@ -1,9 +1,9 @@ from __future__ import annotations +import lzma import re from pathlib import Path -from shlex import split -from subprocess import run +from typing import Iterable import ijson import requests @@ -28,15 +28,6 @@ def _retrieve_url(download_url: str, target: Path): fp.write(chunk) -def download_and_decompress_file() -> Path: - """ - Downloads data from a URL, saves it to a file, decompresses it, and returns the path. - """ - _retrieve_url(CVE_URL, OUTPUT_FILE) - run(split(f'unxz --force {OUTPUT_FILE.name}'), cwd=DB_DIR, check=True) - return DB_DIR / OUTPUT_FILE.stem # the .xz suffix was removed during extraction - - def extract_english_summary(descriptions: list) -> str: for description in descriptions: if description['lang'] == 'en': @@ -88,17 +79,14 @@ def extract_data_from_cve(cve_item: dict) -> CveEntry: return CveEntry(cve_id=cve_id, summary=summary, impact=impact, cpe_entries=cpe_entries) -def parse_data() -> list[CveEntry]: +def parse_data() -> Iterable[CveEntry]: """ Parse the data from the JSON file and return a list of CveEntry objects. """ - cve_path = download_and_decompress_file() - with cve_path.open('rb') as fp: - # the file is huge, so we use ijson to stream the data + _retrieve_url(CVE_URL, OUTPUT_FILE) + # the downloaded file is a xz archive, so we use lzma to open it: + with lzma.open(OUTPUT_FILE, 'r') as fp: + # inside the archive is a huge JSON file, so we use ijson to stream the data for cve_item in ijson.items(fp, 'cve_items.item'): yield extract_data_from_cve(cve_item) - cve_path.unlink() # remove the temporary file after we are done - - -if __name__ == '__main__': - parse_data() + OUTPUT_FILE.unlink() # remove the temporary file after we are done