diff --git a/vulnerabilities/importers/apache_kafka.py b/vulnerabilities/importers/apache_kafka.py index 27c244b2a..477ccbbe0 100644 --- a/vulnerabilities/importers/apache_kafka.py +++ b/vulnerabilities/importers/apache_kafka.py @@ -15,6 +15,8 @@ from bs4 import BeautifulSoup from dateutil.parser import parse from packageurl import PackageURL +from univers.version_range import VersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -97,16 +99,65 @@ class ApacheKafkaImporter(Importer): license_url = "https://www.apache.org/licenses/" importer_name = "Apache Kafka Importer" + def __init__(self, purl=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + if self.purl.type != "apache" or self.purl.name != "kafka": + print( + f"Warning: This importer handles Apache Kafka packages. Current PURL: {self.purl!s}" + ) + @staticmethod def fetch_advisory_page(self): page = requests.get(self.GH_PAGE_URL) return page.content def advisory_data(self): - advisory_page = self.fetch_advisory_page(self) + advisory_page = self.fetch_advisory_page() + + all_advisories = self.to_advisory(advisory_page) + + if self.purl: + return self._filter_advisories_for_purl(all_advisories) + + return all_advisories + + def _filter_advisories_for_purl(self, advisories): + if not self.purl: + return advisories + + filtered_advisories = [] + for advisory in advisories: + if self._advisory_affects_purl(advisory): + filtered_advisories.append(advisory) + + return filtered_advisories + + def _advisory_affects_purl(self, advisory): + if not self.purl: + return True + + if self.purl.type != "apache" or self.purl.name != "kafka": + return False + + if not self.purl.version: + return True + + for affected_package in advisory.affected_packages: + + if affected_package.affected_version_range: + try: + purl_version = SemverVersion(self.purl.version) + if purl_version in VersionRange.from_string( + affected_package.affected_version_range + ): + return True + except Exception as e: + logger.error(f"Error checking version {self.purl.version}: {e}") + return True - parsed_data = self.to_advisory(advisory_page) - return parsed_data + return False def to_advisory(self, advisory_page): advisories = [] diff --git a/vulnerabilities/tests/test_apache_kafka.py b/vulnerabilities/tests/test_apache_kafka.py index 92c76c7b1..a4f7ac14e 100644 --- a/vulnerabilities/tests/test_apache_kafka.py +++ b/vulnerabilities/tests/test_apache_kafka.py @@ -12,6 +12,7 @@ from unittest.mock import patch import pytest +from packageurl import PackageURL from vulnerabilities.importer import AdvisoryData from vulnerabilities.importers.apache_kafka import ApacheKafkaImporter @@ -79,3 +80,51 @@ def to_advisory_changed_fixed_versions(): with open(os.path.join(TEST_DATA, "cve-list-changed-fixed-versions.html")) as f: raw_data = f.read() advisories = ApacheKafkaImporter().to_advisory(raw_data) + + +@pytest.fixture +def mock_apache_kafka_advisory_page(monkeypatch): + test_file = os.path.join(TEST_DATA, "cve-list-2022-12-06.html") + with open(test_file, "rb") as f: + html_content = f.read() + + def mock_fetch_advisory_page(self): + return html_content + + monkeypatch.setattr(ApacheKafkaImporter, "fetch_advisory_page", mock_fetch_advisory_page) + + +def test_package_first_mode_all_advisories(monkeypatch, mock_apache_kafka_advisory_page): + purl = PackageURL(type="apache", name="kafka") + importer = ApacheKafkaImporter(purl=purl) + advisories = list(importer.advisory_data()) + + assert len(advisories) == 6 + + +def test_package_first_mode_version_affected(monkeypatch, mock_apache_kafka_advisory_page): + purl = PackageURL(type="apache", name="kafka", version="2.8.0") + importer = ApacheKafkaImporter(purl=purl) + advisories = list(importer.advisory_data()) + + assert any( + any( + ap.affected_version_range and "2.8.0" in ap.affected_version_range + for ap in adv.affected_packages + ) + for adv in advisories + ) + + +def test_package_first_mode_version_not_affected(monkeypatch, mock_apache_kafka_advisory_page): + purl = PackageURL(type="apache", name="kafka", version="3.3.0") + importer = ApacheKafkaImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert advisories == [] + + +def test_package_first_mode_wrong_package(monkeypatch, mock_apache_kafka_advisory_page): + purl = PackageURL(type="apache", name="notkafka") + importer = ApacheKafkaImporter(purl=purl) + advisories = list(importer.advisory_data()) + assert advisories == []