From 89779db5de5671f8fa3af34b2e1cdf706f2e7322 Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Fri, 1 Aug 2025 11:55:40 +0200 Subject: [PATCH] perf(analysis): Parallelize graph query execution This commit significantly improves the performance of graph analysis by applying parallelization techniques to the run_graph_query and collect_graph functions. Previously, the collection of ancestors and descendants for a given node was performed sequentially. By refactoring run_graph_query to use futures::join!, we now process both directions concurrently, reducing the overall execution time. Building on this, the collect_graph function was optimized to process all discovered nodes in parallel using join_all. This ensures that we leverage available resources more efficiently when analyzing multiple entry points in the graph. Assisted-by: Gemini (cherry picked from commit 92cb5b6986d1c0e18d8b2f1012bba7a1759be07d) --- modules/analysis/src/config.rs | 21 +++ modules/analysis/src/service/collector.rs | 202 ++++++++++++---------- modules/analysis/src/service/mod.rs | 70 +++++--- modules/analysis/src/service/test.rs | 1 + 4 files changed, 177 insertions(+), 117 deletions(-) diff --git a/modules/analysis/src/config.rs b/modules/analysis/src/config.rs index 38311dd30..70eebcbbc 100644 --- a/modules/analysis/src/config.rs +++ b/modules/analysis/src/config.rs @@ -1,6 +1,17 @@ use bytesize::ByteSize; +use std::num::NonZeroUsize; use trustify_common::model::BinaryByteSize; +fn parse_concurrency(s: &str) -> Result { + let value: usize = s.parse().map_err(|e| format!("Invalid number: {e}"))?; + NonZeroUsize::new(value).ok_or_else(|| "Concurrency must be greater than zero".to_string()) +} + +const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(10) { + Some(val) => val, + None => panic!("Default concurrency must be non-zero integer"), +}; + #[derive(clap::Args, Debug, Clone)] pub struct AnalysisConfig { #[arg( @@ -11,12 +22,22 @@ pub struct AnalysisConfig { help = "Maximum size of the graph cache." )] pub max_cache_size: BinaryByteSize, + + #[arg( + long, + env = "TRUSTIFY_ANALYSIS_CONCURRENCY", + default_value = "10", + value_parser = parse_concurrency, + help = "The number of concurrent tasks for analysis (must be > 0)." + )] + pub concurrency: NonZeroUsize, } impl Default for AnalysisConfig { fn default() -> Self { Self { max_cache_size: BinaryByteSize(ByteSize::mib(200)), + concurrency: DEFAULT_CONCURRENCY, } } } diff --git a/modules/analysis/src/service/collector.rs b/modules/analysis/src/service/collector.rs index 49e4c9ef7..54c47762f 100644 --- a/modules/analysis/src/service/collector.rs +++ b/modules/analysis/src/service/collector.rs @@ -1,4 +1,5 @@ use super::*; +use futures::stream::{self, StreamExt}; use parking_lot::Mutex; use std::{collections::HashMap, sync::Arc}; @@ -33,6 +34,7 @@ pub struct Collector<'a, C: ConnectionTrait> { discovered: DiscoveredTracker, relationships: &'a HashSet, connection: &'a C, + concurrency: usize, } impl<'a, C: ConnectionTrait> Collector<'a, C> { @@ -47,6 +49,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { discovered: self.discovered.clone(), relationships: self.relationships, connection: self.connection, + concurrency: self.concurrency, } } @@ -61,6 +64,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { depth: u64, relationships: &'a HashSet, connection: &'a C, + concurrency: usize, ) -> Self { Self { graph_cache, @@ -72,6 +76,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { discovered: Default::default(), relationships, connection, + concurrency, } } @@ -100,6 +105,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { discovered: self.discovered.clone(), relationships: self.relationships, connection: self.connection, + concurrency: self.concurrency, } } @@ -160,7 +166,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { let current_sbom_id = ¤t_node.sbom_id; let current_sbom_uuid = *current_sbom_id; let current_node_id = ¤t_node.node_id; - let mut resolved_external_nodes: Vec = vec![]; + let find_sbom_externals = resolve_rh_external_sbom_ancestors( current_sbom_uuid, current_node.node_id.clone().to_string(), @@ -168,66 +174,75 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { ) .await; - for sbom_external_node in find_sbom_externals { - if &sbom_external_node.sbom_id == current_sbom_id { - continue; - } - // check this is a valid external relationship - match sbom_external_node::Entity::find() - .filter( - sbom_external_node::Column::SbomId - .eq(sbom_external_node.clone().sbom_id), - ) - .filter( - sbom_external_node::Column::ExternalNodeRef - .eq(sbom_external_node.clone().node_id), - ) - .one(self.connection) - .await - { - Ok(Some(matched)) => { - // get the external sbom graph - let Some(external_graph) = - self.graph_cache.clone().get(&matched.sbom_id.to_string()) - else { - log::warn!( - "external sbom graph {:?} not found in graph cache", - &matched.sbom_id.to_string() - ); + let resolved_external_nodes: Vec = stream::iter(find_sbom_externals) + .map(|sbom_external_node| { + let collector = self.clone(); + async move { + if &sbom_external_node.sbom_id == current_sbom_id { return None; - }; - // find the node in retrieved external graph - let Some(external_node_index) = external_graph - .node_indices() - .find(|&node| external_graph[node].node_id.eq(&matched.node_id)) - else { - log::warn!( - "Node with ID {current_node_id} not found in external sbom" - ); - continue; - }; - // recurse into those external sbom nodes and save - resolved_external_nodes.extend( - Box::pin( - self.clone() - .with(external_graph.as_ref(), external_node_index) - .collect_graph(), + } + // check this is a valid external relationship + match sbom_external_node::Entity::find() + .filter( + sbom_external_node::Column::SbomId + .eq(sbom_external_node.clone().sbom_id), ) - .await, - ); - } - Err(_) => { - log::warn!("Problem looking up sbom external node"); - continue; - } - _ => { - log::debug!( - "not external sbom sbom_external_node {sbom_external_node:?}" - ); - continue; + .filter( + sbom_external_node::Column::ExternalNodeRef + .eq(sbom_external_node.clone().node_id), + ) + .one(collector.connection) + .await + { + Ok(Some(matched)) => { + // get the external sbom graph + let Some(external_graph) = + collector.graph_cache.clone().get(&matched.sbom_id.to_string()) + else { + log::warn!( + "external sbom graph {:?} not found in graph cache", + &matched.sbom_id.to_string() + ); + return None; + }; + // find the node in retrieved external graph + let Some(external_node_index) = external_graph + .node_indices() + .find(|&node| { + external_graph[node].node_id.eq(&matched.node_id) + }) + else { + log::warn!( + "Node with ID {current_node_id} not found in external sbom" + ); + return None; + }; + // recurse into those external sbom nodes and save + Some( + collector + .with(external_graph.as_ref(), external_node_index) + .collect_graph() + .await, + ) + } + Err(_) => { + log::warn!("Problem looking up sbom external node"); + None + } + _ => { + log::debug!( + "not external sbom sbom_external_node {sbom_external_node:?}" + ); + None + } + } } - } - } + }) + .buffer_unordered(self.concurrency) + .filter_map(|nodes| async move { nodes }) + .flat_map(stream::iter) + .collect::>() + .await; let mut result = self.collect_graph().await; if !resolved_external_nodes.is_empty() { @@ -240,44 +255,49 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> { } pub async fn collect_graph(&self) -> Vec { - let mut result = vec![]; log::debug!("Collecting graph for {:?}", self.node); - for edge in self.graph.edges_directed(self.node, self.direction) { - log::debug!("edge {edge:?}"); - - // we only recurse in one direction - let (ancestor, descendent, package_node) = match self.direction { - Direction::Incoming => ( - Box::pin(self.continue_node(edge.source()).collect()).await, - None, - self.graph.node_weight(edge.source()), - ), - Direction::Outgoing => ( - None, - Box::pin(self.continue_node(edge.target()).collect()).await, - self.graph.node_weight(edge.target()), - ), - }; - let relationship = edge.weight(); + stream::iter(self.graph.edges_directed(self.node, self.direction)) + .map(|edge| async move { + log::debug!("edge {edge:?}"); - if !self.relationships.is_empty() && !self.relationships.contains(relationship) { - // if we have entries, and no match, continue with the next - continue; - } + // we only recurse in one direction + // Depending on the direction, we collect ancestors or descendants + let (ancestor, descendent, package_node) = match self.direction { + // If the direction is incoming, we are collecting ancestors. + // We recursively call `collect` for the source of the edge. + Direction::Incoming => ( + self.continue_node(edge.source()).collect().await, + None, + self.graph.node_weight(edge.source()), + ), + // If the direction is outgoing, we are collecting descendants. + // We recursively call `collect` for the target of the edge. + Direction::Outgoing => ( + None, + self.continue_node(edge.target()).collect().await, + self.graph.node_weight(edge.target()), + ), + }; - let Some(package_node) = package_node else { - continue; - }; + let relationship = edge.weight(); - result.push(Node { - base: BaseSummary::from(package_node), - relationship: Some(*relationship), - ancestors: ancestor, - descendants: descendent, - }); - } + if !self.relationships.is_empty() && !self.relationships.contains(relationship) { + // if we have entries, and no match, continue with the next + return None; + } - result + // Create a new `Node` and add it to the result + Some(Node { + base: BaseSummary::from(package_node?), + relationship: Some(*relationship), + ancestors: ancestor, + descendants: descendent, + }) + }) + .buffer_unordered(self.concurrency) + .filter_map(|node| async move { node }) + .collect::>() + .await } } diff --git a/modules/analysis/src/service/mod.rs b/modules/analysis/src/service/mod.rs index adcb2354b..b0937d380 100644 --- a/modules/analysis/src/service/mod.rs +++ b/modules/analysis/src/service/mod.rs @@ -88,6 +88,7 @@ pub struct AnalysisService { inner: InnerService, _loader: Arc>, tx: mpsc::UnboundedSender, + concurrency: usize, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -325,6 +326,7 @@ impl AnalysisService { inner, _loader: loader, tx, + concurrency: config.concurrency.get(), } } @@ -414,14 +416,16 @@ impl AnalysisService { /// Collect nodes from the graph #[instrument(skip(self, create, graphs))] - async fn collect_graph<'a, C>( + async fn collect_graph<'a, 'g, F, Fut>( &self, query: impl Into> + Debug, - graphs: &[(String, Arc)], - create: C, + graphs: &'g [(String, Arc)], + concurrency: usize, + create: F, ) -> Vec where - C: AsyncFn(&Graph, NodeIndex, &graph::Node) -> Node, + F: Fn(&'g Graph, NodeIndex, &'g graph::Node) -> Fut + Clone, + Fut: Future, { let query = query.into(); @@ -431,14 +435,16 @@ impl AnalysisService { .filter(|(sbom_id, graph)| acyclic(sbom_id, graph)), ) .flat_map(|(_, graph)| { + let create = create.clone(); stream::iter( graph .node_indices() .filter(|&i| Self::filter(graph, &query, i)) .filter_map(|i| graph.node_weight(i).map(|w| (i, w))), ) - .then(|(node_index, package_node)| create(graph, node_index, package_node)) + .map(move |(node_index, package_node)| create(graph, node_index, package_node)) }) + .buffer_unordered(concurrency) .collect::>() .await } @@ -455,17 +461,21 @@ impl AnalysisService { let relationships = options.relationships; log::debug!("relations: {:?}", relationships); - self.collect_graph(query, graphs, async |graph, node_index, node| { - log::debug!( - "Discovered node - sbom: {}, node: {}", - node.sbom_id, - node.node_id - ); - Node { - base: node.into(), - relationship: None, - ancestors: Box::pin( - Collector::new( + self.collect_graph( + query, + graphs, + self.concurrency, + |graph, node_index, node| { + let graph_cache = graph_cache.clone(); + let relationships = relationships.clone(); + async move { + log::debug!( + "Discovered node - sbom: {}, node: {}", + node.sbom_id, + node.node_id + ); + + let ancestors = Collector::new( &graph_cache, graphs, graph, @@ -474,12 +484,11 @@ impl AnalysisService { options.ancestors, &relationships, connection, + self.concurrency, ) - .collect(), - ) - .await, - descendants: Box::pin( - Collector::new( + .collect(); + + let descendants = Collector::new( &graph_cache, graphs, graph, @@ -488,12 +497,21 @@ impl AnalysisService { options.descendants, &relationships, connection, + self.concurrency, ) - .collect(), - ) - .await, - } - }) + .collect(); + + let (ancestors, descendants) = futures::join!(ancestors, descendants); + + Node { + base: node.into(), + relationship: None, + ancestors, + descendants, + } + } + }, + ) .await } diff --git a/modules/analysis/src/service/test.rs b/modules/analysis/src/service/test.rs index 8621a1768..01fbbab72 100644 --- a/modules/analysis/src/service/test.rs +++ b/modules/analysis/src/service/test.rs @@ -390,6 +390,7 @@ async fn test_cache_size_used(ctx: &TrustifyContext) -> Result<(), anyhow::Error let service = AnalysisService::new( AnalysisConfig { max_cache_size: BinaryByteSize::from(small_sbom_size * 2), + ..Default::default() }, ctx.db.clone(), );