Skip to content

Modify Elixir Security importer to support package-first mode #1935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 91 additions & 3 deletions vulnerabilities/importers/elixir_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import urllib.parse as urlparse
import logging
import os
import tempfile
from pathlib import Path
from typing import Set

import requests
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import HexVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
Expand All @@ -30,7 +34,22 @@ class ElixirSecurityImporter(Importer):
spdx_license_expression = "CC0-1.0"
importer_name = "Elixir Security Importer"

def __init__(self, purl=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
if self.purl:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is being used at multiple importers, we shall extract it out as a function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree
We can modify the constructor of the base class instead of modifying each individual importer's constructor.
But I believe in this case we won't show the warning messages if the purl is not right for the importer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always pass the message or the type or even the log as a parameter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of passing the supported types, that would make it generic with less duplicate code
The warning message is the same, so it will adapt if we pass supported types

if self.purl.type != "hex":
print(
f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories"
)

def advisory_data(self) -> Set[AdvisoryData]:
if not self.purl:
return self._batch_advisory_data()

return self._package_first_advisory_data()

def _batch_advisory_data(self) -> Set[AdvisoryData]:
try:
self.clone(self.repo_url)
base_path = Path(self.vcs_response.dest_dir)
Expand All @@ -41,8 +60,77 @@ def advisory_data(self) -> Set[AdvisoryData]:
if self.vcs_response:
self.vcs_response.delete()

def process_file(self, file, base_path):
relative_path = str(file.relative_to(base_path)).strip("/")
def _package_first_advisory_data(self) -> Set[AdvisoryData]:
if self.purl.type != "hex":
logging.warning(
f"PURL type {self.purl.type} is not supported by Elixir Security importer"
)
return []

package_name = self.purl.name

try:
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
response = requests.get(directory_url)

if response.status_code != 200:
logging.info(f"No advisories found for {package_name} in Elixir Security Database")
return []

yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")]

for file_path in yaml_files:
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
content_response = requests.get(
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
)

if content_response.status_code != 200:
logging.warning(f"Failed to fetch file content for {file_path}")
continue

# Create a temporary file to store the content
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
temp_file.write(content_response.text)
temp_path = temp_file.name

try:
for advisory in self.process_file(temp_path, Path(""), file_path=file_path):
if self.purl.version and not self._advisory_affects_version(advisory):
continue

yield advisory
finally:
if os.path.exists(temp_path):
os.remove(temp_path)

except Exception as e:
logging.error(f"Error fetching advisories for {self.purl}: {str(e)}")
return []

def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
if not self.purl.version:
return True

for affected_package in advisory.affected_packages:
if affected_package.affected_version_range:
try:
purl_version = SemverVersion(self.purl.version)

if purl_version in affected_package.affected_version_range:
return True
except Exception as e:
logging.warning(f"Failed to parse version {self.purl.version}: {str(e)}")
return True

return False

def process_file(self, file, base_path, file_path=None):
if file_path:
relative_path = file_path
else:
relative_path = str(Path(file).relative_to(base_path)).strip("/")

advisory_url = (
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
)
Expand Down
133 changes: 125 additions & 8 deletions vulnerabilities/pipelines/v2_importers/elixir_security_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
import tempfile
from pathlib import Path
from typing import Iterable

import requests
from dateutil import parser as dateparser
from fetchcode.vcs import fetch_via_vcs
from packageurl import PackageURL
from univers.version_constraint import VersionConstraint
from univers.version_range import HexVersionRange
from univers.versions import SemverVersion

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
Expand All @@ -37,25 +41,65 @@ class ElixirSecurityImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
repo_url = "git+https://github.com/dependabot/elixir-security-advisories"
unfurl_version_ranges = True

is_batch_run = True

def __init__(self, *args, purl=None, **kwargs):
super().__init__(*args, **kwargs)
self.purl = purl
if self.purl:
ElixirSecurityImporterPipeline.is_batch_run = False
if self.purl.type != "hex":
self.log(
f"Warning: PURL type {self.purl.type} is not 'hex', may not match any advisories"
)

Comment on lines +44 to +55
Copy link
Member

@keshav-space keshav-space Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not add purl as a primary argument to Pipeline. Primary arguments to the pipeline are exclusively reserved for managing pipeline execution. Instead, you can pass purl to the pipeline like this: ElixirSecurityImporterPipeline(purl="pkg:hex/coherence") and then access it inside a pipeline step using self.inputs["purl"].

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the constructor has arguments for managing pipeline execution (in BasePipelineRun class).
How do I access the purl argument without overriding the constructor or modifying BasePipelineRun constructor?
The way I approached this was to call the parent constructor with the arguments so that the pipeline functions properly, and then take the PURL which is important in the importer-level.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And based on @TG1999's comment, we can unify the purl handling in package-first mode by overriding the VulnerableCodeBaseImporterPipeline constructor for example to do something like this.

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.purl = kwargs.get("purl")
    self.supported_types = kwargs.get("supported_types)

Then doing the package-first checks and warnings which is unified for all importers.

@classmethod
def steps(cls):
if not cls.is_batch_run:
return (cls.collect_and_store_advisories,)
return (cls.clone, cls.collect_and_store_advisories, cls.clean_downloads)

def clean_downloads(self):
if self.vcs_response:
if self.is_batch_run and self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)
if self.is_batch_run:
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def advisories_count(self) -> int:
if not self.is_batch_run:
return self._count_package_advisories()

base_path = Path(self.vcs_response.dest_dir)
count = len(list((base_path / "packages").glob("**/*.yml")))
return count

def _count_package_advisories(self) -> int:
if self.purl.type != "hex":
return 0

try:
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{self.purl.name}"
response = requests.get(directory_url)

if response.status_code != 200:
return 0

yaml_files = [file for file in response.json() if file["name"].endswith(".yml")]
return len(yaml_files)
except Exception:
return 0

def collect_advisories(self) -> Iterable[AdvisoryData]:
if not self.is_batch_run:
return self._collect_package_advisories()

return self._collect_batch_advisories()

def _collect_batch_advisories(self) -> Iterable[AdvisoryData]:
try:
base_path = Path(self.vcs_response.dest_dir)
vuln = base_path / "packages"
Expand All @@ -65,11 +109,84 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
if self.vcs_response:
self.vcs_response.delete()

def process_file(self, file, base_path) -> Iterable[AdvisoryData]:
relative_path = str(file.relative_to(base_path)).strip("/")
path_segments = str(file).split("/")
# use the last two segments as the advisory ID
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")
def _collect_package_advisories(self) -> Iterable[AdvisoryData]:
if self.purl.type != "hex":
self.log(f"PURL type {self.purl.type} is not supported by Elixir Security importer")
return []

package_name = self.purl.name

try:
directory_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/packages/{package_name}"
response = requests.get(directory_url)

if response.status_code != 200:
self.log(f"No advisories found for {package_name} in Elixir Security Database")
return []

yaml_files = [file["path"] for file in response.json() if file["name"].endswith(".yml")]

for file_path in yaml_files:
content_url = f"https://api.github.com/repos/dependabot/elixir-security-advisories/contents/{file_path}"
content_response = requests.get(
content_url, headers={"Accept": "application/vnd.github.v3.raw"}
)

if content_response.status_code != 200:
self.log(f"Failed to fetch file content for {file_path}")
continue

# Create a temporary file to store the content
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
temp_file.write(content_response.text)
temp_path = temp_file.name

try:
for advisory in self.process_file(
Path(temp_path), Path(""), file_path=file_path
):
if self.purl.version and not self._advisory_affects_version(advisory):
continue

yield advisory
finally:
if os.path.exists(temp_path):
os.remove(temp_path)

except Exception as e:
self.log(f"Error fetching advisories for {self.purl}: {str(e)}")
return []

def _advisory_affects_version(self, advisory: AdvisoryData) -> bool:
if not self.purl.version:
return True

for affected_package in advisory.affected_packages:
if affected_package.affected_version_range:
try:
purl_version = SemverVersion(self.purl.version)

if purl_version in affected_package.affected_version_range:
return True
except Exception as e:
self.log(f"Failed to parse version {self.purl.version}: {str(e)}")
return True

return False

def process_file(self, file, base_path, file_path=None) -> Iterable[AdvisoryData]:
if file_path:
relative_path = file_path
advisory_id = (
file_path.replace(".yml", "").split("/")[-2]
+ "/"
+ file_path.replace(".yml", "").split("/")[-1]
)
else:
relative_path = str(file.relative_to(base_path)).strip("/")
path_segments = str(file).split("/")
advisory_id = "/".join(path_segments[-2:]).replace(".yml", "")

advisory_url = (
f"https://github.com/dependabot/elixir-security-advisories/blob/master/{relative_path}"
)
Expand Down
Loading