11use super :: * ;
2+ use futures:: stream:: { self , StreamExt } ;
23use parking_lot:: Mutex ;
34use std:: { collections:: HashMap , sync:: Arc } ;
45
@@ -33,6 +34,7 @@ pub struct Collector<'a, C: ConnectionTrait> {
3334 discovered : DiscoveredTracker ,
3435 relationships : & ' a HashSet < Relationship > ,
3536 connection : & ' a C ,
37+ concurrency : usize ,
3638}
3739
3840impl < ' a , C : ConnectionTrait > Collector < ' a , C > {
@@ -47,6 +49,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
4749 discovered : self . discovered . clone ( ) ,
4850 relationships : self . relationships ,
4951 connection : self . connection ,
52+ concurrency : self . concurrency ,
5053 }
5154 }
5255
@@ -61,6 +64,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
6164 depth : u64 ,
6265 relationships : & ' a HashSet < Relationship > ,
6366 connection : & ' a C ,
67+ concurrency : usize ,
6468 ) -> Self {
6569 Self {
6670 graph_cache,
@@ -72,6 +76,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
7276 discovered : Default :: default ( ) ,
7377 relationships,
7478 connection,
79+ concurrency,
7580 }
7681 }
7782
@@ -100,6 +105,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
100105 discovered : self . discovered . clone ( ) ,
101106 relationships : self . relationships ,
102107 connection : self . connection ,
108+ concurrency : self . concurrency ,
103109 }
104110 }
105111
@@ -160,74 +166,83 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
160166 let current_sbom_id = & current_node. sbom_id ;
161167 let current_sbom_uuid = * current_sbom_id;
162168 let current_node_id = & current_node. node_id ;
163- let mut resolved_external_nodes : Vec < Node > = vec ! [ ] ;
169+
164170 let find_sbom_externals = resolve_rh_external_sbom_ancestors (
165171 current_sbom_uuid,
166172 current_node. node_id . clone ( ) . to_string ( ) ,
167173 self . connection ,
168174 )
169175 . await ;
170176
171- for sbom_external_node in find_sbom_externals {
172- if & sbom_external_node. sbom_id == current_sbom_id {
173- continue ;
174- }
175- // check this is a valid external relationship
176- match sbom_external_node:: Entity :: find ( )
177- . filter (
178- sbom_external_node:: Column :: SbomId
179- . eq ( sbom_external_node. clone ( ) . sbom_id ) ,
180- )
181- . filter (
182- sbom_external_node:: Column :: ExternalNodeRef
183- . eq ( sbom_external_node. clone ( ) . node_id ) ,
184- )
185- . one ( self . connection )
186- . await
187- {
188- Ok ( Some ( matched) ) => {
189- // get the external sbom graph
190- let Some ( external_graph) =
191- self . graph_cache . clone ( ) . get ( & matched. sbom_id . to_string ( ) )
192- else {
193- log:: warn!(
194- "external sbom graph {:?} not found in graph cache" ,
195- & matched. sbom_id. to_string( )
196- ) ;
177+ let resolved_external_nodes: Vec < Node > = stream:: iter ( find_sbom_externals)
178+ . map ( |sbom_external_node| {
179+ let collector = self . clone ( ) ;
180+ async move {
181+ if & sbom_external_node. sbom_id == current_sbom_id {
197182 return None ;
198- } ;
199- // find the node in retrieved external graph
200- let Some ( external_node_index) = external_graph
201- . node_indices ( )
202- . find ( |& node| external_graph[ node] . node_id . eq ( & matched. node_id ) )
203- else {
204- log:: warn!(
205- "Node with ID {current_node_id} not found in external sbom"
206- ) ;
207- continue ;
208- } ;
209- // recurse into those external sbom nodes and save
210- resolved_external_nodes. extend (
211- Box :: pin (
212- self . clone ( )
213- . with ( external_graph. as_ref ( ) , external_node_index)
214- . collect_graph ( ) ,
183+ }
184+ // check this is a valid external relationship
185+ match sbom_external_node:: Entity :: find ( )
186+ . filter (
187+ sbom_external_node:: Column :: SbomId
188+ . eq ( sbom_external_node. clone ( ) . sbom_id ) ,
215189 )
216- . await ,
217- ) ;
218- }
219- Err ( _) => {
220- log:: warn!( "Problem looking up sbom external node" ) ;
221- continue ;
222- }
223- _ => {
224- log:: debug!(
225- "not external sbom sbom_external_node {sbom_external_node:?}"
226- ) ;
227- continue ;
190+ . filter (
191+ sbom_external_node:: Column :: ExternalNodeRef
192+ . eq ( sbom_external_node. clone ( ) . node_id ) ,
193+ )
194+ . one ( collector. connection )
195+ . await
196+ {
197+ Ok ( Some ( matched) ) => {
198+ // get the external sbom graph
199+ let Some ( external_graph) =
200+ collector. graph_cache . clone ( ) . get ( & matched. sbom_id . to_string ( ) )
201+ else {
202+ log:: warn!(
203+ "external sbom graph {:?} not found in graph cache" ,
204+ & matched. sbom_id. to_string( )
205+ ) ;
206+ return None ;
207+ } ;
208+ // find the node in retrieved external graph
209+ let Some ( external_node_index) = external_graph
210+ . node_indices ( )
211+ . find ( |& node| {
212+ external_graph[ node] . node_id . eq ( & matched. node_id )
213+ } )
214+ else {
215+ log:: warn!(
216+ "Node with ID {current_node_id} not found in external sbom"
217+ ) ;
218+ return None ;
219+ } ;
220+ // recurse into those external sbom nodes and save
221+ Some (
222+ collector
223+ . with ( external_graph. as_ref ( ) , external_node_index)
224+ . collect_graph ( )
225+ . await ,
226+ )
227+ }
228+ Err ( _) => {
229+ log:: warn!( "Problem looking up sbom external node" ) ;
230+ None
231+ }
232+ _ => {
233+ log:: debug!(
234+ "not external sbom sbom_external_node {sbom_external_node:?}"
235+ ) ;
236+ None
237+ }
238+ }
228239 }
229- }
230- }
240+ } )
241+ . buffer_unordered ( self . concurrency )
242+ . filter_map ( |nodes| async move { nodes } )
243+ . flat_map ( stream:: iter)
244+ . collect :: < Vec < _ > > ( )
245+ . await ;
231246
232247 let mut result = self . collect_graph ( ) . await ;
233248 if !resolved_external_nodes. is_empty ( ) {
@@ -240,44 +255,49 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
240255 }
241256
242257 pub async fn collect_graph ( & self ) -> Vec < Node > {
243- let mut result = vec ! [ ] ;
244258 log:: debug!( "Collecting graph for {:?}" , self . node) ;
245- for edge in self . graph . edges_directed ( self . node , self . direction ) {
246- log:: debug!( "edge {edge:?}" ) ;
247-
248- // we only recurse in one direction
249- let ( ancestor, descendent, package_node) = match self . direction {
250- Direction :: Incoming => (
251- Box :: pin ( self . continue_node ( edge. source ( ) ) . collect ( ) ) . await ,
252- None ,
253- self . graph . node_weight ( edge. source ( ) ) ,
254- ) ,
255- Direction :: Outgoing => (
256- None ,
257- Box :: pin ( self . continue_node ( edge. target ( ) ) . collect ( ) ) . await ,
258- self . graph . node_weight ( edge. target ( ) ) ,
259- ) ,
260- } ;
261259
262- let relationship = edge. weight ( ) ;
260+ stream:: iter ( self . graph . edges_directed ( self . node , self . direction ) )
261+ . map ( |edge| async move {
262+ log:: debug!( "edge {edge:?}" ) ;
263263
264- if !self . relationships . is_empty ( ) && !self . relationships . contains ( relationship) {
265- // if we have entries, and no match, continue with the next
266- continue ;
267- }
264+ // we only recurse in one direction
265+ // Depending on the direction, we collect ancestors or descendants
266+ let ( ancestor, descendent, package_node) = match self . direction {
267+ // If the direction is incoming, we are collecting ancestors.
268+ // We recursively call `collect` for the source of the edge.
269+ Direction :: Incoming => (
270+ self . continue_node ( edge. source ( ) ) . collect ( ) . await ,
271+ None ,
272+ self . graph . node_weight ( edge. source ( ) ) ,
273+ ) ,
274+ // If the direction is outgoing, we are collecting descendants.
275+ // We recursively call `collect` for the target of the edge.
276+ Direction :: Outgoing => (
277+ None ,
278+ self . continue_node ( edge. target ( ) ) . collect ( ) . await ,
279+ self . graph . node_weight ( edge. target ( ) ) ,
280+ ) ,
281+ } ;
268282
269- let Some ( package_node) = package_node else {
270- continue ;
271- } ;
283+ let relationship = edge. weight ( ) ;
272284
273- result. push ( Node {
274- base : BaseSummary :: from ( package_node) ,
275- relationship : Some ( * relationship) ,
276- ancestors : ancestor,
277- descendants : descendent,
278- } ) ;
279- }
285+ if !self . relationships . is_empty ( ) && !self . relationships . contains ( relationship) {
286+ // if we have entries, and no match, continue with the next
287+ return None ;
288+ }
280289
281- result
290+ // Create a new `Node` and add it to the result
291+ Some ( Node {
292+ base : BaseSummary :: from ( package_node?) ,
293+ relationship : Some ( * relationship) ,
294+ ancestors : ancestor,
295+ descendants : descendent,
296+ } )
297+ } )
298+ . buffer_unordered ( self . concurrency )
299+ . filter_map ( |node| async move { node } )
300+ . collect :: < Vec < _ > > ( )
301+ . await
282302 }
283303}
0 commit comments