Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions modules/analysis/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
use bytesize::ByteSize;
use std::num::NonZeroUsize;
use trustify_common::model::BinaryByteSize;

fn parse_concurrency(s: &str) -> Result<NonZeroUsize, String> {
let value: usize = s.parse().map_err(|e| format!("Invalid number: {e}"))?;
NonZeroUsize::new(value).ok_or_else(|| "Concurrency must be greater than zero".to_string())
}

const DEFAULT_CONCURRENCY: NonZeroUsize = match NonZeroUsize::new(10) {
Some(val) => val,
None => panic!("Default concurrency must be non-zero integer"),
};

#[derive(clap::Args, Debug, Clone)]
pub struct AnalysisConfig {
#[arg(
Expand All @@ -11,12 +22,22 @@ pub struct AnalysisConfig {
help = "Maximum size of the graph cache."
)]
pub max_cache_size: BinaryByteSize,

#[arg(
long,
env = "TRUSTIFY_ANALYSIS_CONCURRENCY",
default_value = "10",
value_parser = parse_concurrency,
help = "The number of concurrent tasks for analysis (must be > 0)."
)]
pub concurrency: NonZeroUsize,
}

impl Default for AnalysisConfig {
fn default() -> Self {
Self {
max_cache_size: BinaryByteSize(ByteSize::mib(200)),
concurrency: DEFAULT_CONCURRENCY,
}
}
}
202 changes: 111 additions & 91 deletions modules/analysis/src/service/collector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use futures::stream::{self, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -33,6 +34,7 @@ pub struct Collector<'a, C: ConnectionTrait> {
discovered: DiscoveredTracker,
relationships: &'a HashSet<Relationship>,
connection: &'a C,
concurrency: usize,
}

impl<'a, C: ConnectionTrait> Collector<'a, C> {
Expand All @@ -47,6 +49,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
discovered: self.discovered.clone(),
relationships: self.relationships,
connection: self.connection,
concurrency: self.concurrency,
}
}

Expand All @@ -61,6 +64,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
depth: u64,
relationships: &'a HashSet<Relationship>,
connection: &'a C,
concurrency: usize,
) -> Self {
Self {
graph_cache,
Expand All @@ -72,6 +76,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
discovered: Default::default(),
relationships,
connection,
concurrency,
}
}

Expand Down Expand Up @@ -100,6 +105,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
discovered: self.discovered.clone(),
relationships: self.relationships,
connection: self.connection,
concurrency: self.concurrency,
}
}

Expand Down Expand Up @@ -160,74 +166,83 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
let current_sbom_id = &current_node.sbom_id;
let current_sbom_uuid = *current_sbom_id;
let current_node_id = &current_node.node_id;
let mut resolved_external_nodes: Vec<Node> = vec![];

let find_sbom_externals = resolve_rh_external_sbom_ancestors(
current_sbom_uuid,
current_node.node_id.clone().to_string(),
self.connection,
)
.await;

