Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 228 additions & 94 deletions rust/otap-dataflow/crates/config/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ pub struct PipelineConfig {
///
/// Note: We use `Arc<NodeUserConfig>` to allow sharing the same pipeline configuration
/// across multiple cores/threads without cloning the entire configuration.
nodes: HashMap<NodeId, Arc<NodeUserConfig>>,
#[serde(default)]
nodes: PipelineNodes,
Copy link
Contributor

@utpilla utpilla Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the doc comments on the lines above to not mention Arc<NodeUserConfig> now. We should mention that for pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/// Internal telemetry pipeline nodes. These have the same structure
/// as `nodes` but are independent and isolated to a separate internal
/// telemetry runtime.
#[serde(default, skip_serializing_if = "PipelineNodes::is_empty")]
internal: PipelineNodes,
Comment on lines +49 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an issue with this approach. Even though, as things stand, our engine only supports a single pipeline configuration, it is designed to support multiple pipelines (with different config) running in parallel within the same process. In the configuration crate, we have this logical grouping called PipelineGroup, whose long term goal is to group multiple pipelines that belong to the same group, for example a group could be mapped to a tenant.

All of this is to say that we will have multiple pipeline configurations running at the same time, and therefore a single internal telemetry system (ITS) receiving internal telemetry from different pipeline configurations. Because of that, it does not seem ideal to start tying the ITS configuration to a specific pipeline configuration. In the long run, I agree that we will need a way to define pipeline specific processing for our internal telemetry, but I propose that we address this in a second phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong opinion on how we organize the graph as far as where the internal telemetry definition belongs.


/// Service-level telemetry configuration.
#[serde(default)]
Expand All @@ -68,6 +75,183 @@ pub enum PipelineType {
/// OpenTelemetry with Apache Arrow Protocol (OTAP) pipeline.
Otap,
}

/// A collection of nodes forming a pipeline graph.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(transparent)]
pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as in the other PR:
nit: We can do something more "structured" using named attributes instead of a tuple

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make this type transparent to serde, which works (IIUC) for tuples.


impl PipelineNodes {
/// Returns true if the node collection is empty.
#[must_use]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Returns the number of nodes.
#[must_use]
pub fn len(&self) -> usize {
self.0.len()
}

/// Returns a reference to the node with the given ID, if it exists.
#[must_use]
pub fn get(&self, id: &str) -> Option<&Arc<NodeUserConfig>> {
self.0.get(id)
}

/// Returns true if a node with the given ID exists.
#[must_use]
pub fn contains_key(&self, id: &str) -> bool {
self.0.contains_key(id)
}

/// Returns an iterator visiting all nodes.
pub fn iter(&self) -> impl Iterator<Item = (&NodeId, &Arc<NodeUserConfig>)> {
self.0.iter()
}

/// Returns an iterator over node IDs.
pub fn keys(&self) -> impl Iterator<Item = &NodeId> {
self.0.keys()
}

/// Validate the node graph structure.
///
/// Checks for:
/// - Invalid hyper-edges (missing target nodes)
/// - Cycles in the DAG
pub fn validate(
&self,
pipeline_group_id: &PipelineGroupId,
pipeline_id: &PipelineId,
errors: &mut Vec<Error>,
) {
self.validate_hyper_edges(pipeline_group_id, pipeline_id, errors);

// Only check for cycles if no hyper-edge errors
if errors.is_empty() {
for cycle in self.detect_cycles() {
errors.push(Error::CycleDetected {
context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()),
nodes: cycle,
});
}
}
}

/// Validate hyper-edges (check that all destination nodes exist).
fn validate_hyper_edges(
&self,
pipeline_group_id: &PipelineGroupId,
pipeline_id: &PipelineId,
errors: &mut Vec<Error>,
) {
for (node_id, node) in self.0.iter() {
for edge in node.out_ports.values() {
let missing_targets: Vec<_> = edge
.destinations
.iter()
.filter(|target| !self.0.contains_key(*target))
.cloned()
.collect();

if !missing_targets.is_empty() {
errors.push(Error::InvalidHyperEdgeSpec {
context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()),
source_node: node_id.clone(),
missing_source: false,
details: Box::new(HyperEdgeSpecDetails {
target_nodes: edge.destinations.iter().cloned().collect(),
dispatch_strategy: edge.dispatch_strategy.clone(),
missing_targets,
}),
});
}
}
}
}

