From bd5c1f2ca7a85e8c1ee7b2c0016b46d46a679709 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Fri, 25 Jul 2025 15:52:50 +0300 Subject: [PATCH 1/9] Add PyPa live importer #1953 * Add PyPa live pipeline importer to fetch advisories affecting a single PURL * Add tests for PyPa live importer Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/importers/__init__.py | 7 + .../v2_importers/pypa_live_importer.py | 150 ++++++++++++++++++ .../test_pypa_v2_live_importer_pipeline.py | 134 ++++++++++++++++ 3 files changed, 291 insertions(+) create mode 100644 vulnerabilities/pipelines/v2_importers/pypa_live_importer.py create mode 100644 vulnerabilities/tests/pipelines/test_pypa_v2_live_importer_pipeline.py diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 706ca3c07..e8b19c20e 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -55,6 +55,7 @@ from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2 from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2 from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2 +from vulnerabilities.pipelines.v2_importers import pypa_live_importer as pypa_live_importer_v2 from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2 from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2 from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2 @@ -113,3 +114,9 @@ oss_fuzz.OSSFuzzImporter, ] ) + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + pypa_live_importer_v2.PyPaLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/pypa_live_importer.py b/vulnerabilities/pipelines/v2_importers/pypa_live_importer.py new file mode 100644 index 000000000..4760df2a3 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/pypa_live_importer.py @@ -0,0 +1,150 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + + +from typing import Iterable + +import requests +import saneyaml +from packageurl import PackageURL +from univers.versions import PypiVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 + + +class PyPaLiveImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + Pypa Live Importer Pipeline + + Collect advisories from PyPA GitHub repository for a single PURL. + """ + + pipeline_id = "pypa_live_importer_v2" + supported_types = ["pypi"] + spdx_license_expression = "CC-BY-4.0" + license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE" + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.fetch_package_advisories, + cls.collect_and_store_advisories, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for PyPaLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def _is_version_affected(self, advisory_dict, version): + affected = advisory_dict.get("affected", []) + try: + v = PypiVersion(version) + except Exception: + return False + for entry in affected: + ranges = entry.get("ranges", []) + for r in ranges: + events = r.get("events", []) + introduced = None + fixed = None + for event in events: + if "introduced" in event: + introduced = event["introduced"] + if "fixed" in event: + fixed = event["fixed"] + try: + if introduced: + introduced_v = PypiVersion(introduced) + if v < introduced_v: + continue + if fixed: + fixed_v = PypiVersion(fixed) + if v >= fixed_v: + continue + if introduced: + introduced_v = PypiVersion(introduced) + if (not fixed or v < PypiVersion(fixed)) and v >= introduced_v: + return True + except Exception: + continue + return False + + def fetch_package_advisories(self): + if not self.purl.type in self.supported_types: + return + + search_path = f"vulns/{self.purl.name}" + + self.package_advisories = [] + + api_url = f"https://api.github.com/repos/pypa/advisory-database/contents/{search_path}" + response = requests.get(api_url) + + if response.status_code == 404: + self.log(f"No advisories found for package {self.purl.name}") + return + + if response.status_code != 200: + self.log(f"Failed to fetch advisories: {response.status_code} {response.text}") + return + + for item in response.json(): + if item["type"] == "file" and item["name"].endswith(".yaml"): + file_url = item["download_url"] + self.log("Fetching advisory file: " + item["name"]) + file_response = requests.get(file_url) + + if file_response.status_code == 200: + advisory_text = file_response.text + advisory_dict = saneyaml.load(advisory_text) + + if self.purl.version and not self._is_version_affected( + advisory_dict, self.purl.version + ): + continue + + self.package_advisories.append( + {"text": advisory_text, "dict": advisory_dict, "url": item["html_url"]} + ) + + def advisories_count(self): + return len(self.package_advisories) if hasattr(self, "package_advisories") else 0 + + def collect_advisories(self) -> Iterable[AdvisoryData]: + from vulnerabilities.importers.osv import parse_advisory_data_v2 + + if not hasattr(self, "package_advisories"): + return + + for advisory in self.package_advisories: + yield parse_advisory_data_v2( + raw_data=advisory["dict"], + supported_ecosystems=self.supported_types, + advisory_url=advisory["url"], + advisory_text=advisory["text"], + ) diff --git a/vulnerabilities/tests/pipelines/test_pypa_v2_live_importer_pipeline.py b/vulnerabilities/tests/pipelines/test_pypa_v2_live_importer_pipeline.py new file mode 100644 index 000000000..8b07624de --- /dev/null +++ b/vulnerabilities/tests/pipelines/test_pypa_v2_live_importer_pipeline.py @@ -0,0 +1,134 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest +import saneyaml +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData + + +@pytest.fixture +def mock_github_api_response(): + return { + "status_code": 200, + "json": [ + { + "type": "file", + "name": "CVE-2022-1234.yaml", + "download_url": "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-1234.yaml", + "html_url": "https://github.com/pypa/advisory-database/blob/main/vulns/package1/CVE-2022-1234.yaml", + }, + { + "type": "file", + "name": "CVE-2022-5678.yaml", + "download_url": "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-5678.yaml", + "html_url": "https://github.com/pypa/advisory-database/blob/main/vulns/package1/CVE-2022-5678.yaml", + }, + ], + } + + +@pytest.fixture +def mock_advisory_files(): + advisory1 = { + "id": "CVE-2022-1234", + "summary": "A vulnerability in package1", + "affected": [ + { + "package": {"name": "package1", "ecosystem": "PyPI"}, + "ranges": [ + {"type": "ECOSYSTEM", "events": [{"introduced": "1.0.0"}, {"fixed": "1.2.0"}]} + ], + } + ], + } + + advisory2 = { + "id": "CVE-2022-5678", + "summary": "Another vulnerability in package1", + "affected": [ + { + "package": {"name": "package1", "ecosystem": "PyPI"}, + "ranges": [ + {"type": "ECOSYSTEM", "events": [{"introduced": "1.5.0"}, {"fixed": "1.7.0"}]} + ], + } + ], + } + + return { + "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-1234.yaml": advisory1, + "https://raw.githubusercontent.com/pypa/advisory-database/main/vulns/package1/CVE-2022-5678.yaml": advisory2, + } + + +def test_package_with_version_affected(mock_github_api_response, mock_advisory_files): + from vulnerabilities.pipelines.v2_importers.pypa_live_importer import PyPaLiveImporterPipeline + + purl = PackageURL(type="pypi", name="package1", version="1.1.0") + + with patch("requests.get") as mock_get: + mock_api_response = MagicMock() + mock_api_response.status_code = mock_github_api_response["status_code"] + mock_api_response.json.return_value = mock_github_api_response["json"] + + def mock_get_side_effect(url, *args, **kwargs): + if "api.github.com" in url: + return mock_api_response + + mock_file_response = MagicMock() + mock_file_response.status_code = 200 + mock_file_response.text = saneyaml.dump(mock_advisory_files[url]) + return mock_file_response + + mock_get.side_effect = mock_get_side_effect + + with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse: + + def side_effect(raw_data, supported_ecosystems, advisory_url, advisory_text): + return AdvisoryData( + advisory_id=raw_data["id"], + summary=raw_data["summary"], + references_v2=[{"url": advisory_url}], + affected_packages=[], + weaknesses=[], + url=advisory_url, + ) + + mock_parse.side_effect = side_effect + + pipeline = PyPaLiveImporterPipeline(selected_groups=["package_first"], purl=purl) + pipeline.get_purl_inputs() + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].advisory_id == "CVE-2022-1234" + + +def test_nonexistent_package(): + from vulnerabilities.pipelines.v2_importers.pypa_live_importer import PyPaLiveImporterPipeline + + purl = PackageURL(type="pypi", name="nonexistent_package", version="1.0.0") + + with patch("requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + pipeline = PyPaLiveImporterPipeline(selected_groups=["package_first"], purl=purl) + pipeline.get_purl_inputs() + pipeline.fetch_package_advisories() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 From d1a7cc12fa1d99ec78d9fc6b5949f6a283e6f1d6 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Wed, 30 Jul 2025 15:53:40 +0300 Subject: [PATCH 2/9] Add Live Evaluation API endpoint #1902 * Add a new API endpoint to run live evaluation importers * Add tests for the live evaluation API endpoint Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/api_v2.py | 84 ++++++++++++++++++++++++++++ vulnerabilities/tests/test_api_v2.py | 65 +++++++++++++++++++++ vulnerablecode/urls.py | 2 + 3 files changed, 151 insertions(+) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index c45dbfebe..066651ff7 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,6 +8,9 @@ # +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed + from django.db.models import Prefetch from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter @@ -25,6 +28,7 @@ from rest_framework.reverse import reverse from rest_framework.throttling import AnonRateThrottle +from vulnerabilities.importers import LIVE_IMPORTERS_REGISTRY from vulnerabilities.models import AdvisoryReference from vulnerabilities.models import AdvisorySeverity from vulnerabilities.models import AdvisoryV2 @@ -1225,3 +1229,83 @@ def lookup(self, request): return Response( AdvisoryPackageV2Serializer(qs, many=True, context={"request": request}).data ) + + +class LiveEvaluationSerializer(serializers.Serializer): + purl_string = serializers.CharField(help_text="PackageURL to evaluate") + no_threading = serializers.BooleanField(required=False, default=False) + + +class LiveEvaluationViewSet(viewsets.GenericViewSet): + serializer_class = LiveEvaluationSerializer + + @extend_schema( + request=LiveEvaluationSerializer, + responses={ + 202: {"description": "Live evaluation done successfully"}, + 400: {"description": "Invalid request"}, + 500: {"description": "Internal server error"}, + }, + ) + @action(detail=False, methods=["post"]) + def evaluate(self, request): + serializer = self.get_serializer(data=request.data) + if not serializer.is_valid(): + return Response( + serializer.errors, + status=status.HTTP_400_BAD_REQUEST, + ) + + purl_string = serializer.validated_data.get("purl_string") + no_threading = serializer.validated_data.get("no_threading", False) + + try: + purl = PackageURL.from_string(purl_string) if purl_string else None + if not purl: + return Response({"error": "Invalid PackageURL"}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + return Response( + {"error": f"Invalid PackageURL: {str(e)}"}, status=status.HTTP_400_BAD_REQUEST + ) + + importers = [ + importer + for importer in LIVE_IMPORTERS_REGISTRY.values() + if hasattr(importer, "supported_types") + and purl.type in getattr(importer, "supported_types", []) + ] + + if not importers: + return Response( + {"error": f"No live importers found for purl type '{purl.type}'"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + results = [] + + def run_importer(importer): + importer_name = getattr(importer, "pipeline_id", importer.__name__) + response_data = {"importer": importer_name, "purl": purl_string, "steps_completed": []} + try: + pipeline_instance = importer(purl=purl) + status_code, error = pipeline_instance.execute() + if status_code != 0: + response_data["error"] = f"Importer {importer_name} failed: {error}" + else: + response_data["steps_completed"].append("import") + except Exception as e: + response_data["error"] = f"Error running importer {importer_name}: {str(e)}" + return response_data + + if not no_threading and len(importers) > 1: + with ThreadPoolExecutor(max_workers=len(importers)) as executor: + future_to_importer = { + executor.submit(run_importer, importer): importer for importer in importers + } + for future in as_completed(future_to_importer): + results.append(future.result()) + else: + for importer in importers: + results.append(run_importer(importer)) + + return Response(results, status=status.HTTP_202_ACCEPTED) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 432c7c10f..eab8425ec 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -905,3 +905,68 @@ def test_get_all_vulnerable_purls(self): response = self.client.get(url) assert response.status_code == 200 assert "pkg:pypi/sample@1.0.0" in response.data + + +class LiveEvaluationAPITest(APITestCase): + def setUp(self): + self.client = APIClient(enforce_csrf_checks=True) + self.url = "/api/v2/live-evaluation/evaluate" + + @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") + def test_evaluate_success(self, mock_registry): + class MockImporter: + pipeline_id = "dummy" + supported_types = ["pypi"] + + def __init__(self, purl=None): + pass + + def execute(self): + return 0, None + + mock_registry.values.return_value = [MockImporter] + data = {"purl_string": "pkg:pypi/django@3.2"} + response = self.client.post(self.url, data, format="json") + assert response.status_code == 202 + assert isinstance(response.data, list) + assert response.data[0]["importer"] == "dummy" + assert response.data[0]["purl"] == "pkg:pypi/django@3.2" + assert "steps_completed" in response.data[0] + assert "import" in response.data[0]["steps_completed"] + + @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") + def test_evaluate_no_importer_found(self, mock_registry): + class MockImporter: + pipeline_id = "dummy" + supported_types = ["npm"] + + mock_registry.values.return_value = [MockImporter] + data = {"purl_string": "pkg:pypi/django@3.2"} + response = self.client.post(self.url, data, format="json") + assert response.status_code == 400 + assert "No live importers found" in response.data["error"] + + def test_evaluate_invalid_purl(self): + data = {"purl_string": "not_a_valid_purl"} + response = self.client.post(self.url, data, format="json") + assert response.status_code == 400 + assert "Invalid PackageURL" in response.data["error"] + + @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") + def test_evaluate_no_threading(self, mock_registry): + class MockImporter: + pipeline_id = "dummy" + supported_types = ["pypi"] + + def __init__(self, purl=None): + pass + + def execute(self): + return 0, None + + mock_registry.values.return_value = [MockImporter] + data = {"purl_string": "pkg:pypi/django@3.2", "no_threading": True} + response = self.client.post(self.url, data, format="json") + assert response.status_code == 202 + assert isinstance(response.data, list) + assert response.data[0]["importer"] == "dummy" diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 8d170678a..7140965fe 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -23,6 +23,7 @@ from vulnerabilities.api_v2 import AdvisoriesPackageV2ViewSet from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet +from vulnerabilities.api_v2 import LiveEvaluationViewSet from vulnerabilities.api_v2 import PackageV2ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet from vulnerabilities.api_v2 import VulnerabilityV2ViewSet @@ -69,6 +70,7 @@ def __init__(self, *args, **kwargs): api_v2_router.register("codefixes", CodeFixViewSet, basename="codefix") api_v2_router.register("pipelines", PipelineScheduleV2ViewSet, basename="pipelines") api_v2_router.register("advisory-codefixes", CodeFixV2ViewSet, basename="advisory-codefix") +api_v2_router.register("live-evaluation", LiveEvaluationViewSet, basename="live-evaluation") urlpatterns = [ From ba3b3c1f46aa2832298d521749063b0692cb11b9 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Mon, 25 Aug 2025 22:32:07 +0300 Subject: [PATCH 3/9] Change Live Evaluation API to use RQ #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/api_v2.py | 102 +++++++++++++----- ...vepipelinerun_pipelinerun_live_pipeline.py | 48 +++++++++ vulnerabilities/models.py | 45 +++++++- vulnerabilities/tasks.py | 41 +++++-- vulnerabilities/tests/test_api_v2.py | 45 +++----- vulnerablecode/settings.py | 7 ++ 6 files changed, 219 insertions(+), 69 deletions(-) create mode 100644 vulnerabilities/migrations/0102_livepipelinerun_pipelinerun_live_pipeline.py diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 8f6b06593..477840e25 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -44,6 +44,7 @@ from vulnerabilities.models import VulnerabilityReference from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness +from vulnerabilities.tasks import enqueue_ad_hoc_pipeline from vulnerabilities.throttling import PermissionBasedUserRateThrottle @@ -1300,8 +1301,7 @@ def lookup(self, request): class LiveEvaluationSerializer(serializers.Serializer): - purl_string = serializers.CharField(help_text="PackageURL to evaluate") - no_threading = serializers.BooleanField(required=False, default=False) + purl = serializers.CharField(help_text="PackageURL to evaluate") class LiveEvaluationViewSet(viewsets.GenericViewSet): @@ -1310,7 +1310,7 @@ class LiveEvaluationViewSet(viewsets.GenericViewSet): @extend_schema( request=LiveEvaluationSerializer, responses={ - 202: {"description": "Live evaluation done successfully"}, + 202: {"description": "Live evaluation enqueued successfully; returns Run IDs"}, 400: {"description": "Invalid request"}, 500: {"description": "Internal server error"}, }, @@ -1324,8 +1324,7 @@ def evaluate(self, request): status=status.HTTP_400_BAD_REQUEST, ) - purl_string = serializer.validated_data.get("purl_string") - no_threading = serializer.validated_data.get("no_threading", False) + purl_string = serializer.validated_data.get("purl") try: purl = PackageURL.from_string(purl_string) if purl_string else None @@ -1349,31 +1348,78 @@ def evaluate(self, request): status=status.HTTP_400_BAD_REQUEST, ) - results = [] + # Create a single LivePipelineRun to represent this evaluation + from vulnerabilities.models import LivePipelineRun - def run_importer(importer): + live_run = LivePipelineRun.objects.create(purl=purl_string) + runs = [] + for importer in importers: importer_name = getattr(importer, "pipeline_id", importer.__name__) - response_data = {"importer": importer_name, "purl": purl_string, "steps_completed": []} + run_id = enqueue_ad_hoc_pipeline(importer_name, inputs={"purl": purl}) + # Attach each PipelineRun to the LivePipelineRun + from vulnerabilities.models import PipelineRun + try: - pipeline_instance = importer(purl=purl) - status_code, error = pipeline_instance.execute() - if status_code != 0: - response_data["error"] = f"Importer {importer_name} failed: {error}" - else: - response_data["steps_completed"].append("import") - except Exception as e: - response_data["error"] = f"Error running importer {importer_name}: {str(e)}" - return response_data - - if not no_threading and len(importers) > 1: - with ThreadPoolExecutor(max_workers=len(importers)) as executor: - future_to_importer = { - executor.submit(run_importer, importer): importer for importer in importers + run_obj = PipelineRun.objects.get(run_id=run_id) + run_obj.live_pipeline = live_run + run_obj.save() + except PipelineRun.DoesNotExist: + pass + runs.append( + { + "importer": importer_name, + "run_id": str(run_id) if run_id else None, } - for future in as_completed(future_to_importer): - results.append(future.result()) - else: - for importer in importers: - results.append(run_importer(importer)) + ) + return Response( + {"live_run_id": str(live_run.run_id), "runs": runs}, status=status.HTTP_202_ACCEPTED + ) - return Response(results, status=status.HTTP_202_ACCEPTED) + @extend_schema( + parameters=[ + OpenApiParameter( + name="live_run_id", + description="UUID of the live run to check status for", + required=True, + type={"type": "string"}, + location=OpenApiParameter.PATH, + ) + ], + responses={200: "LivePipelineRun status and importers status"}, + ) + @action(detail=False, methods=["get"], url_path=r"status/(?P[0-9a-f\-]{36})") + def status(self, request, live_run_id=None): + from vulnerabilities.models import LivePipelineRun + from vulnerabilities.models import PipelineRun + + try: + live_run = LivePipelineRun.objects.get(run_id=live_run_id) + except LivePipelineRun.DoesNotExist: + return Response({"detail": "Live run not found."}, status=status.HTTP_404_NOT_FOUND) + + live_run.update_status() + + # Gather status for each importer run + importer_statuses = [] + for run in live_run.pipelineruns.all(): + importer_statuses.append( + { + "importer": run.pipeline.pipeline_id, + "run_id": str(run.run_id), + "status": run.status, + "run_start_date": run.run_start_date, + "run_end_date": run.run_end_date, + "run_exitcode": run.run_exitcode, + "run_output": run.run_output, + } + ) + + response = { + "live_run_id": str(live_run.run_id), + "overall_status": live_run.status, + "created_date": live_run.created_date, + "completed_date": live_run.completed_date, + "purl": live_run.purl, + "importers": importer_statuses, + } + return Response(response) diff --git a/vulnerabilities/migrations/0102_livepipelinerun_pipelinerun_live_pipeline.py b/vulnerabilities/migrations/0102_livepipelinerun_pipelinerun_live_pipeline.py new file mode 100644 index 000000000..3e436db34 --- /dev/null +++ b/vulnerabilities/migrations/0102_livepipelinerun_pipelinerun_live_pipeline.py @@ -0,0 +1,48 @@ +# Generated by Django 4.2.22 on 2025-08-25 18:03 + +from django.db import migrations, models +import django.db.models.deletion +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0101_advisorytodov2_todorelatedadvisoryv2_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="LivePipelineRun", + fields=[ + ( + "run_id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + unique=True, + ), + ), + ("created_date", models.DateTimeField(auto_now_add=True, db_index=True)), + ("completed_date", models.DateTimeField(blank=True, editable=False, null=True)), + ("status", models.CharField(default="queued", max_length=20)), + ("purl", models.CharField(blank=True, max_length=300, null=True)), + ], + options={ + "ordering": ["-created_date"], + }, + ), + migrations.AddField( + model_name="pipelinerun", + name="live_pipeline", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="pipelineruns", + to="vulnerabilities.livepipelinerun", + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index f404d7d17..5a10dd9ef 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -1972,6 +1972,35 @@ class CodeFixV2(CodeChangeV2): ) +class LivePipelineRun(models.Model): + """Represents a single live evaluation run for all compatible importers.""" + + run_id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True) + created_date = models.DateTimeField(auto_now_add=True, db_index=True) + completed_date = models.DateTimeField(blank=True, null=True, editable=False) + status = models.CharField(max_length=20, default="queued") + purl = models.CharField(max_length=300, blank=True, null=True) + + def is_finished(self): + return self.status == "finished" + + def update_status(self): + if not self.pipelineruns.exists(): + self.status = "queued" + elif all(run.status == PipelineRun.Status.SUCCESS for run in self.pipelineruns.all()): + self.status = "finished" + self.completed_date = timezone.now() + elif any(run.status == PipelineRun.Status.FAILURE for run in self.pipelineruns.all()): + self.status = "failed" + self.completed_date = timezone.now() + else: + self.status = "running" + self.save() + + class Meta: + ordering = ["-created_date"] + + class PipelineRun(models.Model): """The Database representation of a pipeline execution.""" @@ -1981,6 +2010,14 @@ class PipelineRun(models.Model): on_delete=models.CASCADE, ) + live_pipeline = models.ForeignKey( + "LivePipelineRun", + related_name="pipelineruns", + on_delete=models.CASCADE, + blank=True, + null=True, + ) + run_id = models.UUIDField( primary_key=True, default=uuid.uuid4, @@ -2245,8 +2282,9 @@ def append_to_log(self, message, is_multiline=False): if not is_multiline: message = message.replace("\n", "").replace("\r", "") - self.log = self.log + message + "\n" - self.save(update_fields=["log"]) + new_log = (self.log or "") + message + "\n" + type(self).objects.filter(run_id=self.run_id).update(log=new_log) + self.log = new_log def dequeue(self): from vulnerabilities.tasks import dequeue_job @@ -2342,12 +2380,15 @@ def save(self, *args, **kwargs): def pipeline_class(self): """Return the pipeline class.""" from vulnerabilities.importers import IMPORTERS_REGISTRY + from vulnerabilities.importers import LIVE_IMPORTERS_REGISTRY from vulnerabilities.improvers import IMPROVERS_REGISTRY if self.pipeline_id in IMPROVERS_REGISTRY: return IMPROVERS_REGISTRY.get(self.pipeline_id) if self.pipeline_id in IMPORTERS_REGISTRY: return IMPORTERS_REGISTRY.get(self.pipeline_id) + if self.pipeline_id in LIVE_IMPORTERS_REGISTRY: + return LIVE_IMPORTERS_REGISTRY.get(self.pipeline_id) @property def description(self): diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index e035985a2..b42a8fdc1 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -21,10 +21,10 @@ logger = logging.getLogger(__name__) -queue = django_rq.get_queue("default") +default_queue = django_rq.get_queue("default") -def execute_pipeline(pipeline_id, run_id): +def execute_pipeline(pipeline_id, run_id, inputs=None): from vulnerabilities.pipelines import VulnerableCodePipeline logger.info(f"Enter `execute_pipeline` {pipeline_id}") @@ -39,7 +39,8 @@ def execute_pipeline(pipeline_id, run_id): exitcode = 0 run_class = run.pipeline_class if issubclass(run_class, VulnerableCodePipeline): - pipeline_instance = run_class(run_instance=run) + inputs = inputs or {} + pipeline_instance = run_class(run_instance=run, **inputs) exitcode, output = pipeline_instance.execute() elif issubclass(run_class, Importer) or issubclass(run_class, Improver): exitcode, output = legacy_runner(run_class=run_class, run=run) @@ -121,7 +122,7 @@ def enqueue_pipeline(pipeline_id): run = models.PipelineRun.objects.create( pipeline=pipeline_schedule, ) - job = queue.enqueue( + job = default_queue.enqueue( execute_pipeline, pipeline_id, run.run_id, @@ -131,7 +132,35 @@ def enqueue_pipeline(pipeline_id): ) +def enqueue_ad_hoc_pipeline(pipeline_id, *, inputs=None): + """Enqueue a one-off execution for the given pipeline_id with optional inputs. + + Returns the created run_id or None if the pipeline cannot be enqueued. + """ + try: + pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) + except models.PipelineSchedule.DoesNotExist: + pipeline_schedule = models.PipelineSchedule.objects.create( + pipeline_id=pipeline_id, + is_active=False, + ) + + run = models.PipelineRun.objects.create(pipeline=pipeline_schedule) + + live_queue = django_rq.get_queue("live") + job = live_queue.enqueue( + execute_pipeline, + pipeline_id, + run.run_id, + inputs or {}, + job_id=str(run.run_id), + on_failure=set_run_failure, + job_timeout=f"{pipeline_schedule.execution_timeout}h", + ) + return run.run_id + + def dequeue_job(job_id): """Remove a job from queue if it hasn't been executed yet.""" - if job_id in queue.jobs: - queue.remove(job_id) + if job_id in default_queue.jobs: + default_queue.remove(job_id) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 7fde6232f..32c70a92e 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -913,24 +913,17 @@ def setUp(self): @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") def test_evaluate_success(self, mock_registry): class MockImporter: - pipeline_id = "dummy" + pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] - def __init__(self, purl=None): - pass - - def execute(self): - return 0, None - mock_registry.values.return_value = [MockImporter] - data = {"purl_string": "pkg:pypi/django@3.2"} + data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") assert response.status_code == 202 - assert isinstance(response.data, list) - assert response.data[0]["importer"] == "dummy" - assert response.data[0]["purl"] == "pkg:pypi/django@3.2" - assert "steps_completed" in response.data[0] - assert "import" in response.data[0]["steps_completed"] + assert isinstance(response.data, dict) + assert response.data["live_run_id"] is not None + assert response.data["runs"][0]["importer"] == "pypa_live_importer_v2" + assert response.data["runs"][0]["run_id"] is not None @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") def test_evaluate_no_importer_found(self, mock_registry): @@ -939,32 +932,18 @@ class MockImporter: supported_types = ["npm"] mock_registry.values.return_value = [MockImporter] - data = {"purl_string": "pkg:pypi/django@3.2"} + data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") assert response.status_code == 400 assert "No live importers found" in response.data["error"] def test_evaluate_invalid_purl(self): - data = {"purl_string": "not_a_valid_purl"} + data = {"purl": "not_a_valid_purl"} response = self.client.post(self.url, data, format="json") assert response.status_code == 400 assert "Invalid PackageURL" in response.data["error"] - @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") - def test_evaluate_no_threading(self, mock_registry): - class MockImporter: - pipeline_id = "dummy" - supported_types = ["pypi"] - - def __init__(self, purl=None): - pass - - def execute(self): - return 0, None - - mock_registry.values.return_value = [MockImporter] - data = {"purl_string": "pkg:pypi/django@3.2", "no_threading": True} - response = self.client.post(self.url, data, format="json") - assert response.status_code == 202 - assert isinstance(response.data, list) - assert response.data[0]["importer"] == "dummy" + def test_status_not_found(self): + url = "/api/v2/live-evaluation/status/00000000-0000-0000-0000-000000000000" + response = self.client.get(url) + assert response.status_code == 404 diff --git a/vulnerablecode/settings.py b/vulnerablecode/settings.py index 05a3d0fa8..dd7693037 100644 --- a/vulnerablecode/settings.py +++ b/vulnerablecode/settings.py @@ -385,3 +385,10 @@ "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_REDIS_DEFAULT_TIMEOUT", default=3600), } } + +RQ_QUEUES["live"] = { + "HOST": env.str("VULNERABLECODE_REDIS_HOST", default=RQ_QUEUES["default"]["HOST"]), + "PORT": env.str("VULNERABLECODE_REDIS_PORT", default=RQ_QUEUES["default"]["PORT"]), + "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=RQ_QUEUES["default"]["PASSWORD"]), + "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_LIVE_REDIS_DEFAULT_TIMEOUT", default=3600), +} From a7f7041721da931256614eb9ce2a69234c2808b1 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Mon, 25 Aug 2025 23:01:33 +0300 Subject: [PATCH 4/9] Fix Live Evaluation API tests #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/tests/test_api_v2.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 32c70a92e..5a5a2481b 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -23,6 +23,7 @@ from vulnerabilities.models import Alias from vulnerabilities.models import ApiUser from vulnerabilities.models import CodeFixV2 +from vulnerabilities.models import LivePipelineRun from vulnerabilities.models import Package from vulnerabilities.models import PackageV2 from vulnerabilities.models import PipelineRun @@ -911,12 +912,27 @@ def setUp(self): self.url = "/api/v2/live-evaluation/evaluate" @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") - def test_evaluate_success(self, mock_registry): + @patch("vulnerabilities.api_v2.enqueue_ad_hoc_pipeline") + @patch("vulnerabilities.models.PipelineRun.objects.get") + @patch("vulnerabilities.models.LivePipelineRun.objects.create") + def test_evaluate_success( + self, mock_live_create, mock_pipeline_get, mock_enqueue, mock_registry + ): class MockImporter: pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] mock_registry.values.return_value = [MockImporter] + mock_live_run = type("MockLiveRun", (), {"run_id": "mock-live-id"})() + mock_live_create.return_value = mock_live_run + mock_enqueue.return_value = "mock-run-id" + mock_pipeline_run = type( + "MockPipelineRun", + (), + {"run_id": "mock-run-id", "live_pipeline": None, "save": lambda self: None}, + )() + mock_pipeline_get.return_value = mock_pipeline_run + data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") assert response.status_code == 202 @@ -943,7 +959,9 @@ def test_evaluate_invalid_purl(self): assert response.status_code == 400 assert "Invalid PackageURL" in response.data["error"] - def test_status_not_found(self): + @patch("vulnerabilities.models.LivePipelineRun.objects.get") + def test_status_not_found(self, mock_live_get): + mock_live_get.side_effect = LivePipelineRun.DoesNotExist() url = "/api/v2/live-evaluation/status/00000000-0000-0000-0000-000000000000" response = self.client.get(url) assert response.status_code == 404 From 6c0c6c8312d72d7e842cb204967c682b9238ae8d Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Tue, 26 Aug 2025 19:56:50 +0300 Subject: [PATCH 5/9] Add status url in live evaluation API response #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/api_v2.py | 23 ++++++++++++++++++----- vulnerabilities/tests/test_api_v2.py | 9 +++++++-- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 477840e25..1dbe0433a 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,10 +8,8 @@ # -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import as_completed - from django.db.models import Prefetch +from django.urls import reverse from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema @@ -1371,8 +1369,24 @@ def evaluate(self, request): "run_id": str(run_id) if run_id else None, } ) + + request_obj = request + status_path = reverse( + "live-evaluation-status", kwargs={"live_run_id": str(live_run.run_id)} + ) + + if hasattr(request_obj, "build_absolute_uri"): + status_url = request_obj.build_absolute_uri(status_path) + else: + status_url = status_path + return Response( - {"live_run_id": str(live_run.run_id), "runs": runs}, status=status.HTTP_202_ACCEPTED + { + "live_run_id": str(live_run.run_id), + "runs": runs, + "status_url": status_url, + }, + status=status.HTTP_202_ACCEPTED, ) @extend_schema( @@ -1390,7 +1404,6 @@ def evaluate(self, request): @action(detail=False, methods=["get"], url_path=r"status/(?P[0-9a-f\-]{36})") def status(self, request, live_run_id=None): from vulnerabilities.models import LivePipelineRun - from vulnerabilities.models import PipelineRun try: live_run = LivePipelineRun.objects.get(run_id=live_run_id) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 5a5a2481b..1829f095d 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -915,15 +915,17 @@ def setUp(self): @patch("vulnerabilities.api_v2.enqueue_ad_hoc_pipeline") @patch("vulnerabilities.models.PipelineRun.objects.get") @patch("vulnerabilities.models.LivePipelineRun.objects.create") + @patch("django.urls.reverse") def test_evaluate_success( - self, mock_live_create, mock_pipeline_get, mock_enqueue, mock_registry + self, mock_reverse, mock_live_create, mock_pipeline_get, mock_enqueue, mock_registry ): class MockImporter: pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] mock_registry.values.return_value = [MockImporter] - mock_live_run = type("MockLiveRun", (), {"run_id": "mock-live-id"})() + valid_uuid = "00000000-0000-0000-0000-000000000001" + mock_live_run = type("MockLiveRun", (), {"run_id": valid_uuid})() mock_live_create.return_value = mock_live_run mock_enqueue.return_value = "mock-run-id" mock_pipeline_run = type( @@ -932,6 +934,7 @@ class MockImporter: {"run_id": "mock-run-id", "live_pipeline": None, "save": lambda self: None}, )() mock_pipeline_get.return_value = mock_pipeline_run + mock_reverse.return_value = f"/api/v2/live-evaluation/status/{valid_uuid}" data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") @@ -940,6 +943,8 @@ class MockImporter: assert response.data["live_run_id"] is not None assert response.data["runs"][0]["importer"] == "pypa_live_importer_v2" assert response.data["runs"][0]["run_id"] is not None + assert "status_url" in response.data + assert response.data["status_url"].endswith(f"/api/v2/live-evaluation/status/{valid_uuid}") @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") def test_evaluate_no_importer_found(self, mock_registry): From c1909f530e36817025b6054bdaa2df5213c015f8 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Tue, 26 Aug 2025 19:57:18 +0300 Subject: [PATCH 6/9] Add a separate live evaluation dashboard #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/views.py | 50 ++++++++++++++++++++++++++++++++++++---- vulnerablecode/urls.py | 6 +++++ 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index f4cd99dbe..52dce9b49 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -34,6 +34,7 @@ from vulnerabilities.forms import PackageSearchForm from vulnerabilities.forms import PipelineSchedulePackageForm from vulnerabilities.forms import VulnerabilitySearchForm +from vulnerabilities.importers import LIVE_IMPORTERS_REGISTRY from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import PipelineRun from vulnerabilities.models import PipelineSchedule @@ -642,18 +643,59 @@ class PipelineScheduleListView(ListView, FormMixin): form_class = PipelineSchedulePackageForm def get_queryset(self): + live_pipeline_ids = list(LIVE_IMPORTERS_REGISTRY.keys()) form = self.form_class(self.request.GET) + if form.is_valid(): - return PipelineSchedule.objects.filter( + return PipelineSchedule.objects.exclude(pipeline_id__in=live_pipeline_ids).filter( pipeline_id__icontains=form.cleaned_data.get("search") ) - return PipelineSchedule.objects.all() + return PipelineSchedule.objects.exclude(pipeline_id__in=live_pipeline_ids) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) - context["active_pipeline_count"] = PipelineSchedule.objects.filter(is_active=True).count() + live_pipeline_ids = list(LIVE_IMPORTERS_REGISTRY.keys()) + + context["active_pipeline_count"] = ( + PipelineSchedule.objects.exclude(pipeline_id__in=live_pipeline_ids) + .filter(is_active=True) + .count() + ) + context["disabled_pipeline_count"] = ( + PipelineSchedule.objects.exclude(pipeline_id__in=live_pipeline_ids) + .filter(is_active=False) + .count() + ) + return context + + +class LiveEvaluationPipelineScheduleListView(ListView, FormMixin): + model = PipelineSchedule + context_object_name = "schedule_list" + template_name = "pipeline_dashboard.html" + paginate_by = 20 + form_class = PipelineSchedulePackageForm + + def get_queryset(self): + live_pipeline_ids = list(LIVE_IMPORTERS_REGISTRY.keys()) + form = self.form_class(self.request.GET) + + if form.is_valid(): + return PipelineSchedule.objects.filter( + pipeline_id__in=live_pipeline_ids, + pipeline_id__icontains=form.cleaned_data.get("search"), + ) + return PipelineSchedule.objects.filter(pipeline_id__in=live_pipeline_ids) + + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + live_pipeline_ids = list(LIVE_IMPORTERS_REGISTRY.keys()) + + context["active_pipeline_count"] = PipelineSchedule.objects.filter( + pipeline_id__in=live_pipeline_ids, is_active=True + ).count() context["disabled_pipeline_count"] = PipelineSchedule.objects.filter( - is_active=False + pipeline_id__in=live_pipeline_ids, is_active=False ).count() return context diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 7140965fe..00de4ccaa 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -33,6 +33,7 @@ from vulnerabilities.views import ApiUserCreateView from vulnerabilities.views import HomePage from vulnerabilities.views import HomePageV2 +from vulnerabilities.views import LiveEvaluationPipelineScheduleListView from vulnerabilities.views import PackageDetails from vulnerabilities.views import PackageSearch from vulnerabilities.views import PackageSearchV2 @@ -90,6 +91,11 @@ def __init__(self, *args, **kwargs): PipelineScheduleListView.as_view(), name="dashboard", ), + path( + "pipelines/live-evaluation-dashboard/", + LiveEvaluationPipelineScheduleListView.as_view(), + name="live-evaluation-dashboard", + ), path( "pipelines//runs/", PipelineRunListView.as_view(), From 54a7d8b67a46df5ebe3c48ff3391811f40e27a1b Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 4 Sep 2025 16:43:26 +0300 Subject: [PATCH 7/9] Address PR Review Comments #1953 Signed-off-by: Michael Ehab Mikhail --- docker-compose.yml | 19 +++++--- docker.env | 4 +- vulnerabilities/api_v2.py | 37 +++++---------- vulnerabilities/models.py | 40 ++++++++++++----- vulnerabilities/tasks.py | 67 +++++++++++++++++++--------- vulnerabilities/tests/test_api_v2.py | 18 ++------ vulnerabilities/views.py | 9 ++++ vulnerablecode/settings.py | 19 ++++---- vulnerablecode/urls.py | 5 ++- 9 files changed, 129 insertions(+), 89 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 27bb5531d..07711f99a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,9 +23,9 @@ services: vulnerablecode: build: . command: /bin/sh -c " - ./manage.py migrate && - ./manage.py collectstatic --no-input --verbosity 0 --clear && - gunicorn vulnerablecode.wsgi:application -u nobody -g nogroup --bind :8000 --timeout 600 --workers 8" + ./manage.py migrate && + ./manage.py collectstatic --no-input --verbosity 0 --clear && + gunicorn vulnerablecode.wsgi:application -u nobody -g nogroup --bind :8000 --timeout 600 --workers 8" env_file: - docker.env expose: @@ -60,6 +60,17 @@ services: - db - vulnerablecode + vulnerablecode_rqworker_live: + build: . + command: wait-for-it web:8000 -- python ./manage.py rqworker live + env_file: + - docker.env + volumes: + - /etc/vulnerablecode/:/etc/vulnerablecode/ + depends_on: + - vulnerablecode_redis + - db + - vulnerablecode nginx: image: nginx @@ -75,9 +86,7 @@ services: depends_on: - vulnerablecode - volumes: db_data: static: vulnerablecode_redis_data: - diff --git a/docker.env b/docker.env index bc0c3c9d5..4188cd889 100644 --- a/docker.env +++ b/docker.env @@ -5,4 +5,6 @@ POSTGRES_PASSWORD=vulnerablecode VULNERABLECODE_DB_HOST=db VULNERABLECODE_STATIC_ROOT=/var/vulnerablecode/static/ -VULNERABLECODE_REDIS_HOST=vulnerablecode_redis \ No newline at end of file +VULNERABLECODE_REDIS_HOST=vulnerablecode_redis + +VULNERABLECODE_ENABLE_LIVE_EVALUATION_API=false \ No newline at end of file diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 1dbe0433a..43506500f 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -1304,6 +1304,7 @@ class LiveEvaluationSerializer(serializers.Serializer): class LiveEvaluationViewSet(viewsets.GenericViewSet): serializer_class = LiveEvaluationSerializer + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] @extend_schema( request=LiveEvaluationSerializer, @@ -1346,34 +1347,15 @@ def evaluate(self, request): status=status.HTTP_400_BAD_REQUEST, ) - # Create a single LivePipelineRun to represent this evaluation - from vulnerabilities.models import LivePipelineRun - - live_run = LivePipelineRun.objects.create(purl=purl_string) - runs = [] - for importer in importers: - importer_name = getattr(importer, "pipeline_id", importer.__name__) - run_id = enqueue_ad_hoc_pipeline(importer_name, inputs={"purl": purl}) - # Attach each PipelineRun to the LivePipelineRun - from vulnerabilities.models import PipelineRun - - try: - run_obj = PipelineRun.objects.get(run_id=run_id) - run_obj.live_pipeline = live_run - run_obj.save() - except PipelineRun.DoesNotExist: - pass - runs.append( - { - "importer": importer_name, - "run_id": str(run_id) if run_id else None, - } - ) + # Enqueue all selected importers together and link runs to a new LivePipelineRun + importer_ids = [getattr(imp, "pipeline_id", imp.__name__) for imp in importers] + live_run_id, run_ids = enqueue_ad_hoc_pipeline(importer_ids, inputs={"purl": purl}) + runs = [ + {"importer": importer_ids[idx], "run_id": str(rid)} for idx, rid in enumerate(run_ids) + ] request_obj = request - status_path = reverse( - "live-evaluation-status", kwargs={"live_run_id": str(live_run.run_id)} - ) + status_path = reverse("live-evaluation-status", kwargs={"live_run_id": str(live_run_id)}) if hasattr(request_obj, "build_absolute_uri"): status_url = request_obj.build_absolute_uri(status_path) @@ -1382,7 +1364,7 @@ def evaluate(self, request): return Response( { - "live_run_id": str(live_run.run_id), + "live_run_id": str(live_run_id), "runs": runs, "status_url": status_url, }, @@ -1431,6 +1413,7 @@ def status(self, request, live_run_id=None): "live_run_id": str(live_run.run_id), "overall_status": live_run.status, "created_date": live_run.created_date, + "started_date": getattr(live_run, "started_date", None), "completed_date": live_run.completed_date, "purl": live_run.purl, "importers": importer_statuses, diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 5a10dd9ef..6ca9bbb5d 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -1985,17 +1985,37 @@ def is_finished(self): return self.status == "finished" def update_status(self): - if not self.pipelineruns.exists(): + runs = list(self.pipelineruns.all()) + if not runs: self.status = "queued" - elif all(run.status == PipelineRun.Status.SUCCESS for run in self.pipelineruns.all()): + self.completed_date = None + self.save(update_fields=["status", "completed_date"]) + return + + # Determine aggregate status + if all(r.status == PipelineRun.Status.SUCCESS for r in runs): self.status = "finished" - self.completed_date = timezone.now() - elif any(run.status == PipelineRun.Status.FAILURE for run in self.pipelineruns.all()): + elif any(r.status == PipelineRun.Status.FAILURE for r in runs): self.status = "failed" - self.completed_date = timezone.now() - else: + elif any(r.status == PipelineRun.Status.RUNNING for r in runs): self.status = "running" - self.save() + else: + # queued or mixed queued + self.status = "queued" + + end_times = [r.run_end_date for r in runs if r.run_end_date] + completed = None + if self.status in ("finished", "failed") and end_times: + completed = max(end_times) + self.completed_date = completed + self.save(update_fields=["status", "completed_date"]) + + @property + def started_date(self): + """Return earliest run_start_date among child runs, if any.""" + runs = self.pipelineruns.all() + start_times = [r.run_start_date for r in runs if r.run_start_date] + return min(start_times) if start_times else None class Meta: ordering = ["-created_date"] @@ -2281,10 +2301,8 @@ def append_to_log(self, message, is_multiline=False): message = message.strip() if not is_multiline: message = message.replace("\n", "").replace("\r", "") - - new_log = (self.log or "") + message + "\n" - type(self).objects.filter(run_id=self.run_id).update(log=new_log) - self.log = new_log + self.log = (self.log or "") + message + "\n" + self.save(update_fields=["log"]) def dequeue(self): from vulnerabilities.tasks import dequeue_job diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index b42a8fdc1..3871f19cb 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -22,6 +22,7 @@ logger = logging.getLogger(__name__) default_queue = django_rq.get_queue("default") +live_queue = django_rq.get_queue("live") def execute_pipeline(pipeline_id, run_id, inputs=None): @@ -132,35 +133,59 @@ def enqueue_pipeline(pipeline_id): ) -def enqueue_ad_hoc_pipeline(pipeline_id, *, inputs=None): - """Enqueue a one-off execution for the given pipeline_id with optional inputs. +def enqueue_ad_hoc_pipeline(pipeline_ids, *, inputs=None): + """Enqueue one-off executions for the given pipeline_ids with optional inputs. - Returns the created run_id or None if the pipeline cannot be enqueued. + When multiple pipeline IDs are provided, this will create a single LivePipelineRun and attach + each created PipelineRun to it. Returns a tuple of (live_run_id, run_ids). + + If a single pipeline ID (str) is provided, it will be wrapped into a list. """ + inputs = inputs or {} + # Normalize to list + if isinstance(pipeline_ids, str): + pipeline_ids = [pipeline_ids] + + # Create a LivePipelineRun to group these ad-hoc runs, if any inputs (such as purl) are given + purl_val = inputs.get("purl") try: - pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) - except models.PipelineSchedule.DoesNotExist: - pipeline_schedule = models.PipelineSchedule.objects.create( - pipeline_id=pipeline_id, - is_active=False, - ) + # accept PackageURL instance as well as string + purl_str = str(purl_val) if purl_val is not None else None + except Exception: + purl_str = None + + live_run = models.LivePipelineRun.objects.create(purl=purl_str) + + run_ids = [] + for pipeline_id in pipeline_ids: + try: + pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) + except models.PipelineSchedule.DoesNotExist: + pipeline_schedule = models.PipelineSchedule.objects.create( + pipeline_id=pipeline_id, + is_active=False, + ) - run = models.PipelineRun.objects.create(pipeline=pipeline_schedule) + run = models.PipelineRun.objects.create(pipeline=pipeline_schedule, live_pipeline=live_run) + + # Enqueue on the dedicated live queue + live_queue.enqueue( + execute_pipeline, + pipeline_id, + run.run_id, + inputs, + job_id=str(run.run_id), + on_failure=set_run_failure, + job_timeout=f"{pipeline_schedule.execution_timeout}h", + ) + run_ids.append(run.run_id) - live_queue = django_rq.get_queue("live") - job = live_queue.enqueue( - execute_pipeline, - pipeline_id, - run.run_id, - inputs or {}, - job_id=str(run.run_id), - on_failure=set_run_failure, - job_timeout=f"{pipeline_schedule.execution_timeout}h", - ) - return run.run_id + return live_run.run_id, run_ids def dequeue_job(job_id): """Remove a job from queue if it hasn't been executed yet.""" if job_id in default_queue.jobs: default_queue.remove(job_id) + if job_id in live_queue.jobs: + live_queue.remove(job_id) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 1829f095d..923549ed9 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -913,27 +913,15 @@ def setUp(self): @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") @patch("vulnerabilities.api_v2.enqueue_ad_hoc_pipeline") - @patch("vulnerabilities.models.PipelineRun.objects.get") - @patch("vulnerabilities.models.LivePipelineRun.objects.create") @patch("django.urls.reverse") - def test_evaluate_success( - self, mock_reverse, mock_live_create, mock_pipeline_get, mock_enqueue, mock_registry - ): + def test_evaluate_success(self, mock_reverse, mock_enqueue, mock_registry): class MockImporter: pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] mock_registry.values.return_value = [MockImporter] valid_uuid = "00000000-0000-0000-0000-000000000001" - mock_live_run = type("MockLiveRun", (), {"run_id": valid_uuid})() - mock_live_create.return_value = mock_live_run - mock_enqueue.return_value = "mock-run-id" - mock_pipeline_run = type( - "MockPipelineRun", - (), - {"run_id": "mock-run-id", "live_pipeline": None, "save": lambda self: None}, - )() - mock_pipeline_get.return_value = mock_pipeline_run + mock_enqueue.return_value = (valid_uuid, ["mock-run-id"]) mock_reverse.return_value = f"/api/v2/live-evaluation/status/{valid_uuid}" data = {"purl": "pkg:pypi/django@3.2"} @@ -942,7 +930,7 @@ class MockImporter: assert isinstance(response.data, dict) assert response.data["live_run_id"] is not None assert response.data["runs"][0]["importer"] == "pypa_live_importer_v2" - assert response.data["runs"][0]["run_id"] is not None + assert response.data["runs"][0]["run_id"] == "mock-run-id" assert "status_url" in response.data assert response.data["status_url"].endswith(f"/api/v2/live-evaluation/status/{valid_uuid}") diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index 52dce9b49..57c28f13b 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -697,6 +697,15 @@ def get_context_data(self, **kwargs): context["disabled_pipeline_count"] = PipelineSchedule.objects.filter( pipeline_id__in=live_pipeline_ids, is_active=False ).count() + + # Update status of recent LivePipelineRun groups for freshness + try: + from vulnerabilities.models import LivePipelineRun + + for lpr in LivePipelineRun.objects.order_by("-created_date")[:50]: + lpr.update_status() + except Exception: + pass return context diff --git a/vulnerablecode/settings.py b/vulnerablecode/settings.py index dd7693037..17c7aae28 100644 --- a/vulnerablecode/settings.py +++ b/vulnerablecode/settings.py @@ -181,6 +181,10 @@ "VULNERABLECODEIO_REQUIRE_AUTHENTICATION", default=False ) +VULNERABLECODE_ENABLE_LIVE_EVALUATION_API = env.bool( + "VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", default=False +) + LOGIN_REDIRECT_URL = "/" LOGOUT_REDIRECT_URL = "/" @@ -383,12 +387,11 @@ "PORT": env.str("VULNERABLECODE_REDIS_PORT", default="6379"), "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=""), "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_REDIS_DEFAULT_TIMEOUT", default=3600), - } -} - -RQ_QUEUES["live"] = { - "HOST": env.str("VULNERABLECODE_REDIS_HOST", default=RQ_QUEUES["default"]["HOST"]), - "PORT": env.str("VULNERABLECODE_REDIS_PORT", default=RQ_QUEUES["default"]["PORT"]), - "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=RQ_QUEUES["default"]["PASSWORD"]), - "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_LIVE_REDIS_DEFAULT_TIMEOUT", default=3600), + }, + "live": { + "HOST": env.str("VULNERABLECODE_REDIS_HOST", default="localhost"), + "PORT": env.str("VULNERABLECODE_REDIS_PORT", default="6379"), + "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=""), + "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_LIVE_REDIS_DEFAULT_TIMEOUT", default=3600), + }, } diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 00de4ccaa..08085655d 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -46,6 +46,7 @@ from vulnerabilities.views import VulnerabilitySearch from vulnerablecode.settings import DEBUG from vulnerablecode.settings import DEBUG_TOOLBAR +from vulnerablecode.settings import VULNERABLECODE_ENABLE_LIVE_EVALUATION_API # See the comment at https://stackoverflow.com/a/46163870. @@ -71,7 +72,9 @@ def __init__(self, *args, **kwargs): api_v2_router.register("codefixes", CodeFixViewSet, basename="codefix") api_v2_router.register("pipelines", PipelineScheduleV2ViewSet, basename="pipelines") api_v2_router.register("advisory-codefixes", CodeFixV2ViewSet, basename="advisory-codefix") -api_v2_router.register("live-evaluation", LiveEvaluationViewSet, basename="live-evaluation") + +if VULNERABLECODE_ENABLE_LIVE_EVALUATION_API: + api_v2_router.register("live-evaluation", LiveEvaluationViewSet, basename="live-evaluation") urlpatterns = [ From 5d4312be4bf73137769aae807e8050b501c00f2f Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 4 Sep 2025 17:35:48 +0300 Subject: [PATCH 8/9] Fix Live Evaluation API tests #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/tests/test_api_v2.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 923549ed9..7425bfcc0 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -7,6 +7,7 @@ # See https://aboutcode.org for more information about nexB OSS projects. # +import os from unittest.mock import patch from django.contrib.auth.models import User @@ -919,6 +920,7 @@ class MockImporter: pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] + os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_registry.values.return_value = [MockImporter] valid_uuid = "00000000-0000-0000-0000-000000000001" mock_enqueue.return_value = (valid_uuid, ["mock-run-id"]) @@ -940,6 +942,7 @@ class MockImporter: pipeline_id = "dummy" supported_types = ["npm"] + os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_registry.values.return_value = [MockImporter] data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") @@ -947,6 +950,7 @@ class MockImporter: assert "No live importers found" in response.data["error"] def test_evaluate_invalid_purl(self): + os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" data = {"purl": "not_a_valid_purl"} response = self.client.post(self.url, data, format="json") assert response.status_code == 400 @@ -954,6 +958,7 @@ def test_evaluate_invalid_purl(self): @patch("vulnerabilities.models.LivePipelineRun.objects.get") def test_status_not_found(self, mock_live_get): + os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_live_get.side_effect = LivePipelineRun.DoesNotExist() url = "/api/v2/live-evaluation/status/00000000-0000-0000-0000-000000000000" response = self.client.get(url) From 15868dee44512419697508515edd6e3c7ebfe909 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Thu, 4 Sep 2025 18:21:56 +0300 Subject: [PATCH 9/9] Fix Live Evaluation API tests #1953 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/api_v2.py | 12 ++++++++++++ vulnerabilities/tests/test_api_v2.py | 14 ++++++++++---- vulnerablecode/urls.py | 4 +--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 43506500f..845ab8081 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -44,6 +44,7 @@ from vulnerabilities.models import Weakness from vulnerabilities.tasks import enqueue_ad_hoc_pipeline from vulnerabilities.throttling import PermissionBasedUserRateThrottle +from vulnerablecode.settings import VULNERABLECODE_ENABLE_LIVE_EVALUATION_API class CharInFilter(filters.BaseInFilter, filters.CharFilter): @@ -1316,6 +1317,11 @@ class LiveEvaluationViewSet(viewsets.GenericViewSet): ) @action(detail=False, methods=["post"]) def evaluate(self, request): + if not VULNERABLECODE_ENABLE_LIVE_EVALUATION_API: + return Response( + {"error": "Live evaluation API is disabled."}, + status=status.HTTP_403_FORBIDDEN, + ) serializer = self.get_serializer(data=request.data) if not serializer.is_valid(): return Response( @@ -1385,6 +1391,12 @@ def evaluate(self, request): ) @action(detail=False, methods=["get"], url_path=r"status/(?P[0-9a-f\-]{36})") def status(self, request, live_run_id=None): + if not VULNERABLECODE_ENABLE_LIVE_EVALUATION_API: + return Response( + {"error": "Live evaluation API is disabled."}, + status=status.HTTP_403_FORBIDDEN, + ) + from vulnerabilities.models import LivePipelineRun try: diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 7425bfcc0..c1f9e376d 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -912,6 +912,7 @@ def setUp(self): self.client = APIClient(enforce_csrf_checks=True) self.url = "/api/v2/live-evaluation/evaluate" + @patch("vulnerabilities.api_v2.VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", True) @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") @patch("vulnerabilities.api_v2.enqueue_ad_hoc_pipeline") @patch("django.urls.reverse") @@ -920,7 +921,6 @@ class MockImporter: pipeline_id = "pypa_live_importer_v2" supported_types = ["pypi"] - os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_registry.values.return_value = [MockImporter] valid_uuid = "00000000-0000-0000-0000-000000000001" mock_enqueue.return_value = (valid_uuid, ["mock-run-id"]) @@ -936,30 +936,36 @@ class MockImporter: assert "status_url" in response.data assert response.data["status_url"].endswith(f"/api/v2/live-evaluation/status/{valid_uuid}") + @patch("vulnerabilities.api_v2.VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", True) @patch("vulnerabilities.api_v2.LIVE_IMPORTERS_REGISTRY") def test_evaluate_no_importer_found(self, mock_registry): class MockImporter: pipeline_id = "dummy" supported_types = ["npm"] - os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_registry.values.return_value = [MockImporter] data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(self.url, data, format="json") assert response.status_code == 400 assert "No live importers found" in response.data["error"] + @patch("vulnerabilities.api_v2.VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", True) def test_evaluate_invalid_purl(self): - os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" data = {"purl": "not_a_valid_purl"} response = self.client.post(self.url, data, format="json") assert response.status_code == 400 assert "Invalid PackageURL" in response.data["error"] + @patch("vulnerabilities.api_v2.VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", True) @patch("vulnerabilities.models.LivePipelineRun.objects.get") def test_status_not_found(self, mock_live_get): - os.environ["VULNERABLECODE_ENABLE_LIVE_EVALUATION_API"] = "true" mock_live_get.side_effect = LivePipelineRun.DoesNotExist() url = "/api/v2/live-evaluation/status/00000000-0000-0000-0000-000000000000" response = self.client.get(url) assert response.status_code == 404 + + @patch("vulnerabilities.api_v2.VULNERABLECODE_ENABLE_LIVE_EVALUATION_API", False) + def test_evaluate_disabled_returns_403(self): + data = {"purl": "pkg:pypi/django@3.2"} + response = self.client.post(self.url, data, format="json") + assert response.status_code == 403 diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 08085655d..76ef7965d 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -46,7 +46,6 @@ from vulnerabilities.views import VulnerabilitySearch from vulnerablecode.settings import DEBUG from vulnerablecode.settings import DEBUG_TOOLBAR -from vulnerablecode.settings import VULNERABLECODE_ENABLE_LIVE_EVALUATION_API # See the comment at https://stackoverflow.com/a/46163870. @@ -73,8 +72,7 @@ def __init__(self, *args, **kwargs): api_v2_router.register("pipelines", PipelineScheduleV2ViewSet, basename="pipelines") api_v2_router.register("advisory-codefixes", CodeFixV2ViewSet, basename="advisory-codefix") -if VULNERABLECODE_ENABLE_LIVE_EVALUATION_API: - api_v2_router.register("live-evaluation", LiveEvaluationViewSet, basename="live-evaluation") +api_v2_router.register("live-evaluation", LiveEvaluationViewSet, basename="live-evaluation") urlpatterns = [