Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
7d91076
save
jmacd Jan 5, 2026
51e6559
save with view
jmacd Jan 6, 2026
00cdb25
emulate stateful encoder
jmacd Jan 6, 2026
0aaad0b
typo
jmacd Jan 6, 2026
d8c7ac2
benchmarrked
jmacd Jan 6, 2026
0c44470
compact
jmacd Jan 6, 2026
58e988b
compact bench
jmacd Jan 6, 2026
52a9610
workr out
jmacd Jan 7, 2026
7cb9db8
builds
jmacd Jan 7, 2026
b374a47
crates/engine/src/pipeline_metrics.rs
jmacd Jan 7, 2026
cd80b0e
option
jmacd Jan 7, 2026
c7a7d0b
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Jan 7, 2026
ce6a95b
remove dead code
jmacd Jan 7, 2026
165c940
rename
jmacd Jan 7, 2026
686cd9e
remove dead
jmacd Jan 7, 2026
21961df
cleanup
jmacd Jan 7, 2026
8f8595c
refactor bench
jmacd Jan 7, 2026
f9a65a5
use proto encoder macro
jmacd Jan 7, 2026
b95aea2
ftb
jmacd Jan 7, 2026
82f5cbe
nicer
jmacd Jan 7, 2026
cd4baee
use view
jmacd Jan 7, 2026
f8f0eb7
ansi cleanup
jmacd Jan 7, 2026
92a9337
handy
jmacd Jan 7, 2026
d80edc6
nice
jmacd Jan 7, 2026
4e4b37f
refactor a test
jmacd Jan 7, 2026
657524d
ansi cleanup
jmacd Jan 7, 2026
cbb1000
fmt
jmacd Jan 7, 2026
2f02db6
new config
jmacd Jan 7, 2026
ba09b56
clippyu
jmacd Jan 7, 2026
c172e5d
comment
jmacd Jan 7, 2026
60fea89
comments
jmacd Jan 7, 2026
b9e8958
cleanup docs
jmacd Jan 7, 2026
a6b6f46
lint
jmacd Jan 7, 2026
983518f
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Jan 7, 2026
241158e
fix win test
jmacd Jan 7, 2026
93c118e
RawLoggingLayer
jmacd Jan 7, 2026
e235e85
use module basename
jmacd Jan 7, 2026
54d6bf4
ws
jmacd Jan 7, 2026
e1e52c8
write to console on global subscriber error
jmacd Jan 7, 2026
7a1baed
store &Metadata, make accessors
jmacd Jan 8, 2026
9e86542
TODO about truncation
jmacd Jan 8, 2026
f277f09
cpyright
jmacd Jan 8, 2026
9789407
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Jan 9, 2026
953e38a
readme
jmacd Jan 9, 2026
1f37dc1
Do not code the severity text in the full OTLP bytes encoding (does n…
jmacd Jan 9, 2026
77e6aed
lint
jmacd Jan 9, 2026
2ed0f38
Update rust/otap-dataflow/crates/telemetry/src/self_tracing/README.md
jmacd Jan 9, 2026
7fe247d
Update rust/otap-dataflow/crates/telemetry/src/self_tracing/README.md
jmacd Jan 9, 2026
725e303
lint
jmacd Jan 9, 2026
1e4b657
Merge branch 'jmacd/tracing_to_otlp_bytes' of github.com:jmacd/otel-a…
jmacd Jan 9, 2026
b148d98
lint
jmacd Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"}
otap-df-config = { path = "crates/config" }
otap-df-controller = { path = "crates/controller" }
otap-df-otap = { path = "crates/otap" }
otap-df-pdata = { path = "crates/pdata" }
otap-df-telemetry = { path = "crates/telemetry" }
quiver = { package = "otap-df-quiver", path = "crates/quiver" }
data_engine_expressions = { path = "../experimental/query_engine/expressions" }
data_engine_kql_parser = { path = "../experimental/query_engine/kql-parser" }
Expand Down
7 changes: 7 additions & 0 deletions rust/otap-dataflow/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ otap-df-engine = { path = "../crates/engine"}
otap-df-telemetry = { path="../crates/telemetry"}
otap-df-pdata = { path="../crates/pdata", features = ["bench"]}

tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["registry"] }