/// Detect cycles in the node graph.
fn detect_cycles(&self) -> Vec<Vec<NodeId>> {
fn visit(
node: &NodeId,
nodes: &HashMap<NodeId, Arc<NodeUserConfig>>,
visiting: &mut HashSet<NodeId>,
visited: &mut HashSet<NodeId>,
current_path: &mut Vec<NodeId>,
cycles: &mut Vec<Vec<NodeId>>,
) {
if visited.contains(node) {
return;
}
if visiting.contains(node) {
if let Some(pos) = current_path.iter().position(|n| n == node) {
cycles.push(current_path[pos..].to_vec());
}
return;
}
_ = visiting.insert(node.clone());
current_path.push(node.clone());

if let Some(n) = nodes.get(node) {
for edge in n.out_ports.values() {
for tgt in &edge.destinations {
visit(tgt, nodes, visiting, visited, current_path, cycles);
}
}
}

_ = visiting.remove(node);
_ = visited.insert(node.clone());
_ = current_path.pop();
}

let mut visiting = HashSet::new();
let mut current_path = Vec::new();
let mut visited = HashSet::new();
let mut cycles = Vec::new();

for node in self.0.keys() {
if !visited.contains(node) {
visit(
node,
&self.0,
&mut visiting,
&mut visited,
&mut current_path,
&mut cycles,
);
}
}

cycles
}
}

impl std::ops::Index<&str> for PipelineNodes {
type Output = Arc<NodeUserConfig>;

fn index(&self, id: &str) -> &Self::Output {
&self.0[id]
}
}

