-
Notifications
You must be signed in to change notification settings - Fork 33
perf(analysis): Parallelize graph query execution #1911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideThis PR refactors the graph analysis pipeline to exploit async concurrency: collect_graph now batches node‐processing futures and uses join_all, per‐node ancestor/descendant queries leverage futures::join!, Collector.collect_graph processes child edges in parallel, and DiscoveredTracker switches to an async Mutex for efficient locking. Sequence diagram for parallelized node processing in collect_graphsequenceDiagram
participant AnalysisService
participant Graphs
participant NodeFutures
participant join_all
participant Node
AnalysisService->>Graphs: Iterate over graphs
Graphs->>NodeFutures: For each node, create future (async closure)
NodeFutures->>join_all: Batch all node futures
join_all->>Node: Execute all node futures in parallel
Node-->>join_all: Return processed Node
join_all-->>AnalysisService: Return Vec<Node> (all nodes processed in parallel)
Updated class diagram for DiscoveredTracker and Collector with async changesclassDiagram
class DiscoveredTracker {
+Arc<tokio::sync::Mutex<HashMap<*const NodeGraph, FixedBitSet>>> cache
+async fn visit(&self, graph: &NodeGraph, node: NodeIndex) -> bool
}
class Collector {
+async fn collect_graph(&self) -> Vec<Node>
}
DiscoveredTracker <.. Collector : used by
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @dejanb - I've reviewed your changes - here's some feedback:
- Consider bounding the number of concurrent tasks in collect_graph (for example with buffer_unordered or a semaphore) to avoid spawning too many futures at once on large graphs.
- Switching DiscoveredTracker to tokio::sync::Mutex may introduce contention under high parallelism; you might explore lock-free data structures or an RwLock to reduce contention.
- Rather than collecting all edge futures into a Vec and then join_all, using FuturesUnordered would let you process results as they complete and reduce peak memory usage.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider bounding the number of concurrent tasks in collect_graph (for example with buffer_unordered or a semaphore) to avoid spawning too many futures at once on large graphs.
- Switching DiscoveredTracker to tokio::sync::Mutex may introduce contention under high parallelism; you might explore lock-free data structures or an RwLock to reduce contention.
- Rather than collecting all edge futures into a Vec and then join_all, using FuturesUnordered would let you process results as they complete and reduce peak memory usage.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
/scale-test |
|
🛠️ Scale test has started! Follow the progress here: Workflow Run |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1911 +/- ##
==========================================
+ Coverage 68.14% 68.29% +0.14%
==========================================
Files 365 367 +2
Lines 23123 23247 +124
Branches 23123 23247 +124
==========================================
+ Hits 15757 15876 +119
- Misses 6485 6488 +3
- Partials 881 883 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| ) -> Vec<Node> | ||
| where | ||
| C: AsyncFn(&Graph<graph::Node, Relationship>, NodeIndex, &graph::Node) -> Node, | ||
| F: Fn(&'g Graph<graph::Node, Relationship>, NodeIndex, &'g graph::Node) -> Fut + Clone, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use AsyncFn here? Instead of the old style stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature is changed as the function is now moved in the closure so it needed more lifetimes specified. After wrestling with it for a while, this is the best I could come up with
|
The changes in this commit improved performance of the analysis graph tenfold for the large sboms. In particular this call on the load tests dataset went from to For smaller sboms, the difference is not that significant when the sbom is not in the cache, but it's also noticeable when it's it already ion the memory. from to |
|
I'll go next and examine sourcery (and ctron's :) ) reviews |
Goose ReportGoose Attack ReportPlan Overview
Request Metrics
Response Time Metrics
Status Code Metrics
Transaction Metrics
Scenario Metrics
📄 Full Report (Go to "Artifacts" and download report) |
|
My concern with this change (I know I always have concerns) is that there's an unbounded fork of tasks. Sure that will improve performance, but it might also spike resource consumption. Question is, can we somehow limit this? And make this limit configurable? |
|
cool - a few observations:
|
1f675a9 to
a5a3d21
Compare
|
/scale-test |
|
🛠️ Scale test has started! Follow the progress here: Workflow Run |
|
The reworked implementation now uses The test with
against load testing dataset.
So, I picked It'd be great if we could periodically run memory profiling during load/scaling tests in the future. |
@JimFuller-RedHat I tested both latest and non-latest variant. The behaviour is the same as loading the sboms is not an issue, but querying graphs once loaded. Since your comment I tested exclusively with the latest API for consistence. |
JimFuller-RedHat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice work ! LGTM
Goose ReportGoose Attack ReportPlan Overview
Request Metrics
Response Time Metrics
Status Code Metrics
Transaction Metrics
Scenario Metrics
📄 Full Report (Go to "Artifacts" and download report) |
ctron
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good too me. Thanks for wrestling with this. Two small ideas. But not blockers.
| }) | ||
| .buffer_unordered(self.concurrency) | ||
| .filter_map(|nodes| async move { nodes }) | ||
| .collect::<Vec<_>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you flatten the stream before collecting it? So save one collect?
modules/analysis/src/config.rs
Outdated
| default_value = "10", | ||
| help = "The number of concurrent tasks for analysis." | ||
| )] | ||
| pub concurrency: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should guard against zero. Maybe using https://doc.rust-lang.org/std/num/type.NonZeroUsize.html is just a manual check.
3b5bc59 to
cb780d4
Compare
This commit significantly improves the performance of graph analysis by applying parallelization techniques to the run_graph_query and collect_graph functions. Previously, the collection of ancestors and descendants for a given node was performed sequentially. By refactoring run_graph_query to use futures::join!, we now process both directions concurrently, reducing the overall execution time. Building on this, the collect_graph function was optimized to process all discovered nodes in parallel using join_all. This ensures that we leverage available resources more efficiently when analyzing multiple entry points in the graph. Assisted-by: Gemini
cb780d4 to
d8634d4
Compare
|
@ctron Thanks for the suggestions. Flattening the stream definitely makes sense and I think additionally impacts the memory usage in a positive way. I also implemented the concurrency config with NonZeroUsize. I'm not super happy with the usage pattern, but I think it's good for now. There are also a few more places in the code where this could be applied. Can you give it another quick look before merging? |
|
/backport |
|
Successfully created backport PR for |
This commit significantly improves the performance of graph analysis by applying parallelization techniques to the run_graph_query and collect_graph functions.
Previously, the collection of ancestors and descendants for a given node was performed sequentially. By refactoring run_graph_query to use futures::join!, we now process both directions concurrently, reducing the overall
execution time.
Building on this, the collect_graph function was optimized to process all discovered nodes in parallel using join_all. This ensures that we leverage available resources more efficiently when analyzing multiple entry points in
the graph.
Assisted-by: Gemini
Summary by Sourcery
Optimize graph analysis performance by refactoring query and collection routines to execute traversals and node processing in parallel using futures::join! and join_all.
Enhancements: