diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 6630ba4ae0..a254ef58db 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -50,6 +50,8 @@ otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"} otap-df-config = { path = "crates/config" } otap-df-controller = { path = "crates/controller" } otap-df-otap = { path = "crates/otap" } +otap-df-pdata = { path = "crates/pdata" } +otap-df-telemetry = { path = "crates/telemetry" } quiver = { package = "otap-df-quiver", path = "crates/quiver" } data_engine_expressions = { path = "../experimental/query_engine/expressions" } data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" } diff --git a/rust/otap-dataflow/benchmarks/Cargo.toml b/rust/otap-dataflow/benchmarks/Cargo.toml index c88c6c1e1d..30e9ce6c33 100644 --- a/rust/otap-dataflow/benchmarks/Cargo.toml +++ b/rust/otap-dataflow/benchmarks/Cargo.toml @@ -26,6 +26,9 @@ otap-df-engine = { path = "../crates/engine"} otap-df-telemetry = { path="../crates/telemetry"} otap-df-pdata = { path="../crates/pdata", features = ["bench"]} +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["registry"] } + fluke-hpack.workspace = true futures-channel.workspace = true futures.workspace = true @@ -87,3 +90,7 @@ harness = false [[bench]] name = "otap_logs_view" harness = false + +[[bench]] +name = "self_tracing" +harness = false diff --git a/rust/otap-dataflow/benchmarks/benches/self_tracing/main.rs b/rust/otap-dataflow/benchmarks/benches/self_tracing/main.rs new file mode 100644 index 0000000000..a2ee979ffe --- /dev/null +++ b/rust/otap-dataflow/benchmarks/benches/self_tracing/main.rs @@ -0,0 +1,199 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Benchmarks for the compact log formatter. +//! +//! These benchmarks emit a single tracing event but perform N +//! encoding or encoding-and-formatting operations inside the callback +//! +//! Benchmark names follow the pattern: `group/description/N_events` +//! +//! Example: `encode/3_attrs/1000_events` = 300 µs → 300 ns per event + +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use tracing::{Event, Subscriber}; +use tracing_subscriber::layer::Layer; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; + +use otap_df_pdata::otlp::ProtoBuffer; +use otap_df_telemetry::self_tracing::{ + ConsoleWriter, DirectLogRecordEncoder, LogRecord, SavedCallsite, +}; + +#[cfg(not(windows))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(windows))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +/// The operation to perform on each event within the layer. +#[derive(Clone, Copy)] +enum BenchOp { + /// Encode the event into a LogRecord only. + Encode, + /// Encode once, then format N times. + Format, + /// Encode and format together N times. + EncodeAndFormat, + /// Encode to protobuf N times. + EncodeProto, +} + +/// A layer that performs a configurable operation N times per event. +struct BenchLayer { + iterations: usize, + op: BenchOp, +} + +impl BenchLayer { + fn new(iterations: usize, op: BenchOp) -> Self { + Self { iterations, op } + } +} + +impl Layer for BenchLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + match self.op { + BenchOp::Encode => { + for _ in 0..self.iterations { + let record = LogRecord::new(event); + let _ = std::hint::black_box(record); + } + } + BenchOp::Format => { + // Encode once, format N times + let record = LogRecord::new(event); + let writer = ConsoleWriter::no_color(); + let callsite = SavedCallsite::new(event.metadata()); + + for _ in 0..self.iterations { + let line = writer.format_log_record(&record, &callsite); + let _ = std::hint::black_box(line); + } + } + BenchOp::EncodeAndFormat => { + let writer = ConsoleWriter::no_color(); + + for _ in 0..self.iterations { + let record = LogRecord::new(event); + let callsite = SavedCallsite::new(event.metadata()); + let line = writer.format_log_record(&record, &callsite); + let _ = std::hint::black_box(line); + } + } + BenchOp::EncodeProto => { + let mut buf = ProtoBuffer::new(); + let mut encoder = DirectLogRecordEncoder::new(&mut buf); + let callsite = SavedCallsite::new(event.metadata()); + + for _ in 0..self.iterations { + encoder.clear(); + let size = encoder.encode_log_record(LogRecord::new(event), &callsite); + let _ = std::hint::black_box(size); + } + } + } + } +} + +/// Macro to generate benchmark functions for different attribute counts. +/// Each variant emits a consistent log statement for fair comparison. +macro_rules! emit_log { + (0) => { + tracing::info!("benchmark message") + }; + (3) => { + tracing::info!( + attr_str = "value", + attr_int = 42, + attr_bool = true, + "benchmark message" + ) + }; + (10) => { + tracing::info!( + attr_str1 = "string1", + attr_bool1 = true, + attr_str2 = "string2", + attr_float1 = 1.234, + attr_int1 = 42i64, + attr_str3 = "string3", + attr_bool2 = false, + attr_float2 = 5.678, + attr_int2 = 100u64, + attr_str4 = "string4", + "benchmark message" + ) + }; +} + +/// Run a benchmark with the given layer, invoking the log emitter. +fn run_bench(b: &mut criterion::Bencher<'_>, layer: L, emit: F) +where + L: Layer + Send + Sync + 'static, + F: Fn(), +{ + let subscriber = tracing_subscriber::registry().with(layer); + let dispatch = tracing::Dispatch::new(subscriber); + + b.iter(|| { + tracing::dispatcher::with_default(&dispatch, &emit); + std::hint::black_box(()); + }); +} + +/// Benchmark a specific operation across different iteration counts. +fn bench_op(c: &mut Criterion, group_name: &str, op: BenchOp) { + let mut group = c.benchmark_group(group_name); + + for &iterations in &[100, 1000] { + for &(attr_count, attr_label) in &[(0, "0_attrs"), (3, "3_attrs"), (10, "10_attrs")] { + let id = BenchmarkId::new(attr_label, format!("{}_events", iterations)); + + let _ = group.bench_with_input(id, &iterations, |b, &iters| { + let layer = BenchLayer::new(iters, op); + match attr_count { + 0 => run_bench(b, layer, || emit_log!(0)), + 3 => run_bench(b, layer, || emit_log!(3)), + _ => run_bench(b, layer, || emit_log!(10)), + } + }); + } + } + + group.finish(); +} + +fn bench_encode(c: &mut Criterion) { + bench_op(c, "encode", BenchOp::Encode); +} + +fn bench_format(c: &mut Criterion) { + bench_op(c, "format", BenchOp::Format); +} + +fn bench_encode_and_format(c: &mut Criterion) { + bench_op(c, "encode_and_format", BenchOp::EncodeAndFormat); +} + +fn bench_encode_proto(c: &mut Criterion) { + bench_op(c, "encode_proto", BenchOp::EncodeProto); +} + +#[allow(missing_docs)] +mod bench_entry { + use super::*; + + criterion_group!( + name = benches; + config = Criterion::default(); + targets = bench_encode, bench_format, bench_encode_and_format, bench_encode_proto + ); +} + +criterion_main!(bench_entry::benches); diff --git a/rust/otap-dataflow/crates/pdata/src/otlp/common.rs b/rust/otap-dataflow/crates/pdata/src/otlp/common.rs index e3a0859d12..dce34e9d59 100644 --- a/rust/otap-dataflow/crates/pdata/src/otlp/common.rs +++ b/rust/otap-dataflow/crates/pdata/src/otlp/common.rs @@ -30,7 +30,7 @@ use std::fmt; use std::fmt::Write; use std::sync::LazyLock; -pub(in crate::otlp) struct ResourceArrays<'a> { +pub(crate) struct ResourceArrays<'a> { pub id: Option<&'a UInt16Array>, pub dropped_attributes_count: Option<&'a UInt32Array>, pub schema_url: Option>, @@ -123,14 +123,14 @@ impl<'a> TryFrom<&'a RecordBatch> for ResourceArrays<'a> { } } -pub(in crate::otlp) struct ScopeArrays<'a> { +pub(crate) struct ScopeArrays<'a> { pub name: Option>, pub version: Option>, pub dropped_attributes_count: Option<&'a UInt32Array>, pub id: Option<&'a UInt16Array>, } -pub static SCOPE_ARRAY_DATA_TYPE: LazyLock = LazyLock::new(|| { +static SCOPE_ARRAY_DATA_TYPE: LazyLock = LazyLock::new(|| { DataType::Struct(Fields::from(vec![ Field::new( consts::NAME, @@ -491,11 +491,15 @@ macro_rules! proto_encode_len_delimited_unknown_size { }}; } -pub(crate) fn encode_len_placeholder(buf: &mut ProtoBuffer) { +/// Write a 4-byte length placeholder for later patching. +/// Do not use directly, use proto_encode_len_delimited_unknown_size. +pub fn encode_len_placeholder(buf: &mut ProtoBuffer) { buf.buffer.extend_from_slice(&[0x80, 0x80, 0x80, 0x00]); } -pub(crate) fn patch_len_placeholder( +/// Patch a previously written length placeholder with the actual length. +/// Do not use directly, use proto_encode_len_delimited_unknown_size. +pub fn patch_len_placeholder( buf: &mut ProtoBuffer, num_bytes: usize, len: usize, diff --git a/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs b/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs index f4f4056c5a..4e5e611410 100644 --- a/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs +++ b/rust/otap-dataflow/crates/pdata/src/otlp/mod.rs @@ -9,7 +9,7 @@ use crate::{error::Result, otap::OtapArrowRecords}; use bytes::Bytes; use otap_df_config::SignalType; -pub use common::ProtoBuffer; +pub use common::{ProtoBuffer, encode_len_placeholder, patch_len_placeholder}; pub use otap_df_pdata_otlp_macros::Message; // Required for derived code pub use otap_df_pdata_otlp_macros::qualified; // Required for derived code @@ -17,6 +17,8 @@ pub use otap_df_pdata_otlp_macros::qualified; // Required for derived code pub mod attributes; /// Common methods for batching. pub mod batching; +/// Common utilities for protobuf encoding. +pub mod common; /// Common methods for OTLP/OTAP logs. pub mod logs; /// Common methods for OTLP/OTAP metrics. @@ -24,10 +26,9 @@ pub mod metrics; /// Common methods for OTLP/OTAP traces. pub mod traces; -mod common; - #[cfg(test)] mod batching_tests; + #[cfg(test)] mod tests; diff --git a/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/common.rs b/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/common.rs index 65038444c2..c352cd0ef6 100644 --- a/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/common.rs +++ b/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/common.rs @@ -37,8 +37,10 @@ pub struct RawKeyValue<'a> { } impl<'a> RawKeyValue<'a> { + /// Create a new RawKeyValue parser from a byte slice containing a KeyValue message. #[inline] - fn new(buf: &'a [u8]) -> Self { + #[must_use] + pub fn new(buf: &'a [u8]) -> Self { Self { buf, pos: Cell::new(0), diff --git a/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/logs.rs b/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/logs.rs index 8795e14658..fa63cffe36 100644 --- a/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/logs.rs +++ b/rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/logs.rs @@ -153,6 +153,18 @@ pub struct RawLogRecord<'a> { bytes_parser: ProtoBytesParser<'a, LogFieldOffsets>, } +impl<'a> RawLogRecord<'a> { + /// Create a new instance of `RawLogRecord`. This is exposed + /// specifically for interpreting internally generated log records + /// which encode body and attributes as OTLP bytes. + #[must_use] + pub fn new(buf: &'a [u8]) -> Self { + Self { + bytes_parser: ProtoBytesParser::new(buf), + } + } +} + /// Known field offsets within byte buffer for fields in ResourceLogs message pub struct LogFieldOffsets { scalar_fields: [Cell>; 13], @@ -274,11 +286,7 @@ impl<'a> Iterator for LogRecordsIter<'a> { type Item = RawLogRecord<'a>; fn next(&mut self) -> Option { - let slice = self.byte_parser.next()?; - - Some(RawLogRecord { - bytes_parser: ProtoBytesParser::new(slice), - }) + Some(RawLogRecord::new(self.byte_parser.next()?)) } } diff --git a/rust/otap-dataflow/crates/telemetry/Cargo.toml b/rust/otap-dataflow/crates/telemetry/Cargo.toml index e92704a616..8be2d5a138 100644 --- a/rust/otap-dataflow/crates/telemetry/Cargo.toml +++ b/rust/otap-dataflow/crates/telemetry/Cargo.toml @@ -19,8 +19,12 @@ unchecked-index = [] unchecked-arithmetic = [] [dependencies] -axum = { workspace = true } +otap-df-pdata = { workspace = true } otap-df-config = { workspace = true } + +axum = { workspace = true } +bytes = { workspace = true } +chrono = { workspace = true } flume = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } @@ -29,6 +33,7 @@ thiserror = { workspace = true } slotmap = { workspace = true } parking_lot = { workspace = true } prometheus = { workspace = true } +prost = { workspace = true } serde = { workspace = true } tonic = { workspace = true, optional = true } opentelemetry = { workspace = true } diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index f28001364d..58f7a75da5 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -40,6 +40,7 @@ pub mod metrics; pub mod opentelemetry_client; pub mod registry; pub mod reporter; +pub mod self_tracing; pub mod semconv; // Re-export _private module from internal_events for macro usage. diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs new file mode 100644 index 0000000000..9d03fd56c7 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing.rs @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Log encoding and formatting for Tokio tracing events. +//! +//! The intermediate representation is LogRecord, includes the +//! primitive fields and static references. The remaining data are +//! placed in a partial OTLP encoding. + +pub mod encoder; +pub mod formatter; + +use bytes::Bytes; +use encoder::DirectFieldVisitor; +use otap_df_pdata::otlp::ProtoBuffer; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::callsite::Identifier; +use tracing::{Event, Level, Metadata}; + +pub use encoder::DirectLogRecordEncoder; +pub use formatter::{ConsoleWriter, RawLoggingLayer}; + +/// A log record with structural metadata and pre-encoded body/attributes. +#[derive(Debug, Clone)] +pub struct LogRecord { + /// Callsite identifier used to look up cached callsite info. + pub callsite_id: Identifier, + + /// Timestamp in UNIX epoch nanoseconds. + pub timestamp_ns: u64, + + /// Pre-encoded body and attributes in OTLP bytes. These bytes + /// can be interrpreted using the otap_df_pdata::views::otlp::bytes::RawLogRecord + /// in practice and/or parsed by a crate::proto::opentelemetry::logs::v1::LogRecord + /// message object for testing. + pub body_attrs_bytes: Bytes, +} + +/// Saved callsite information. This is information that can easily be +/// populated from Metadata, for example in a `register_callsite` hook +/// for building a map by Identifier. +#[derive(Debug, Clone)] +pub struct SavedCallsite { + /// Tracing metadata. + metadata: &'static Metadata<'static>, +} + +impl SavedCallsite { + /// Construct saved callsite information from tracing Metadata. + #[must_use] + pub fn new(metadata: &'static Metadata<'static>) -> Self { + Self { metadata } + } + + /// The level. + #[must_use] + pub fn level(&self) -> &Level { + self.metadata.level() + } + + /// The filename. + #[must_use] + pub fn file(&self) -> Option<&'static str> { + self.metadata.file() + } + + /// The line number. + #[must_use] + pub fn line(&self) -> Option { + self.metadata.line() + } + + /// The target (e.g., module). + #[must_use] + pub fn target(&self) -> &'static str { + self.metadata.target() + } + + /// The event name. + #[must_use] + pub fn name(&self) -> &'static str { + self.metadata.name() + } +} + +impl LogRecord { + /// Construct a log record, partially encoding its dynamic content. + #[must_use] + pub fn new(event: &Event<'_>) -> Self { + let metadata = event.metadata(); + + // Encode body and attributes to bytes. + // Note! TODO: we could potentially avoid allocating for the intermediate + // protobuf slice with work to support a fixed-size buffer and cursor + // instead of a Vec. + let mut buf = ProtoBuffer::with_capacity(256); + let mut visitor = DirectFieldVisitor::new(&mut buf); + event.record(&mut visitor); + + Self { + callsite_id: metadata.callsite(), + timestamp_ns: Self::get_timestamp_nanos(), + body_attrs_bytes: buf.into_bytes(), + } + } + + /// Get current timestamp in UNIX epoch nanoseconds. + fn get_timestamp_nanos() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64 + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing/README.md b/rust/otap-dataflow/crates/telemetry/src/self_tracing/README.md new file mode 100644 index 0000000000..820c5073f3 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing/README.md @@ -0,0 +1,29 @@ +# Internal logging handlers + +This module contains a simple encoder and formatter for use with Tokio +tracing subscribers to enable a lightweight bridge into the +OTAP-Dataflow engine. + +## OTLP bytes first + +This module currently implements encoding support for OTLP bytes, in +two forms: + +- Partial: The `LogRecord` type encodes the dynamic arguments from the event + along with a timestamp, yielding a representation that can be passed into + an internal pipeline because it is already encoded as bytes. This representation + allows sorting and filtering records before encoding full OTLP messages. +- Full: The `DirectLogRecordEncoder` type supports appending the OTLP bytes + representation for the complete LogRecord (without Scope and Resource wrappers). + +## Raw logging handler + +This package supports a `RawLoggingLayer` tracing subscriber that +prints colorized or uncolorized messages on the console. The +formatting code path in this module is safe for logging in critical +regions and to be used as a fallback inside other logging handlers. +It uses no synchronization and depends only on the console. + +Presently, the raw logging code path allocates memory, however this is +not the desired state. [See the issue with potential +improvements.](https://github.com/open-telemetry/otel-arrow/issues/1746) diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs new file mode 100644 index 0000000000..688f7eb19f --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing/encoder.rs @@ -0,0 +1,303 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Direct OTLP bytes encoder for tokio-tracing events. + +use std::fmt::Write as FmtWrite; + +use otap_df_pdata::otlp::ProtoBuffer; +use otap_df_pdata::proto::consts::{field_num::common::*, field_num::logs::*, wire_types}; +use otap_df_pdata::proto_encode_len_delimited_unknown_size; +use tracing::Level; + +use super::{LogRecord, SavedCallsite}; + +/// Direct encoder that writes a single LogRecord from a tracing Event. +pub struct DirectLogRecordEncoder<'buf> { + buf: &'buf mut ProtoBuffer, +} + +impl<'buf> DirectLogRecordEncoder<'buf> { + /// Create a new encoder that writes to the provided buffer. + #[inline] + pub fn new(buf: &'buf mut ProtoBuffer) -> Self { + Self { buf } + } + + /// Reset the underlying buffer. + pub fn clear(&mut self) { + self.buf.clear(); + } + + /// Encode a tracing Event as a complete LogRecord message. + /// + /// Returns the number of bytes written. + pub fn encode_log_record(&mut self, record: LogRecord, callsite: &SavedCallsite) -> usize { + let start_len = self.buf.len(); + + // Encode time_unix_nano (field 1, fixed64) + self.buf + .encode_field_tag(LOG_RECORD_TIME_UNIX_NANO, wire_types::FIXED64); + self.buf + .extend_from_slice(&record.timestamp_ns.to_le_bytes()); + + // Encode severity_number (field 2, varint) + let severity = level_to_severity_number(callsite.level()); + self.buf + .encode_field_tag(LOG_RECORD_SEVERITY_NUMBER, wire_types::VARINT); + self.buf.encode_varint(severity as u64); + + // Node we skip encoding severity_text (field 3, string) + + // Encode event_name (field 12, string) - format: "target::name (file:line)" + encode_event_name(self.buf, callsite); + + self.buf.extend_from_slice(&record.body_attrs_bytes); + + self.buf.len() - start_len + } +} + +/// Encode the event name from callsite metadata. +/// Format: "target::name (file:line)" or "target::name" if no file/line. +fn encode_event_name(buf: &mut ProtoBuffer, callsite: &SavedCallsite) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_EVENT_NAME, + { + buf.extend_from_slice(callsite.target().as_bytes()); + buf.extend_from_slice(b"::"); + buf.extend_from_slice(callsite.name().as_bytes()); + if let (Some(file), Some(line)) = (callsite.file(), callsite.line()) { + let _ = write!(ProtoWriter(buf), " ({}:{})", file, line); + } + }, + buf + ); +} + +/// Wrapper that implements fmt::Write for a ProtoBuffer. +struct ProtoWriter<'a>(&'a mut ProtoBuffer); + +impl FmtWrite for ProtoWriter<'_> { + #[inline] + fn write_str(&mut self, s: &str) -> std::fmt::Result { + self.0.extend_from_slice(s.as_bytes()); + Ok(()) + } +} + +/// Visitor that directly encodes tracing fields to protobuf. +pub struct DirectFieldVisitor<'buf> { + buf: &'buf mut ProtoBuffer, +} + +impl<'buf> DirectFieldVisitor<'buf> { + /// Create a new DirectFieldVisitor that writes to the provided buffer. + pub fn new(buf: &'buf mut ProtoBuffer) -> Self { + Self { buf } + } + + /// Encode an attribute (KeyValue message) with a string value. + #[inline] + pub fn encode_string_attribute(&mut self, key: &str, value: &str) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_ATTRIBUTES, + { + self.buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + self.buf.encode_string(ANY_VALUE_STRING_VALUE, value); + }, + self.buf + ); + }, + self.buf + ); + } + + /// Encode an attribute with an i64 value. + #[inline] + pub fn encode_int_attribute(&mut self, key: &str, value: i64) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_ATTRIBUTES, + { + self.buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + self.buf + .encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT); + self.buf.encode_varint(value as u64); + }, + self.buf + ); + }, + self.buf + ); + } + + /// Encode an attribute with a bool value. + #[inline] + pub fn encode_bool_attribute(&mut self, key: &str, value: bool) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_ATTRIBUTES, + { + self.buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + self.buf + .encode_field_tag(ANY_VALUE_BOOL_VALUE, wire_types::VARINT); + self.buf.encode_varint(u64::from(value)); + }, + self.buf + ); + }, + self.buf + ); + } + + /// Encode an attribute with a double value. + #[inline] + pub fn encode_double_attribute(&mut self, key: &str, value: f64) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_ATTRIBUTES, + { + self.buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + self.buf + .encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64); + self.buf.extend_from_slice(&value.to_le_bytes()); + }, + self.buf + ); + }, + self.buf + ); + } + + /// Encode the body (AnyValue message) as a string. + #[inline] + pub fn encode_body_string(&mut self, value: &str) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_BODY, + { + self.buf.encode_string(ANY_VALUE_STRING_VALUE, value); + }, + self.buf + ); + } + + /// Encode the body (AnyValue message) from a Debug value without allocation. + #[inline] + pub fn encode_body_debug(&mut self, value: &dyn std::fmt::Debug) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_BODY, + { + encode_debug_string(self.buf, value); + }, + self.buf + ); + } + + /// Encode an attribute with a Debug value without allocation. + #[inline] + pub fn encode_debug_attribute(&mut self, key: &str, value: &dyn std::fmt::Debug) { + proto_encode_len_delimited_unknown_size!( + LOG_RECORD_ATTRIBUTES, + { + self.buf.encode_string(KEY_VALUE_KEY, key); + proto_encode_len_delimited_unknown_size!( + KEY_VALUE_VALUE, + { + encode_debug_string(self.buf, value); + }, + self.buf + ); + }, + self.buf + ); + } +} + +/// Helper to encode a Debug value as a protobuf string field. +/// This is separate from DirectFieldVisitor to avoid borrow conflicts with the macro. +#[inline] +fn encode_debug_string(buf: &mut ProtoBuffer, value: &dyn std::fmt::Debug) { + proto_encode_len_delimited_unknown_size!( + ANY_VALUE_STRING_VALUE, + { + let _ = write!(ProtoWriter(buf), "{:?}", value); + }, + buf + ); +} + +impl tracing::field::Visit for DirectFieldVisitor<'_> { + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + if field.name() == "message" { + // TODO: encode f64 body + return; + } + self.encode_double_attribute(field.name(), value); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + if field.name() == "message" { + // TODO: encode i64 body + return; + } + self.encode_int_attribute(field.name(), value); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + if field.name() == "message" { + // TODO: encode u64 body + return; + } + self.encode_int_attribute(field.name(), value as i64); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + if field.name() == "message" { + // TODO: encode bool body + return; + } + self.encode_bool_attribute(field.name(), value); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "message" { + self.encode_body_string(value); + return; + } + self.encode_string_attribute(field.name(), value); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + // The Rust Debug type cannot be destructured, only formatted. + if field.name() == "message" { + self.encode_body_debug(value); + } else { + self.encode_debug_attribute(field.name(), value); + } + } +} + +/// Convert tracing Level to OTLP severity number. +/// +/// See: https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber +#[inline] +#[must_use] +pub fn level_to_severity_number(level: &Level) -> u8 { + match *level { + Level::TRACE => 1, + Level::DEBUG => 5, + Level::INFO => 9, + Level::WARN => 13, + Level::ERROR => 17, + } +} diff --git a/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs b/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs new file mode 100644 index 0000000000..2d6b4c2b79 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/self_tracing/formatter.rs @@ -0,0 +1,651 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! An alternative to Tokio fmt::layer(). + +use super::{LogRecord, SavedCallsite}; +use bytes::Bytes; +use chrono::{DateTime, Datelike, Timelike, Utc}; +use otap_df_pdata::views::common::{AnyValueView, AttributeView, ValueType}; +use otap_df_pdata::views::logs::LogRecordView; +use otap_df_pdata::views::otlp::bytes::logs::RawLogRecord; +use std::io::{Cursor, Write}; +use tracing::{Event, Level, Subscriber}; +use tracing_subscriber::layer::{Context, Layer as TracingLayer}; +use tracing_subscriber::registry::LookupSpan; + +/// Default buffer size for log formatting. +/// +/// TODO: Append a note to the log message when truncation occurs, otherwise +/// today the log record is silently truncated. +pub const LOG_BUFFER_SIZE: usize = 4096; + +/// ANSI codes a.k.a. "Select Graphic Rendition" codes. +#[derive(Clone, Copy)] +#[repr(u8)] +enum AnsiCode { + Reset = 0, + Bold = 1, + Dim = 2, + Red = 31, + Green = 32, + Yellow = 33, + Blue = 34, + Magenta = 35, +} + +/// Color mode for console output. +#[derive(Debug, Clone, Copy)] +pub enum ColorMode { + /// Enable ANSI color codes. + Color, + /// Disable ANSI color codes. + NoColor, +} + +impl ColorMode { + /// Write an ANSI escape sequence (no-op for NoColor). + #[inline] + fn write_ansi(self, w: &mut BufWriter<'_>, code: AnsiCode) { + if let ColorMode::Color = self { + let _ = write!(w, "\x1b[{}m", code as u8); + } + } + + /// Write level with color and padding. + #[inline] + fn write_level(self, w: &mut BufWriter<'_>, level: &Level) { + self.write_ansi(w, Self::color(level)); + let _ = w.write_all(level.as_str().as_bytes()); + self.write_ansi(w, AnsiCode::Reset); + let _ = w.write_all(b" "); + } + + /// Get ANSI color code for a severity level. + #[inline] + fn color(level: &Level) -> AnsiCode { + match *level { + Level::ERROR => AnsiCode::Red, + Level::WARN => AnsiCode::Yellow, + Level::INFO => AnsiCode::Green, + Level::DEBUG => AnsiCode::Blue, + Level::TRACE => AnsiCode::Magenta, + } + } +} + +/// Console writes formatted text to stdout or stderr. +#[derive(Debug, Clone, Copy)] +pub struct ConsoleWriter { + color_mode: ColorMode, +} + +/// A minimal alternative to `tracing_subscriber::fmt::layer()`. +pub struct RawLoggingLayer { + writer: ConsoleWriter, +} + +impl RawLoggingLayer { + /// Return a new formatting layer with associated writer. + #[must_use] + pub fn new(writer: ConsoleWriter) -> Self { + Self { writer } + } +} + +/// Type alias for a cursor over a byte buffer. +/// Uses `std::io::Cursor` for position tracking with `std::io::Write`. +pub type BufWriter<'a> = Cursor<&'a mut [u8]>; + +impl ConsoleWriter { + /// Create a writer that outputs to stdout without ANSI colors. + #[must_use] + pub fn no_color() -> Self { + Self { + color_mode: ColorMode::NoColor, + } + } + + /// Create a writer that outputs to stderr with ANSI colors. + #[must_use] + pub fn color() -> Self { + Self { + color_mode: ColorMode::Color, + } + } + + /// Format a LogRecord as a human-readable string (for testing/compatibility). + /// + /// Output format: `2026-01-06T10:30:45.123Z INFO target::name (file.rs:42): body [attr=value, ...]` + pub fn format_log_record(&self, record: &LogRecord, callsite: &SavedCallsite) -> String { + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let len = self.write_log_record(&mut buf, record, callsite); + // The buffer contains valid UTF-8 since we only write ASCII and valid UTF-8 strings + String::from_utf8_lossy(&buf[..len]).into_owned() + } + + /// Write a LogRecord to a byte buffer. Returns the number of bytes written. + pub fn write_log_record( + &self, + buf: &mut [u8], + record: &LogRecord, + callsite: &SavedCallsite, + ) -> usize { + let mut w = Cursor::new(buf); + let cm = self.color_mode; + + cm.write_ansi(&mut w, AnsiCode::Dim); + Self::write_timestamp(&mut w, record.timestamp_ns); + cm.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b" "); + cm.write_level(&mut w, callsite.level()); + cm.write_ansi(&mut w, AnsiCode::Bold); + Self::write_event_name(&mut w, callsite); + cm.write_ansi(&mut w, AnsiCode::Reset); + let _ = w.write_all(b": "); + Self::write_body_attrs(&mut w, &record.body_attrs_bytes); + let _ = w.write_all(b"\n"); + + w.position() as usize + } + + /// Write callsite details as event_name to buffer. + #[inline] + fn write_event_name(w: &mut BufWriter<'_>, callsite: &SavedCallsite) { + let _ = w.write_all(callsite.target().as_bytes()); + let _ = w.write_all(b"::"); + let _ = w.write_all(callsite.name().as_bytes()); + if let (Some(file), Some(line)) = (callsite.file(), callsite.line()) { + let _ = write!(w, " ({}:{})", file, line); + } + } + + /// Write nanosecond timestamp as ISO 8601 (UTC) to buffer. + #[inline] + fn write_timestamp(w: &mut BufWriter<'_>, nanos: u64) { + let secs = (nanos / 1_000_000_000) as i64; + let subsec_nanos = (nanos % 1_000_000_000) as u32; + + if let Some(dt) = DateTime::::from_timestamp(secs, subsec_nanos) { + let date = dt.date_naive(); + let time = dt.time(); + let millis = subsec_nanos / 1_000_000; + + let _ = write!( + w, + "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z", + date.year(), + date.month(), + date.day(), + time.hour(), + time.minute(), + time.second(), + millis + ); + } else { + let _ = w.write_all(b""); + } + } + + /// Write body+attrs bytes to buffer using LogRecordView. + fn write_body_attrs(w: &mut BufWriter<'_>, bytes: &Bytes) { + if bytes.is_empty() { + return; + } + + // A partial protobuf message (just body + attributes) is still a valid message. + // We can use the RawLogRecord view to access just the fields we encoded. + let record = RawLogRecord::new(bytes.as_ref()); + + // Write body if present + if let Some(body) = record.body() { + Self::write_any_value(w, &body); + } + + // Write attributes if present + let mut attrs = record.attributes().peekable(); + if attrs.peek().is_some() { + let _ = w.write_all(b" ["); + let mut first = true; + for attr in attrs { + if Self::is_full(w) { + break; + } + if !first { + let _ = w.write_all(b", "); + } + first = false; + let _ = w.write_all(attr.key()); + let _ = w.write_all(b"="); + match attr.value() { + Some(v) => Self::write_any_value(w, &v), + None => { + let _ = w.write_all(b""); + } + } + } + let _ = w.write_all(b"]"); + } + } + + /// Check if the buffer is full (position >= capacity). + #[inline] + fn is_full(w: &BufWriter<'_>) -> bool { + w.position() as usize >= w.get_ref().len() + } + + /// Write an AnyValue to buffer. + fn write_any_value<'a>(w: &mut BufWriter<'_>, value: &impl AnyValueView<'a>) { + match value.value_type() { + ValueType::String => { + if let Some(s) = value.as_string() { + let _ = w.write_all(s); + } + } + ValueType::Int64 => { + if let Some(i) = value.as_int64() { + let _ = write!(w, "{}", i); + } + } + ValueType::Bool => { + if let Some(b) = value.as_bool() { + let _ = w.write_all(if b { b"true" } else { b"false" }); + } + } + ValueType::Double => { + if let Some(d) = value.as_double() { + let _ = write!(w, "{:.6}", d); + } + } + ValueType::Bytes => { + if let Some(bytes) = value.as_bytes() { + let _ = w.write_all(b"["); + for (i, b) in bytes.iter().enumerate() { + if i > 0 { + let _ = w.write_all(b", "); + } + let _ = write!(w, "{}", b); + } + let _ = w.write_all(b"]"); + } + } + ValueType::Array => { + let _ = w.write_all(b"["); + if let Some(array_iter) = value.as_array() { + let mut first = true; + for item in array_iter { + if !first { + let _ = w.write_all(b", "); + } + first = false; + Self::write_any_value(w, &item); + } + } + let _ = w.write_all(b"]"); + } + ValueType::KeyValueList => { + let _ = w.write_all(b"{"); + if let Some(kvlist_iter) = value.as_kvlist() { + let mut first = true; + for kv in kvlist_iter { + if !first { + let _ = w.write_all(b", "); + } + first = false; + let _ = w.write_all(kv.key()); + if let Some(val) = kv.value() { + let _ = w.write_all(b"="); + Self::write_any_value(w, &val); + } + } + } + let _ = w.write_all(b"}"); + } + ValueType::Empty => {} + } + } + + /// Write a log line to stdout or stderr. + fn write_line(&self, level: &Level, data: &[u8]) { + let use_stderr = matches!(*level, Level::ERROR | Level::WARN); + let _ = if use_stderr { + std::io::stderr().write_all(data) + } else { + std::io::stdout().write_all(data) + }; + } +} + +impl TracingLayer for RawLoggingLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + // Allocates a buffer on the stack, formats the event to a LogRecord + // with partial OTLP bytes. + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + // TODO: there are allocations implied here that we would prefer + // to avoid, it will be an extensive change in the ProtoBuffer to + // stack-allocate this temporary. + let record = LogRecord::new(event); + let callsite = SavedCallsite::new(event.metadata()); + + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let len = self.writer.write_log_record(&mut buf, &record, &callsite); + self.writer.write_line(callsite.level(), &buf[..len]); + } + + // Note! This tracing layer does not implement Span-related features + // available through LookupSpan. This is important future work and will + // require introducing a notion of context. Presently, the Tokio tracing + // Context does not pass through the OTAP dataflow engine. + // + // We are likely to issue span events as events, meaning not to build + // Span objects at runtime. +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::self_tracing::DirectLogRecordEncoder; + use crate::self_tracing::encoder::level_to_severity_number; + use bytes::Bytes; + use otap_df_pdata::otlp::ProtoBuffer; + use otap_df_pdata::proto::opentelemetry::common::v1::any_value::Value; + use otap_df_pdata::proto::opentelemetry::common::v1::{AnyValue, KeyValue}; + use otap_df_pdata::proto::opentelemetry::logs::v1::LogRecord as ProtoLogRecord; + use prost::Message; + use std::sync::{Arc, Mutex}; + use tracing_subscriber::prelude::*; + + struct CaptureLayer { + formatted: Arc>, + encoded: Arc>, + } + + impl TracingLayer for CaptureLayer + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + let record = LogRecord::new(event); + let callsite = SavedCallsite::new(event.metadata()); + + // Capture formatted output + let writer = ConsoleWriter::no_color(); + *self.formatted.lock().unwrap() = writer.format_log_record(&record, &callsite); + + // Capture full OTLP encoding + let mut buf = ProtoBuffer::with_capacity(512); + let mut encoder = DirectLogRecordEncoder::new(&mut buf); + let _ = encoder.encode_log_record(record, &callsite); + *self.encoded.lock().unwrap() = buf.into_bytes(); + } + } + + fn new_capture_layer() -> (CaptureLayer, Arc>, Arc>) { + let formatted = Arc::new(Mutex::new(String::new())); + let encoded = Arc::new(Mutex::new(Bytes::new())); + let layer = CaptureLayer { + formatted: formatted.clone(), + encoded: encoded.clone(), + }; + (layer, formatted, encoded) + } + + // strip timestamp and newline + fn strip_ts(s: &str) -> (&str, &str) { + // timestamp is 24 bytes, see assertion below. + (&s[..24], s[26..].trim_end()) + } + + fn format_timestamp(nanos: u64) -> String { + let mut buf = [0u8; 32]; + let mut w = Cursor::new(buf.as_mut_slice()); + ConsoleWriter::write_timestamp(&mut w, nanos); + let len = w.position() as usize; + assert_eq!(len, 24); + String::from_utf8_lossy(&buf[..len]).into_owned() + } + + fn format_attrs(attrs: &[KeyValue]) -> String { + if attrs.is_empty() { + return String::new(); + } + let mut result = String::from(" ["); + for (i, kv) in attrs.iter().enumerate() { + if i > 0 { + result.push_str(", "); + } + result.push_str(&kv.key); + result.push('='); + if let Some(ref v) = kv.value { + if let Some(ref val) = v.value { + match val { + Value::StringValue(s) => result.push_str(s), + Value::IntValue(i) => result.push_str(&i.to_string()), + Value::BoolValue(b) => result.push_str(if *b { "true" } else { "false" }), + Value::DoubleValue(d) => result.push_str(&format!("{:.6}", d)), + _ => unreachable!(), + } + } + } + } + result.push(']'); + result + } + + fn assert_log_record( + formatted: &Arc>, + encoded: &Arc>, + expected_level: Level, + expected_body: &str, + expected_attrs: Vec, + ) { + // Decode the OTLP bytes + let bytes = encoded.lock().unwrap(); + let decoded = ProtoLogRecord::decode(bytes.as_ref()).expect("decode failed"); + + // Verify OTLP encoding + let sev_text = expected_level.as_str(); + assert_eq!( + decoded.severity_number, + level_to_severity_number(&expected_level) as i32, + "severity_number mismatch" + ); + // Severity text not coded in OTLP bytes form. + assert!(decoded.severity_text.is_empty()); + assert_eq!( + decoded.body, + Some(AnyValue::new_string(expected_body)), + "body mismatch" + ); + assert_eq!(decoded.attributes, expected_attrs, "attributes mismatch"); + + // Build expected text suffix + let attrs_text = format_attrs(&expected_attrs); + let expected_suffix = format!(": {}{}", expected_body, attrs_text); + + // Verify event_name has correct prefix. Note: file:line are not always available, not tested. + let expected_prefix = "otap_df_telemetry::self_tracing::formatter::tests::event"; + assert!( + decoded.event_name.starts_with(expected_prefix), + "event_name should start with '{}', got: {}", + expected_prefix, + decoded.event_name + ); + + // Verify text formatting + let binding = formatted.lock().unwrap(); + let (ts_str, rest) = strip_ts(&binding); + + // Verify timestamp matches OTLP value + let expected_ts = format_timestamp(decoded.time_unix_nano); + assert_eq!(ts_str, expected_ts, "timestamp mismatch"); + + assert!( + rest.starts_with(sev_text), + "expected level '{}', got: {}", + sev_text, + rest + ); + assert!( + rest.ends_with(&expected_suffix), + "expected suffix '{}', got: {}", + expected_suffix, + rest + ); + } + + #[test] + fn test_log_format() { + let (layer, formatted, encoded) = new_capture_layer(); + let subscriber = tracing_subscriber::registry().with(layer); + let dispatch = tracing::Dispatch::new(subscriber); + let _guard = tracing::dispatcher::set_default(&dispatch); + + tracing::info!("hello world"); + assert_log_record(&formatted, &encoded, Level::INFO, "hello world", vec![]); + + tracing::warn!(count = 42i64, "something odd"); + assert_log_record( + &formatted, + &encoded, + Level::WARN, + "something odd", + vec![KeyValue::new("count", AnyValue::new_int(42))], + ); + + tracing::error!(msg = "oops", "we failed"); + assert_log_record( + &formatted, + &encoded, + Level::ERROR, + "we failed", + vec![KeyValue::new("msg", AnyValue::new_string("oops"))], + ); + } + + #[test] + fn test_timestamp_format() { + let record = LogRecord { + callsite_id: tracing::callsite::Identifier(&TEST_CALLSITE), + // 2024-01-15T12:30:45.678Z + timestamp_ns: 1_705_321_845_678_000_000, + body_attrs_bytes: Bytes::new(), + }; + + let writer = ConsoleWriter::no_color(); + let output = writer.format_log_record(&record, &test_callsite()); + + // Note that the severity text is formatted using the Metadata::Level + // so the text appears, unlike the protobuf case. + assert_eq!( + output, + "2024-01-15T12:30:45.678Z INFO test_module::submodule::test_event (src/test.rs:123): \n" + ); + + let writer = ConsoleWriter::color(); + let output = writer.format_log_record(&record, &test_callsite()); + + // With ANSI codes: dim timestamp, green INFO, bold event name + assert_eq!( + output, + "\x1b[2m2024-01-15T12:30:45.678Z\x1b[0m \x1b[32mINFO\x1b[0m \x1b[1mtest_module::submodule::test_event (src/test.rs:123)\x1b[0m: \n" + ); + + // Verify full OTLP encoding with known callsite + let mut buf = ProtoBuffer::with_capacity(256); + let mut encoder = DirectLogRecordEncoder::new(&mut buf); + let _ = encoder.encode_log_record(record, &test_callsite()); + let decoded = ProtoLogRecord::decode(buf.into_bytes().as_ref()).expect("decode failed"); + + assert_eq!(decoded.time_unix_nano, 1_705_321_845_678_000_000); + assert_eq!(decoded.severity_number, 9); // INFO + assert!(decoded.severity_text.is_empty()); // Not coded + assert_eq!( + decoded.event_name, + "test_module::submodule::test_event (src/test.rs:123)" + ); + } + + #[test] + fn test_buffer_overflow() { + let mut attrs = Vec::new(); + for i in 0..200 { + attrs.push(KeyValue::new( + format!("attribute_key_{:03}", i), + AnyValue::new_string(format!("value_{:03}", i)), + )); + } + + let proto_record = ProtoLogRecord { + body: Some(AnyValue::new_string("This is the log message body")), + attributes: attrs, + ..Default::default() + }; + + let mut encoded = Vec::new(); + proto_record.encode(&mut encoded).unwrap(); + + let record = LogRecord { + callsite_id: tracing::callsite::Identifier(&TEST_CALLSITE), + timestamp_ns: 1_705_321_845_678_000_000, + body_attrs_bytes: Bytes::from(encoded), + }; + + let mut buf = [0u8; LOG_BUFFER_SIZE]; + let writer = ConsoleWriter::no_color(); + let len = writer.write_log_record(&mut buf, &record, &test_callsite()); + + // Fills exactly to capacity due to overflow. + // Note! we could append a ... or some other indicator. + assert_eq!(len, LOG_BUFFER_SIZE); + + // Verify the output starts correctly with timestamp and body + let output = std::str::from_utf8(&buf[..len]).unwrap(); + assert!( + output.starts_with("2024-01-15T12:30:45.678Z"), + "got: {}", + output + ); + assert!( + output.contains("This is the log message body"), + "got: {}", + output + ); + assert!( + output.contains("attribute_key_000=value_000"), + "got: {}", + output + ); + assert!( + output.contains("attribute_key_010=value_010"), + "got: {}", + output + ); + } + + static TEST_CALLSITE: TestCallsite = TestCallsite; + struct TestCallsite; + impl tracing::Callsite for TestCallsite { + fn set_interest(&self, _: tracing::subscriber::Interest) {} + fn metadata(&self) -> &tracing::Metadata<'_> { + &TEST_METADATA + } + } + + static TEST_METADATA: tracing::Metadata<'static> = tracing::Metadata::new( + "test_event", + "test_module::submodule", + Level::INFO, + Some("src/test.rs"), + Some(123), + Some("test_module::submodule"), + tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&TEST_CALLSITE)), + tracing::metadata::Kind::EVENT, + ); + + fn test_callsite() -> SavedCallsite { + SavedCallsite::new(&TEST_METADATA) + } +}