Skip to content

Conversation

@jmacd
Copy link
Contributor

@jmacd jmacd commented Jan 12, 2026

Too large for review!

Complete draft configuration from #1741. This establishes a default logging provider configuration that supports 4 choices in 3 different contexts. Choices:

  • Noop
  • Raw (synchronous to console)
  • Immediate (asynchronous to a channel)
  • OpenTelemetry

Contexts:

  • Global tokio subscriber
  • Engine threads of main pipeline
  • Internal telemetry pipeline threads

There are major caveats. Two critical new configuration fields named "providers" and "output" are added in the current service::telemetry::logs area. This area is off-spec for the OpenTelemetry declarative configuration, so these belong elsewhere. However, the code needs substantial refactoring and it gets in the way of this intermediate step to refactor more.

The major pieces of the changeset are largely independent:

  • The two new configuration fields "providers" and "output", associated LoggingProviders struct, ProviderMode, OutputMode enums (crates/config/src/service/telemetry/logs.rs)
  • Add pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>); add a second set of node for the internal pipeline nodes (existing) and internal (new); refactoring to re-use validation logic (crates/config/src/pipeline.rs)
  • Two new internal logging setups in the controller "direct" for simple console printing and "internal" to use the internal telemetry pipeline; this is either a new thread to run simple console printing or a new thread to run_forever() the internal pipeline
  • Consolidation of the Tokio tracing setup formerly part of opentelemetry_client.rs into a new telemetry_runtime.rs; here the 4 provider modes are supported, with a helper to install Tokio subscribers for certain threads (crates/telemetry/src/logs.rs); definition of a flume bounded channel identical to / parallel to the observed state store. (TODO: future consolidation with observed_state); "direct" collector support for asynchronous console logging
  • Various scattered eprintln!, log::info!, log::error!, println! and other diagnostics changed to exercise new logging paths
  • Internal telemetry receiver component: when output mode is "internal" this collects log records from the channel; special setup required to inject the flume channel to this component as the internal telemetry pipeline is built.
  • [Unrelated debugging]: otel-arrow-rust batching logic panic #1334 makes it easy to produce panics, setting the internal telemetry pipeline to use OTAP batching created a nice test for what happens when the internal logging path panics or fails; needs separate investigation, setting internal provider to "raw" helpfully shows what happens.
  • OTAP console exporter; re-uses the raw logging support to print OTLP logs data including resource and scope data; new support for encoding OTLP bytes directly from the HashMap of resource values from service::telemetry::resource.
  • raw_error! macro that is safe to use everywhere including inside Tokio subscribers, for logging about internal telemetry failures as an emergency
  • Setup a nice logger early in main.rs so that df_engine prints consistent logs format.

Console logs look like:

2026-01-15T18:11:39.467Z  RESOURCE  v1.Resource:  [service.name=test, service.id=1234]
2026-01-15T18:11:39.467Z  │ SCOPE    v1.InstrumentationScope:
2026-01-15T18:11:39.467Z  │ └─ DEBUG otap-df-engine::pipeline.build.complete (crates/engine/src/lib.rs:532):  [name=pipeline.build.complete, pipeline_group_id=default_pipeline_group, pipeline_id=default_pipeline, core_id=0]

We expect to fill in scope attributes soon). I had tested with batching format=otap to see if the OTAP representation would naturally group single statements into the resource value. In this configuration, it now prints:

2026-01-15T18:18:10.138Z  ERROR otap-df-otap::Processor.BatchingError (crates/otap/src/batch_processor.rs:844):  [name=Processor.BatchingError, signal=Logs, error=Failed to batch OTAP data: Schema error: target schema is not superset of current schema target=Field { "parent_id": UInt16, metadata: {"encoding": "plain"} }, Field { "key": Dictionary(UInt8, Utf8) }, Field { "type": UInt8 }, Field { "str": nullable Dictionary(UInt16, Utf8) }, Field { "int": nullable Dictionary(UInt16, Int64) }, Field { "body": nullable Struct("type": UInt8) }, Field { "resource": nullable Struct("id": nullable UInt16, metadata: {"encoding": "plain"}) }, Field { "scope": nullable Struct("id": nullable UInt16, metadata: {"encoding": "plain"}) } current=Field { "parent_id": UInt16, metadata: {"encoding": "plain"} }, Field { "key": Dictionary(UInt8, Utf8) }, Field { "type": UInt8 }, Field { "str": nullable Dictionary(UInt16, Utf8) }, Field { "body": nullable Struct("type": UInt8) }, Field { "resource": nullable Struct("id": nullable UInt16, metadata: {"encoding": "plain"}) }, Field { "scope": nullable Struct("id": nullable UInt16, metadata: {"encoding": "plain"}) }, Field { "int": nullable Dictionary(UInt16, Int64) }]