fluke-hpack.workspace = true
futures-channel.workspace = true
futures.workspace = true
Expand Down Expand Up @@ -87,3 +90,7 @@ harness = false
[[bench]]
name = "otap_logs_view"
harness = false

[[bench]]
name = "self_tracing"
harness = false
199 changes: 199 additions & 0 deletions rust/otap-dataflow/benchmarks/benches/self_tracing/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Benchmarks for the compact log formatter.
//!
//! These benchmarks emit a single tracing event but perform N
//! encoding or encoding-and-formatting operations inside the callback
//!
//! Benchmark names follow the pattern: `group/description/N_events`
//!
//! Example: `encode/3_attrs/1000_events` = 300 µs → 300 ns per event

use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use tracing::{Event, Subscriber};
use tracing_subscriber::layer::Layer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;

use otap_df_pdata::otlp::ProtoBuffer;
use otap_df_telemetry::self_tracing::{
ConsoleWriter, DirectLogRecordEncoder, LogRecord, SavedCallsite,
};

#[cfg(not(windows))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(windows))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

/// The operation to perform on each event within the layer.
#[derive(Clone, Copy)]
enum BenchOp {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/open-telemetry/opentelemetry-rust/pull/3307/changes

I was adding similar benches in OTel Rust to measure the cost of conversions!

/// Encode the event into a LogRecord only.
Encode,
/// Encode once, then format N times.
Format,
/// Encode and format together N times.
EncodeAndFormat,
/// Encode to protobuf N times.
EncodeProto,
}

/// A layer that performs a configurable operation N times per event.
struct BenchLayer {
iterations: usize,
op: BenchOp,
}

impl BenchLayer {
fn new(iterations: usize, op: BenchOp) -> Self {
Self { iterations, op }
}
}

impl<S> Layer<S> for BenchLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
match self.op {
BenchOp::Encode => {
for _ in 0..self.iterations {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any advantage doing the iteration ourselves, than doing just once, and let criterion do the repeated execution itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this I suppose to be certain that overhead from the subscriber isn't being picked up. I'm pleased when I see 100 executions at 8.6xxx and 1000 executions at 85.xxx.

let record = LogRecord::new(event);
let _ = std::hint::black_box(record);
}
}
BenchOp::Format => {
// Encode once, format N times
let record = LogRecord::new(event);
let writer = ConsoleWriter::no_color();
let callsite = SavedCallsite::new(event.metadata());

for _ in 0..self.iterations {
let line = writer.format_log_record(&record, &callsite);
let _ = std::hint::black_box(line);
}
}
BenchOp::EncodeAndFormat => {
let writer = ConsoleWriter::no_color();

for _ in 0..self.iterations {
let record = LogRecord::new(event);
let callsite = SavedCallsite::new(event.metadata());
let line = writer.format_log_record(&record, &callsite);
let _ = std::hint::black_box(line);
}
}
BenchOp::EncodeProto => {
let mut buf = ProtoBuffer::new();
let mut encoder = DirectLogRecordEncoder::new(&mut buf);
let callsite = SavedCallsite::new(event.metadata());

for _ in 0..self.iterations {
encoder.clear();
let size = encoder.encode_log_record(LogRecord::new(event), &callsite);
let _ = std::hint::black_box(size);
}
}
}
}
}

/// Macro to generate benchmark functions for different attribute counts.
/// Each variant emits a consistent log statement for fair comparison.
macro_rules! emit_log {
(0) => {
tracing::info!("benchmark message")
};
(3) => {
tracing::info!(
attr_str = "value",
attr_int = 42,
attr_bool = true,
"benchmark message"
)
};
(10) => {
tracing::info!(
attr_str1 = "string1",
attr_bool1 = true,
attr_str2 = "string2",
attr_float1 = 1.234,
attr_int1 = 42i64,
attr_str3 = "string3",
attr_bool2 = false,
attr_float2 = 5.678,
attr_int2 = 100u64,
attr_str4 = "string4",
"benchmark message"
)
};
}

