diff --git a/vulnerabilities/importers/curl.py b/vulnerabilities/importers/curl.py index a7f5e86fa..b2723ce11 100644 --- a/vulnerabilities/importers/curl.py +++ b/vulnerabilities/importers/curl.py @@ -39,6 +39,15 @@ class CurlImporter(Importer): importer_name = "Curl Importer" api_url = "https://curl.se/docs/vuln.json" + def __init__(self, purl=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + if self.purl.type != "generic" or self.purl.name != "curl": + print( + f"Warning: This importer handles curl package vulnerabilities. Current PURL: {self.purl!s}" + ) + def fetch(self) -> Iterable[Mapping]: response = fetch_response(self.api_url) return response.json() @@ -48,11 +57,42 @@ def advisory_data(self) -> Iterable[AdvisoryData]: for data in raw_data: cve_id = data.get("aliases") or [] cve_id = cve_id[0] if len(cve_id) > 0 else None - if not cve_id.startswith("CVE"): - package = data.get("database_specific").get("package") + if not cve_id or not cve_id.startswith("CVE"): + package = data.get("database_specific", {}).get("package", "") logger.error(f"Invalid CVE ID: {cve_id} in package {package}") continue - yield parse_advisory_data(data) + + advisory = parse_advisory_data(data) + + if self.purl and not self._advisory_affects_purl(advisory): + continue + + yield advisory + + def _advisory_affects_purl(self, advisory: AdvisoryData) -> bool: + if not self.purl: + return True + + if self.purl.type != "generic" or self.purl.name != "curl": + return False + + for affected_package in advisory.affected_packages: + if affected_package.package.name != "curl": + continue + + if self.purl.version and affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + + if purl_version not in affected_package.affected_version_range: + continue + except Exception as e: + logger.error(f"Error checking version {self.purl.version}: {e}") + continue + + return True + + return False def parse_advisory_data(raw_data) -> AdvisoryData: diff --git a/vulnerabilities/tests/test_curl.py b/vulnerabilities/tests/test_curl.py index 6822e9677..dd4895d3a 100644 --- a/vulnerabilities/tests/test_curl.py +++ b/vulnerabilities/tests/test_curl.py @@ -7,10 +7,16 @@ # See https://aboutcode.org for more information about nexB OSS projects. # +import json import os from unittest import TestCase from unittest.mock import patch +import pytest +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importers.curl import CurlImporter from vulnerabilities.importers.curl import get_cwe_from_curl_advisory from vulnerabilities.importers.curl import parse_advisory_data from vulnerabilities.tests import util_tests @@ -71,3 +77,52 @@ def test_get_cwe_from_curl_advisory(self): for advisory in mock_advisory: mock_cwe_list.extend(get_cwe_from_curl_advisory(advisory)) assert mock_cwe_list == [311] + + +@pytest.fixture +def mock_curl_api(monkeypatch): + test_files = [ + "curl_advisory_mock1.json", + "curl_advisory_mock2.json", + "curl_advisory_mock3.json", + ] + + BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + TEST_DATA = os.path.join(BASE_DIR, "test_data/curl") + data = [] + for fname in test_files: + with open(os.path.join(TEST_DATA, fname)) as f: + data.append(json.load(f)) + + def mock_fetch(self): + return data + + monkeypatch.setattr(CurlImporter, "fetch", mock_fetch) + + +def test_curl_importer_package_first(monkeypatch, mock_curl_api): + purl = PackageURL(type="generic", namespace="curl.se", name="curl") + importer = CurlImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert len(advisories) == 3 + for adv in advisories: + assert any(ap.package.name == "curl" for ap in adv.affected_packages) + + +def test_curl_importer_package_first_version(monkeypatch, mock_curl_api): + purl = PackageURL(type="generic", namespace="curl.se", name="curl", version="8.6.0") + importer = CurlImporter(purl=purl) + advisories = list(importer.advisory_data()) + + assert len(advisories) == 1 + assert advisories[0].aliases[0] == "CVE-2024-2379" + + for ap in advisories[0].affected_packages: + assert ap.affected_version_range.contains(SemverVersion("8.6.0")) + + +def test_curl_importer_package_first_version_not_affected(monkeypatch, mock_curl_api): + purl = PackageURL(type="generic", namespace="curl.se", name="curl", version="9.9.9") + importer = CurlImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert advisories == []