diff --git a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/README.md b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/README.md index a1958aa98d..7377e979c5 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/README.md +++ b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/README.md @@ -61,7 +61,6 @@ nodes: "span_id": "SpanId" "attributes": "message": "ParsedMessage" - disable_schema_mapping: false # Authentication configuration (uses Azure SDK defaults) auth: diff --git a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/config.rs b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/config.rs index f1a63b56c9..dbfe6dfdcd 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/config.rs +++ b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/config.rs @@ -93,10 +93,6 @@ pub struct SchemaConfig { /// Log record field mappings #[serde(default)] pub log_record_mapping: HashMap, - - /// Disable automatic schema mapping - #[serde(default)] - pub disable_schema_mapping: bool, } impl Config { diff --git a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/exporter.rs b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/exporter.rs index 431914274a..138f206822 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/exporter.rs @@ -563,7 +563,6 @@ mod tests { stream_name: "stream".to_string(), dcr: "dcr-id".to_string(), schema: SchemaConfig { - disable_schema_mapping: false, resource_mapping: HashMap::new(), scope_mapping: HashMap::new(), log_record_mapping: HashMap::new(), diff --git a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/otlp-ame.yaml b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/otlp-ame.yaml index 7e71435b91..de32fc7dce 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/otlp-ame.yaml +++ b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/otlp-ame.yaml @@ -55,7 +55,6 @@ nodes: "attributes": "Properties" "trace_id": "TraceId" "span_id": "SpanId" - disable_schema_mapping: false # Authentication configuration for AKS auth: diff --git a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/transformer.rs b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/transformer.rs index 4f70abe869..8ac665e91e 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/transformer.rs +++ b/rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/transformer.rs @@ -8,101 +8,183 @@ use otap_df_pdata::views::{ resource::ResourceView, }; use serde::Serialize; -use serde_json::{Value, json}; +use serde_json::Value; +use std::borrow::Cow; +use std::collections::HashMap; use super::config::{Config, SchemaConfig}; -const ATTRIBUTES_FIELD: &str = "attributes"; const HEX_CHARS: &[u8; 16] = b"0123456789abcdef"; -/// Flat view of final log line: resource + scope + record -#[derive(Serialize)] -struct LogEntry<'a> { - #[serde(flatten)] - resource: &'a serde_json::Map, +/// Pre-parsed field mapping for a log record field +#[derive(Debug, Clone)] +struct FieldMapping { + /// The source field name (e.g., "time_unix_nano", "body") + source: LogRecordField, + /// The destination field name in the output JSON + dest: String, +} + +/// Enum representing known log record fields for fast matching +#[derive(Debug, Clone, Copy)] +enum LogRecordField { + TimeUnixNano, + ObservedTimeUnixNano, + TraceId, + SpanId, + Flags, + SeverityNumber, + SeverityText, + Body, + EventName, +} + +impl LogRecordField { + /// Parse a field name string into a LogRecordField enum + fn from_str(s: &str) -> Option { + if s.eq_ignore_ascii_case("time_unix_nano") { + Some(Self::TimeUnixNano) + } else if s.eq_ignore_ascii_case("observed_time_unix_nano") { + Some(Self::ObservedTimeUnixNano) + } else if s.eq_ignore_ascii_case("trace_id") { + Some(Self::TraceId) + } else if s.eq_ignore_ascii_case("span_id") { + Some(Self::SpanId) + } else if s.eq_ignore_ascii_case("flags") { + Some(Self::Flags) + } else if s.eq_ignore_ascii_case("severity_number") { + Some(Self::SeverityNumber) + } else if s.eq_ignore_ascii_case("severity_text") { + Some(Self::SeverityText) + } else if s.eq_ignore_ascii_case("body") { + Some(Self::Body) + } else if s.eq_ignore_ascii_case("event_name") { + // Add this + Some(Self::EventName) + } else { + None + } + } +} + +/// Pre-parsed schema configuration for faster runtime processing +#[derive(Debug, Clone)] +struct ParsedSchema { + /// Resource attribute mappings (source -> dest) + resource_mapping: HashMap, + /// Scope attribute mappings (source -> dest) + scope_mapping: HashMap, + /// Pre-parsed log record field mappings + field_mappings: Vec, + /// Pre-parsed attribute mappings (source attr -> dest field) + attribute_mapping: HashMap, +} - #[serde(flatten)] - scope: &'a serde_json::Map, +impl ParsedSchema { + fn from_config(schema: &SchemaConfig) -> Result { + let mut field_mappings = Vec::new(); + let mut attribute_mapping = HashMap::new(); + + for (key, value) in &schema.log_record_mapping { + if key.eq_ignore_ascii_case("attributes") { + // Parse attribute mappings + if let Some(attr_map) = value.as_object() { + for (attr_key, attr_dest) in attr_map { + let dest = attr_dest + .as_str() + .map(String::from) + .unwrap_or_else(|| attr_dest.to_string()); + _ = attribute_mapping.insert(attr_key.clone(), dest); + } + } + } else { + // Parse field mapping + let source = LogRecordField::from_str(key) + .ok_or_else(|| format!("Unknown log record field: {key}"))?; + let dest = value + .as_str() + .ok_or_else(|| format!("Field mapping for '{key}' must be a string"))? + .to_string(); + field_mappings.push(FieldMapping { source, dest }); + } + } - #[serde(flatten)] - record: serde_json::Map, + Ok(Self { + resource_mapping: schema.resource_mapping.clone(), + scope_mapping: schema.scope_mapping.clone(), + field_mappings, + attribute_mapping, + }) + } } -// TODO: Performance review and improvements -// TODO: Make sure mapping of all fields are covered /// Converts OTLP logs to Azure Log Analytics format +#[derive(Debug)] pub struct Transformer { - schema: SchemaConfig, + schema: ParsedSchema, } #[allow(clippy::print_stdout)] impl Transformer { /// Create a new Transformer with the given configuration + /// + /// # Panics + /// Panics if the schema configuration contains invalid field names #[must_use] pub fn new(config: &Config) -> Self { Self { - schema: config.api.schema.clone(), + schema: ParsedSchema::from_config(&config.api.schema) + .expect("Invalid schema configuration"), } } + /// Create a new Transformer, returning an error if configuration is invalid + pub fn try_new(config: &Config) -> Result { + Ok(Self { + schema: ParsedSchema::from_config(&config.api.schema)?, + }) + } + /// High-perf, single-threaded: one reusable BytesMut, grows to max size, no extra copies. /// Now accepts any type implementing `LogsDataView` (both `OtapLogsView` and `RawLogsData`). #[must_use] pub fn convert_to_log_analytics(&self, logs_view: &T) -> Vec { - let mut results = Vec::new(); - let mut buf = BytesMut::with_capacity(512); + let mut results = Vec::with_capacity(1024); + let mut buf = BytesMut::with_capacity(2048); + let mut record_map = serde_json::Map::new(); for resource_logs in logs_view.resources() { - let resource_attrs = if self.schema.disable_schema_mapping { - serde_json::Map::new() - } else { - resource_logs - .resource() - .map(|r| self.apply_resource_mapping(&r)) - .unwrap_or_default() - }; - for scope_logs in resource_logs.scopes() { - let scope_attrs = if self.schema.disable_schema_mapping { - serde_json::Map::new() - } else { - scope_logs - .scope() - .map(|s| self.apply_scope_mapping(&s)) - .unwrap_or_default() - }; - for log_record in scope_logs.log_records() { - let record_map = if self.schema.disable_schema_mapping { - Self::legacy_transform_to_map(&log_record) - } else { - match self.transform_log_record_to_map(&log_record) { - Ok(m) => m, - Err(e) => { - println!( - "[AzureMonitorExporter] Skipping log record due to transformation error: {e}" - ); - continue; - } - } - }; + record_map.clear(); + + // normally config should be validated to avoid duplicate keys, but if that + // ever happens for any reason such as a bug, then the logic below ensures that + // the lowest level fields override the higher level ones. + + // apply resource mapping first to allow scope to override if needed + if let Some(r) = resource_logs.resource() { + self.apply_resource_mapping(&r, &mut record_map); + } + + // apply scope mapping next to allow log record to override if needed + if let Some(s) = scope_logs.scope() { + self.apply_scope_mapping(&s, &mut record_map); + } - let entry = LogEntry { - resource: &resource_attrs, - scope: &scope_attrs, - record: record_map, - }; + self.transform_log_record_to_map(&log_record, &mut record_map); buf.clear(); { let writer = (&mut buf).writer(); let mut ser = serde_json::Serializer::new(writer); - if let Err(e) = entry.serialize(&mut ser) { + if let Err(e) = record_map.serialize(&mut ser) { println!("Failed to serialize log entry: {e}"); continue; } } - results.push(buf.clone().freeze()); + + results.push(buf.split().freeze()); } } } @@ -110,142 +192,95 @@ impl Transformer { results } - /// Legacy transform when schema mapping is disabled - fn legacy_transform_to_map(log_record: &R) -> serde_json::Map { - let mut map = serde_json::Map::new(); - - let timestamp = log_record - .time_unix_nano() - .filter(|&ts| ts != 0) - .or_else(|| log_record.observed_time_unix_nano()) - .unwrap_or(0); - - _ = map.insert( - "TimeGenerated".into(), - json!(Self::format_timestamp(timestamp)), - ); - - if let Some(body) = log_record.body() { - _ = map.insert("RawData".into(), json!(Self::extract_string_value(&body))); - } - - map - } - /// Apply resource mapping based on configuration fn apply_resource_mapping( &self, resource: &R, - ) -> serde_json::Map { - let mut attrs = serde_json::Map::new(); - + map: &mut serde_json::Map, + ) { for attr in resource.attributes() { - let key = Self::str_to_string(attr.key()); - if let Some(mapped_name) = self.schema.resource_mapping.get(&key) { + let key: Cow<'_, str> = String::from_utf8_lossy(attr.key()); + if let Some(mapped_name) = self.schema.resource_mapping.get(key.as_ref()) { if let Some(value) = attr.value() { - _ = attrs.insert(mapped_name.clone(), Self::convert_any_value(&value)); + _ = map.insert(mapped_name.clone(), Self::convert_any_value(&value)); } } } - - attrs } /// Apply scope mapping based on configuration fn apply_scope_mapping( &self, scope: &S, - ) -> serde_json::Map { - let mut attrs = serde_json::Map::new(); - + map: &mut serde_json::Map, + ) { for attr in scope.attributes() { - let key = Self::str_to_string(attr.key()); - if let Some(mapped_name) = self.schema.scope_mapping.get(&key) { + let key: Cow<'_, str> = String::from_utf8_lossy(attr.key()); + if let Some(mapped_name) = self.schema.scope_mapping.get(key.as_ref()) { if let Some(value) = attr.value() { - _ = attrs.insert(mapped_name.clone(), Self::convert_any_value(&value)); + _ = map.insert(mapped_name.clone(), Self::convert_any_value(&value)); } } } - - attrs } - /// Transform log record fields into a Map + /// Transform log record fields into a Map (no longer returns Result - validation done at construction) fn transform_log_record_to_map( &self, log_record: &R, - ) -> Result, String> { - let mut map = serde_json::Map::new(); - - for (key, value) in &self.schema.log_record_mapping { - if key == ATTRIBUTES_FIELD { - self.map_attributes(&mut map, log_record, value)?; - } else { - let field_name = value - .as_str() - .ok_or("Field mapping value must be a string")?; - let extracted = Self::extract_field_value(key, log_record)?; - _ = map.insert(field_name.into(), extracted); - } + map: &mut serde_json::Map, + ) { + // Process pre-parsed field mappings + for field_mapping in &self.schema.field_mappings { + let value = Self::extract_field_value(field_mapping.source, log_record); + _ = map.insert(field_mapping.dest.clone(), value); } - Ok(map) - } - - /// Map log record attributes based on attr_mapping config - fn map_attributes( - &self, - dest: &mut serde_json::Map, - log_record: &R, - attr_mapping_value: &Value, - ) -> Result<(), String> { - let Some(attr_mapping) = attr_mapping_value.as_object() else { - return Ok(()); - }; - - for attr in log_record.attributes() { - let attr_key = Self::str_to_string(attr.key()); - if let Some(mapped_field) = attr_mapping.get(&attr_key) { - if let Some(val) = attr.value() { - let field_name = mapped_field - .as_str() - .map(String::from) - .unwrap_or_else(|| mapped_field.to_string()); - _ = dest.insert(field_name, Self::convert_any_value(&val)); + // Process attribute mappings + if !self.schema.attribute_mapping.is_empty() { + for attr in log_record.attributes() { + let attr_key: Cow<'_, str> = String::from_utf8_lossy(attr.key()); + if let Some(dest_field) = self.schema.attribute_mapping.get(attr_key.as_ref()) { + if let Some(val) = attr.value() { + _ = map.insert(dest_field.clone(), Self::convert_any_value(&val)); + } } } } - - Ok(()) } - /// Extract value from log record by field name - fn extract_field_value(key: &str, log_record: &R) -> Result { - match key.to_lowercase().as_str() { - "time_unix_nano" => Ok(json!(Self::format_timestamp( - log_record.time_unix_nano().unwrap_or(0) - ))), - "observed_time_unix_nano" => Ok(json!(Self::format_timestamp( - log_record.observed_time_unix_nano().unwrap_or(0) - ))), - "trace_id" => Ok(log_record + /// Extract value from log record by pre-parsed field enum (no string comparison needed) + #[inline] + fn extract_field_value(field: LogRecordField, log_record: &R) -> Value { + match field { + LogRecordField::TimeUnixNano => Value::String(Self::format_timestamp( + log_record.time_unix_nano().unwrap_or(0), + )), + LogRecordField::ObservedTimeUnixNano => Value::String(Self::format_timestamp( + log_record.observed_time_unix_nano().unwrap_or(0), + )), + LogRecordField::TraceId => log_record .trace_id() - .map(|id| json!(Self::bytes_to_hex(id))) - .unwrap_or(Value::Null)), - "span_id" => Ok(log_record + .map(|id| Value::String(Self::bytes_to_hex(id))) + .unwrap_or(Value::Null), + LogRecordField::SpanId => log_record .span_id() - .map(|id| json!(Self::bytes_to_hex(id))) - .unwrap_or(Value::Null)), - "flags" => Ok(json!(log_record.flags().unwrap_or(0))), - "severity_number" => Ok(json!(log_record.severity_number().unwrap_or(0) as i64)), - "severity_text" => Ok(json!(Self::str_to_string( - log_record.severity_text().unwrap_or(b"") - ))), - "body" => Ok(log_record + .map(|id| Value::String(Self::bytes_to_hex(id))) + .unwrap_or(Value::Null), + LogRecordField::Flags => Value::Number(log_record.flags().unwrap_or(0).into()), + LogRecordField::SeverityNumber => { + Value::Number((log_record.severity_number().unwrap_or(0) as i64).into()) + } + LogRecordField::SeverityText => Value::String(Self::str_to_string( + log_record.severity_text().unwrap_or(b""), + )), + LogRecordField::Body => log_record .body() - .map(|b| json!(Self::extract_string_value(&b))) - .unwrap_or(Value::Null)), - _ => Err(format!("Unknown field name: {key}")), + .map(|b| Value::String(Self::extract_string_value(&b))) + .unwrap_or(Value::Null), + LogRecordField::EventName => { + Value::String(Self::str_to_string(log_record.event_name().unwrap_or(b""))) + } } } @@ -282,20 +317,19 @@ impl Transformer { /// Convert AnyValueView to serde_json::Value fn convert_any_value<'a, V: AnyValueView<'a>>(value: &V) -> Value { match value.value_type() { - ValueType::String => json!(Self::str_to_string(value.as_string().unwrap_or(b""))), - ValueType::Int64 => json!(value.as_int64().unwrap_or(0)), - ValueType::Double => json!(value.as_double().unwrap_or(0.0)), - ValueType::Bool => json!(value.as_bool().unwrap_or(false)), - ValueType::Bytes => json!(Self::bytes_to_hex(value.as_bytes().unwrap_or(&[]))), + ValueType::String => { + Value::String(Self::str_to_string(value.as_string().unwrap_or(b""))) + } + ValueType::Int64 => Value::Number(value.as_int64().unwrap_or(0).into()), + ValueType::Double => serde_json::Number::from_f64(value.as_double().unwrap_or(0.0)) + .map(Value::Number) + .unwrap_or(Value::Null), + ValueType::Bool => Value::Bool(value.as_bool().unwrap_or(false)), + ValueType::Bytes => Value::String(Self::bytes_to_hex(value.as_bytes().unwrap_or(&[]))), ValueType::Array => value .as_array() - .map(|iter| { - json!( - iter.map(|v| Self::convert_any_value(&v)) - .collect::>() - ) - }) - .unwrap_or(json!([])), + .map(|iter| Value::Array(iter.map(|v| Self::convert_any_value(&v)).collect())) + .unwrap_or_else(|| Value::Array(Vec::new())), ValueType::KeyValueList => value .as_kvlist() .map(|iter| { @@ -308,7 +342,7 @@ impl Transformer { .collect(); Value::Object(map) }) - .unwrap_or(json!({})), + .unwrap_or_else(|| Value::Object(serde_json::Map::new())), ValueType::Empty => Value::Null, } } @@ -348,9 +382,10 @@ mod tests { }; use otap_df_pdata::views::otlp::bytes::logs::RawLogsData; use prost::Message; + use serde_json::json; use std::collections::HashMap; - fn create_test_config(disable_mapping: bool) -> Config { + fn create_test_config() -> Config { use super::super::config::{ApiConfig, AuthConfig, SchemaConfig}; Config { @@ -359,7 +394,6 @@ mod tests { stream_name: "test-stream".into(), dcr: "test-dcr".into(), schema: SchemaConfig { - disable_schema_mapping: disable_mapping, resource_mapping: HashMap::from([( "service.name".into(), "ServiceName".into(), @@ -376,43 +410,9 @@ mod tests { } } - #[test] - fn test_legacy_transform() { - let config = create_test_config(true); - let transformer = Transformer::new(&config); - - let request = ExportLogsServiceRequest { - resource_logs: vec![ResourceLogs { - resource: None, - scope_logs: vec![ScopeLogs { - scope: None, - log_records: vec![LogRecord { - time_unix_nano: 1_000_000_000, - observed_time_unix_nano: 2_000_000_000, - body: Some(AnyValue { - value: Some(OtelAnyValueEnum::StringValue("test body".into())), - }), - ..Default::default() - }], - schema_url: String::new(), - }], - schema_url: String::new(), - }], - }; - - let bytes = request.encode_to_vec(); - let logs_view = RawLogsData::new(&bytes); - let result = transformer.convert_to_log_analytics(&logs_view); - - assert_eq!(result.len(), 1); - let json: Value = serde_json::from_slice(&result[0]).unwrap(); - assert!(json["TimeGenerated"].as_str().is_some()); - assert_eq!(json["RawData"], "test body"); - } - #[test] fn test_schema_mapping() { - let config = create_test_config(false); + let config = create_test_config(); let transformer = Transformer::new(&config); let request = ExportLogsServiceRequest { @@ -473,7 +473,7 @@ mod tests { #[test] fn test_all_log_record_fields() { - let mut config = create_test_config(false); + let mut config = create_test_config(); config.api.schema.log_record_mapping = HashMap::from([ ("time_unix_nano".to_string(), json!("Time")), ("observed_time_unix_nano".to_string(), json!("ObservedTime")), @@ -526,7 +526,7 @@ mod tests { #[test] fn test_any_value_types() { - let config = create_test_config(false); + let config = create_test_config(); let transformer = Transformer::new(&config); let request = ExportLogsServiceRequest { @@ -564,7 +564,7 @@ mod tests { #[test] fn test_kvlist_value() { - let config = create_test_config(false); + let config = create_test_config(); let transformer = Transformer::new(&config); let request = ExportLogsServiceRequest { @@ -600,7 +600,7 @@ mod tests { #[test] fn test_empty_values() { - let mut config = create_test_config(false); + let mut config = create_test_config(); _ = config .api .schema @@ -646,19 +646,79 @@ mod tests { } #[test] - fn test_invalid_mapping_error() { - let mut config = create_test_config(false); + fn test_zero_timestamp() { + let timestamp = Transformer::format_timestamp(0); + assert!(timestamp.contains('T')); + } + + #[test] + fn test_try_new_success() { + let config = create_test_config(); + let result = Transformer::try_new(&config); + assert!(result.is_ok()); + } + + #[test] + fn test_try_new_invalid_field() { + let mut config = create_test_config(); _ = config .api .schema .log_record_mapping .insert("invalid_field".into(), json!("Invalid")); + let result = Transformer::try_new(&config); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Unknown log record field")); + } + + #[test] + fn test_try_new_invalid_mapping_type() { + let mut config = create_test_config(); + // Field mapping value must be a string, not an object + _ = config + .api + .schema + .log_record_mapping + .insert("body".into(), json!({"nested": "object"})); + + let result = Transformer::try_new(&config); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("must be a string")); + } + + #[test] + fn test_empty_schema_mappings() { + use super::super::config::{ApiConfig, AuthConfig, SchemaConfig}; + + let config = Config { + api: ApiConfig { + dcr_endpoint: "https://test.com".into(), + stream_name: "test-stream".into(), + dcr: "test-dcr".into(), + schema: SchemaConfig { + resource_mapping: HashMap::new(), + scope_mapping: HashMap::new(), + log_record_mapping: HashMap::new(), + }, + }, + auth: AuthConfig::default(), + }; + let transformer = Transformer::new(&config); let request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { - resource: None, + resource: Some(Resource { + attributes: vec![KeyValue { + key: "unmapped.attr".into(), + value: Some(AnyValue { + value: Some(OtelAnyValueEnum::StringValue("value".into())), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), scope_logs: vec![ScopeLogs { scope: None, log_records: vec![LogRecord::default()], @@ -671,18 +731,162 @@ mod tests { let bytes = request.encode_to_vec(); let logs_view = RawLogsData::new(&bytes); let result = transformer.convert_to_log_analytics(&logs_view); - assert_eq!(result.len(), 0); + + assert_eq!(result.len(), 1); + let json: Value = serde_json::from_slice(&result[0]).unwrap(); + // Should be empty object since no mappings configured + assert_eq!(json, json!({})); } #[test] - fn test_zero_timestamp() { - let timestamp = Transformer::format_timestamp(0); - assert!(timestamp.contains('T')); + fn test_attribute_with_no_value() { + let config = create_test_config(); + let transformer = Transformer::new(&config); + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".into(), + value: None, // No value + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_logs: vec![ScopeLogs { + scope: Some(InstrumentationScope { + name: "test".into(), + version: String::new(), + attributes: vec![KeyValue { + key: "scope.name".into(), + value: None, // No value + }], + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + attributes: vec![KeyValue { + key: "test.attr".into(), + value: None, // No value + }], + ..Default::default() + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let bytes = request.encode_to_vec(); + let logs_view = RawLogsData::new(&bytes); + let result = transformer.convert_to_log_analytics(&logs_view); + + assert_eq!(result.len(), 1); + let json: Value = serde_json::from_slice(&result[0]).unwrap(); + // Attributes with no value should be skipped + assert!(json.get("ServiceName").is_none()); + assert!(json.get("ScopeName").is_none()); + assert!(json.get("TestAttr").is_none()); + } + + #[test] + fn test_multiple_log_records() { + let config = create_test_config(); + let transformer = Transformer::new(&config); + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: None, + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![ + LogRecord { + body: Some(AnyValue { + value: Some(OtelAnyValueEnum::StringValue("first".into())), + }), + severity_text: "INFO".into(), + ..Default::default() + }, + LogRecord { + body: Some(AnyValue { + value: Some(OtelAnyValueEnum::StringValue("second".into())), + }), + severity_text: "ERROR".into(), + ..Default::default() + }, + ], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let bytes = request.encode_to_vec(); + let logs_view = RawLogsData::new(&bytes); + let result = transformer.convert_to_log_analytics(&logs_view); + + assert_eq!(result.len(), 2); + + let json1: Value = serde_json::from_slice(&result[0]).unwrap(); + assert_eq!(json1["Body"], "first"); + assert_eq!(json1["Severity"], "INFO"); + + let json2: Value = serde_json::from_slice(&result[1]).unwrap(); + assert_eq!(json2["Body"], "second"); + assert_eq!(json2["Severity"], "ERROR"); + } + + #[test] + fn test_empty_body_string() { + let mut config = create_test_config(); + config.api.schema.log_record_mapping = HashMap::from([("body".into(), json!("Body"))]); + + let transformer = Transformer::new(&config); + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: None, + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![LogRecord { + body: Some(AnyValue { + value: Some(OtelAnyValueEnum::StringValue(String::new())), + }), + ..Default::default() + }], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let bytes = request.encode_to_vec(); + let logs_view = RawLogsData::new(&bytes); + let result = transformer.convert_to_log_analytics(&logs_view); + + let json: Value = serde_json::from_slice(&result[0]).unwrap(); + assert_eq!(json["Body"], ""); + } + + #[test] + fn test_bytes_to_hex() { + assert_eq!(Transformer::bytes_to_hex(&[]), ""); + assert_eq!(Transformer::bytes_to_hex(&[0x00]), "00"); + assert_eq!(Transformer::bytes_to_hex(&[0xff]), "ff"); + assert_eq!( + Transformer::bytes_to_hex(&[0x12, 0x34, 0xab, 0xcd]), + "1234abcd" + ); } #[test] - fn test_observed_time_fallback() { - let config = create_test_config(true); + fn test_case_insensitive_field_names() { + let mut config = create_test_config(); + config.api.schema.log_record_mapping = HashMap::from([ + ("TIME_UNIX_NANO".into(), json!("Time")), + ("Body".into(), json!("Body")), + ("SEVERITY_TEXT".into(), json!("Severity")), + ]); + let transformer = Transformer::new(&config); let request = ExportLogsServiceRequest { @@ -691,8 +895,11 @@ mod tests { scope_logs: vec![ScopeLogs { scope: None, log_records: vec![LogRecord { - time_unix_nano: 0, - observed_time_unix_nano: 3_000_000_000, + time_unix_nano: 1_000_000_000, + body: Some(AnyValue { + value: Some(OtelAnyValueEnum::StringValue("test".into())), + }), + severity_text: "WARN".into(), ..Default::default() }], schema_url: String::new(), @@ -704,7 +911,93 @@ mod tests { let bytes = request.encode_to_vec(); let logs_view = RawLogsData::new(&bytes); let result = transformer.convert_to_log_analytics(&logs_view); + + let json: Value = serde_json::from_slice(&result[0]).unwrap(); + assert!(json["Time"].as_str().is_some()); + assert_eq!(json["Body"], "test"); + assert_eq!(json["Severity"], "WARN"); + } + + #[test] + fn test_double_nan_becomes_null() { + let config = create_test_config(); + let transformer = Transformer::new(&config); + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: Some(Resource { + attributes: vec![KeyValue { + key: "service.name".into(), + value: Some(AnyValue { + value: Some(OtelAnyValueEnum::DoubleValue(f64::NAN)), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![LogRecord::default()], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let bytes = request.encode_to_vec(); + let logs_view = RawLogsData::new(&bytes); + let result = transformer.convert_to_log_analytics(&logs_view); + let json: Value = serde_json::from_slice(&result[0]).unwrap(); - assert!(json["TimeGenerated"].as_str().unwrap().contains("1970")); + // NaN cannot be represented in JSON, so it becomes null + assert_eq!(json["ServiceName"], json!(null)); + } + + #[test] + fn test_event_name_field() { + let mut config = create_test_config(); + config.api.schema.log_record_mapping = HashMap::from([ + ("event_name".into(), json!("EventName")), + ("severity_text".into(), json!("Severity")), + ]); + + let transformer = Transformer::new(&config); + + let request = ExportLogsServiceRequest { + resource_logs: vec![ResourceLogs { + resource: None, + scope_logs: vec![ScopeLogs { + scope: None, + log_records: vec![ + LogRecord { + event_name: "user.login".into(), + severity_text: "INFO".into(), + ..Default::default() + }, + LogRecord { + event_name: String::new(), + severity_text: String::new(), + ..Default::default() + }, + ], + schema_url: String::new(), + }], + schema_url: String::new(), + }], + }; + + let bytes = request.encode_to_vec(); + let logs_view = RawLogsData::new(&bytes); + let result = transformer.convert_to_log_analytics(&logs_view); + + assert_eq!(result.len(), 2); + + let json1: Value = serde_json::from_slice(&result[0]).unwrap(); + assert_eq!(json1["EventName"], "user.login"); + assert_eq!(json1["Severity"], "INFO"); + + let json2: Value = serde_json::from_slice(&result[1]).unwrap(); + assert_eq!(json2["EventName"], ""); + assert_eq!(json2["Severity"], ""); } }