Skip to content

Conversation

@jmacd
Copy link
Contributor

@jmacd jmacd commented Jan 22, 2026

What issue does this PR close?

Final part of #1771.

Fixes #1736.

How are these changes tested?

New tests. A new configs/internal-telemetry.yaml demonstrates the ITR configuration with the new console exporter.

Are there any user-facing changes?

Yes. See README.md update.

@jmacd jmacd requested a review from a team as a code owner January 22, 2026 07:10
@github-actions github-actions bot added the rust Pull requests that update Rust code label Jan 22, 2026
@codecov
Copy link

codecov bot commented Jan 22, 2026

Codecov Report

❌ Patch coverage is 51.17541% with 270 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.58%. Comparing base (2fae2f7) to head (2605f1e).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1861      +/-   ##
==========================================
- Coverage   84.70%   84.58%   -0.12%     
==========================================
  Files         504      505       +1     
  Lines      150323   150853     +530     
==========================================
+ Hits       127329   127599     +270     
- Misses      22460    22720     +260     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.89% <51.17%> (-0.19%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.52% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver 90.66% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.


// Should have the internal nodes as its main nodes
assert_eq!(internal.nodes.len(), 2);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add assertions to verify the node names against the provided config

assert_eq!(
    internal.nodes["itr"].plugin_urn.as_ref(),
    "urn:otel:otap:internal_telemetry:receiver"
);
assert_eq!(
    internal.nodes["console"].plugin_urn.as_ref(),
    "urn:otel:console:exporter"
);

Comment on lines 117 to +119
// Create the telemetry system, pass the observed state store
// as the admin reporter for console_async support.
let telemetry_system =
// The ITS logs receiver is returned if any provider uses ITS mode.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update these comments. Maybe something like:

// Create the telemetry system. The logging_evt_reporter is used for
// ConsoleAsync provider mode (logs go through the ObservedStateStore).
// The ITS logs receiver is returned if any provider uses ITS mode.

internal_telemetry: Option<InternalTelemetrySettings>,
startup_tx: std_mpsc::Sender<Result<(), Error>>,
) -> Result<Vec<()>, Error> {
// No core pinning for internal pipeline - it's lightweight
Copy link
Member

@lalitb lalitb Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the cross-thread channel here in the context of the thread-per-core design. Since logs from pinned engine threads flow through flume::bounded to this unpinned thread, do you anticipate any cache/NUMA considerations at high log volumes?
Not a blocker - just wondering if batching per-core or optional affinity config or expose dedicated internal pipeline per NUMA domain might be worth considering in a future iteration.

Copy link
Member

@lalitb lalitb Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify my earlier point - the pipeline would be lightweight only if the log volume is low. The internal telemetry thread is unpinned, so the scheduler can run it on any core, including ones already hosting pinned pipeline threads, which can introduce contention and jitter. The actual impact depends on available idle cores and how busy the pinned threads are, but without affinity the ITS thread can end up sharing those cores.


let internal_config = match internal_config {
Some(config) => config,
None => return Ok(None), // No internal pipeline configured
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log a message here as well indicating that internal pipeline is being skipped.

);

// Create a channel to signal startup success/failure
let (startup_tx, startup_rx) = std_mpsc::channel::<Result<(), Error>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a bounded channel to make the intent clear about this being a oneshot channel.

Suggested change
let (startup_tx, startup_rx) = std_mpsc::channel::<Result<(), Error>>();
let (startup_tx, startup_rx) = std_mpsc::channel::<Result<(), Error>>(1);

/// 3. If any provider uses ITS mode, a dedicated logs channel for the ITR
///
/// The `admin_reporter` is required for async logging modes (`ConsoleAsync`, future `ITS`).
/// The `admin_reporter` is required for async logging modes (`ConsoleAsync`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a bit misleading as it suggests some relation with admin threads. Could we rename this to something more intuitive such as console_async_reporter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Internal logging bridge, internral logging receiver

3 participants