1
+ from collections import defaultdict
2
+ from typing import Dict , List , Any
3
+
4
+
5
+ class Dedupe :
6
+ @staticmethod
7
+ def normalize_file_path (path : str ) -> str :
8
+ return path .split ("/" , 1 )[- 1 ] if path and "/" in path else path or ""
9
+
10
+ @staticmethod
11
+ def alert_key (alert : dict ) -> tuple :
12
+ return (
13
+ alert ["type" ],
14
+ alert ["severity" ],
15
+ alert ["category" ],
16
+ Dedupe .normalize_file_path (alert .get ("file" )),
17
+ alert .get ("start" ),
18
+ alert .get ("end" )
19
+ )
20
+
21
+ @staticmethod
22
+ def consolidate_and_merge_alerts (package_group : List [Dict [str , Any ]]) -> Dict [str , Any ]:
23
+ def alert_identity (alert : dict ) -> tuple :
24
+ return (
25
+ alert ["type" ],
26
+ alert ["severity" ],
27
+ alert ["category" ],
28
+ Dedupe .normalize_file_path (alert .get ("file" )),
29
+ alert .get ("start" ),
30
+ alert .get ("end" )
31
+ )
32
+
33
+ alert_map : Dict [tuple , dict ] = {}
34
+ releases = set ()
35
+ for pkg in package_group :
36
+ release = pkg .get ("release" ) if pkg .get ("release" ) is not None else pkg .get ("type" )
37
+ releases .add (release )
38
+
39
+ for alert in pkg .get ("alerts" , []):
40
+ identity = alert_identity (alert )
41
+ file = Dedupe .normalize_file_path (alert .get ("file" ))
42
+
43
+ if identity not in alert_map :
44
+ alert_map [identity ] = {
45
+ "key" : alert ["key" ], # keep the first key seen
46
+ "type" : alert ["type" ],
47
+ "severity" : alert ["severity" ],
48
+ "category" : alert ["category" ],
49
+ "file" : file ,
50
+ "start" : alert .get ("start" ),
51
+ "end" : alert .get ("end" ),
52
+ "releases" : [release ]
53
+ }
54
+ else :
55
+ if release not in alert_map [identity ]["releases" ]:
56
+ alert_map [identity ]["releases" ].append (release )
57
+
58
+ base = package_group [0 ]
59
+ return {
60
+ "id" : base .get ("id" ),
61
+ "author" : base .get ("author" ),
62
+ "size" : base .get ("size" ),
63
+ "type" : base .get ("type" ),
64
+ "name" : base .get ("name" ),
65
+ "namespace" : base .get ("namespace" ),
66
+ "version" : base .get ("version" ),
67
+ "releases" : sorted (releases ),
68
+ "alerts" : list (alert_map .values ()),
69
+ "score" : base .get ("score" , {}),
70
+ "license" : base .get ("license" ),
71
+ "licenseDetails" : base .get ("licenseDetails" , []),
72
+ "batchIndex" : base .get ("batchIndex" ),
73
+ "purl" : f"pkg:{ base .get ('type' , 'unknown' )} /{ base .get ('name' , 'unknown' )} @{ base .get ('version' , '0.0.0' )} "
74
+ }
75
+
76
+ @staticmethod
77
+ def dedupe (packages : List [Dict [str , Any ]], batched : bool = True ) -> List [Dict [str , Any ]]:
78
+ if batched :
79
+ grouped = Dedupe .consolidate_by_batch_index (packages )
80
+ else :
81
+ grouped = Dedupe .consolidate_by_order (packages )
82
+ return [Dedupe .consolidate_and_merge_alerts (group ) for group in grouped .values ()]
83
+
84
+ @staticmethod
85
+ def consolidate_by_batch_index (packages : List [Dict [str , Any ]]) -> dict [int , list [dict [str , Any ]]]:
86
+ grouped : Dict [int , List [Dict [str , Any ]]] = defaultdict (list )
87
+ for pkg in packages :
88
+ grouped [pkg ["batchIndex" ]].append (pkg )
89
+ return grouped
90
+
91
+ @staticmethod
92
+ def consolidate_by_order (packages : List [Dict [str , Any ]]) -> dict [int , list [dict [str , Any ]]]:
93
+ grouped : Dict [int , List [Dict [str , Any ]]] = defaultdict (list )
94
+ batch_index = 0
95
+ package_purl = None
96
+ try :
97
+ for pkg in packages :
98
+ name = pkg ["name" ]
99
+ version = pkg ["version" ]
100
+ namespace = pkg .get ("namespace" )
101
+ ecosystem = pkg .get ("type" )
102
+ new_purl = f"pkg:{ ecosystem } /"
103
+ if namespace :
104
+ new_purl += f"{ namespace } /"
105
+ new_purl += f"{ name } @{ version } "
106
+ if package_purl is None :
107
+ package_purl = new_purl
108
+ if package_purl != new_purl :
109
+ batch_index += 1
110
+ pkg ["batchIndex" ] = batch_index
111
+ grouped [pkg ["batchIndex" ]].append (pkg )
112
+ except Exception as error :
113
+ print (error )
114
+ return grouped
0 commit comments