diff --git a/vulnerabilities/pipelines/npm_importer.py b/vulnerabilities/pipelines/npm_importer.py index 7b6d3aba2..d6c577e3d 100644 --- a/vulnerabilities/pipelines/npm_importer.py +++ b/vulnerabilities/pipelines/npm_importer.py @@ -9,14 +9,19 @@ # Author: Navonil Das (@NavonilDas) +import json +import os +import tempfile from pathlib import Path from typing import Iterable import pytz +import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -39,14 +44,24 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipeline): repo_url = "git+https://github.com/nodejs/security-wg" importer_name = "Npm Importer" + is_batch_run = True + + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + NpmImporterPipeline.is_batch_run = False + if self.purl.type != "npm": + print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") + @classmethod def steps(cls): - return ( + return [ cls.clone, cls.collect_and_store_advisories, cls.import_new_advisories, cls.clean_downloads, - ) + ] def clone(self): self.log(f"Cloning `{self.repo_url}`") @@ -58,9 +73,26 @@ def advisories_count(self): def collect_advisories(self) -> Iterable[AdvisoryData]: vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - - for advisory in vuln_directory.glob("*.json"): - yield from self.to_advisory_data(advisory) + advisory_files = list(vuln_directory.glob("*.json")) + + if not self.is_batch_run: + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: + try: + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) + except Exception as e: + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files + + for advisory in list(advisory_files): + for result in self.to_advisory_data(advisory): + if result: + yield result def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: data = load_json(file) @@ -112,6 +144,11 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] + if self.purl and self.purl.version: + affected_package = affected_packages[0] if affected_packages else None + if affected_package and not self._version_is_affected(affected_package): + return + for alias in advsisory_aliases: yield AdvisoryData( summary=build_description(summary=summary, description=description), @@ -122,6 +159,13 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", ) + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range + def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None diff --git a/vulnerabilities/pipelines/v2_importers/npm_importer.py b/vulnerabilities/pipelines/v2_importers/npm_importer.py index 67e2a4355..3e509e63e 100644 --- a/vulnerabilities/pipelines/v2_importers/npm_importer.py +++ b/vulnerabilities/pipelines/v2_importers/npm_importer.py @@ -9,14 +9,19 @@ # Author: Navonil Das (@NavonilDas) +import json +import os +import tempfile from pathlib import Path from typing import Iterable import pytz +import requests from dateutil.parser import parse from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackage @@ -42,6 +47,16 @@ class NpmImporterPipeline(VulnerableCodeBaseImporterPipelineV2): repo_url = "git+https://github.com/nodejs/security-wg" unfurl_version_ranges = True + is_batch_run = True + + def __init__(self, *args, purl=None, **kwargs): + super().__init__(*args, **kwargs) + self.purl = purl + if self.purl: + NpmImporterPipeline.is_batch_run = False + if self.purl.type != "npm": + print(f"Warning: This importer handles NPM packages. Current PURL: {self.purl!s}") + @classmethod def steps(cls): return ( @@ -60,9 +75,26 @@ def advisories_count(self): def collect_advisories(self) -> Iterable[AdvisoryData]: vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - - for advisory in vuln_directory.glob("*.json"): - yield self.to_advisory_data(advisory) + advisory_files = list(vuln_directory.glob("*.json")) + + if not self.is_batch_run: + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: + try: + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) + except Exception as e: + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files + + for advisory in list(advisory_files): + result = self.to_advisory_data(advisory) + if result: + yield result def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: if file.name == "index.json": @@ -121,6 +153,11 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: affected_packages.append(self.get_affected_package(data, package_name)) advsisory_aliases = data.get("cves") or [] + if self.purl and self.purl.version: + affected_package = affected_packages[0] if affected_packages else None + if affected_package and not self._version_is_affected(affected_package): + return + return AdvisoryData( advisory_id=f"npm-{id}", aliases=advsisory_aliases, @@ -132,6 +169,13 @@ def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]: url=f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json", ) + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range + def get_affected_package(self, data, package_name): affected_version_range = None unaffected_version_range = None @@ -174,5 +218,11 @@ def clean_downloads(self): self.log(f"Removing cloned repository") self.vcs_response.delete() + if hasattr(self, "temp_dir") and os.path.exists(self.temp_dir): + import shutil + + self.log(f"Removing temporary directory") + shutil.rmtree(self.temp_dir) + def on_failure(self): self.clean_downloads() diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py index bcfb83f62..22579bff1 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline.py @@ -12,6 +12,7 @@ import json import os from pathlib import Path +from types import SimpleNamespace from unittest.mock import patch from packageurl import PackageURL @@ -77,3 +78,134 @@ def test_npm_improver(mock_response): result.extend(inference) expected_file = os.path.join(TEST_DATA, f"npm-improver-expected.json") util_tests.check_results_against_json(result, expected_file) + + +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_missing_vuln_directory(tmp_path): + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + affected_package = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True + + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True diff --git a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py index 7941c9b69..7c0536419 100644 --- a/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py +++ b/vulnerabilities/tests/pipelines/test_npm_importer_pipeline_v2.py @@ -8,18 +8,26 @@ # import json +import os +from pathlib import Path from types import SimpleNamespace +from unittest.mock import MagicMock +from unittest.mock import patch import pytz from packageurl import PackageURL +from univers.version_constraint import VersionConstraint from univers.version_range import NpmVersionRange from univers.versions import SemverVersion from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackage from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline from vulnerabilities.severity_systems import CVSSV2 from vulnerabilities.severity_systems import CVSSV3 +TEST_DATA = Path(__file__).parent.parent / "test_data" / "npm" + def test_clone(monkeypatch): import vulnerabilities.pipelines.v2_importers.npm_importer as npm_mod @@ -58,8 +66,8 @@ def test_advisories_count_and_collect(tmp_path): (vuln_dir / "001.json").write_text(json.dumps({"id": "001"})) p = NpmImporterPipeline() p.vcs_response = SimpleNamespace(dest_dir=str(base), delete=lambda: None) - assert p.advisories_count() == 2 advisories = list(p.collect_advisories()) + assert p.advisories_count() == 2 # Should yield None for index.json and one AdvisoryData real = [a for a in advisories if isinstance(a, AdvisoryData)] assert len(real) == 1 @@ -126,3 +134,116 @@ def test_get_affected_package_special_and_standard(): pkg2 = p.get_affected_package(data2, "pkg2") assert isinstance(pkg2.affected_version_range, NpmVersionRange) assert pkg2.fixed_version == SemverVersion("2.0.1") + + +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmImporterPipeline(purl=purl) + + affected_package = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True + + affected_package_no_range = AffectedPackage( + package=PackageURL(type="npm", name="npm"), + affected_version_range=None, + fixed_version=SemverVersion(string="1.3.3"), + ) + assert pipeline._version_is_affected(affected_package_no_range) == True