Skip to content

Commit d8634d4

Browse files
committed
perf(analysis): Parallelize graph query execution
This commit significantly improves the performance of graph analysis by applying parallelization techniques to the run_graph_query and collect_graph functions. Previously, the collection of ancestors and descendants for a given node was performed sequentially. By refactoring run_graph_query to use futures::join!, we now process both directions concurrently, reducing the overall execution time. Building on this, the collect_graph function was optimized to process all discovered nodes in parallel using join_all. This ensures that we leverage available resources more efficiently when analyzing multiple entry points in the graph. Assisted-by: Gemini
1 parent 5a4c127 commit d8634d4

File tree

4 files changed

+177
-117
lines changed

4 files changed

+177
-117
lines changed

modules/analysis/src/config.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
use bytesize::ByteSize;
2+
use std::num::NonZeroUsize;
23
use trustify_common::model::BinaryByteSize;
34

5+
fn parse_concurrency(s: &str) -> Result<NonZeroUsize, String> {
6+
let value: usize = s.parse().map_err(|e| format!("Invalid number: {e}"))?;
7+
NonZeroUsize::new(value).ok_or_else(|| "Concurrency must be greater than zero".to_string())
8+
}
9+
10+
const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(10) {
11+
Some(val) => val,
12+
None => panic!("Default concurrency must be non-zero integer"),
13+
};
14+
415
#[derive(clap::Args, Debug, Clone)]
516
pub struct AnalysisConfig {
617
#[arg(
@@ -11,12 +22,22 @@ pub struct AnalysisConfig {
1122
help = "Maximum size of the graph cache."
1223
)]
1324
pub max_cache_size: BinaryByteSize,
25+
26+
#[arg(
27+
long,
28+
env = "TRUSTIFY_ANALYSIS_CONCURRENCY",
29+
default_value = "10",
30+
value_parser = parse_concurrency,
31+
help = "The number of concurrent tasks for analysis (must be > 0)."
32+
)]
33+
pub concurrency: NonZeroUsize,
1434
}
1535

1636
impl Default for AnalysisConfig {
1737
fn default() -> Self {
1838
Self {
1939
max_cache_size: BinaryByteSize(ByteSize::mib(200)),
40+
concurrency: DEFAULT_CONCURRENCY,
2041
}
2142
}
2243
}

modules/analysis/src/service/collector.rs

Lines changed: 111 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::*;
2+
use futures::stream::{self, StreamExt};
23
use parking_lot::Mutex;
34
use std::{collections::HashMap, sync::Arc};
45

@@ -33,6 +34,7 @@ pub struct Collector<'a, C: ConnectionTrait> {
3334
discovered: DiscoveredTracker,
3435
relationships: &'a HashSet<Relationship>,
3536
connection: &'a C,
37+
concurrency: usize,
3638
}
3739

3840
impl<'a, C: ConnectionTrait> Collector<'a, C> {
@@ -47,6 +49,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
4749
discovered: self.discovered.clone(),
4850
relationships: self.relationships,
4951
connection: self.connection,
52+
concurrency: self.concurrency,
5053
}
5154
}
5255

@@ -61,6 +64,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
6164
depth: u64,
6265
relationships: &'a HashSet<Relationship>,
6366
connection: &'a C,
67+
concurrency: usize,
6468
) -> Self {
6569
Self {
6670
graph_cache,
@@ -72,6 +76,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
7276
discovered: Default::default(),
7377
relationships,
7478
connection,
79+
concurrency,
7580
}
7681
}
7782

@@ -100,6 +105,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
100105
discovered: self.discovered.clone(),
101106
relationships: self.relationships,
102107
connection: self.connection,
108+
concurrency: self.concurrency,
103109
}
104110
}
105111

@@ -160,74 +166,83 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
160166
let current_sbom_id = &current_node.sbom_id;
161167
let current_sbom_uuid = *current_sbom_id;
162168
let current_node_id = &current_node.node_id;
163-
let mut resolved_external_nodes: Vec<Node> = vec![];
169+
164170
let find_sbom_externals = resolve_rh_external_sbom_ancestors(
165171
current_sbom_uuid,
166172
current_node.node_id.clone().to_string(),
167173
self.connection,
168174
)
169175
.await;
170176

