diff --git a/rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs b/rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs index ae549ec9b9..82e3df27a7 100644 --- a/rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs +++ b/rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs @@ -12,29 +12,48 @@ use crate::*; const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); const OTLP_BUFFER_INITIAL_CAPACITY: usize = 1024 * 64; -static EXPRESSIONS: LazyLock>> = +static EXPRESSIONS: LazyLock>> = LazyLock::new(|| RwLock::new(Vec::new())); +#[derive(Debug)] +pub struct BridgePipeline { + attributes_schema: Option, + pipeline: PipelineExpression, +} + +impl BridgePipeline { + pub fn get_pipeline(&self) -> &PipelineExpression { + &self.pipeline + } +} + pub fn parse_kql_query_into_pipeline( query: &str, options: Option, -) -> Result> { - let result = - KqlParser::parse_with_options(query, build_parser_options(options).map_err(|e| vec![e])?)?; - Ok(result.pipeline) +) -> Result> { + let options = build_parser_options(options).map_err(|e| vec![e])?; + let attributes_schema = match options + .get_source_map_schema() + .and_then(|s| s.get_schema_for_key("Attributes")) + { + Some(ParserMapKeySchema::Map(Some(attributes_schema))) => Some(attributes_schema.clone()), + _ => None, + }; + let result = KqlParser::parse_with_options(query, options)?; + Ok(BridgePipeline { + attributes_schema, + pipeline: result.pipeline, + }) } pub fn register_pipeline_for_kql_query( query: &str, options: Option, ) -> Result> { - let options = build_parser_options(options).map_err(|e| vec![e])?; - - let result = KqlParser::parse_with_options(query, options.clone())?; - let pipeline = result.pipeline; + let pipeline = parse_kql_query_into_pipeline(query, options)?; let mut expressions = EXPRESSIONS.write().unwrap(); - expressions.push((options, pipeline)); + expressions.push(pipeline); Ok(expressions.len() - 1) } @@ -47,11 +66,23 @@ pub fn process_protobuf_otlp_export_logs_service_request_using_registered_pipeli let pipeline_registration = expressions.get(pipeline); - let (options, pipeline) = match pipeline_registration { + let pipeline = match pipeline_registration { Some(v) => v, None => return Err(BridgeError::PipelineNotFound(pipeline)), }; + process_protobuf_otlp_export_logs_service_request_using_pipeline( + pipeline, + log_level, + export_logs_service_request_protobuf_data, + ) +} + +pub fn process_protobuf_otlp_export_logs_service_request_using_pipeline( + pipeline: &BridgePipeline, + log_level: RecordSetEngineDiagnosticLevel, + export_logs_service_request_protobuf_data: &[u8], +) -> Result<(Vec, Vec), BridgeError> { let request = ExportLogsServiceRequest::from_protobuf(export_logs_service_request_protobuf_data); @@ -59,12 +90,8 @@ pub fn process_protobuf_otlp_export_logs_service_request_using_registered_pipeli return Err(BridgeError::OtlpProtobufReadError(e)); } - match process_export_logs_service_request_using_pipeline( - Some(options), - pipeline, - log_level, - request.unwrap(), - ) { + match process_export_logs_service_request_using_pipeline(pipeline, log_level, request.unwrap()) + { Ok((included, dropped)) => { let mut included_records_otlp_response = Vec::new(); if let Some(ref included) = included { @@ -101,8 +128,7 @@ pub fn process_protobuf_otlp_export_logs_service_request_using_registered_pipeli } pub fn process_export_logs_service_request_using_pipeline( - options: Option<&ParserOptions>, - pipeline: &PipelineExpression, + pipeline: &BridgePipeline, log_level: RecordSetEngineDiagnosticLevel, mut export_logs_service_request: ExportLogsServiceRequest, ) -> Result< @@ -117,14 +143,10 @@ pub fn process_export_logs_service_request_using_pipeline( ); let mut batch = engine - .begin_batch(pipeline) + .begin_batch(&pipeline.pipeline) .map_err(|e| BridgeError::PipelineInitializationError(e.to_string()))?; - if let Some(options) = options - && let Some(ParserMapKeySchema::Map(Some(attributes_schema))) = options - .get_source_map_schema() - .and_then(|s| s.get_schema_for_key("Attributes")) - { + if let Some(attributes_schema) = &pipeline.attributes_schema { // Note: This block is a not-so-elegant fix to OTLP not supporting // roundtrip of extended types. What we do is if we know something is a // DateTime via the schema and the value is a string we will try to @@ -589,7 +611,6 @@ mod tests { let (included_records, dropped_records) = process_export_logs_service_request_using_pipeline( - None, &pipeline, RecordSetEngineDiagnosticLevel::Verbose, request, @@ -610,7 +631,6 @@ mod tests { let (included_records, dropped_records) = process_export_logs_service_request_using_pipeline( - None, &pipeline, RecordSetEngineDiagnosticLevel::Verbose, request, @@ -632,7 +652,6 @@ mod tests { let (included_records, dropped_records) = process_export_logs_service_request_using_pipeline( - None, &pipeline, RecordSetEngineDiagnosticLevel::Verbose, request, @@ -721,24 +740,19 @@ mod tests { )), ); - let options = build_parser_options(Some( - BridgeOptions::new().with_attributes_schema( - ParserMapSchema::new() - .with_key_definition("TimeGenerated", ParserMapKeySchema::DateTime), - ), - )) - .unwrap(); - - let pipeline = KqlParser::parse_with_options( + let pipeline = parse_kql_query_into_pipeline( "source | where gettype(TimeGenerated) == 'datetime'", - options.clone(), + Some( + BridgeOptions::new().with_attributes_schema( + ParserMapSchema::new() + .with_key_definition("TimeGenerated", ParserMapKeySchema::DateTime), + ), + ), ) - .unwrap() - .pipeline; + .unwrap(); let (included_records, dropped_records) = process_export_logs_service_request_using_pipeline( - Some(&options), &pipeline, RecordSetEngineDiagnosticLevel::Verbose, request, diff --git a/rust/experimental/query_engine/engine-recordset-otlp-bridge/tests/common/mod.rs b/rust/experimental/query_engine/engine-recordset-otlp-bridge/tests/common/mod.rs index 74a00036e7..4cfb13079b 100644 --- a/rust/experimental/query_engine/engine-recordset-otlp-bridge/tests/common/mod.rs +++ b/rust/experimental/query_engine/engine-recordset-otlp-bridge/tests/common/mod.rs @@ -1,11 +1,11 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -use data_engine_expressions::*; use data_engine_recordset::*; +use data_engine_recordset_otlp_bridge::BridgePipeline; pub(crate) fn process_records<'a, TRecords, TRecord>( - pipeline: &'a PipelineExpression, + pipeline: &'a BridgePipeline, engine: &RecordSetEngine, records: &mut TRecords, ) -> RecordSetEngineResults<'a, TRecord> @@ -16,9 +16,9 @@ where println!("Request:"); println!("{records:?}"); - println!("{pipeline}"); + println!("{}", pipeline.get_pipeline()); - let mut batch = engine.begin_batch(pipeline).unwrap(); + let mut batch = engine.begin_batch(pipeline.get_pipeline()).unwrap(); let dropped_records = batch.push_records(records); diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index c62f44cc02..70e4da3265 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -171,7 +171,11 @@ zip = "=4.2.0" byte-unit = "5.2.0" cpu-time = "1.0.0" -# Azure Monnitor Exporter +# Recordset KQL OTLP query_engine Processor +data_engine_recordset_otlp_bridge = { path = "../experimental/query_engine/engine-recordset-otlp-bridge" } +data_engine_recordset = { path = "../experimental/query_engine/engine-recordset" } + +# Azure Monitor Exporter azure_core = {version = "0.30.1", default-features = false } azure_identity = {version = "0.30.0", default-features = false } flate2 = "1.1.5" @@ -199,6 +203,7 @@ azure-monitor-exporter = ["otap-df-otap/azure-monitor-exporter"] # Experimental processors (opt-in) experimental-processors = ["otap-df-otap/experimental-processors"] condense-attributes-processor = ["otap-df-otap/condense-attributes-processor"] +recordset-kql-processor = ["otap-df-otap/recordset-kql-processor"] [workspace.lints.rust] # General compatibility lints diff --git a/rust/otap-dataflow/configs/fake-kql-debug-noop.yaml b/rust/otap-dataflow/configs/fake-kql-debug-noop.yaml new file mode 100644 index 0000000000..bf1b1f8ed6 --- /dev/null +++ b/rust/otap-dataflow/configs/fake-kql-debug-noop.yaml @@ -0,0 +1,63 @@ +nodes: + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:fake_data_generator:receiver" + out_ports: + out_port: + destinations: + - kql + dispatch_strategy: round_robin + config: + traffic_config: + max_signal_count: 100 + max_batch_size: 50 + signals_per_second: 10 + log_weight: 100 + metric_weight: 0 + trace_weight: 0 + registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] + + kql: + kind: processor + plugin_urn: "urn:otel:recordset_kql:processor" + out_ports: + out_port: + destinations: + - debug + dispatch_strategy: round_robin + config: + query: | + source + | extend Body = case( + EventName == "azure.resource.log", + strcat(Body, "\n\nTroubleshooting resource errors: https://docs.example.com/exporter-errors#kql-processor"), + Body + ) + + debug: + kind: processor + plugin_urn: "urn:otel:debug:processor" + out_ports: + out_port: + destinations: + - noop + dispatch_strategy: round_robin + config: + verbosity: detailed + mode: signal + + noop: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + +service: + telemetry: + logs: + level: "error" + processors: + - batch: + exporter: + console: + resource: + service.name: "kql-processor-demo" diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index ed8ebbd269..9e0c08a92a 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -69,6 +69,10 @@ tower = { workspace = true } # Geneva exporter dependencies geneva-uploader = { version = "0.3.0", optional = true } +# Recordset KQL OTLP query_engine Processor dependencies +data_engine_recordset_otlp_bridge = { workspace = true, optional = true } +data_engine_recordset = { workspace = true, optional = true } + # Azure Monitor Exporter dependencies azure_identity = { workspace = true, optional = true } flate2 = { workspace = true, optional = true } @@ -105,6 +109,11 @@ azure = ["object_store/azure", "object_store/cloud", "dep:azure_identity", "dep: # Experimental processors experimental-processors = [] condense-attributes-processor = ["experimental-processors"] +recordset-kql-processor = [ + "experimental-processors", + "dep:data_engine_recordset_otlp_bridge", + "dep:data_engine_recordset" +] [dev-dependencies] flume.workspace = true diff --git a/rust/otap-dataflow/crates/otap/src/experimental/mod.rs b/rust/otap-dataflow/crates/otap/src/experimental/mod.rs index 31b7d1ea3a..e6224643c2 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/mod.rs +++ b/rust/otap-dataflow/crates/otap/src/experimental/mod.rs @@ -18,3 +18,7 @@ pub mod azure_monitor_exporter; /// Condense Attributes processor #[cfg(feature = "condense-attributes-processor")] pub mod condense_attributes_processor; + +/// Recordset KQL OTLP Query Engine processor +#[cfg(feature = "recordset-kql-processor")] +pub mod recordset_kql_processor; diff --git a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/README.md b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/README.md new file mode 100644 index 0000000000..a78a1f4f5d --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/README.md @@ -0,0 +1,176 @@ +# EXPERIMENTAL KQL "RecordSet" Processor + +An OTAP-Dataflow processor that filters and transforms OpenTelemetry data using +Kusto Query Language (KQL) expressions over OpenTelemetry data in an opinionated +way. + +## Caveats and notes + +The underlying engine does not use Apache Arrow or DataFusion. This is not an +optimized code path in the OTAP Dataflow engine and performance (throughput) +will be lower. + +## Overview + +This processor integrates the experimental KQL "recordset" engine from +`rust/experimental/query_engine` to enable powerful data transformations within +OTAP pipelines. This was developed as a prototype as we prepare for a direct +column-oriented implementation, it is functional and production quality however +not an optimized implementation. + +## Features + +- **Filters**: Use KQL `where` clauses to filter logs +- **Transformations**: Apply KQL `extend` and `project` operations to modify + data +- **Aggregations**: Perform KQL `summarize` operations for data aggregation + +## Configuration + +```yaml +processors: + kql: + query: "source | where SeverityText == 'ERROR' | extend processed_time = now()" +``` + +## LogRecord structure and accessing data + +Log record structure follows the [OpenTelemetry +Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#logs-data-model). + +|Field | Type | +|-----------------|-------------| +|Timestamp |DateTime(UTC)| +|ObservedTimestamp|DateTime(UTC)| +|TraceId |Byte[] | +|SpanId |Byte[] | +|TraceFlags |Integer | +|SeverityText |String | +|SeverityNumber |Integer | +|Body |Any | +|Attributes |Map | +|EventName |String | + +## Associated data + +Each log record is associated to a +[Resource](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/README.md) +and an [Instrumentation +Scope](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/instrumentation-scope.md). + +### Resource + +|Field | Type | +|-----------------|-------------| +|Attributes |Map | + +### Instrumentation Scope + +|Field | Type | +|-----------------|-------------| +|Attributes |Map | +|Name |String | +|Version |String | + +## Accessing data + +When querying data on a log record we use the `source` identifier. + +The following query will remove the `TraceId` and `SpanId` fields: + +```yaml +query: "source | project-away TraceId, SpanId" +``` + +Top-level fields can be cleared and modified but new top-level fields cannot be +introduced. To support the addition of data a special top-level field named +`Attributes` is used. `Attributes` is a map type which contains values tied to a +string key. + +The following query will add `TimeProcessed` key/value to `Attributes`: + +```yaml +query: "source | extend Attributes['TimeProcessed'] = now()" +``` + +To simplify accessing custom data the query engine will automatically resolve +anything unknown using the `Attributes` map. + +This is equivalent to the previous query: + +```yaml +query: "source | extend TimeProcessed = now()" +``` + +Nested data may also be accessed. + +The following query will add the value of the `Body.name` field as +`Attributes.Name`. `Body.name` will then be removed. Note: If `Body.name` cannot +be found a `null` value will be set. + +```yaml +query: | + source + | extend Name = Body.name + | project-away Body.name +``` + +To access `Resource` and/or `Instrumentation Scope` data the `resource` and/or +`scope` identifiers may be used. + +```yaml +query: | + source + | extend + ProcessName = resource.Attributes['process.name'], + Category = scope.name +``` + +## Examples + +### Filter logs by severity + +```yaml +query: "source | where SeverityNumber >= 17" # ERROR and above +``` + +### Add computed fields + +```yaml +query: "source | extend hour = bin(Timestamp, 1h)" +``` + +### Keep only specific fields + +```yaml +query: "source | project-keep Body, SeverityText, Timestamp" +``` + +### Aggregate logs + +```yaml +query: "source | summarize Count = count() by SeverityText" +``` + +## Building + +Enable the `recordset-kql-processor` feature flag: + +```bash +cargo build --features recordset-kql-processor +``` + +## Running the Demo + +A complete demo configuration is available at +`configs/fake-kql-debug-noop.yaml`. Run it with: + +```bash +cargo run --features recordset-kql-processor --bin df_engine -- --pipeline ./configs/fake-kql-debug-noop.yaml --num-cores 1 +``` + +This demonstrates: + +- Fake log data generation +- KQL-based Body field enrichment with vendor-specific URLs +- Debug processor output showing modified logs diff --git a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/config.rs b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/config.rs new file mode 100644 index 0000000000..1c0c79b40e --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/config.rs @@ -0,0 +1,276 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +/// Configuration for the KQL processor +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RecordsetKqlProcessorConfig { + /// KQL query to apply to the data + pub query: String, + /// Optional bridge options for KQL processing + #[serde(default)] + pub bridge_options: Option, +} + +#[cfg(test)] +mod tests { + use crate::experimental::recordset_kql_processor::processor::RecordsetKqlProcessor; + + use super::*; + use otap_df_engine::context::{ControllerContext, PipelineContext}; + use otap_df_telemetry::registry::MetricsRegistryHandle; + use pretty_assertions::assert_eq; + use serde_json::json; + + fn create_test_pipeline_context() -> PipelineContext { + let metrics_registry = MetricsRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry); + controller_ctx.pipeline_context_with("test_grp".into(), "test_pipeline".into(), 0, 0) + } + + #[test] + fn test_config_parsing() { + let config_json = json!({ + "query": "source | where true" + }); + + let config: RecordsetKqlProcessorConfig = serde_json::from_value(config_json).unwrap(); + assert_eq!(config.query, "source | where true"); + assert_eq!(config.bridge_options, None); + } + + #[test] + fn test_config_parsing_with_bridge_options() { + let config_json = json!({ + "query": "source | where true", + "bridge_options": { + "attributes_schema": { + "double_key": "Double", + "string_key": "String" + } + } + }); + + let config: RecordsetKqlProcessorConfig = serde_json::from_value(config_json).unwrap(); + assert_eq!(config.query, "source | where true"); + assert!(config.bridge_options.is_some()); + + // Verify bridge options can be parsed + let bridge_options = RecordsetKqlProcessor::parse_bridge_options(&config.bridge_options); + assert!(bridge_options.is_ok()); + assert!(bridge_options.unwrap().is_some()); + } + + #[test] + fn test_processor_creation() { + let pipeline_ctx = create_test_pipeline_context(); + let config = RecordsetKqlProcessorConfig { + query: "source | where true".to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!(processor.is_ok()); + } + + #[test] + fn test_processor_creation_with_bridge_options() { + let pipeline_ctx = create_test_pipeline_context(); + let bridge_options_json = json!({ + "attributes_schema": { + "custom_field": "String", + "metric_value": "Double" + } + }); + + let config = RecordsetKqlProcessorConfig { + query: "source | where true".to_string(), + bridge_options: Some(bridge_options_json), + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!(processor.is_ok()); + } + + #[test] + fn test_invalid_query() { + let pipeline_ctx = create_test_pipeline_context(); + let config = RecordsetKqlProcessorConfig { + query: "invalid kql syntax <<<".to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!(processor.is_err()); + } + + #[test] + fn test_conditional_extend_vendor_info() { + // Test demonstrating vendor-specific enrichment based on EventName or Body matching + let pipeline_ctx = create_test_pipeline_context(); + + // Query that matches on EventName or Body and extends with vendor info + let query = r#" + source + | extend vendor = case( + EventName == "payment.completed" or Body has "payment", "payment-processor", + EventName == "user.login" or Body has "login", "auth-service", + EventName == "metric.emitted" or Body has "metric", "observability", + "unknown" + ) + | extend priority = case( + EventName has "error" or SeverityNumber >= 17, "high", + EventName has "warning" or SeverityNumber >= 13, "medium", + "low" + ) + | extend processed_at = now() + "#; + + let config = RecordsetKqlProcessorConfig { + query: query.to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!(processor.is_ok(), "Query should parse successfully"); + } + + #[test] + fn test_multiple_conditions_with_or() { + let pipeline_ctx = create_test_pipeline_context(); + + // Query demonstrating OR conditions and statement-by-statement enrichment + let query = r#" + source + | where EventName == "critical.alert" or Body has "CRITICAL" or SeverityNumber >= 21 + | extend alert_type = case( + Body has "OutOfMemory" or EventName has "oom", "memory", + Body has "DiskFull" or EventName has "disk", "storage", + Body has "NetworkTimeout" or EventName has "timeout", "network", + "generic" + ) + | extend team = case( + alert_type == "memory", "infrastructure", + alert_type == "storage", "storage-ops", + alert_type == "network", "network-ops", + "on-call" + ) + | extend escalation_minutes = case( + alert_type == "memory", 5, + alert_type == "storage", 15, + 30 + ) + "#; + + let config = RecordsetKqlProcessorConfig { + query: query.to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!(processor.is_ok(), "Complex OR and case query should parse"); + } + + #[test] + fn test_pattern_matching_extend() { + let pipeline_ctx = create_test_pipeline_context(); + + // Query using pattern matching with 'has' operator + let query = r#" + source + | extend service_type = case( + EventName has "http" or Body has "HTTP", "web-service", + EventName has "grpc" or Body has "gRPC", "rpc-service", + EventName has "kafka" or Body has "Kafka", "messaging", + EventName has "postgres" or Body has "PostgreSQL", "database", + "other" + ) + | extend vendor_category = case( + service_type == "web-service", "frontend", + service_type == "rpc-service", "backend", + service_type == "messaging", "integration", + service_type == "database", "data-layer", + "uncategorized" + ) + | project-keep EventName, Body, service_type, vendor_category, SeverityText + "#; + + let config = RecordsetKqlProcessorConfig { + query: query.to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!( + processor.is_ok(), + "Pattern matching query should parse successfully" + ); + } + + #[test] + fn test_body_modification_with_strcat() { + let pipeline_ctx = create_test_pipeline_context(); + + // Query that appends supplemental information to the Body + let query = r#" + source + | extend Body = case( + EventName == "payment.failed" or Body has "payment failed", + strcat(Body, "\n\nTroubleshooting: https://docs.example.com/payment-errors\nSupport: https://support.example.com/ticket"), + EventName == "auth.error" or Body has "authentication", + strcat(Body, "\n\nAuth Guide: https://docs.example.com/auth\nReset Password: https://portal.example.com/reset"), + EventName has "error" or SeverityNumber >= 17, + strcat(Body, "\n\nError Guide: https://docs.example.com/errors"), + Body + ) + "#; + + let config = RecordsetKqlProcessorConfig { + query: query.to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!( + processor.is_ok(), + "Body modification with strcat should parse successfully" + ); + } + + #[test] + fn test_body_enrichment_with_vendor_links() { + let pipeline_ctx = create_test_pipeline_context(); + + // Query that adds vendor-specific links and guidelines based on content + let query = r#" + source + | extend vendor_type = case( + Body has "AWS" or Body has "Amazon", "aws", + Body has "Azure" or Body has "Microsoft", "azure", + Body has "GCP" or Body has "Google", "gcp", + "other" + ) + | extend Body = case( + vendor_type == "aws", + strcat(Body, "\n\n[AWS Guidelines]\nDocs: https://docs.aws.amazon.com\nSupport: https://console.aws.amazon.com/support\nStatus: https://health.aws.amazon.com"), + vendor_type == "azure", + strcat(Body, "\n\n[Azure Guidelines]\nDocs: https://learn.microsoft.com/azure\nSupport: https://portal.azure.com/#blade/Microsoft_Azure_Support\nStatus: https://status.azure.com"), + vendor_type == "gcp", + strcat(Body, "\n\n[GCP Guidelines]\nDocs: https://cloud.google.com/docs\nSupport: https://console.cloud.google.com/support\nStatus: https://status.cloud.google.com"), + Body + ) + "#; + + let config = RecordsetKqlProcessorConfig { + query: query.to_string(), + bridge_options: None, + }; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config); + assert!( + processor.is_ok(), + "Body enrichment with vendor links should parse successfully" + ); + } +} diff --git a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/mod.rs b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/mod.rs new file mode 100644 index 0000000000..8cfe214008 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/mod.rs @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +pub(crate) mod config; +pub(crate) mod processor; + +use crate::experimental::recordset_kql_processor::config::RecordsetKqlProcessorConfig; +use crate::experimental::recordset_kql_processor::processor::RecordsetKqlProcessor; +use crate::pdata::OtapPdata; + +use otap_df_config::{error::Error as ConfigError, node::NodeUserConfig}; +use otap_df_engine::{ + config::ProcessorConfig, context::PipelineContext, node::NodeId, processor::ProcessorWrapper, +}; +use std::sync::Arc; + +// TODO metrics + +/// Factory function to create a KQL processor +pub fn create_recordset_kql_processor( + pipeline_ctx: PipelineContext, + node: NodeId, + node_config: Arc, + processor_config: &ProcessorConfig, +) -> Result, ConfigError> { + let config: RecordsetKqlProcessorConfig = serde_json::from_value(node_config.config.clone()) + .map_err(|e| ConfigError::InvalidUserConfig { + error: format!("Failed to parse KQL configuration: {e}"), + })?; + + let processor = RecordsetKqlProcessor::with_pipeline_ctx(pipeline_ctx, config)?; + + Ok(ProcessorWrapper::local( + processor, + node, + node_config, + processor_config, + )) +} diff --git a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs new file mode 100644 index 0000000000..1800857a59 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs @@ -0,0 +1,1070 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use crate::experimental::recordset_kql_processor::config::RecordsetKqlProcessorConfig; +use crate::experimental::recordset_kql_processor::create_recordset_kql_processor; +use crate::pdata::OtapPdata; + +use async_trait::async_trait; +use data_engine_recordset::RecordSetEngineDiagnosticLevel; +use data_engine_recordset_otlp_bridge::{ + BridgeError, BridgeOptions, BridgePipeline, parse_kql_query_into_pipeline, + process_protobuf_otlp_export_logs_service_request_using_pipeline, +}; +use linkme::distributed_slice; +use otap_df_config::SignalType; +use otap_df_config::error::Error as ConfigError; +use otap_df_engine::{ + ConsumerEffectHandlerExtension, ProcessorFactory, + context::PipelineContext, + control::NackMsg, + error::Error, + local::processor::{EffectHandler, Processor}, + message::Message, +}; +use otap_df_pdata::{OtapPayload, OtlpProtoBytes}; + +/// URN identifier for the processor +pub const RECORDSET_KQL_PROCESSOR_URN: &str = "urn:otel:recordset_kql:processor"; + +/// OTAP KQL Processor +#[allow(unsafe_code)] +#[distributed_slice(crate::OTAP_PROCESSOR_FACTORIES)] +pub static RECORDSET_KQL_PROCESSOR_FACTORY: ProcessorFactory = ProcessorFactory { + name: RECORDSET_KQL_PROCESSOR_URN, + create: create_recordset_kql_processor, +}; + +/// KQL processor that applies KQL queries to telemetry data +pub struct RecordsetKqlProcessor { + config: RecordsetKqlProcessorConfig, + pipeline: BridgePipeline, +} + +impl RecordsetKqlProcessor { + /// Creates a new KQL processor + pub fn with_pipeline_ctx( + _pipeline_ctx: PipelineContext, + config: RecordsetKqlProcessorConfig, + ) -> Result { + let bridge_options = Self::parse_bridge_options(&config.bridge_options)?; + let pipeline = + parse_kql_query_into_pipeline(&config.query, bridge_options).map_err(|errors| { + ConfigError::InvalidUserConfig { + error: format!("Failed to parse KQL query: {:?}", errors), + } + })?; + + otap_df_telemetry::otel_info!( + "Processor.Ready", + processor = "kql", + message = "KQL processor initialized successfully" + ); + + Ok(Self { config, pipeline }) + } + + /// Parse bridge options from JSON value + pub fn parse_bridge_options( + bridge_options_json: &Option, + ) -> Result, ConfigError> { + if let Some(json_value) = bridge_options_json { + let json_str = + serde_json::to_string(json_value).map_err(|e| ConfigError::InvalidUserConfig { + error: format!("Failed to serialize bridge options: {}", e), + })?; + let bridge_options = BridgeOptions::from_json(&json_str).map_err(|e| { + ConfigError::InvalidUserConfig { + error: format!("Failed to parse bridge options: {}", e), + } + })?; + Ok(Some(bridge_options)) + } else { + Ok(None) + } + } + + async fn process_data( + &mut self, + data: OtapPdata, + effect_handler: &mut EffectHandler, + ) -> Result<(), Error> { + let signal = data.signal_type(); + let input_items = data.num_items() as u64; + + // Extract context and payload, convert to OTLP bytes + let (ctx, payload) = data.into_parts(); + let otlp_bytes: OtlpProtoBytes = payload.try_into()?; + + // Process based on signal type + let result = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(bytes) => { + otap_df_telemetry::otel_debug!( + "Processor.ProcessingLogs", + processor = "recordset_kql", + message = "Processing KQL query", + input_items = input_items, + ); + self.process_logs(bytes, signal) + } + OtlpProtoBytes::ExportMetricsRequest(_bytes) => Err(Error::InternalError { + message: "Metrics processing not yet implemented in KQL bridge".to_string(), + }), + OtlpProtoBytes::ExportTracesRequest(_bytes) => Err(Error::InternalError { + message: "Traces processing not yet implemented in KQL bridge".to_string(), + }), + }; + + match result { + Ok(processed_bytes) => { + // Convert back to OtapPayload and reconstruct OtapPdata + let payload: OtapPayload = processed_bytes.into(); + // Note! we are recomputing the number of matched items, which + // the engine could tell us. + let output_items = payload.num_items() as u64; + + otap_df_telemetry::otel_debug!( + "Processor.Success", + processor = "recordset_kql", + input_items = input_items, + output_items = output_items, + ); + + let processed_data = OtapPdata::new(ctx, payload); + + effect_handler.send_message(processed_data).await?; + Ok(()) + } + Err(e) => { + let message = e.to_string(); + otap_df_telemetry::otel_debug!( + "Processor.Failure", + processor = "recordset_kql", + input_items = input_items, + message = message, + ); + + effect_handler + .notify_nack(NackMsg::new( + message, + OtapPdata::new(ctx, OtapPayload::empty(SignalType::Logs)), + )) + .await?; + Err(e) + } + } + } + + fn process_logs( + &mut self, + bytes: bytes::Bytes, + signal: SignalType, + ) -> Result { + let (included_records, _) = + process_protobuf_otlp_export_logs_service_request_using_pipeline( + &self.pipeline, + RecordSetEngineDiagnosticLevel::Warn, + &bytes, + ) + .map_err(|e| Self::map_bridge_error(e, signal))?; + + Ok(OtlpProtoBytes::ExportLogsRequest(included_records.into())) + } + + fn map_bridge_error(error: BridgeError, signal: SignalType) -> Error { + Error::InternalError { + message: format!("KQL bridge error for {:?}: {}", signal, error), + } + } +} + +#[async_trait(?Send)] +impl Processor for RecordsetKqlProcessor { + async fn process( + &mut self, + msg: Message, + effect_handler: &mut EffectHandler, + ) -> Result<(), Error> { + match msg { + Message::PData(data) => self.process_data(data, effect_handler).await, + Message::Control(control_msg) => { + use otap_df_engine::control::NodeControlMsg; + match control_msg { + NodeControlMsg::Config { config } => { + if let Ok(new_config) = + serde_json::from_value::(config) + { + // Re-parse the pipeline (if changed) + if new_config.query != self.config.query + || new_config.bridge_options != self.config.bridge_options + { + let bridge_options = + match Self::parse_bridge_options(&new_config.bridge_options) { + Err(e) => { + otap_df_telemetry::otel_warn!( + "Processor.ReconfigureError", + processor = "kql", + message = format!("{e}") + ); + None + } + Ok(v) => v, + }; + + match parse_kql_query_into_pipeline( + &new_config.query, + bridge_options, + ) { + Ok(pipeline) => { + otap_df_telemetry::otel_info!( + "Processor.Reconfigured", + processor = "kql", + ); + + self.pipeline = pipeline; + self.config = new_config; + } + Err(errors) => { + let message = + format!("Failed to parse updated query: {:?}", errors); + otap_df_telemetry::otel_error!( + "Processor.ReconfigureError", + processor = "kql", + message = message, + ); + } + } + } else { + self.config = new_config; + } + } + Ok(()) + } + NodeControlMsg::Shutdown { .. } => Ok(()), + _ => Ok(()), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::pdata::OtapPdata; + use bytes::BytesMut; + use otap_df_config::node::NodeUserConfig; + use otap_df_engine::context::ControllerContext; + use otap_df_engine::message::Message; + use otap_df_engine::testing::{node::test_node, processor::TestRuntime}; + use otap_df_pdata::OtlpProtoBytes; + use otap_df_pdata::proto::opentelemetry::{ + collector::logs::v1::ExportLogsServiceRequest, + common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value::*}, + logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber}, + resource::v1::Resource, + }; + use otap_df_telemetry::registry::MetricsRegistryHandle; + use prost::Message as _; + use serde_json::json; + + // Helper functions for test setup and common operations + + /// Builds a log request with the given log records + fn build_log_request(log_records: Vec) -> ExportLogsServiceRequest { + ExportLogsServiceRequest::new(vec![ResourceLogs::new( + Resource { + ..Default::default() + }, + vec![ScopeLogs::new( + InstrumentationScope { + ..Default::default() + }, + log_records, + )], + )]) + } + + /// Builds a single log record with the given attributes + fn build_log_with_attrs(log_attrs: Vec) -> ExportLogsServiceRequest { + build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .severity_number(SeverityNumber::Info) + .event_name("") + .attributes(log_attrs) + .finish(), + ]) + } + + /// Runs a KQL processor test with custom validation + fn run_kql_test(input: ExportLogsServiceRequest, query: &str, validate: F) + where + F: FnOnce(ExportLogsServiceRequest) + 'static, + { + run_kql_test_with_bridge_options(input, query, None, validate); + } + + /// Runs a KQL processor test with bridge options and custom validation + fn run_kql_test_with_bridge_options( + input: ExportLogsServiceRequest, + query: &str, + bridge_options: Option, + validate: F, + ) where + F: FnOnce(ExportLogsServiceRequest) + 'static, + { + let metrics_registry_handle = MetricsRegistryHandle::new(); + let controller_ctx = ControllerContext::new(metrics_registry_handle); + let pipeline_ctx = + controller_ctx.pipeline_context_with("grp".into(), "pipeline".into(), 0, 0); + + let node = test_node("recordset-kql-processor-test"); + let rt: TestRuntime = TestRuntime::new(); + let mut node_config = NodeUserConfig::new_processor_config(RECORDSET_KQL_PROCESSOR_URN); + + node_config.config = if let Some(opts) = bridge_options { + json!({ "query": query, "bridge_options": opts }) + } else { + json!({ "query": query }) + }; + + let proc = + create_recordset_kql_processor(pipeline_ctx, node, Arc::new(node_config), rt.config()) + .expect("create processor"); + let phase = rt.set_processor(proc); + + phase + .run_test(|mut ctx| async move { + let mut bytes = BytesMut::new(); + input.encode(&mut bytes).expect("encode"); + let bytes = bytes.freeze(); + let pdata_in = + OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into()); + ctx.process(Message::PData(pdata_in)) + .await + .expect("process"); + + let out = ctx.drain_pdata().await; + let first = out.into_iter().next().expect("one output").payload(); + + let otlp_bytes: OtlpProtoBytes = first.try_into().expect("convert to otlp"); + let bytes = match otlp_bytes { + OtlpProtoBytes::ExportLogsRequest(b) => b, + _ => panic!("unexpected otlp variant"), + }; + let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode"); + + validate(decoded); + }) + .validate(|_| async move {}); + } + + /// Tests a KQL processor with expected log record count + fn test_kql_with_count(input: ExportLogsServiceRequest, query: &str, expected_count: usize) { + run_kql_test(input, query, move |decoded| { + let total_count: usize = decoded + .resource_logs + .iter() + .flat_map(|rl| &rl.scope_logs) + .map(|sl| sl.log_records.len()) + .sum(); + assert_eq!( + total_count, expected_count, + "Expected {} log records, got {}", + expected_count, total_count + ); + }); + } + + #[test] + fn test_extend_operators() { + // Test basic extend with literals, conditionals (iif, case), and expressions + let input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("error")), + KeyValue::new("Priority", AnyValue::new_int(9)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("warning")), + KeyValue::new("Priority", AnyValue::new_int(5)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("info")), + KeyValue::new("Priority", AnyValue::new_int(2)), + ]) + .finish(), + ]); + + run_kql_test( + input, + r#"source + | extend Literal = 'constant' + | extend Body = 'transformed' + | extend IsHighPriority = (Priority > 7) + | extend AlertLevel = iif(Level == 'error', 'HIGH', 'NORMAL') + | extend Severity = case(Level == 'error', 'critical', Level == 'warning', 'medium', 'low') + "#, + |decoded| { + let log_records = &decoded.resource_logs[0].scope_logs[0].log_records; + assert_eq!(log_records.len(), 3); + + // Verify first record (error, priority 9) + let rec1 = &log_records[0]; + assert_eq!( + rec1.attributes + .iter() + .find(|kv| kv.key == "Literal") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("constant".to_string()) + ); + assert_eq!( + rec1.body.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("transformed".to_string()) + ); + assert_eq!( + rec1.attributes + .iter() + .find(|kv| kv.key == "IsHighPriority") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &BoolValue(true) + ); + assert_eq!( + rec1.attributes + .iter() + .find(|kv| kv.key == "AlertLevel") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("HIGH".to_string()) + ); + assert_eq!( + rec1.attributes + .iter() + .find(|kv| kv.key == "Severity") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("critical".to_string()) + ); + + // Verify second record (warning, priority 5) + let rec2 = &log_records[1]; + assert_eq!( + rec2.attributes + .iter() + .find(|kv| kv.key == "IsHighPriority") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &BoolValue(false) + ); + assert_eq!( + rec2.attributes + .iter() + .find(|kv| kv.key == "AlertLevel") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("NORMAL".to_string()) + ); + assert_eq!( + rec2.attributes + .iter() + .find(|kv| kv.key == "Severity") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("medium".to_string()) + ); + + // Verify third record (info, priority 2) + let rec3 = &log_records[2]; + assert_eq!( + rec3.attributes + .iter() + .find(|kv| kv.key == "Severity") + .unwrap() + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &StringValue("low".to_string()) + ); + }, + ); + } + + #[test] + fn test_where_operators() { + // Test string operators: has, contains, in, !=, =~ + let string_input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .body(AnyValue::new_string("ERROR in payment system")) + .attributes(vec![KeyValue::new("Status", AnyValue::new_string("error"))]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .body(AnyValue::new_string("warning: high load")) + .attributes(vec![KeyValue::new( + "Status", + AnyValue::new_string("warning"), + )]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .body(AnyValue::new_string("info: normal operation")) + .attributes(vec![KeyValue::new("Status", AnyValue::new_string("info"))]) + .finish(), + ]); + test_kql_with_count(string_input.clone(), "source | where Body has 'payment'", 1); + test_kql_with_count( + string_input.clone(), + "source | where Body contains 'ERROR'", + 1, + ); + test_kql_with_count( + string_input.clone(), + "source | where Status in ('error', 'warning')", + 2, + ); + test_kql_with_count(string_input.clone(), "source | where Status != 'info'", 2); + test_kql_with_count(string_input, "source | where Status =~ 'ERROR'", 1); + + // Test comparison operators: >, >=, <, <=, == + let numeric_input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(8))]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(5))]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(3))]) + .finish(), + ]); + test_kql_with_count(numeric_input.clone(), "source | where Priority > 5", 1); + test_kql_with_count(numeric_input.clone(), "source | where Priority >= 5", 2); + test_kql_with_count(numeric_input.clone(), "source | where Priority < 5", 1); + test_kql_with_count(numeric_input.clone(), "source | where Priority <= 5", 2); + test_kql_with_count(numeric_input, "source | where Priority == 5", 1); + + // Test logical operators: and, or + let logical_input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("error")), + KeyValue::new("Priority", AnyValue::new_int(10)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("error")), + KeyValue::new("Priority", AnyValue::new_int(3)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("info")), + KeyValue::new("Priority", AnyValue::new_int(9)), + ]) + .finish(), + ]); + test_kql_with_count( + logical_input.clone(), + "source | where Level == 'error' and Priority > 5", + 1, + ); + test_kql_with_count( + logical_input.clone(), + "source | where Level == 'error' or Priority >= 9", + 3, + ); + test_kql_with_count(logical_input, "source | where false", 0); + } + + #[test] + fn test_project_operators() { + // Test project (select and transform columns) + let project_input = build_log_with_attrs(vec![ + KeyValue::new("Foo", AnyValue::new_string("hello")), + KeyValue::new("Bar", AnyValue::new_string("world")), + KeyValue::new("Baz", AnyValue::new_string("test")), + ]); + run_kql_test( + project_input, + "source | project Foo, NewBar = strcat(Bar, '!')", + |decoded| { + let log_record = &decoded.resource_logs[0].scope_logs[0].log_records[0]; + let foo_attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "Foo") + .expect("Foo not found"); + assert_eq!( + foo_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("hello".to_string()) + ); + let new_bar_attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "NewBar") + .expect("NewBar not found"); + assert_eq!( + new_bar_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("world!".to_string()) + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Bar"), + "Bar should not exist" + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Baz"), + "Baz should not exist" + ); + }, + ); + + // Test project-away (remove specified columns) + let project_away_input = build_log_with_attrs(vec![ + KeyValue::new("Foo", AnyValue::new_string("hello")), + KeyValue::new("Bar", AnyValue::new_string("world")), + KeyValue::new("Baz", AnyValue::new_string("test")), + ]); + run_kql_test( + project_away_input, + "source | project-away Bar, Baz", + |decoded| { + let log_record = &decoded.resource_logs[0].scope_logs[0].log_records[0]; + let foo_attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "Foo") + .expect("Foo should exist"); + assert_eq!( + foo_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("hello".to_string()) + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Bar"), + "Bar should be removed" + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Baz"), + "Baz should be removed" + ); + }, + ); + + // Test project-rename (rename columns) + let project_rename_input = build_log_with_attrs(vec![ + KeyValue::new("Foo", AnyValue::new_string("hello")), + KeyValue::new("Bar", AnyValue::new_string("world")), + ]); + run_kql_test( + project_rename_input, + "source | project-rename NewFoo = Foo, NewBar = Bar", + |decoded| { + let log_record = &decoded.resource_logs[0].scope_logs[0].log_records[0]; + let new_foo_attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "NewFoo") + .expect("NewFoo should exist"); + assert_eq!( + new_foo_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("hello".to_string()) + ); + let new_bar_attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "NewBar") + .expect("NewBar should exist"); + assert_eq!( + new_bar_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("world".to_string()) + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Foo"), + "Foo should be renamed" + ); + assert!( + !log_record.attributes.iter().any(|kv| kv.key == "Bar"), + "Bar should be renamed" + ); + }, + ); + } + + #[test] + fn test_attributes_syntax() { + // Test that Attributes['key'] syntax is identical to just 'key' + let input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![ + KeyValue::new("Status", AnyValue::new_string("error")), + KeyValue::new("Priority", AnyValue::new_int(10)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![ + KeyValue::new("Status", AnyValue::new_string("warning")), + KeyValue::new("Priority", AnyValue::new_int(5)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![ + KeyValue::new("Status", AnyValue::new_string("info")), + KeyValue::new("Priority", AnyValue::new_int(3)), + ]) + .finish(), + ]); + + // Test where clause: direct reference vs Attributes[] syntax + test_kql_with_count(input.clone(), "source | where Status == 'error'", 1); + test_kql_with_count( + input.clone(), + "source | where Attributes['Status'] == 'error'", + 1, + ); + + test_kql_with_count(input.clone(), "source | where Priority > 5", 1); + test_kql_with_count( + input.clone(), + "source | where Attributes['Priority'] > 5", + 1, + ); + + // Test extend: both syntaxes should work identically + run_kql_test( + input.clone(), + "source | extend NewStatus = Status | project NewStatus", + |decoded| { + let log_record = &decoded.resource_logs[0].scope_logs[0].log_records[0]; + let attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "NewStatus") + .expect("NewStatus not found"); + assert_eq!( + attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("error".to_string()) + ); + }, + ); + + run_kql_test( + input.clone(), + "source | extend NewStatus = Attributes['Status'] | project NewStatus", + |decoded| { + let log_record = &decoded.resource_logs[0].scope_logs[0].log_records[0]; + let attr = log_record + .attributes + .iter() + .find(|kv| kv.key == "NewStatus") + .expect("NewStatus not found"); + assert_eq!( + attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("error".to_string()) + ); + }, + ); + + // Test summarize: grouping by direct vs Attributes[] syntax + run_kql_test( + input.clone(), + "source | summarize Count = count() by Status", + |decoded| { + let summary_records = &decoded.resource_logs[1].scope_logs[0].log_records; + assert_eq!(summary_records.len(), 3); + }, + ); + + run_kql_test( + input, + "source | summarize Count = count() by Attributes['Status']", + |decoded| { + let summary_records = &decoded.resource_logs[1].scope_logs[0].log_records; + assert_eq!(summary_records.len(), 3); + }, + ); + } + + #[test] + fn test_summarize_basic() { + // Test basic aggregation with count() and group by + let input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("error")), + KeyValue::new("Priority", AnyValue::new_int(8)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("info")), + KeyValue::new("Priority", AnyValue::new_int(3)), + ]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![ + KeyValue::new("Level", AnyValue::new_string("info")), + KeyValue::new("Priority", AnyValue::new_int(5)), + ]) + .finish(), + ]); + + run_kql_test( + input, + "source | summarize Count = count(), MaxPriority = max(Priority) by Level | where Count > 1", + |decoded| { + assert_eq!(decoded.resource_logs.len(), 2); + assert_eq!(decoded.resource_logs[0].scope_logs[0].log_records.len(), 0); + + let summary_records = &decoded.resource_logs[1].scope_logs[0].log_records; + assert_eq!( + summary_records.len(), + 1, + "Should only have info level (Count > 1)" + ); + + let record = &summary_records[0]; + let level_attr = record + .attributes + .iter() + .find(|kv| kv.key == "Level") + .expect("Level not found"); + assert_eq!( + level_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &StringValue("info".to_string()) + ); + + let count_attr = record + .attributes + .iter() + .find(|kv| kv.key == "Count") + .expect("Count not found"); + assert_eq!( + count_attr.value.as_ref().unwrap().value.as_ref().unwrap(), + &IntValue(2) + ); + + let max_priority_attr = record + .attributes + .iter() + .find(|kv| kv.key == "MaxPriority") + .expect("MaxPriority not found"); + assert_eq!( + max_priority_attr + .value + .as_ref() + .unwrap() + .value + .as_ref() + .unwrap(), + &IntValue(5) + ); + }, + ); + } + + #[test] + fn test_summarize_bin_timestamp() { + use std::time::{SystemTime, UNIX_EPOCH}; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + let minute_nanos = 60_000_000_000u64; + let base_time = (now / minute_nanos) * minute_nanos; + + let input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(base_time) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(5))]) + .finish(), + LogRecord::build() + .time_unix_nano(base_time + 1000) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(8))]) + .finish(), + LogRecord::build() + .time_unix_nano(base_time + minute_nanos) + .attributes(vec![KeyValue::new("Priority", AnyValue::new_int(3))]) + .finish(), + ]); + + run_kql_test( + input, + "source | summarize MaxPriority = max(Priority) by bin(Timestamp, 1m)", + |decoded| { + assert_eq!(decoded.resource_logs.len(), 2); + assert_eq!(decoded.resource_logs[0].scope_logs[0].log_records.len(), 0); + + let summary_records = &decoded.resource_logs[1].scope_logs[0].log_records; + assert_eq!(summary_records.len(), 2); + + let priorities: Vec = summary_records + .iter() + .map(|r| { + let attr = r + .attributes + .iter() + .find(|kv| kv.key == "MaxPriority") + .expect("MaxPriority not found"); + match attr.value.as_ref().unwrap().value.as_ref().unwrap() { + IntValue(v) => *v, + _ => panic!("MaxPriority should be int"), + } + }) + .collect(); + + assert!(priorities.contains(&8), "Missing first minute (priority 8)"); + assert!( + priorities.contains(&3), + "Missing second minute (priority 3)" + ); + }, + ); + } + + #[test] + fn test_summarize_bin_custom_timestamp() { + use std::time::{SystemTime, UNIX_EPOCH}; + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let base_time_secs = (now / 60) * 60; + let base_time_rfc3339 = chrono::DateTime::from_timestamp(base_time_secs as i64, 0) + .unwrap() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + let next_minute_rfc3339 = chrono::DateTime::from_timestamp((base_time_secs + 60) as i64, 0) + .unwrap() + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + + let input = build_log_request(vec![ + LogRecord::build() + .time_unix_nano(1u64) + .attributes(vec![KeyValue::new( + "CustomTimestamp", + AnyValue::new_string(&base_time_rfc3339), + )]) + .finish(), + LogRecord::build() + .time_unix_nano(2u64) + .attributes(vec![KeyValue::new( + "CustomTimestamp", + AnyValue::new_string(&base_time_rfc3339), + )]) + .finish(), + LogRecord::build() + .time_unix_nano(3u64) + .attributes(vec![KeyValue::new( + "CustomTimestamp", + AnyValue::new_string(&next_minute_rfc3339), + )]) + .finish(), + ]); + + // This test hinges on the ability of engine to recognize CustomTimestamp as a DateTime + // Because of the input schema, query doesn't need to use todatetime() wrapper + let bridge_options = json!({ + "attributes_schema": { + "schema": { + "CustomTimestamp": "DateTime" + }, + "options": { + "allow_undefined_keys": true + } + } + }); + + run_kql_test_with_bridge_options( + input, + "source | summarize EventCount = count() by bin(CustomTimestamp, 1m)", + Some(bridge_options), + move |decoded| { + assert_eq!(decoded.resource_logs.len(), 2); + let summary_records = &decoded.resource_logs[1].scope_logs[0].log_records; + assert_eq!(summary_records.len(), 2); + + let counts: Vec = summary_records + .iter() + .map(|r| { + let attr = r + .attributes + .iter() + .find(|kv| kv.key == "EventCount") + .expect("EventCount not found"); + match attr.value.as_ref().unwrap().value.as_ref().unwrap() { + IntValue(v) => *v, + _ => panic!("EventCount should be int"), + } + }) + .collect(); + + assert!(counts.contains(&2), "Should have a bin with count=2"); + assert!(counts.contains(&1), "Should have a bin with count=1"); + }, + ); + } +}