impl IntoIterator for PipelineNodes {
type Item = (NodeId, Arc<NodeUserConfig>);
type IntoIter = std::collections::hash_map::IntoIter<NodeId, Arc<NodeUserConfig>>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

impl FromIterator<(NodeId, Arc<NodeUserConfig>)> for PipelineNodes {
fn from_iter<T: IntoIterator<Item = (NodeId, Arc<NodeUserConfig>)>>(iter: T) -> Self {
Self(iter.into_iter().collect())
}
}

/// A configuration for a pipeline.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct PipelineSettings {
Expand Down Expand Up @@ -262,6 +446,12 @@ impl PipelineConfig {
&self.settings
}

/// Returns a reference to the main pipeline nodes.
#[must_use]
pub fn nodes(&self) -> &PipelineNodes {
&self.nodes
}

/// Returns an iterator visiting all nodes in the pipeline.
pub fn node_iter(&self) -> impl Iterator<Item = (&NodeId, &Arc<NodeUserConfig>)> {
self.nodes.iter()
Expand All @@ -278,55 +468,44 @@ impl PipelineConfig {
&self.service
}

/// Returns true if the internal telemetry pipeline is configured.
#[must_use]
pub fn has_internal_pipeline(&self) -> bool {
!self.internal.is_empty()
}

/// Returns a reference to the internal pipeline nodes.
#[must_use]
pub fn internal_nodes(&self) -> &PipelineNodes {
&self.internal
}

/// Returns an iterator visiting all nodes in the internal telemetry pipeline.
pub fn internal_node_iter(&self) -> impl Iterator<Item = (&NodeId, &Arc<NodeUserConfig>)> {
self.internal.iter()
}

/// Validate the pipeline specification.
///
/// This method checks for:
/// - Duplicate node IDs
/// - Duplicate out-ports (same source node + port name)
/// - Invalid hyper-edges (missing source or target nodes)
/// - Cycles in the DAG
pub fn validate(
&self,
pipeline_group_id: &PipelineGroupId,
pipeline_id: &PipelineId,
) -> Result<(), Error> {
let mut errors = Vec::new();

// Check for invalid hyper-edges (references to non-existent nodes)
for (node_id, node) in self.nodes.iter() {
for edge in node.out_ports.values() {
let mut missing_targets = Vec::new();
// Validate main pipeline
self.nodes
.validate(pipeline_group_id, pipeline_id, &mut errors);

for target in &edge.destinations {
if !self.nodes.contains_key(target) {
missing_targets.push(target.clone());
}
}

if !missing_targets.is_empty() {
errors.push(Error::InvalidHyperEdgeSpec {
context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()),
source_node: node_id.clone(),
missing_source: false, // source exists since we're iterating over nodes
details: Box::new(HyperEdgeSpecDetails {
target_nodes: edge.destinations.iter().cloned().collect(),
dispatch_strategy: edge.dispatch_strategy.clone(),
missing_targets,
}),
});
}
}
}

// Check for cycles if no errors so far
if errors.is_empty() {
let cycles = self.detect_cycles();
for cycle in cycles {
errors.push(Error::CycleDetected {
context: Context::new(pipeline_group_id.clone(), pipeline_id.clone()),
nodes: cycle,
});
}
// Validate internal pipeline if present
if !self.internal.is_empty() {
self.internal
.validate(pipeline_group_id, pipeline_id, &mut errors);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to some inconvenience in some cases of validation errors. For example, if the main pipeline and the internal pipeline have nodes with the same name and there is a validation error, it wouldn't be clear if the error belongs to the main pipeline section or the internal pipeline section.

Maybe we can add a bool field is_internal to Context or just append "internal" to the pipeline_id when validating internal pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appended _internal.

}

if !errors.is_empty() {
Expand All @@ -335,65 +514,13 @@ impl PipelineConfig {
Ok(())
}
}

fn detect_cycles(&self) -> Vec<Vec<NodeId>> {
fn visit(
node: &NodeId,
nodes: &HashMap<NodeId, Arc<NodeUserConfig>>,
visiting: &mut HashSet<NodeId>,
visited: &mut HashSet<NodeId>,
current_path: &mut Vec<NodeId>,
cycles: &mut Vec<Vec<NodeId>>,
) {
if visited.contains(node) {
return;
}
if visiting.contains(node) {
// Cycle found
if let Some(pos) = current_path.iter().position(|n| n == node) {
cycles.push(current_path[pos..].to_vec());
}
return;
}
_ = visiting.insert(node.clone());
current_path.push(node.clone());

if let Some(n) = nodes.get(node) {
for edge in n.out_ports.values() {
for tgt in &edge.destinations {
visit(tgt, nodes, visiting, visited, current_path, cycles);
}
}
}

_ = visiting.remove(node);
_ = visited.insert(node.clone());
_ = current_path.pop();
}

let mut visiting = HashSet::new();
let mut current_path = Vec::new();
let mut visited = HashSet::new();
let mut cycles = Vec::new();

for node in self.nodes.keys() {
if !visited.contains(node) {
visit(
node,
&self.nodes,
&mut visiting,
&mut visited,
&mut current_path,
&mut cycles,
);
}
}

cycles
}
}

/// A builder for constructing a [`PipelineConfig`].
/// A builder for constructing a [`PipelineConfig`]. This type is used
/// for easy testing of the PipelineNodes logic.
///
/// Note: does not support testing the internal pipeline build,
/// because it is identical.
pub struct PipelineConfigBuilder {
description: Option<Description>,
nodes: HashMap<NodeId, NodeUserConfig>,
Expand Down Expand Up @@ -656,6 +783,7 @@ impl PipelineConfigBuilder {
.into_iter()
.map(|(id, node)| (id, Arc::new(node)))
.collect(),
internal: PipelineNodes(HashMap::new()),
settings: PipelineSettings::default(),
r#type: pipeline_type,
service: ServiceConfig::default(),
Expand Down Expand Up @@ -1052,13 +1180,19 @@ mod tests {
file_path,
);

assert!(result.is_ok());
assert!(result.is_ok(), "failed parsing {}", result.unwrap_err());
let config = result.unwrap();
assert_eq!(config.nodes.len(), 3);
assert!(config.nodes.contains_key("receiver1"));
assert!(config.nodes.contains_key("processor1"));
assert!(config.nodes.contains_key("exporter1"));

assert_eq!(config.internal.len(), 4);
assert!(config.internal.contains_key("receiver1"));
assert!(config.internal.contains_key("processor1"));
assert!(config.internal.contains_key("processor2"));
assert!(config.internal.contains_key("exporter1"));

let telemetry_config = &config.service().telemetry;
let reporting_interval = telemetry_config.reporting_interval;
assert_eq!(reporting_interval.as_secs(), 5);
Expand Down
Loading
Loading