Skip to content
Draft
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
4f5a128
Contribute Beaubourg to OTEL
lquerel Feb 22, 2025
f0bf6a4
Merge branch 'main' into main
lquerel Feb 24, 2025
39d3b2f
Merge branch 'open-telemetry:main' into main
lquerel Mar 3, 2025
94c1c2f
Merge branch 'open-telemetry:main' into main
lquerel Apr 10, 2025
a6cc070
Merge branch 'open-telemetry:main' into main
lquerel Apr 16, 2025
2b1c910
Merge branch 'open-telemetry:main' into main
lquerel Apr 18, 2025
4646aff
Merge branch 'open-telemetry:main' into main
lquerel Jun 2, 2025
15910da
Merge branch 'open-telemetry:main' into main
lquerel Sep 3, 2025
b3264cc
Merge branch 'open-telemetry:main' into grpc-receiver-shared-conf
lquerel Nov 3, 2025
c72610a
Refactor OTLP and OTAP receiver to share a more extended gRPC configu…
lquerel Nov 4, 2025
237d194
Fix otap_exporter tests
lquerel Nov 4, 2025
1893f32
Rename GrpcServerConfig into GrpcServerSettings
lquerel Nov 4, 2025
01a68e4
Rename SharedState into AckSubscriptionState
lquerel Nov 4, 2025
7067292
Change Arc into Rc in local receiver
lquerel Nov 4, 2025
20de200
Refactor compression.rs
lquerel Nov 4, 2025
ed9ab1a
Fix documentation
lquerel Nov 4, 2025
38ed5f2
Document and refactor code for clarity
lquerel Nov 4, 2025
21f7e1a
Document and refactor code for clarity
lquerel Nov 4, 2025
a33f48d
Update rust/otap-dataflow/crates/config/src/byte_units.rs
lquerel Nov 4, 2025
7c8d897
Improve effect handler implementations
lquerel Nov 8, 2025
633bea0
Add concurrency per connection
lquerel Nov 8, 2025
12ec6e6
Try to remove Tonic for a new otap receiver.
lquerel Nov 10, 2025
5ac172a
Try to remove Tonic for a new otap receiver.
lquerel Nov 10, 2025
ec1581c
Code cleanup
lquerel Nov 10, 2025
0c1dfb6
stream_batch_statuses returns now a concrete stream implementation
lquerel Nov 11, 2025
5103f6d
Introduce an admission controller
lquerel Nov 12, 2025
10fa2fe
Reuse ZstdDecompressor
lquerel Nov 12, 2025
aee3a0f
Reuse an output buffer when encoding BatchStatus responses so most se…
lquerel Nov 12, 2025
ce68e4e
Parse grpc-encoding without allocating/lowercasing by using byte-wise…
lquerel Nov 12, 2025
d914124
Stream frame payloads without copying every H2 chunk into a single gr…
lquerel Nov 12, 2025
87d04a5
Refactor LocalFutureSet to use a FuturesUnordered instead of a Vec of…
lquerel Nov 12, 2025
16b7716
Fix clippy issues
lquerel Nov 12, 2025
6a5ec40
Merge remote-tracking branch 'upstream/main' into grpc-receiver-share…
lquerel Nov 12, 2025
e9d9d62
Merge with main
lquerel Nov 12, 2025
6c48fab
Update benchmarks to use new OTAP receiver
lquerel Nov 12, 2025
1cd16c8
Add support for gzip and deflate compression
lquerel Nov 12, 2025
2ef166d
Add support for gzip and deflate compression on both request and resp…
lquerel Nov 13, 2025
ddea999
Add support http2_keepalive.
lquerel Nov 13, 2025
051e9a9
Add support for timeout.
lquerel Nov 13, 2025
852fa49
Adapt the unit tests used for otap_receiver.rs to otel_receiver.rs
lquerel Nov 13, 2025
8b03d49
Add more unit tests for otel_receiver.rs
lquerel Nov 13, 2025
f31ef7c
Fix unit tests for otel_receiver.rs
lquerel Nov 13, 2025
576514d
Add ToDo in otel_receiver.rs
lquerel Nov 13, 2025
32d7d1b
Add more tests
lquerel Nov 13, 2025
61fd7ae
Add more unit tests
lquerel Nov 13, 2025
25eea44
Refactor code
lquerel Nov 13, 2025
e5a24f9
Refactor code
lquerel Nov 13, 2025
5dad5df
Add comments
lquerel Nov 13, 2025
8eb0a8c
Add comments
lquerel Nov 14, 2025
3f37fcf
Add comments
lquerel Nov 14, 2025
ba4186a
Remove the Send constraint on BodyStream and RequestStream
lquerel Nov 14, 2025
8fe13f7
More documentation
lquerel Nov 14, 2025
658f09f
Reorganize the tests
lquerel Nov 14, 2025
00cefa6
Add an e2e test for otel_receiver.rs
lquerel Nov 14, 2025
87ad1a3
Add otel_receiver_design.png diagram
lquerel Nov 14, 2025
aa595fd
Merge remote-tracking branch 'origin/grpc-receiver-shared-conf' into …
lquerel Nov 14, 2025
a6ba0e5
Merge remote-tracking branch 'upstream/main' into grpc-receiver-share…
lquerel Nov 14, 2025
3f7a532
Update diagram
lquerel Nov 14, 2025
17d9974
Update documentation
lquerel Nov 14, 2025
12829a6
Merge remote-tracking branch 'origin/grpc-receiver-shared-conf' into …
lquerel Nov 14, 2025
1a3f06c
Update diagram
lquerel Nov 14, 2025
5eb97d2
Merge remote-tracking branch 'origin/grpc-receiver-shared-conf' into …
lquerel Nov 14, 2025
d383878
Update documentation
lquerel Nov 14, 2025
caee87e
Merge remote-tracking branch 'origin/grpc-receiver-shared-conf' into …
lquerel Nov 14, 2025
45ee6bd
Add OTLP support in otel_receiver.rs
lquerel Nov 16, 2025
9d20625
Clean up code, define new URN for otel receiver
lquerel Nov 16, 2025
82704c4
Prepare otel_receiver.rs to become the default OTLP/OTAP receiver
lquerel Nov 17, 2025
7b9e9c6
Add a more complete OTLP test
lquerel Nov 17, 2025
8664c2f
Add more complete tests for OTLP and OTAP
lquerel Nov 17, 2025
6ecf6b4
Add new option to display on the console the signal throughput in per…
lquerel Nov 18, 2025
c921266
Replace Vec<u8> by bytes::Bytes in OtlpProtoByte
lquerel Nov 18, 2025
65036cb
Remove redundant tcp nodelay
lquerel Nov 18, 2025
d845bef
Move the JoinSet:::join_next outside of the select!
lquerel Nov 18, 2025
7f03421
Replace select! by a stream approach into handle_tcp_conn
lquerel Nov 20, 2025
7974336
Replace select! in otel_receiver and grpc
lquerel Nov 20, 2025
b0fb382
Replace spawn_local tasks per stream with FutureUnordered
lquerel Nov 20, 2025
8ce027f
Refactor code around the response encoder
lquerel Nov 20, 2025
bbff3ca
Fix unit tests
lquerel Nov 21, 2025
d106109
Improve ack management efficiency
lquerel Nov 21, 2025
f8e30f1
Use only one zstd decoder
lquerel Nov 21, 2025
8bc6512
Document otel_receiver.rs
lquerel Nov 21, 2025
ff36a6c
Fix perf issue + Documentation
lquerel Nov 21, 2025
cffa021
Refactor code for clarity
lquerel Nov 21, 2025
c9f899a
Fix unbounded gRPC frame size & decompressed size
lquerel Nov 21, 2025
594c7ef
Add timeout on HTTP2 handshake
lquerel Nov 21, 2025
a60680b
Fix ack slot leak on pipeline send failure
lquerel Nov 21, 2025
d04dc74
Fix some small corner cases
lquerel Nov 21, 2025
4cd0fc7
Add security.md for otel_receiver
lquerel Nov 22, 2025
63f375b
Add a security.md doc
lquerel Nov 22, 2025
30d2358
Add architecture description
lquerel Nov 22, 2025
5444c0a
OTLP and OTAP exporter.rs
lquerel Nov 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rust/otap-dataflow/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ output.json
resolved-schema.yaml