This is a raw error printed by the internal telemetry pipeline.

That's the error after modifying batch_processor.rs briefly in this PR, otherwise it panics. Since I've modified the batch processor not to return an error, we can no longer see the situation where the internal telemetry pipeline panics. When producers write to the channel after a panic, the send fails resulting in a raw_error! which tells the user on the console when logging service dies.

2026-01-15T18:23:03.953Z  ERROR otap_df_telemetry::logs::failed to send log (crates/telemetry/src/logs.rs:105):  [err=sending on a closed channel]

config:

service:
telemetry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more for the existing SDK configuration, in alignment with the otel collector style.
I would configure it in /telemetry/logs instead if required.

Comment on lines +60 to +62
global: immediate
engine: immediate
output: direct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?


# Internal telemetry pipeline - separate from main pipeline
# Uses hardcoded settings: single thread, no admin server
internal:
Copy link
Contributor

@andborja andborja Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: We can make it multi-graph with the same effort and risk.
Something like:

graphs: # array of graphs
   - internal: # name of the graph
     # other graph level properties, including the logger type
     nodes: # copied from the original graph structure
       telemetry:  # same block that is configured here
        ...

We can keep the existing configuration of single graph as is. This would be more an extension of the configuration.

providers:
global: immediate
engine: immediate
internal: raw # Avoid feedback in internal pipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configured at the "internal" block, not really here

/// these is the main pipeline, and one is the internal telemetry pipeline.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(transparent)]
pub struct PipelineNodes(HashMap<NodeId, Arc<NodeUserConfig>>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can do something more "structured" using named attributes instead of a tuple

let span = otel_info_span!("internal_pipeline_thread", core.id = core_id.id);
let _guard = span.enter();

// No core pinning for internal pipeline - it's lightweight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose changing this comment and turning it into a TODO. In the long run, we will also introduce thread pinning for this internal pipeline, since an instance of our Internal Telemetry System will be deployed on each NUMA region. We will therefore need one thread per internal telemetry pipeline for each NUMA node, with thread pinning to one of the cores in the corresponding region.

github-merge-queue bot pushed a commit that referenced this pull request Jan 16, 2026
Part of #1771.

Part of #1736.

Follows #1741.

This moves the HashMap<_, _> of nodes into a struct `PipelineNodes` and
re-uses it to parse an identical graph of `internal` nodes. This
internal graph will be use when an internal logging provider is
configured to output to an internal pipeline.
github-merge-queue bot pushed a commit that referenced this pull request Jan 16, 2026
Part of #1771.

Part of #1736.

Follows #1741.

This `raw_error!` macro is different from the others in
`internal_events.rs` in two ways:

1. Supports the complete Tokio `tracing` syntax, including display and
debug formatters
2. Bypasses the Tokio global dispatch and subscriber, calling into the
raw logging layer

The use of `tracing`'s `valueset!` macro is key to supporting the whole
syntax for the other `otel_XXX!` macros.

Test log statement prints:

```
2026-01-15T20:59:42.100Z  ERROR  otap_df_telemetry::internal_events::tests::raw error message (crates/telemetry/src/internal_events.rs:171):  [error=ConfigurationError("bad config")]
```
github-merge-queue bot pushed a commit that referenced this pull request Jan 16, 2026
… setup (#1795)

Part of #1771.

Part of #1736.

As documented in #1741.

~Updates that document to match this change reflecting the prototype in
#1771.~

Revised relative to #1771.

Adds LoggingProviders (choice of default logging provider for global,
engine, and internal-telemetry threads).
Adds ProviderMode with names to select instrumentation behavior, with
`its` referring to internal telemetry system.

Note: These settings are somehow not ideally placed. They belong also in
the top-level settings, or with observed_state settings. However, since
logging is configured with resource and level, which are part of the
service::telemetry config area presently, we use that structure. After
the bulk of #1736 is finished we can restructure.
github-merge-queue bot pushed a commit that referenced this pull request Jan 17, 2026
…#1808)

Part of #1771.

Part of #1736.

This is a non-functional refactoring of `opentelemetry_client.rs` into
other places. This will make it clearer what changes in #1771 and what
is just moving around.

Moves runtime elements into the InternalTelemetrySystem, simplifies
setup for the controller where logs/metrics were separated.

Moves OTel-SDK specific pieces into `otel_sdk` module, separates the
Tokio `tracing` setup.

---------

Co-authored-by: Utkarsh Umesan Pillai <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

3 participants