Skip to content
2 changes: 1 addition & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git",
weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
zip = "=4.2.0"
byte-unit = "5.2.0"

# Azure Monnitor Exporter
azure_identity = "0.30.0"
Expand Down Expand Up @@ -230,4 +231,3 @@ inherits = "release"
debug = true # Or 2 for full debug info
strip = "none" # Keep symbols and debug info
panic = "unwind"

1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ serde_yaml = { workspace = true }
miette = { workspace = true }
urn = { workspace = true }
schemars = { workspace = true }
byte-unit = { workspace = true }
127 changes: 127 additions & 0 deletions rust/otap-dataflow/crates/config/src/byte_units.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Support for byte units like "KB / KiB", "MB / MiB", "GB / GiB" in configuration files.

use byte_unit::Byte;
use serde::de::Error as DeError;
use serde::{Deserialize, Deserializer};

#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Number(u64),
String(String),
}

/// Deserialize an optional byte size that can be specified either as a number (in bytes)
/// or as a string with units (e.g. "1 KB", "2 MiB").
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let (bytes, repr) = match value {
Value::Number(value) => (value as u128, value.to_string()),
Value::String(text) => {
let parsed: Byte = text.parse().map_err(DeError::custom)?;
(parsed.as_u64() as u128, text)
}
};

if bytes > u32::MAX as u128 {
return Err(DeError::custom(format!(
"byte size '{}' ({} bytes) exceeds u32::MAX ({} bytes)",
repr,
bytes,
u32::MAX
)));
}

Ok(Some(bytes as u32))
}

#[cfg(test)]
mod tests {
use super::deserialize;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Holder {
#[serde(default, deserialize_with = "deserialize")]
value: Option<u32>,
}

fn de_yaml(input: &str) -> Result<Holder, serde_yaml::Error> {
serde_yaml::from_str::<Holder>(input)
}

#[test]
fn parses_number_as_bytes() {
let cfg = de_yaml("value: 1024").expect("should parse numeric bytes");
assert_eq!(cfg.value, Some(1024));
}

#[test]
fn parses_string_with_iec_units() {
// 1 KiB == 1024 bytes
let cfg = de_yaml("value: 1 KiB").expect("should parse 1 KiB");
assert_eq!(cfg.value, Some(1024));

// 2 MiB == 2 * 1024 * 1024 bytes
let cfg = de_yaml("value: '2 MiB'").expect("should parse 2 MiB");
assert_eq!(cfg.value, Some(2 * 1024 * 1024));
}

#[test]
fn parses_plain_string_number() {
let cfg = de_yaml("value: '2048'").expect("should parse plain numeric string");
assert_eq!(cfg.value, Some(2048));
}

#[test]
fn missing_value_is_none() {
let cfg = de_yaml("{}").expect("should parse with missing field as None");
assert_eq!(cfg.value, None);
}

#[test]
fn overflow_is_rejected() {
// 4 GiB == 4 * 1024^3 bytes = 4_294_967_296 > u32::MAX (4_294_967_295)
let err = de_yaml("value: 4 GiB").expect_err("should error for overflow");
let msg = err.to_string();
assert!(
msg.contains("exceeds u32::MAX"),
"unexpected error: {}",
msg
);
}

#[test]
fn parses_no_space_decimal_units() {
let cfg = de_yaml("value: 1KB").expect("should parse 1KB without space");
assert_eq!(cfg.value, Some(1000));

let cfg = de_yaml("value: 10MB").expect("should parse 10MB without space");
assert_eq!(cfg.value, Some(10_000_000));

// Lowercase 'b' should still be treated as bytes per crate behavior
let cfg = de_yaml("value: 1kb").expect("should parse 1kb as 1000 bits => 125 bytes");
assert_eq!(cfg.value, Some(125));
}

#[test]
fn parses_fractional_values_and_rounding() {
// Decimal unit with fraction
let cfg = de_yaml("value: '1.5 MB'").expect("should parse 1.5 MB");
assert_eq!(cfg.value, Some(1_500_000));

// Binary unit with fraction (exact)
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
assert_eq!(cfg.value, Some(512));
}
}
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

