Skip to content

Conversation

@jmacd
Copy link
Contributor

@jmacd jmacd commented Jan 15, 2026

Part of #1771.

Part of #1736.

Follows #1741.

This moves the HashMap<_, _> of nodes into a struct PipelineNodes and re-uses it to parse an identical graph of internal nodes. This internal graph will be use when an internal logging provider is configured to output to an internal pipeline.

@jmacd jmacd requested a review from a team as a code owner January 15, 2026 19:18
@github-actions github-actions bot added the rust Pull requests that update Rust code label Jan 15, 2026
@codecov
Copy link

codecov bot commented Jan 15, 2026

Codecov Report

❌ Patch coverage is 75.51020% with 36 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.32%. Comparing base (d7abcd3) to head (e852279).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1794      +/-   ##
==========================================
- Coverage   84.33%   84.32%   -0.01%     
==========================================
  Files         495      495              
  Lines      144350   144422      +72     
==========================================
+ Hits       121740   121789      +49     
- Misses      22076    22099      +23     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.58% <75.51%> (-0.02%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.52% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver 89.81% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

/// A collection of nodes forming a pipeline graph.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(transparent)]
pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as in the other PR:
nit: We can do something more "structured" using named attributes instead of a tuple

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to make this type transparent to serde, which works (IIUC) for tuples.


# Note the internal and nodes graphs are separate.
# They do not share a namespace.
internal:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also copied from the other PR:

Suggestion: We can make it multi-graph with the same effort and risk.
Something like:

graphs: # array of graphs
   - internal: # name of the graph
     # other graph level properties, including the logger type
     nodes: # copied from the original graph structure
       telemetry:  # same block that is configured here
        ...

We can keep the existing configuration of single graph as is. This would be more an extension of the configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I will defer to Laurent who has in mind a use-case for pipeline groups.

/// across multiple cores/threads without cloning the entire configuration.
nodes: HashMap<NodeId, Arc<NodeUserConfig>>,
#[serde(default)]
nodes: PipelineNodes,
Copy link
Contributor

@utpilla utpilla Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the doc comments on the lines above to not mention Arc<NodeUserConfig> now. We should mention that for pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +52 to +56
/// Internal telemetry pipeline nodes. These have the same structure
/// as `nodes` but are independent and isolated to a separate internal
/// telemetry runtime.
#[serde(default, skip_serializing_if = "PipelineNodes::is_empty")]
internal: PipelineNodes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an issue with this approach. Even though, as things stand, our engine only supports a single pipeline configuration, it is designed to support multiple pipelines (with different config) running in parallel within the same process. In the configuration crate, we have this logical grouping called PipelineGroup, whose long term goal is to group multiple pipelines that belong to the same group, for example a group could be mapped to a tenant.

All of this is to say that we will have multiple pipeline configurations running at the same time, and therefore a single internal telemetry system (ITS) receiving internal telemetry from different pipeline configurations. Because of that, it does not seem ideal to start tying the ITS configuration to a specific pipeline configuration. In the long run, I agree that we will need a way to define pipeline specific processing for our internal telemetry, but I propose that we address this in a second phase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong opinion on how we organize the graph as far as where the internal telemetry definition belongs.

// Validate internal pipeline if present
if !self.internal.is_empty() {
self.internal
.validate(pipeline_group_id, pipeline_id, &mut errors);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could lead to some inconvenience in some cases of validation errors. For example, if the main pipeline and the internal pipeline have nodes with the same name and there is a validation error, it wouldn't be clear if the error belongs to the main pipeline section or the internal pipeline section.

Maybe we can add a bool field is_internal to Context or just append "internal" to the pipeline_id when validating internal pipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appended _internal.

Copy link
Contributor

@utpilla utpilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a suggestion about ambiguity related to error source.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before to merge this PR, I'd like to discuss this comment.
#1794 (comment)

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @jmacd , we decided to merge this PR as is. I will work on another PR to reposition the engine's general configuration so that it is independent from the pipeline configurations themselves.

@jmacd jmacd added this pull request to the merge queue Jan 16, 2026
Merged via the queue into open-telemetry:main with commit 632c9be Jan 16, 2026
42 of 43 checks passed
@jmacd jmacd deleted the jmacd/internal_pipeline branch January 16, 2026 04:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants