Skip to content

Modify Apache Kafka importer to support package-first mode #1924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions vulnerabilities/importers/apache_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from bs4 import BeautifulSoup
from dateutil.parser import parse
from packageurl import PackageURL
from univers.version_range import VersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
Expand Down Expand Up @@ -97,16 +99,65 @@ class ApacheKafkaImporter(Importer):
license_url = "https://www.apache.org/licenses/"
importer_name = "Apache Kafka Importer"

def __init__(self, purl=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
if self.purl:
if self.purl.type != "apache" or self.purl.name != "kafka":
print(
f"Warning: This importer handles Apache Kafka packages. Current PURL: {self.purl!s}"
)

@staticmethod
def fetch_advisory_page(self):
page = requests.get(self.GH_PAGE_URL)
return page.content

def advisory_data(self):
advisory_page = self.fetch_advisory_page(self)
advisory_page = self.fetch_advisory_page()

all_advisories = self.to_advisory(advisory_page)

if self.purl:
return self._filter_advisories_for_purl(all_advisories)

return all_advisories

def _filter_advisories_for_purl(self, advisories):
if not self.purl:
return advisories

filtered_advisories = []
for advisory in advisories:
if self._advisory_affects_purl(advisory):
filtered_advisories.append(advisory)

return filtered_advisories

def _advisory_affects_purl(self, advisory):
if not self.purl:
return True

if self.purl.type != "apache" or self.purl.name != "kafka":
return False

if not self.purl.version:
return True

for affected_package in advisory.affected_packages:

if affected_package.affected_version_range:
try:
purl_version = SemverVersion(self.purl.version)
if purl_version in VersionRange.from_string(
affected_package.affected_version_range
):
return True
except Exception as e:
logger.error(f"Error checking version {self.purl.version}: {e}")
return True

parsed_data = self.to_advisory(advisory_page)
return parsed_data
return False

def to_advisory(self, advisory_page):
advisories = []
Expand Down
49 changes: 49 additions & 0 deletions vulnerabilities/tests/test_apache_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from unittest.mock import patch

import pytest
from packageurl import PackageURL

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importers.apache_kafka import ApacheKafkaImporter
Expand Down Expand Up @@ -79,3 +80,51 @@ def to_advisory_changed_fixed_versions():
with open(os.path.join(TEST_DATA, "cve-list-changed-fixed-versions.html")) as f:
raw_data = f.read()
advisories = ApacheKafkaImporter().to_advisory(raw_data)


@pytest.fixture
def mock_apache_kafka_advisory_page(monkeypatch):
test_file = os.path.join(TEST_DATA, "cve-list-2022-12-06.html")
with open(test_file, "rb") as f:
html_content = f.read()

def mock_fetch_advisory_page(self):
return html_content

monkeypatch.setattr(ApacheKafkaImporter, "fetch_advisory_page", mock_fetch_advisory_page)


def test_package_first_mode_all_advisories(monkeypatch, mock_apache_kafka_advisory_page):
purl = PackageURL(type="apache", name="kafka")
importer = ApacheKafkaImporter(purl=purl)
advisories = list(importer.advisory_data())

assert len(advisories) == 6


def test_package_first_mode_version_affected(monkeypatch, mock_apache_kafka_advisory_page):
purl = PackageURL(type="apache", name="kafka", version="2.8.0")
importer = ApacheKafkaImporter(purl=purl)
advisories = list(importer.advisory_data())

assert any(
any(
ap.affected_version_range and "2.8.0" in ap.affected_version_range
for ap in adv.affected_packages
)
for adv in advisories
)


def test_package_first_mode_version_not_affected(monkeypatch, mock_apache_kafka_advisory_page):
purl = PackageURL(type="apache", name="kafka", version="3.3.0")
importer = ApacheKafkaImporter(purl=purl)
advisories = list(importer.advisory_data())
assert advisories == []


def test_package_first_mode_wrong_package(monkeypatch, mock_apache_kafka_advisory_page):
purl = PackageURL(type="apache", name="notkafka")
importer = ApacheKafkaImporter(purl=purl)
advisories = list(importer.advisory_data())
assert advisories == []