/// Run a benchmark with the given layer, invoking the log emitter.
fn run_bench<L, F>(b: &mut criterion::Bencher<'_>, layer: L, emit: F)
where
L: Layer<tracing_subscriber::Registry> + Send + Sync + 'static,
F: Fn(),
{
let subscriber = tracing_subscriber::registry().with(layer);
let dispatch = tracing::Dispatch::new(subscriber);

b.iter(|| {
tracing::dispatcher::with_default(&dispatch, &emit);
std::hint::black_box(());
});
}

/// Benchmark a specific operation across different iteration counts.
fn bench_op(c: &mut Criterion, group_name: &str, op: BenchOp) {
let mut group = c.benchmark_group(group_name);

for &iterations in &[100, 1000] {
for &(attr_count, attr_label) in &[(0, "0_attrs"), (3, "3_attrs"), (10, "10_attrs")] {
let id = BenchmarkId::new(attr_label, format!("{}_events", iterations));

let _ = group.bench_with_input(id, &iterations, |b, &iters| {
let layer = BenchLayer::new(iters, op);
match attr_count {
0 => run_bench(b, layer, || emit_log!(0)),
3 => run_bench(b, layer, || emit_log!(3)),
_ => run_bench(b, layer, || emit_log!(10)),
}
});
}
}

group.finish();
}

fn bench_encode(c: &mut Criterion) {
bench_op(c, "encode", BenchOp::Encode);
}

fn bench_format(c: &mut Criterion) {
bench_op(c, "format", BenchOp::Format);
}

fn bench_encode_and_format(c: &mut Criterion) {
bench_op(c, "encode_and_format", BenchOp::EncodeAndFormat);
}

fn bench_encode_proto(c: &mut Criterion) {
bench_op(c, "encode_proto", BenchOp::EncodeProto);
}

#[allow(missing_docs)]
mod bench_entry {
use super::*;

criterion_group!(
name = benches;
config = Criterion::default();
targets = bench_encode, bench_format, bench_encode_and_format, bench_encode_proto
);
}

criterion_main!(bench_entry::benches);
14 changes: 9 additions & 5 deletions rust/otap-dataflow/crates/pdata/src/otlp/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::fmt;
use std::fmt::Write;
use std::sync::LazyLock;

pub(in crate::otlp) struct ResourceArrays<'a> {
pub(crate) struct ResourceArrays<'a> {
pub id: Option<&'a UInt16Array>,
pub dropped_attributes_count: Option<&'a UInt32Array>,
pub schema_url: Option<StringArrayAccessor<'a>>,
Expand Down Expand Up @@ -123,14 +123,14 @@ impl<'a> TryFrom<&'a RecordBatch> for ResourceArrays<'a> {
}
}

pub(in crate::otlp) struct ScopeArrays<'a> {
pub(crate) struct ScopeArrays<'a> {
pub name: Option<StringArrayAccessor<'a>>,
pub version: Option<StringArrayAccessor<'a>>,
pub dropped_attributes_count: Option<&'a UInt32Array>,
pub id: Option<&'a UInt16Array>,
}