171-
for sbom_external_node in find_sbom_externals {
172-
if &sbom_external_node.sbom_id == current_sbom_id {
173-
continue;
174-
}
175-
// check this is a valid external relationship
176-
match sbom_external_node::Entity::find()
177-
.filter(
178-
sbom_external_node::Column::SbomId
179-
.eq(sbom_external_node.clone().sbom_id),
180-
)
181-
.filter(
182-
sbom_external_node::Column::ExternalNodeRef
183-
.eq(sbom_external_node.clone().node_id),
184-
)
185-
.one(self.connection)
186-
.await
187-
{
188-
Ok(Some(matched)) => {
189-
// get the external sbom graph
190-
let Some(external_graph) =
191-
self.graph_cache.clone().get(&matched.sbom_id.to_string())
192-
else {
193-
log::warn!(
194-
"external sbom graph {:?} not found in graph cache",
195-
&matched.sbom_id.to_string()
196-
);
177+
let resolved_external_nodes: Vec<Node> = stream::iter(find_sbom_externals)
178+
.map(|sbom_external_node| {
179+
let collector = self.clone();
180+
async move {
181+
if &sbom_external_node.sbom_id == current_sbom_id {
197182
return None;
198-
};
199-
// find the node in retrieved external graph
200-
let Some(external_node_index) = external_graph
201-
.node_indices()
202-
.find(|&node| external_graph[node].node_id.eq(&matched.node_id))
203-
else {
204-
log::warn!(
205-
"Node with ID {current_node_id} not found in external sbom"
206-
);
207-
continue;
208-
};
209-
// recurse into those external sbom nodes and save
210-
resolved_external_nodes.extend(
211-
Box::pin(
212-
self.clone()
213-
.with(external_graph.as_ref(), external_node_index)
214-
.collect_graph(),
183+
}
184+
// check this is a valid external relationship
185+
match sbom_external_node::Entity::find()
186+
.filter(
187+
sbom_external_node::Column::SbomId
188+
.eq(sbom_external_node.clone().sbom_id),
215189
)
216-
.await,
217-
);
218-
}
219-
Err(_) => {
220-
log::warn!("Problem looking up sbom external node");
221-
continue;
222-
}
223-
_ => {
224-
log::debug!(
225-
"not external sbom sbom_external_node {sbom_external_node:?}"
226-
);
227-
continue;
190+
.filter(
191+
sbom_external_node::Column::ExternalNodeRef
192+
.eq(sbom_external_node.clone().node_id),
193+
)
194+
.one(collector.connection)
195+
.await
196+
{
197+
Ok(Some(matched)) => {
198+
// get the external sbom graph
199+
let Some(external_graph) =
200+
collector.graph_cache.clone().get(&matched.sbom_id.to_string())
201+
else {
202+
log::warn!(
203+
"external sbom graph {:?} not found in graph cache",
204+
&matched.sbom_id.to_string()
205+
);
206+
return None;
207+
};
208+
// find the node in retrieved external graph
209+
let Some(external_node_index) = external_graph
210+
.node_indices()
211+
.find(|&node| {
212+
external_graph[node].node_id.eq(&matched.node_id)
213+
})
214+
else {
215+
log::warn!(
216+
"Node with ID {current_node_id} not found in external sbom"
217+
);
218+
return None;
219+
};
220+
// recurse into those external sbom nodes and save
221+
Some(
222+
collector
223+
.with(external_graph.as_ref(), external_node_index)
224+
.collect_graph()
225+
.await,
226+
)
227+
}
228+
Err(_) => {
229+
log::warn!("Problem looking up sbom external node");
230+
None
231+
}
232+
_ => {
233+
log::debug!(
234+
"not external sbom sbom_external_node {sbom_external_node:?}"
235+
);
236+
None
237+
}
238+
}
228239
}
229-
}
230-
}
240+
})
241+
.buffer_unordered(self.concurrency)
242+
.filter_map(|nodes| async move { nodes })
243+
.flat_map(stream::iter)
244+
.collect::<Vec<_>>()
245+
.await;
231246

232247
let mut result = self.collect_graph().await;
233248
if !resolved_external_nodes.is_empty() {
@@ -240,44 +255,49 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
240255
}
241256

242257
pub async fn collect_graph(&self) -> Vec<Node> {
243-
let mut result = vec![];
244258
log::debug!("Collecting graph for {:?}", self.node);
245-
for edge in self.graph.edges_directed(self.node, self.direction) {
246-
log::debug!("edge {edge:?}");
247-
248-
// we only recurse in one direction
249-
let (ancestor, descendent, package_node) = match self.direction {
250-
Direction::Incoming => (
251-
Box::pin(self.continue_node(edge.source()).collect()).await,
252-
None,
253-
self.graph.node_weight(edge.source()),
254-
),
255-
Direction::Outgoing => (
256-
None,
257-
Box::pin(self.continue_node(edge.target()).collect()).await,
258-
self.graph.node_weight(edge.target()),
259-
),
260-
};
261259

262-
let relationship = edge.weight();
260+
stream::iter(self.graph.edges_directed(self.node, self.direction))
261+
.map(|edge| async move {
262+
log::debug!("edge {edge:?}");
263263

264-
if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
265-
// if we have entries, and no match, continue with the next
266-
continue;
267-
}
264+
// we only recurse in one direction
265+
// Depending on the direction, we collect ancestors or descendants
266+
let (ancestor, descendent, package_node) = match self.direction {
267+
// If the direction is incoming, we are collecting ancestors.
268+
// We recursively call `collect` for the source of the edge.
269+
Direction::Incoming => (
270+
self.continue_node(edge.source()).collect().await,
271+
None,
272+
self.graph.node_weight(edge.source()),
273+
),
274+
// If the direction is outgoing, we are collecting descendants.
275+
// We recursively call `collect` for the target of the edge.
276+
Direction::Outgoing => (
277+
None,
278+
self.continue_node(edge.target()).collect().await,
279+
self.graph.node_weight(edge.target()),
280+
),
281+
};
268282

269-
let Some(package_node) = package_node else {
270-
continue;
271-
};
283+
let relationship = edge.weight();
272284

273-
result.push(Node {
274-
base: BaseSummary::from(package_node),
275-
relationship: Some(*relationship),
276-
ancestors: ancestor,
277-
descendants: descendent,
278-
});
279-
}
285+
if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
286+
// if we have entries, and no match, continue with the next
287+
return None;
288+
}
280289

281-
result
290+
// Create a new `Node` and add it to the result
291+
Some(Node {
292+
base: BaseSummary::from(package_node?),
293+
relationship: Some(*relationship),
294+
ancestors: ancestor,
295+
descendants: descendent,
296+
})
297+
})
298+
.buffer_unordered(self.concurrency)
299+
.filter_map(|node| async move { node })
300+
.collect::<Vec<_>>()
301+
.await
282302
}
283303
}

0 commit comments

Comments
 (0)