31
31
from vulnerabilities .utils import build_description
32
32
from vulnerabilities .utils import get_advisory_url
33
33
from vulnerabilities .utils import get_cwe_id
34
+ from vulntotal .datasources .gitlab import get_casesensitive_slug
35
+ from vulntotal .datasources .gitlab_api import fetch_gitlab_advisories_for_purl
36
+ from vulntotal .datasources .gitlab_api import get_estimated_advisories_count
34
37
35
38
36
39
class GitLabImporterPipeline (VulnerableCodeBaseImporterPipelineV2 ):
@@ -45,9 +48,22 @@ class GitLabImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
45
48
license_url = "https://gitlab.com/gitlab-org/advisories-community/-/blob/main/LICENSE"
46
49
repo_url = "git+https://gitlab.com/gitlab-org/advisories-community/"
47
50
unfurl_version_ranges = True
51
+ is_batch_run = True
52
+
53
+ def __init__ (self , * args , purl = None , ** kwargs ):
54
+ super ().__init__ (* args , ** kwargs )
55
+ self .purl = purl
56
+ # If a purl is provided, we are running in package-first mode
57
+ if self .purl :
58
+ GitlabImporterPipeline .is_batch_run = False
48
59
49
60
@classmethod
50
61
def steps (cls ):
62
+ if not cls .is_batch_run :
63
+ return (
64
+ cls .collect_and_store_advisories ,
65
+ cls .clean_downloads ,
66
+ )
51
67
return (
52
68
cls .clone ,
53
69
cls .collect_and_store_advisories ,
@@ -69,14 +85,50 @@ def steps(cls):
69
85
gitlab_scheme_by_purl_type = {v : k for k , v in purl_type_by_gitlab_scheme .items ()}
70
86
71
87
def clone (self ):
72
- self .log (f"Cloning `{ self .repo_url } `" )
73
- self .vcs_response = fetch_via_vcs (self .repo_url )
88
+ if self .is_batch_run :
89
+ self .log (f"Cloning `{ self .repo_url } `" )
90
+ self .vcs_response = fetch_via_vcs (self .repo_url )
74
91
75
92
def advisories_count (self ):
76
- root = Path (self .vcs_response .dest_dir )
77
- return sum (1 for _ in root .rglob ("*.yml" ))
93
+ if self .is_batch_run :
94
+ root = Path (self .vcs_response .dest_dir )
95
+ return sum (1 for _ in root .rglob ("*.yml" ))
96
+ else :
97
+ return get_estimated_advisories_count (
98
+ self .purl , self .supported_ecosystem (), get_casesensitive_slug
99
+ )
78
100
79
101
def collect_advisories (self ) -> Iterable [AdvisoryData ]:
102
+ if not self .is_batch_run :
103
+ advisories = fetch_gitlab_advisories_for_purl (
104
+ self .purl , self .supported_ecosystem (), get_casesensitive_slug
105
+ )
106
+
107
+ input_version = self .purl .version
108
+ vrc = RANGE_CLASS_BY_SCHEMES [self .purl .type ]
109
+ version_obj = vrc .version_class (input_version ) if input_version else None
110
+
111
+ for advisory in advisories :
112
+ advisory_data = self ._advisory_dict_to_advisory_data (advisory )
113
+ # If purl has version, we need to check if advisory affects the version
114
+ if input_version :
115
+ affected = False
116
+ for affected_package in advisory_data .affected_packages :
117
+ vrange = affected_package .affected_version_range
118
+ fixed_version = affected_package .fixed_version
119
+ if vrange and version_obj in vrange :
120
+ if fixed_version :
121
+ fixed_version_obj = vrc .version_class (str (fixed_version ))
122
+ if version_obj >= fixed_version_obj :
123
+ continue
124
+ affected = True
125
+ break
126
+ if affected :
127
+ yield advisory_data
128
+ else :
129
+ yield advisory_data
130
+ return
131
+
80
132
base_path = Path (self .vcs_response .dest_dir )
81
133
82
134
for file_path in base_path .rglob ("*.yml" ):
@@ -113,13 +165,22 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
113
165
yield advisory
114
166
115
167
def clean_downloads (self ):
116
- if self .vcs_response :
168
+ if self .is_batch_run and hasattr ( self , "vcs_response" ) and self . vcs_response :
117
169
self .log (f"Removing cloned repository" )
118
170
self .vcs_response .delete ()
119
171
120
172
def on_failure (self ):
121
173
self .clean_downloads ()
122
174
175
+ def _advisory_dict_to_advisory_data (self , advisory ):
176
+ return advisory_dict_to_advisory_data (
177
+ advisory = advisory ,
178
+ purl_type_by_gitlab_scheme = self .purl_type_by_gitlab_scheme ,
179
+ gitlab_scheme_by_purl_type = self .gitlab_scheme_by_purl_type ,
180
+ logger = self .log ,
181
+ purl = self .purl ,
182
+ )
183
+
123
184
124
185
def parse_advisory_path (base_path : Path , file_path : Path ) -> Tuple [str , str , str ]:
125
186
"""
@@ -326,3 +387,109 @@ def parse_gitlab_advisory(
326
387
weaknesses = cwe_list ,
327
388
url = advisory_url ,
328
389
)
390
+
391
+
392
+ def advisory_dict_to_advisory_data (
393
+ advisory : dict ,
394
+ purl_type_by_gitlab_scheme ,
395
+ gitlab_scheme_by_purl_type ,
396
+ logger ,
397
+ purl = None ,
398
+ advisory_url = None ,
399
+ ):
400
+ """
401
+ Convert a GitLab advisory dict to AdvisoryData.
402
+ """
403
+ aliases = advisory .get ("identifiers" , [])
404
+ identifier = advisory .get ("identifier" , "" )
405
+ summary = build_description (advisory .get ("title" ), advisory .get ("description" ))
406
+ urls = advisory .get ("urls" , [])
407
+ references = [ReferenceV2 .from_url (u ) for u in urls ]
408
+
409
+ cwe_ids = advisory .get ("cwe_ids" ) or []
410
+ cwe_list = list (map (get_cwe_id , cwe_ids ))
411
+
412
+ date_published = dateparser .parse (advisory .get ("pubdate" ))
413
+ date_published = date_published .replace (tzinfo = pytz .UTC )
414
+
415
+ package_slug = advisory .get ("package_slug" )
416
+
417
+ # Determine purl if not provided
418
+ if not purl :
419
+ purl = get_purl (
420
+ package_slug = package_slug ,
421
+ purl_type_by_gitlab_scheme = purl_type_by_gitlab_scheme ,
422
+ logger = logger ,
423
+ )
424
+
425
+ if not purl :
426
+ logger (
427
+ f"advisory_dict_to_advisory_data: purl is not valid: { package_slug !r} " ,
428
+ level = logging .ERROR ,
429
+ )
430
+ return AdvisoryData (
431
+ aliases = aliases ,
432
+ summary = summary ,
433
+ references_v2 = references ,
434
+ date_published = date_published ,
435
+ url = advisory_url ,
436
+ )
437
+
438
+ affected_version_range = None
439
+ fixed_versions = advisory .get ("fixed_versions" ) or []
440
+ affected_range = advisory .get ("affected_range" )
441
+ gitlab_native_schemes = set (["pypi" , "gem" , "npm" , "go" , "packagist" , "conan" ])
442
+ vrc : VersionRange = RANGE_CLASS_BY_SCHEMES [purl .type ]
443
+ gitlab_scheme = gitlab_scheme_by_purl_type [purl .type ]
444
+ try :
445
+ if affected_range :
446
+ if gitlab_scheme in gitlab_native_schemes :
447
+ affected_version_range = from_gitlab_native (
448
+ gitlab_scheme = gitlab_scheme , string = affected_range
449
+ )
450
+ else :
451
+ affected_version_range = vrc .from_native (affected_range )
452
+ except Exception as e :
453
+ logger (
454
+ f"advisory_dict_to_advisory_data: affected_range is not parsable: { affected_range !r} for: { purl !s} error: { e !r} \n { traceback .format_exc ()} " ,
455
+ level = logging .ERROR ,
456
+ )
457
+
458
+ parsed_fixed_versions = []
459
+ for fixed_version in fixed_versions :
460
+ try :
461
+ fixed_version = vrc .version_class (fixed_version )
462
+ parsed_fixed_versions .append (fixed_version )
463
+ except Exception as e :
464
+ logger (
465
+ f"advisory_dict_to_advisory_data: fixed_version is not parsable`: { fixed_version !r} error: { e !r} \n { traceback .format_exc ()} " ,
466
+ level = logging .ERROR ,
467
+ )
468
+
469
+ if parsed_fixed_versions :
470
+ affected_packages = list (
471
+ extract_affected_packages (
472
+ affected_version_range = affected_version_range ,
473
+ fixed_versions = parsed_fixed_versions ,
474
+ purl = purl ,
475
+ )
476
+ )
477
+ else :
478
+ if not affected_version_range :
479
+ affected_packages = []
480
+ else :
481
+ affected_packages = [
482
+ AffectedPackage (
483
+ package = purl ,
484
+ affected_version_range = affected_version_range ,
485
+ )
486
+ ]
487
+ return AdvisoryData (
488
+ aliases = aliases ,
489
+ summary = summary ,
490
+ references_v2 = references ,
491
+ date_published = date_published ,
492
+ affected_packages = affected_packages ,
493
+ weaknesses = cwe_list ,
494
+ url = advisory_url ,
495
+ )
0 commit comments