pub static SCOPE_ARRAY_DATA_TYPE: LazyLock<DataType> = LazyLock::new(|| {
static SCOPE_ARRAY_DATA_TYPE: LazyLock<DataType> = LazyLock::new(|| {
DataType::Struct(Fields::from(vec![
Field::new(
consts::NAME,
Expand Down Expand Up @@ -491,11 +491,15 @@ macro_rules! proto_encode_len_delimited_unknown_size {
}};
}

pub(crate) fn encode_len_placeholder(buf: &mut ProtoBuffer) {
/// Write a 4-byte length placeholder for later patching.
/// Do not use directly, use proto_encode_len_delimited_unknown_size.
pub fn encode_len_placeholder(buf: &mut ProtoBuffer) {
buf.buffer.extend_from_slice(&[0x80, 0x80, 0x80, 0x00]);
}

pub(crate) fn patch_len_placeholder(
/// Patch a previously written length placeholder with the actual length.
/// Do not use directly, use proto_encode_len_delimited_unknown_size.
pub fn patch_len_placeholder(
buf: &mut ProtoBuffer,
num_bytes: usize,
len: usize,
Expand Down
7 changes: 4 additions & 3 deletions rust/otap-dataflow/crates/pdata/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,26 @@ use crate::{error::Result, otap::OtapArrowRecords};
use bytes::Bytes;
use otap_df_config::SignalType;

pub use common::ProtoBuffer;
pub use common::{ProtoBuffer, encode_len_placeholder, patch_len_placeholder};
pub use otap_df_pdata_otlp_macros::Message; // Required for derived code
pub use otap_df_pdata_otlp_macros::qualified; // Required for derived code

/// Common methods for OTLP/OTAP attributes.
pub mod attributes;
/// Common methods for batching.
pub mod batching;
/// Common utilities for protobuf encoding.
pub mod common;
/// Common methods for OTLP/OTAP logs.
pub mod logs;
/// Common methods for OTLP/OTAP metrics.
pub mod metrics;
/// Common methods for OTLP/OTAP traces.
pub mod traces;

mod common;

#[cfg(test)]
mod batching_tests;

#[cfg(test)]
mod tests;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ pub struct RawKeyValue<'a> {
}

impl<'a> RawKeyValue<'a> {
/// Create a new RawKeyValue parser from a byte slice containing a KeyValue message.
#[inline]
fn new(buf: &'a [u8]) -> Self {
#[must_use]
pub fn new(buf: &'a [u8]) -> Self {
Self {
buf,
pos: Cell::new(0),
Expand Down
18 changes: 13 additions & 5 deletions rust/otap-dataflow/crates/pdata/src/views/otlp/bytes/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ pub struct RawLogRecord<'a> {
bytes_parser: ProtoBytesParser<'a, LogFieldOffsets>,
}

impl<'a> RawLogRecord<'a> {
/// Create a new instance of `RawLogRecord`. This is exposed
/// specifically for interpreting internally generated log records
/// which encode body and attributes as OTLP bytes.
#[must_use]
pub fn new(buf: &'a [u8]) -> Self {
Self {
bytes_parser: ProtoBytesParser::new(buf),
}
}
}

/// Known field offsets within byte buffer for fields in ResourceLogs message
pub struct LogFieldOffsets {
scalar_fields: [Cell<Option<(NonZeroUsize, NonZeroUsize)>>; 13],
Expand Down Expand Up @@ -274,11 +286,7 @@ impl<'a> Iterator for LogRecordsIter<'a> {
type Item = RawLogRecord<'a>;

fn next(&mut self) -> Option<Self::Item> {
let slice = self.byte_parser.next()?;

Some(RawLogRecord {
bytes_parser: ProtoBytesParser::new(slice),
})
Some(RawLogRecord::new(self.byte_parser.next()?))
}
}

Expand Down
7 changes: 6 additions & 1 deletion rust/otap-dataflow/crates/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ unchecked-index = []
unchecked-arithmetic = []

[dependencies]
axum = { workspace = true }
otap-df-pdata = { workspace = true }
otap-df-config = { workspace = true }

axum = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
flume = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
Expand All @@ -29,6 +33,7 @@ thiserror = { workspace = true }
slotmap = { workspace = true }
parking_lot = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
tonic = { workspace = true, optional = true }
opentelemetry = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod metrics;
pub mod opentelemetry_client;
pub mod registry;
pub mod reporter;
pub mod self_tracing;
pub mod semconv;

// Re-export _private module from internal_events for macro usage.
Expand Down
Loading
Loading