for sbom_external_node in find_sbom_externals {
if &sbom_external_node.sbom_id == current_sbom_id {
continue;
}
// check this is a valid external relationship
match sbom_external_node::Entity::find()
.filter(
sbom_external_node::Column::SbomId
.eq(sbom_external_node.clone().sbom_id),
)
.filter(
sbom_external_node::Column::ExternalNodeRef
.eq(sbom_external_node.clone().node_id),
)
.one(self.connection)
.await
{
Ok(Some(matched)) => {
// get the external sbom graph
let Some(external_graph) =
self.graph_cache.clone().get(&matched.sbom_id.to_string())
else {
log::warn!(
"external sbom graph {:?} not found in graph cache",
&matched.sbom_id.to_string()
);
let resolved_external_nodes: Vec<Node> = stream::iter(find_sbom_externals)
.map(|sbom_external_node| {
let collector = self.clone();
async move {
if &sbom_external_node.sbom_id == current_sbom_id {
return None;
};
// find the node in retrieved external graph
let Some(external_node_index) = external_graph
.node_indices()
.find(|&node| external_graph[node].node_id.eq(&matched.node_id))
else {
log::warn!(
"Node with ID {current_node_id} not found in external sbom"
);
continue;
};
// recurse into those external sbom nodes and save
resolved_external_nodes.extend(
Box::pin(
self.clone()
.with(external_graph.as_ref(), external_node_index)
.collect_graph(),
}
// check this is a valid external relationship
match sbom_external_node::Entity::find()
.filter(
sbom_external_node::Column::SbomId
.eq(sbom_external_node.clone().sbom_id),
)
.await,
);
}
Err(_) => {
log::warn!("Problem looking up sbom external node");
continue;
}
_ => {
log::debug!(
"not external sbom sbom_external_node {sbom_external_node:?}"
);
continue;
.filter(
sbom_external_node::Column::ExternalNodeRef
.eq(sbom_external_node.clone().node_id),
)
.one(collector.connection)
.await
{
Ok(Some(matched)) => {
// get the external sbom graph
let Some(external_graph) =
collector.graph_cache.clone().get(&matched.sbom_id.to_string())
else {
log::warn!(
"external sbom graph {:?} not found in graph cache",
&matched.sbom_id.to_string()
);
return None;
};
// find the node in retrieved external graph
let Some(external_node_index) = external_graph
.node_indices()
.find(|&node| {
external_graph[node].node_id.eq(&matched.node_id)
})
else {
log::warn!(
"Node with ID {current_node_id} not found in external sbom"
);
return None;
};
// recurse into those external sbom nodes and save
Some(
collector
.with(external_graph.as_ref(), external_node_index)
.collect_graph()
.await,
)
}
Err(_) => {
log::warn!("Problem looking up sbom external node");
None
}
_ => {
log::debug!(
"not external sbom sbom_external_node {sbom_external_node:?}"
);
None
}
}
}
}
}
})
.buffer_unordered(self.concurrency)
.filter_map(|nodes| async move { nodes })
.flat_map(stream::iter)
.collect::<Vec<_>>()
.await;

let mut result = self.collect_graph().await;
if !resolved_external_nodes.is_empty() {
Expand All @@ -240,44 +255,49 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
}

pub async fn collect_graph(&self) -> Vec<Node> {
let mut result = vec![];
log::debug!("Collecting graph for {:?}", self.node);
for edge in self.graph.edges_directed(self.node, self.direction) {
log::debug!("edge {edge:?}");

// we only recurse in one direction
let (ancestor, descendent, package_node) = match self.direction {
Direction::Incoming => (
Box::pin(self.continue_node(edge.source()).collect()).await,
None,
self.graph.node_weight(edge.source()),
),
Direction::Outgoing => (
None,
Box::pin(self.continue_node(edge.target()).collect()).await,
self.graph.node_weight(edge.target()),
),
};

let relationship = edge.weight();
stream::iter(self.graph.edges_directed(self.node, self.direction))
.map(|edge| async move {
log::debug!("edge {edge:?}");

if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
// if we have entries, and no match, continue with the next
continue;
}
// we only recurse in one direction
// Depending on the direction, we collect ancestors or descendants
let (ancestor, descendent, package_node) = match self.direction {
// If the direction is incoming, we are collecting ancestors.
// We recursively call `collect` for the source of the edge.
Direction::Incoming => (
self.continue_node(edge.source()).collect().await,
None,
self.graph.node_weight(edge.source()),
),
// If the direction is outgoing, we are collecting descendants.
// We recursively call `collect` for the target of the edge.
Direction::Outgoing => (
None,
self.continue_node(edge.target()).collect().await,
self.graph.node_weight(edge.target()),
),
};

let Some(package_node) = package_node else {
continue;
};
let relationship = edge.weight();

result.push(Node {
base: BaseSummary::from(package_node),
relationship: Some(*relationship),
ancestors: ancestor,
descendants: descendent,
});
}
if !self.relationships.is_empty() && !self.relationships.contains(relationship) {
// if we have entries, and no match, continue with the next
return None;
}

result
// Create a new `Node` and add it to the result
Some(Node {
base: BaseSummary::from(package_node?),
relationship: Some(*relationship),
ancestors: ancestor,
descendants: descendent,
})
})
.buffer_unordered(self.concurrency)
.filter_map(|node| async move { node })
.collect::<Vec<_>>()
.await
}
}
Loading