# Ignore the temporary repo dir created by `cargo xtask history` command
history-temp-repo/
history-temp-repo/

*.patch
perf.data
perf.data.*
*.strace
19 changes: 16 additions & 3 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ thiserror.workspace = true
serde_json.workspace = true
clap.workspace = true
mimalloc-rust.workspace = true
env_logger.workspace = true

[workspace.dependencies]
otap-df-pdata-otlp-macros = { path = "./crates/pdata/src/otlp/macros"}
Expand Down Expand Up @@ -64,6 +65,7 @@ criterion = "0.7.0"
data-encoding = "2.9.0"
fluke-hpack = "0.3.1"
flume = { version = "0.11.1", default-features = false, features = ["async"] }
flate2 = "1.1.5"
futures = "0.3.31"
futures-channel = "0.3"
futures-timer = "3.0"
Expand Down Expand Up @@ -105,9 +107,9 @@ socket2 = { version = "0.6.0", features = ["all"] }
syn = { version = "2.0", features = ["full", "extra-traits"] }
tempfile = "3"
thiserror = "2.0.12"
tokio = { version = "1.46.1", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.16" }
tokio-util = { version = "0.7.17" }
tonic = { version = "0.14", default-features = false, features = [
"channel",
"codegen",
Expand All @@ -133,10 +135,15 @@ weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git",
weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
zip = "=4.2.0"
byte-unit = "5.1.6"
bytes = "1.10.1"
env_logger = "0.11"
h2 = "0.4.7"
zstd = "0.13"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependencies above are listed in alphabetical order. Rearrange these as well?


[features]
default = []
unsafe-optimizations = ["unchecked-index", "unchecked-arithmetic"]
unsafe-optimizations = ["unchecked-index", "unchecked-arithmetic", "otap-df-otap/unsafe-optimizations"]
unchecked-index = []
unchecked-arithmetic = []
# Experimental exporters (opt-in)
Expand Down Expand Up @@ -201,6 +208,12 @@ missing_crate_level_docs = "deny"

[profile.release]
debug = "line-tables-only" # minimum required for profiling
#opt-level = 3
#lto = "thin"
#codegen-units = 1
#panic = "abort"
#incremental = false
#strip = "debuginfo"

# A more in-depth analysis is necessary to determine the optimal parameters for the release profile.
#[profile.release]
Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/benchmarks/benches/exporter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ fn bench_exporter(c: &mut Criterion) {
|b, otap_signals| {
b.to_async(&rt).iter(|| async {
// start perf exporter
let config = Config::new(1000, 0.3, true, true, true, true, true);
let config = Config::new(1000, 0.3, true, true, true, true, true, false);
let exporter_config = ExporterConfig::new("perf_exporter");
let node_config =
Arc::new(NodeUserConfig::new_exporter_config(OTAP_PERF_EXPORTER_URN));
Expand Down Expand Up @@ -473,7 +473,7 @@ fn bench_exporter(c: &mut Criterion) {
|b, otap_signals| {
b.to_async(&rt).iter(|| async {
// start perf exporter
let config = Config::new(1000, 0.3, false, false, false, false, false);
let config = Config::new(1000, 0.3, false, false, false, false, false, false);
let exporter_config = ExporterConfig::new("perf_exporter");
let node_config =
Arc::new(NodeUserConfig::new_exporter_config(OTAP_PERF_EXPORTER_URN));
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ serde_yaml = { workspace = true }
miette = { workspace = true }
urn = { workspace = true }
schemars = { workspace = true }
byte-unit = { workspace = true }
127 changes: 127 additions & 0 deletions rust/otap-dataflow/crates/config/src/byte_units.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Support for byte units like "KB / KiB", "MB / MiB", "GB / GiB" in configuration files.

use byte_unit::Byte;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer};

#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Number(u64),
String(String),
}

/// Deserialize an optional byte size that can be specified either as a number (in bytes)
/// or as a string with units (e.g. "1 KB", "2 MiB").
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let (bytes, repr) = match value {
Value::Number(value) => (value as u128, value.to_string()),
Value::String(text) => {
let parsed: Byte = text.parse().map_err(DeError::custom)?;
(parsed.as_u64() as u128, text)
}
};

if bytes > u32::MAX as u128 {
return Err(DeError::custom(format!(
"byte size '{}' ({} bytes) exceeds u32::MAX ({} bytes)",
repr,
bytes,
u32::MAX
)));
}

Ok(Some(bytes as u32))
}

#[cfg(test)]
mod tests {
use super::deserialize;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Holder {
#[serde(default, deserialize_with = "deserialize")]
value: Option<u32>,
}

fn de_yaml(input: &str) -> Result<Holder, serde_yaml::Error> {
serde_yaml::from_str::<Holder>(input)
}

#[test]
fn parses_number_as_bytes() {
let cfg = de_yaml("value: 1024").expect("should parse numeric bytes");
assert_eq!(cfg.value, Some(1024));
}

#[test]
fn parses_string_with_iec_units() {
// 1 KiB == 1024 bytes
let cfg = de_yaml("value: 1 KiB").expect("should parse 1 KiB");
assert_eq!(cfg.value, Some(1024));

// 2 MiB == 2 * 1024 * 1024 bytes
let cfg = de_yaml("value: '2 MiB'").expect("should parse 2 MiB");
assert_eq!(cfg.value, Some(2 * 1024 * 1024));
}

#[test]
fn parses_plain_string_number() {
let cfg = de_yaml("value: '2048'").expect("should parse plain numeric string");
assert_eq!(cfg.value, Some(2048));
}

#[test]
fn missing_value_is_none() {
let cfg = de_yaml("{}").expect("should parse with missing field as None");
assert_eq!(cfg.value, None);
}

#[test]
fn overflow_is_rejected() {
// 4 GiB == 4 * 1024^3 bytes = 4_294_967_296 > u32::MAX (4_294_967_295)
let err = de_yaml("value: 4 GiB").expect_err("should error for overflow");
let msg = err.to_string();
assert!(
msg.contains("exceeds u32::MAX"),
"unexpected error: {}",
msg
);
}

#[test]
fn parses_no_space_decimal_units() {
let cfg = de_yaml("value: 1KB").expect("should parse 1KB without space");
assert_eq!(cfg.value, Some(1000));

let cfg = de_yaml("value: 10MB").expect("should parse 10MB without space");
assert_eq!(cfg.value, Some(10_000_000));

// Lowercase 'b' should still be treated as bytes per crate behavior
let cfg = de_yaml("value: 1kb").expect("should parse 1kb as 1000 bits => 125 bytes");
assert_eq!(cfg.value, Some(125));
}

#[test]
fn parses_fractional_values_and_rounding() {
// Decimal unit with fraction
let cfg = de_yaml("value: '1.5 MB'").expect("should parse 1.5 MB");
assert_eq!(cfg.value, Some(1_500_000));

// Binary unit with fraction (exact)
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
assert_eq!(cfg.value, Some(512));
}
}
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

pub mod byte_units;
pub mod engine;
pub mod error;
pub mod health;
Expand Down
Loading