pub mod byte_units;
pub mod engine;
pub mod error;
pub mod health;
Expand Down
101 changes: 95 additions & 6 deletions rust/otap-dataflow/crates/otap/src/compression.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//!
//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get the respective tonic equivalent
//!
//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get
//! the respective tonic equivalent.

use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use tonic::codec::CompressionEncoding;

/// Enum to represent various compression methods
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum CompressionMethod {
/// Fastest compression
Expand All @@ -26,16 +25,73 @@ impl CompressionMethod {
#[must_use]
pub const fn map_to_compression_encoding(&self) -> CompressionEncoding {
match *self {
CompressionMethod::Gzip => CompressionEncoding::Gzip,
CompressionMethod::Zstd => CompressionEncoding::Zstd,
CompressionMethod::Gzip => CompressionEncoding::Gzip,
CompressionMethod::Deflate => CompressionEncoding::Deflate,
}
}
}

/// Default set of compression methods that are accepted when no configuration is provided.
pub const DEFAULT_COMPRESSION_METHODS: [CompressionMethod; 3] = [
CompressionMethod::Zstd,
CompressionMethod::Gzip,
CompressionMethod::Deflate,
];

#[derive(Deserialize)]
#[serde(untagged)]
enum CompressionConfigValue {
Single(CompressionMethod),
List(Vec<CompressionMethod>),
NoneKeyword(CompressionNone),
}

#[derive(Deserialize)]
#[serde(rename_all = "snake_case")]
enum CompressionNone {
None,
}

/// Deserializer that accepts either a single compression method, a list, or the string `"none"`.
/// Absence of the field keeps the default behaviour (all methods).
pub fn deserialize_compression_methods<'de, D>(
deserializer: D,
) -> Result<Option<Vec<CompressionMethod>>, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<CompressionConfigValue>::deserialize(deserializer)?;
let Some(value) = value else {
return Ok(None);
};

let methods = match value {
CompressionConfigValue::Single(method) => vec![method],
CompressionConfigValue::List(methods) => methods,
CompressionConfigValue::NoneKeyword(CompressionNone::None) => Vec::new(),
};

let mut deduped = Vec::with_capacity(methods.len());
for method in methods {
if !deduped.contains(&method) {
deduped.push(method);
}
}
Comment on lines +75 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is only needed for CompressionConfigValue::List. We could move it inside the match statement to make the code simpler.


Ok(Some(deduped))
}

#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct ConfWithCompression {
#[serde(default, deserialize_with = "deserialize_compression_methods")]
methods: Option<Vec<CompressionMethod>>,
}

#[test]
fn compression_method_accepts_snake_case_only() {
Expand All @@ -52,4 +108,37 @@ mod tests {
assert!(serde_json::from_str::<CompressionMethod>("\"Zstd\"").is_err());
assert!(serde_json::from_str::<CompressionMethod>("\"Deflate\"").is_err());
}

#[test]
fn deserialize_supports_single_value() {
let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "gzip" }"#).unwrap();
assert_eq!(conf.methods, Some(vec![CompressionMethod::Gzip]));
}

#[test]
fn deserialize_supports_list() {
let conf: ConfWithCompression =
serde_json::from_str(r#"{ "methods": ["gzip", "zstd", "gzip"] }"#).unwrap();
assert_eq!(
conf.methods,
Some(vec![CompressionMethod::Gzip, CompressionMethod::Zstd])
);
}

#[test]
fn deserialize_supports_none_keyword() {
let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "none" }"#).unwrap();
assert_eq!(conf.methods, Some(vec![]));
}

#[test]
fn deserialize_supports_absence() {
#[derive(Debug, Deserialize)]
struct Conf {
#[serde(default, deserialize_with = "deserialize_compression_methods")]
methods: Option<Vec<CompressionMethod>>,
}
let conf: Conf = serde_json::from_str("{}").unwrap();
assert_eq!(conf.methods, None);
}
}
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/otap/src/otap_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{

pub mod middleware;
pub mod otlp;
pub mod server_settings;

/// Common settings for OTLP receivers.
#[derive(Clone, Debug)]
Expand Down
Loading
Loading