diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 501e78e4d3..335405f00d 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -133,6 +133,7 @@ weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git", weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"} weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"} zip = "=4.2.0" +byte-unit = "5.2.0" # Azure Monnitor Exporter azure_identity = "0.30.0" @@ -230,4 +231,3 @@ inherits = "release" debug = true # Or 2 for full debug info strip = "none" # Keep symbols and debug info panic = "unwind" - diff --git a/rust/otap-dataflow/crates/config/Cargo.toml b/rust/otap-dataflow/crates/config/Cargo.toml index 37b993445b..d5e9cf8116 100644 --- a/rust/otap-dataflow/crates/config/Cargo.toml +++ b/rust/otap-dataflow/crates/config/Cargo.toml @@ -20,3 +20,4 @@ serde_yaml = { workspace = true } miette = { workspace = true } urn = { workspace = true } schemars = { workspace = true } +byte-unit = { workspace = true } diff --git a/rust/otap-dataflow/crates/config/src/byte_units.rs b/rust/otap-dataflow/crates/config/src/byte_units.rs new file mode 100644 index 0000000000..c4bed89c7f --- /dev/null +++ b/rust/otap-dataflow/crates/config/src/byte_units.rs @@ -0,0 +1,127 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Support for byte units like "KB / KiB", "MB / MiB", "GB / GiB" in configuration files. + +use byte_unit::Byte; +use serde::de::Error as DeError; +use serde::{Deserialize, Deserializer}; + +#[derive(Deserialize)] +#[serde(untagged)] +enum Value { + Number(u64), + String(String), +} + +/// Deserialize an optional byte size that can be specified either as a number (in bytes) +/// or as a string with units (e.g. "1 KB", "2 MiB"). +pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let value = Option::::deserialize(deserializer)?; + let Some(value) = value else { + return Ok(None); + }; + + let (bytes, repr) = match value { + Value::Number(value) => (value as u128, value.to_string()), + Value::String(text) => { + let parsed: Byte = text.parse().map_err(DeError::custom)?; + (parsed.as_u64() as u128, text) + } + }; + + if bytes > u32::MAX as u128 { + return Err(DeError::custom(format!( + "byte size '{}' ({} bytes) exceeds u32::MAX ({} bytes)", + repr, + bytes, + u32::MAX + ))); + } + + Ok(Some(bytes as u32)) +} + +#[cfg(test)] +mod tests { + use super::deserialize; + use serde::Deserialize; + + #[derive(Debug, Deserialize)] + struct Holder { + #[serde(default, deserialize_with = "deserialize")] + value: Option, + } + + fn de_yaml(input: &str) -> Result { + serde_yaml::from_str::(input) + } + + #[test] + fn parses_number_as_bytes() { + let cfg = de_yaml("value: 1024").expect("should parse numeric bytes"); + assert_eq!(cfg.value, Some(1024)); + } + + #[test] + fn parses_string_with_iec_units() { + // 1 KiB == 1024 bytes + let cfg = de_yaml("value: 1 KiB").expect("should parse 1 KiB"); + assert_eq!(cfg.value, Some(1024)); + + // 2 MiB == 2 * 1024 * 1024 bytes + let cfg = de_yaml("value: '2 MiB'").expect("should parse 2 MiB"); + assert_eq!(cfg.value, Some(2 * 1024 * 1024)); + } + + #[test] + fn parses_plain_string_number() { + let cfg = de_yaml("value: '2048'").expect("should parse plain numeric string"); + assert_eq!(cfg.value, Some(2048)); + } + + #[test] + fn missing_value_is_none() { + let cfg = de_yaml("{}").expect("should parse with missing field as None"); + assert_eq!(cfg.value, None); + } + + #[test] + fn overflow_is_rejected() { + // 4 GiB == 4 * 1024^3 bytes = 4_294_967_296 > u32::MAX (4_294_967_295) + let err = de_yaml("value: 4 GiB").expect_err("should error for overflow"); + let msg = err.to_string(); + assert!( + msg.contains("exceeds u32::MAX"), + "unexpected error: {}", + msg + ); + } + + #[test] + fn parses_no_space_decimal_units() { + let cfg = de_yaml("value: 1KB").expect("should parse 1KB without space"); + assert_eq!(cfg.value, Some(1000)); + + let cfg = de_yaml("value: 10MB").expect("should parse 10MB without space"); + assert_eq!(cfg.value, Some(10_000_000)); + + // Lowercase 'b' should still be treated as bytes per crate behavior + let cfg = de_yaml("value: 1kb").expect("should parse 1kb as 1000 bits => 125 bytes"); + assert_eq!(cfg.value, Some(125)); + } + + #[test] + fn parses_fractional_values_and_rounding() { + // Decimal unit with fraction + let cfg = de_yaml("value: '1.5 MB'").expect("should parse 1.5 MB"); + assert_eq!(cfg.value, Some(1_500_000)); + + // Binary unit with fraction (exact) + let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes"); + assert_eq!(cfg.value, Some(512)); + } +} diff --git a/rust/otap-dataflow/crates/config/src/lib.rs b/rust/otap-dataflow/crates/config/src/lib.rs index d97f890724..58fed90485 100644 --- a/rust/otap-dataflow/crates/config/src/lib.rs +++ b/rust/otap-dataflow/crates/config/src/lib.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; +pub mod byte_units; pub mod engine; pub mod error; pub mod health; diff --git a/rust/otap-dataflow/crates/otap/src/compression.rs b/rust/otap-dataflow/crates/otap/src/compression.rs index 72d2df7d6d..2ac703f88f 100644 --- a/rust/otap-dataflow/crates/otap/src/compression.rs +++ b/rust/otap-dataflow/crates/otap/src/compression.rs @@ -1,15 +1,14 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -//! -//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get the respective tonic equivalent -//! +//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get +//! the respective tonic equivalent. -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use tonic::codec::CompressionEncoding; /// Enum to represent various compression methods -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum CompressionMethod { /// Fastest compression @@ -26,16 +25,73 @@ impl CompressionMethod { #[must_use] pub const fn map_to_compression_encoding(&self) -> CompressionEncoding { match *self { - CompressionMethod::Gzip => CompressionEncoding::Gzip, CompressionMethod::Zstd => CompressionEncoding::Zstd, + CompressionMethod::Gzip => CompressionEncoding::Gzip, CompressionMethod::Deflate => CompressionEncoding::Deflate, } } } +/// Default set of compression methods that are accepted when no configuration is provided. +pub const DEFAULT_COMPRESSION_METHODS: [CompressionMethod; 3] = [ + CompressionMethod::Zstd, + CompressionMethod::Gzip, + CompressionMethod::Deflate, +]; + +#[derive(Deserialize)] +#[serde(untagged)] +enum CompressionConfigValue { + Single(CompressionMethod), + List(Vec), + NoneKeyword(CompressionNone), +} + +#[derive(Deserialize)] +#[serde(rename_all = "snake_case")] +enum CompressionNone { + None, +} + +/// Deserializer that accepts either a single compression method, a list, or the string `"none"`. +/// Absence of the field keeps the default behaviour (all methods). +pub fn deserialize_compression_methods<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: Deserializer<'de>, +{ + let value = Option::::deserialize(deserializer)?; + let Some(value) = value else { + return Ok(None); + }; + + let methods = match value { + CompressionConfigValue::Single(method) => vec![method], + CompressionConfigValue::List(methods) => methods, + CompressionConfigValue::NoneKeyword(CompressionNone::None) => Vec::new(), + }; + + let mut deduped = Vec::with_capacity(methods.len()); + for method in methods { + if !deduped.contains(&method) { + deduped.push(method); + } + } + + Ok(Some(deduped)) +} + #[cfg(test)] mod tests { use super::*; + use serde::Deserialize; + + #[derive(Debug, Deserialize)] + struct ConfWithCompression { + #[serde(default, deserialize_with = "deserialize_compression_methods")] + methods: Option>, + } #[test] fn compression_method_accepts_snake_case_only() { @@ -52,4 +108,37 @@ mod tests { assert!(serde_json::from_str::("\"Zstd\"").is_err()); assert!(serde_json::from_str::("\"Deflate\"").is_err()); } + + #[test] + fn deserialize_supports_single_value() { + let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "gzip" }"#).unwrap(); + assert_eq!(conf.methods, Some(vec![CompressionMethod::Gzip])); + } + + #[test] + fn deserialize_supports_list() { + let conf: ConfWithCompression = + serde_json::from_str(r#"{ "methods": ["gzip", "zstd", "gzip"] }"#).unwrap(); + assert_eq!( + conf.methods, + Some(vec![CompressionMethod::Gzip, CompressionMethod::Zstd]) + ); + } + + #[test] + fn deserialize_supports_none_keyword() { + let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "none" }"#).unwrap(); + assert_eq!(conf.methods, Some(vec![])); + } + + #[test] + fn deserialize_supports_absence() { + #[derive(Debug, Deserialize)] + struct Conf { + #[serde(default, deserialize_with = "deserialize_compression_methods")] + methods: Option>, + } + let conf: Conf = serde_json::from_str("{}").unwrap(); + assert_eq!(conf.methods, None); + } } diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc.rs index a81d207a36..98135c89cc 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc.rs @@ -34,6 +34,7 @@ use crate::{ pub mod middleware; pub mod otlp; +pub mod server_settings; /// Common settings for OTLP receivers. #[derive(Clone, Debug)] diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs new file mode 100644 index 0000000000..8b84a50884 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs @@ -0,0 +1,246 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Shared configuration for gRPC-based clients. + +use crate::compression::CompressionMethod; +use otap_df_config::byte_units; +use serde::Deserialize; +use std::time::Duration; +use tonic::codec::CompressionEncoding; +use tonic::transport::Endpoint; + +/// Common configuration shared across gRPC clients. +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct GrpcClientSettings { + /// The gRPC endpoint to connect to. + pub grpc_endpoint: String, + + /// Compression method to use for outbound requests. Defaults to no compression. + #[serde(default)] + pub compression: Option, + + /// Maximum number of concurrent in-flight requests allowed by the transport stack. + #[serde(default = "default_concurrency_limit")] + pub concurrency_limit: usize, + + /// Timeout for establishing TCP connections. + #[serde(default = "default_connect_timeout", with = "humantime_serde")] + pub connect_timeout: Duration, + + /// Whether to enable `TCP_NODELAY`. + #[serde(default = "default_tcp_nodelay")] + pub tcp_nodelay: bool, + + /// TCP keepalive timeout for outbound connections. + #[serde(default = "default_tcp_keepalive", with = "humantime_serde")] + pub tcp_keepalive: Option, + + /// Interval between TCP keepalive probes once keepalive is active. + #[serde(default, with = "humantime_serde")] + pub tcp_keepalive_interval: Option, + + /// Number of TCP keepalive probes sent before a connection is declared dead. + #[serde(default)] + pub tcp_keepalive_retries: Option, + + /// Initial HTTP/2 stream window size, in bytes. + #[serde( + default = "default_initial_stream_window_size", + deserialize_with = "byte_units::deserialize" + )] + pub initial_stream_window_size: Option, + + /// Initial HTTP/2 connection window size, in bytes. + #[serde( + default = "default_initial_connection_window_size", + deserialize_with = "byte_units::deserialize" + )] + pub initial_connection_window_size: Option, + + /// Whether to rely on HTTP/2 adaptive window sizing instead of the manual values above. + #[serde(default = "default_http2_adaptive_window")] + pub http2_adaptive_window: bool, + + /// Interval between HTTP/2 keepalive pings. + #[serde(default = "default_http2_keepalive_interval", with = "humantime_serde")] + pub http2_keepalive_interval: Option, + + /// Timeout waiting for an HTTP/2 keepalive acknowledgement. + #[serde(default = "default_http2_keepalive_timeout", with = "humantime_serde")] + pub http2_keepalive_timeout: Option, + + /// Whether to send HTTP/2 keepalives while idle. + #[serde(default = "default_keep_alive_while_idle")] + pub keep_alive_while_idle: bool, + + /// Timeout for RPC requests. If not specified, no timeout is applied. + #[serde(default, with = "humantime_serde")] + pub timeout: Option, + + /// Internal Tower buffer size for the gRPC client. + #[serde(default)] + pub buffer_size: Option, +} + +impl GrpcClientSettings { + /// Returns the compression encoding to apply to requests, if any. + #[must_use] + pub fn compression_encoding(&self) -> Option { + self.compression + .map(|method| method.map_to_compression_encoding()) + } + + /// Returns a non-zero concurrency limit. + #[must_use] + pub fn effective_concurrency_limit(&self) -> usize { + self.concurrency_limit.max(1) + } + + /// Builds the configured [`Endpoint`]. + pub fn build_endpoint(&self) -> Result { + let mut endpoint = Endpoint::from_shared(self.grpc_endpoint.clone())? + .concurrency_limit(self.effective_concurrency_limit()) + .connect_timeout(self.connect_timeout) + .tcp_nodelay(self.tcp_nodelay) + .tcp_keepalive(self.tcp_keepalive) + .initial_stream_window_size(self.initial_stream_window_size) + .initial_connection_window_size(self.initial_connection_window_size) + .keep_alive_while_idle(self.keep_alive_while_idle); + + if let Some(interval) = self.http2_keepalive_interval { + endpoint = endpoint.http2_keep_alive_interval(interval); + } + if let Some(timeout) = self.http2_keepalive_timeout { + endpoint = endpoint.keep_alive_timeout(timeout); + } + if let Some(interval) = self.tcp_keepalive_interval { + endpoint = endpoint.tcp_keepalive_interval(Some(interval)); + } + if let Some(retries) = self.tcp_keepalive_retries { + endpoint = endpoint.tcp_keepalive_retries(Some(retries)); + } + if self.http2_adaptive_window { + endpoint = endpoint.http2_adaptive_window(true); + } + if let Some(buffer_size) = self.buffer_size { + endpoint = endpoint.buffer_size(buffer_size); + } + if let Some(timeout) = self.timeout { + endpoint = endpoint.timeout(timeout); + } + + Ok(endpoint) + } +} + +impl Default for GrpcClientSettings { + fn default() -> Self { + Self { + grpc_endpoint: String::new(), + compression: None, + concurrency_limit: default_concurrency_limit(), + connect_timeout: default_connect_timeout(), + tcp_nodelay: default_tcp_nodelay(), + tcp_keepalive: default_tcp_keepalive(), + tcp_keepalive_interval: None, + tcp_keepalive_retries: None, + initial_stream_window_size: default_initial_stream_window_size(), + initial_connection_window_size: default_initial_connection_window_size(), + http2_adaptive_window: default_http2_adaptive_window(), + http2_keepalive_interval: default_http2_keepalive_interval(), + http2_keepalive_timeout: default_http2_keepalive_timeout(), + keep_alive_while_idle: default_keep_alive_while_idle(), + timeout: None, + buffer_size: None, + } + } +} + +const fn default_concurrency_limit() -> usize { + 256 +} + +fn default_connect_timeout() -> Duration { + Duration::from_secs(3) +} + +const fn default_tcp_nodelay() -> bool { + true +} + +fn default_tcp_keepalive() -> Option { + Some(Duration::from_secs(45)) +} + +fn default_initial_stream_window_size() -> Option { + Some(8 * 1024 * 1024) +} + +fn default_initial_connection_window_size() -> Option { + Some(32 * 1024 * 1024) +} + +const fn default_http2_adaptive_window() -> bool { + false +} + +fn default_http2_keepalive_interval() -> Option { + Some(Duration::from_secs(30)) +} + +fn default_http2_keepalive_timeout() -> Option { + Some(Duration::from_secs(10)) +} + +const fn default_keep_alive_while_idle() -> bool { + true +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn defaults_match_previous_client_tuning() { + let settings: GrpcClientSettings = + serde_json::from_str(r#"{ "grpc_endpoint": "http://localhost:4317" }"#).unwrap(); + + assert_eq!(settings.concurrency_limit, 256); + assert_eq!(settings.connect_timeout, Duration::from_secs(3)); + assert_eq!(settings.tcp_keepalive, Some(Duration::from_secs(45))); + assert_eq!( + settings.http2_keepalive_interval, + Some(Duration::from_secs(30)) + ); + assert_eq!( + settings.http2_keepalive_timeout, + Some(Duration::from_secs(10)) + ); + assert!(settings.keep_alive_while_idle); + } + + #[test] + fn compression_encoding_is_reported() { + let settings: GrpcClientSettings = serde_json::from_str( + r#"{ "grpc_endpoint": "http://localhost:4317", "compression_method": "gzip" }"#, + ) + .unwrap(); + + assert_eq!( + settings.compression_encoding(), + Some(CompressionEncoding::Gzip) + ); + } + + #[test] + fn effective_concurrency_limit_clamps_to_one() { + let settings: GrpcClientSettings = serde_json::from_str( + r#"{ "grpc_endpoint": "http://localhost:4317", "concurrency_limit": 0 }"#, + ) + .unwrap(); + + assert_eq!(settings.effective_concurrency_limit(), 1); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/otlp/server.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/otlp/server.rs index 0aead23102..3eea1cf301 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc/otlp/server.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/otlp/server.rs @@ -95,10 +95,12 @@ pub struct Settings { pub max_concurrent_requests: usize, /// Whether the receiver should wait. pub wait_for_result: bool, + /// Maximum size for inbound gRPC messages. + pub max_decoding_message_size: Option, /// Request compression allowed - pub accept_compression_encodings: EnabledCompressionEncodings, + pub request_compression_encodings: EnabledCompressionEncodings, /// Response compression used - pub send_compression_encodings: EnabledCompressionEncodings, + pub response_compression_encodings: EnabledCompressionEncodings, } /// Tonic `Codec` implementation that returns the bytes of the serialized message @@ -203,8 +205,8 @@ impl Decoder for OtlpBytesDecoder { fn new_grpc(signal: SignalType, settings: Settings) -> Grpc { let codec = OtlpBytesCodec::new(signal); Grpc::new(codec).apply_compression_config( - settings.accept_compression_encodings, - settings.send_compression_encodings, + settings.request_compression_encodings, + settings.response_compression_encodings, ) } diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/server_settings.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/server_settings.rs new file mode 100644 index 0000000000..c90c22dd35 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/server_settings.rs @@ -0,0 +1,388 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Shared configuration for gRPC-based receivers. + +use crate::compression::{self, CompressionMethod}; +use crate::otap_grpc::otlp::server::Settings; +use otap_df_config::byte_units; +use serde::Deserialize; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::net::TcpListener; +use tonic::codec::EnabledCompressionEncodings; +use tonic::transport::server::TcpIncoming; + +/// Common configuration shared across gRPC receivers. +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct GrpcServerSettings { + /// The endpoint details: protocol, name, port. + pub listening_addr: SocketAddr, + + /// Compression methods accepted for requests. Omitted field defaults to accepting zstd, gzip, + /// and deflate (in that preference order). + #[serde( + default, + deserialize_with = "compression::deserialize_compression_methods" + )] + pub request_compression: Option>, + + /// Compression methods used for responses. Defaults to no compression, falling back to the + /// request list when explicitly configured via the legacy `compression_method` option. + #[serde( + default, + deserialize_with = "compression::deserialize_compression_methods" + )] + pub response_compression: Option>, + + // --- All the following settings have defaults that should be reasonable for most users --- + // ----------------------------------------------------------------------------------------- + /// Maximum number of concurrent in-flight requests. + /// Defaults to `0`, which means the receiver adopts the downstream pdata channel capacity so + /// backpressure flows upstream automatically. Any non-zero value is still clamped to that + /// capacity at runtime. + #[serde(default = "default_max_concurrent_requests")] + pub max_concurrent_requests: usize, + + /// Whether newly accepted sockets should have `TCP_NODELAY` enabled. + /// Keeping this `true` (the default) avoids Nagle's algorithm and minimizes per-export latency. + /// Disabling it trades slightly higher latency for fewer small TCP packets when workloads + /// involve very bursty, tiny messages. + #[serde(default = "default_tcp_nodelay")] + pub tcp_nodelay: bool, + + /// TCP keepalive timeout for accepted sockets. + /// The 45s default evicts dead clients in under a minute without incurring much background + /// traffic. Raise it to reduce keepalive chatter, or set to `null` to disable kernel keepalives + /// entirely (at the cost of slower leak detection on broken links). + #[serde(default = "default_tcp_keepalive", with = "humantime_serde")] + pub tcp_keepalive: Option, + + /// Interval between TCP keepalive probes once keepalive is active. + /// Defaults to 15s so the kernel confirms progress quickly after the keepalive timeout. Longer + /// intervals reduce packets, shorter intervals detect stalled peers faster. Ignored if + /// `tcp_keepalive` is `null`. + #[serde(default = "default_tcp_keepalive_interval", with = "humantime_serde")] + pub tcp_keepalive_interval: Option, + + /// Number of TCP keepalive probes sent before a connection is declared dead. + /// The default (5) balances resilience to transient loss with timely reclamation of resources. + /// Smaller values clean up faster during outages, larger values favor noisy or lossy networks. + #[serde(default = "default_tcp_keepalive_retries")] + pub tcp_keepalive_retries: Option, + + /// Per-connection concurrency limit enforced by the transport layer. + /// By default it mirrors the effective `max_concurrent_requests`, so transport- and + /// application-level backpressure remain aligned. Lower values gate connection bursts earlier, + /// while higher values only help if you also raise `max_concurrent_requests`. Set to `0` to + /// revert to the derived default. + #[serde(default)] + pub transport_concurrency_limit: Option, + + /// Whether the gRPC server should shed load immediately once concurrency limits are hit. + /// Leaving this `true` (default) results in fast `resource_exhausted` responses and protects + /// the single-threaded runtime from unbounded queues. Turning it off allows requests to queue + /// but increases memory usage and tail latency under sustained overload. + #[serde(default = "default_load_shed")] + pub load_shed: bool, + + /// Initial HTTP/2 stream window size, in bytes. + /// Accepts plain integers or suffixed strings such as `8MiB`. The default 8MiB window reduces + /// flow-control stalls for large OTLP batches; trimming it lowers per-stream memory but may + /// throttle throughput, while increasing it benefits high-bandwidth deployments at the cost of + /// larger buffers. + #[serde( + default = "default_initial_stream_window_size", + deserialize_with = "byte_units::deserialize" + )] + pub initial_stream_window_size: Option, + + /// Initial HTTP/2 connection window size, in bytes. + /// Accepts plain integers or suffixed strings such as `32MiB`. Defaults to 24MiB, giving room + /// for several simultaneous large streams; adjust using the same trade-offs as the stream + /// window but applied per connection. + #[serde( + default = "default_initial_connection_window_size", + deserialize_with = "byte_units::deserialize" + )] + pub initial_connection_window_size: Option, + + /// Whether to rely on HTTP/2 adaptive window sizing instead of the manual values above. + /// Disabled by default so the receiver uses predictable static windows. Enabling this lets tonic + /// adjust flow-control windows dynamically, which can improve throughput on high-bandwidth links + /// but makes memory usage and latency more workload dependent (and largely ignores the window + /// sizes configured above). + #[serde(default = "default_http2_adaptive_window")] + pub http2_adaptive_window: bool, + + /// Maximum HTTP/2 frame size, in bytes. + /// Accepts plain integers or suffixed strings such as `16KiB`. The 16KiB default matches the + /// current tuning: large enough to keep framing overhead low for sizeable batches yet still + /// bounded; larger values further decrease framing costs at the expense of bigger per-frame + /// buffers, while smaller values force additional fragmentation and CPU work on jumbo exports. + #[serde( + default = "default_max_frame_size", + deserialize_with = "byte_units::deserialize" + )] + pub max_frame_size: Option, + + /// Maximum size for inbound gRPC messages, in bytes. + /// Accepts plain integers or suffixed strings such as `4MiB`. Defaults to tonic's 4MiB limit. + #[serde( + default = "default_max_decoding_message_size", + deserialize_with = "byte_units::deserialize" + )] + pub max_decoding_message_size: Option, + + /// Interval between HTTP/2 keepalive pings. + /// The default 30s ping keeps intermediaries aware of idle-but-healthy connections. Shorten it + /// to detect broken links faster, lengthen it to reduce ping traffic, or set to `null` to + /// disable HTTP/2 keepalives. + #[serde(default = "default_http2_keepalive_interval", with = "humantime_serde")] + pub http2_keepalive_interval: Option, + + /// Timeout waiting for an HTTP/2 keepalive acknowledgement. + /// Defaults to 10s, balancing rapid detection of stalled peers with tolerance for transient + /// network jitter. Decrease it for quicker failover or increase it for chatty-but-latent paths. + #[serde(default = "default_http2_keepalive_timeout", with = "humantime_serde")] + pub http2_keepalive_timeout: Option, + + /// Upper bound on concurrently active HTTP/2 streams per connection. + /// By default this tracks the effective `max_concurrent_requests`, keeping logical and transport + /// concurrency aligned. Lower values improve fairness between chatty clients. Higher values + /// matter only if you also raise `max_concurrent_requests`. Set to `0` to inherit the derived + /// default. + #[serde(default)] + pub max_concurrent_streams: Option, + + /// Whether to wait for the result (default: false) + /// + /// When enabled, the receiver will not send a response until the + /// immediate downstream component has acknowledged receipt of the + /// data. This does not guarantee that data has been fully + /// processed or successfully exported to the final destination, + /// since components are able acknowledge early. + /// + /// Note when wait_for_result=false, it is impossible to + /// see a failure, errors are effectively suppressed. + #[serde(default = "default_wait_for_result")] + pub wait_for_result: bool, + + /// Timeout for RPC requests. If not specified, no timeout is applied. + /// Format: humantime format (e.g., "30s", "5m", "1h", "500ms") + #[serde(default, with = "humantime_serde")] + pub timeout: Option, +} + +impl GrpcServerSettings { + /// Returns the compression methods accepted for requests. + #[must_use] + pub fn request_compression_methods(&self) -> Vec { + match &self.request_compression { + Some(methods) => methods.clone(), + None => compression::DEFAULT_COMPRESSION_METHODS.to_vec(), + } + } + + /// Returns the compression methods configured for responses. + #[must_use] + pub fn response_compression_methods(&self) -> Vec { + match &self.response_compression { + Some(methods) => methods.clone(), + None => Vec::new(), + } + } + + /// Returns the first configured compression method for responses, if any. + #[must_use] + pub fn preferred_response_compression(&self) -> Option { + self.response_compression + .as_ref() + .and_then(|methods| methods.first().copied()) + } + + /// Builds the Tonic TCP Incoming. + #[must_use] + pub fn build_tcp_incoming(&self, tcp_listener: TcpListener) -> TcpIncoming { + TcpIncoming::from(tcp_listener) + .with_nodelay(Some(self.tcp_nodelay)) + .with_keepalive(self.tcp_keepalive) + .with_keepalive_interval(self.tcp_keepalive_interval) + .with_keepalive_retries(self.tcp_keepalive_retries) + } + + /// Returns the compression encodings to use for both requests and responses. + #[must_use] + pub fn compression_encodings( + &self, + ) -> (EnabledCompressionEncodings, EnabledCompressionEncodings) { + let mut request_compression = EnabledCompressionEncodings::default(); + for method in self.request_compression_methods() { + request_compression.enable(method.map_to_compression_encoding()); + } + + let mut response_compression = EnabledCompressionEncodings::default(); + for method in self.response_compression_methods() { + response_compression.enable(method.map_to_compression_encoding()); + } + (request_compression, response_compression) + } + + /// Builds the gRPC server settings from this configuration. + #[must_use] + pub fn build_settings(&self) -> Settings { + let (request_compression_encodings, response_compression_encodings) = + self.compression_encodings(); + + Settings { + max_concurrent_requests: self.max_concurrent_requests, + wait_for_result: self.wait_for_result, + max_decoding_message_size: self.max_decoding_message_size.map(|value| value as usize), + request_compression_encodings, + response_compression_encodings, + } + } +} + +const fn default_max_concurrent_requests() -> usize { + 0 +} + +const fn default_tcp_nodelay() -> bool { + true +} + +fn default_tcp_keepalive() -> Option { + Some(Duration::from_secs(45)) +} + +fn default_tcp_keepalive_interval() -> Option { + Some(Duration::from_secs(15)) +} + +fn default_tcp_keepalive_retries() -> Option { + Some(5) +} + +const fn default_load_shed() -> bool { + true +} + +fn default_initial_stream_window_size() -> Option { + Some(8 * 1024 * 1024) +} + +fn default_initial_connection_window_size() -> Option { + Some(24 * 1024 * 1024) +} + +fn default_max_frame_size() -> Option { + Some(16 * 1024) +} + +fn default_max_decoding_message_size() -> Option { + Some(4 * 1024 * 1024) +} + +fn default_http2_keepalive_interval() -> Option { + Some(Duration::from_secs(30)) +} + +fn default_http2_keepalive_timeout() -> Option { + Some(Duration::from_secs(10)) +} + +const fn default_http2_adaptive_window() -> bool { + false +} + +const fn default_wait_for_result() -> bool { + // See https://github.com/open-telemetry/otel-arrow/issues/1311 + // This matches the OTel Collector default for wait_for_result, presently. + false +} + +impl Default for GrpcServerSettings { + fn default() -> Self { + Self { + listening_addr: ([0, 0, 0, 0], 0).into(), + request_compression: None, + response_compression: None, + max_concurrent_requests: default_max_concurrent_requests(), + tcp_nodelay: default_tcp_nodelay(), + tcp_keepalive: default_tcp_keepalive(), + tcp_keepalive_interval: default_tcp_keepalive_interval(), + tcp_keepalive_retries: default_tcp_keepalive_retries(), + transport_concurrency_limit: None, + load_shed: default_load_shed(), + initial_stream_window_size: default_initial_stream_window_size(), + initial_connection_window_size: default_initial_connection_window_size(), + http2_adaptive_window: default_http2_adaptive_window(), + max_frame_size: default_max_frame_size(), + max_decoding_message_size: default_max_decoding_message_size(), + http2_keepalive_interval: default_http2_keepalive_interval(), + http2_keepalive_timeout: default_http2_keepalive_timeout(), + max_concurrent_streams: None, + wait_for_result: default_wait_for_result(), + timeout: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compression::{CompressionMethod, DEFAULT_COMPRESSION_METHODS}; + use tonic::codec::CompressionEncoding; + + #[test] + fn defaults_match_expected_compression() { + let settings = GrpcServerSettings::default(); + + assert_eq!( + settings.request_compression_methods(), + DEFAULT_COMPRESSION_METHODS.to_vec() + ); + assert!(settings.response_compression_methods().is_empty()); + assert_eq!(settings.preferred_response_compression(), None); + } + + #[test] + fn response_compression_prefers_first_entry() { + let settings = GrpcServerSettings { + response_compression: Some(vec![CompressionMethod::Gzip, CompressionMethod::Zstd]), + ..Default::default() + }; + + assert_eq!( + settings.preferred_response_compression(), + Some(CompressionMethod::Gzip) + ); + assert_eq!( + settings.response_compression_methods(), + vec![CompressionMethod::Gzip, CompressionMethod::Zstd] + ); + } + + #[test] + fn build_settings_carries_core_limits_and_compression() { + let settings = GrpcServerSettings { + max_concurrent_requests: 42, + wait_for_result: true, + max_decoding_message_size: Some(8 * 1024 * 1024), + request_compression: Some(vec![CompressionMethod::Deflate]), + response_compression: Some(vec![CompressionMethod::Deflate]), + ..Default::default() + }; + + let built = settings.build_settings(); + assert_eq!(built.max_concurrent_requests, 42); + assert!(built.wait_for_result); + assert_eq!(built.max_decoding_message_size, Some(8 * 1024 * 1024)); + + let (req, resp) = settings.compression_encodings(); + assert!(req.is_enabled(CompressionEncoding::Deflate)); + assert!(resp.is_enabled(CompressionEncoding::Deflate)); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs index faaedf672f..28603917e5 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs @@ -233,8 +233,9 @@ impl shared::Receiver for OTLPReceiver { let settings = Settings { max_concurrent_requests: self.config.max_concurrent_requests, wait_for_result: self.config.wait_for_result, - accept_compression_encodings: compression, - send_compression_encodings: compression, + max_decoding_message_size: None, // [lq] will be used in a future PR + request_compression_encodings: compression, + response_compression_encodings: compression, }; let logs_server = LogsServiceServer::new(effect_handler.clone(), &settings);