Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,48 @@ use crate::*;
const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION");
const OTLP_BUFFER_INITIAL_CAPACITY: usize = 1024 * 64;

static EXPRESSIONS: LazyLock<RwLock<Vec<(ParserOptions, PipelineExpression)>>> =
static EXPRESSIONS: LazyLock<RwLock<Vec<BridgePipeline>>> =
LazyLock::new(|| RwLock::new(Vec::new()));

#[derive(Debug)]
pub struct BridgePipeline {
attributes_schema: Option<ParserMapSchema>,
pipeline: PipelineExpression,
}

impl BridgePipeline {
pub fn get_pipeline(&self) -> &PipelineExpression {
&self.pipeline
}
}

pub fn parse_kql_query_into_pipeline(
query: &str,
options: Option<BridgeOptions>,
) -> Result<PipelineExpression, Vec<ParserError>> {
let result =
KqlParser::parse_with_options(query, build_parser_options(options).map_err(|e| vec![e])?)?;
Ok(result.pipeline)
) -> Result<BridgePipeline, Vec<ParserError>> {
let options = build_parser_options(options).map_err(|e| vec![e])?;
let attributes_schema = match options
.get_source_map_schema()
.and_then(|s| s.get_schema_for_key("Attributes"))
{
Some(ParserMapKeySchema::Map(Some(attributes_schema))) => Some(attributes_schema.clone()),
_ => None,
};
let result = KqlParser::parse_with_options(query, options)?;
Ok(BridgePipeline {
attributes_schema,
pipeline: result.pipeline,
})
}

pub fn register_pipeline_for_kql_query(
query: &str,
options: Option<BridgeOptions>,
) -> Result<usize, Vec<ParserError>> {
let options = build_parser_options(options).map_err(|e| vec![e])?;

let result = KqlParser::parse_with_options(query, options.clone())?;
let pipeline = result.pipeline;
let pipeline = parse_kql_query_into_pipeline(query, options)?;

let mut expressions = EXPRESSIONS.write().unwrap();
expressions.push((options, pipeline));
expressions.push(pipeline);
Ok(expressions.len() - 1)
}

Expand All @@ -47,24 +66,32 @@ pub fn process_protobuf_otlp_export_logs_service_request_using_registered_pipeli

let pipeline_registration = expressions.get(pipeline);

let (options, pipeline) = match pipeline_registration {
let pipeline = match pipeline_registration {
Some(v) => v,
None => return Err(BridgeError::PipelineNotFound(pipeline)),
};

process_protobuf_otlp_export_logs_service_request_using_pipeline(
pipeline,
log_level,
export_logs_service_request_protobuf_data,
)
}

pub fn process_protobuf_otlp_export_logs_service_request_using_pipeline(
pipeline: &BridgePipeline,
log_level: RecordSetEngineDiagnosticLevel,
export_logs_service_request_protobuf_data: &[u8],
) -> Result<(Vec<u8>, Vec<u8>), BridgeError> {
let request =
ExportLogsServiceRequest::from_protobuf(export_logs_service_request_protobuf_data);

if let Err(e) = request {
return Err(BridgeError::OtlpProtobufReadError(e));
}

match process_export_logs_service_request_using_pipeline(
Some(options),
pipeline,
log_level,
request.unwrap(),
) {
match process_export_logs_service_request_using_pipeline(pipeline, log_level, request.unwrap())
{
Ok((included, dropped)) => {
let mut included_records_otlp_response = Vec::new();
if let Some(ref included) = included {
Expand Down Expand Up @@ -101,8 +128,7 @@ pub fn process_protobuf_otlp_export_logs_service_request_using_registered_pipeli
}

pub fn process_export_logs_service_request_using_pipeline(
options: Option<&ParserOptions>,
pipeline: &PipelineExpression,
pipeline: &BridgePipeline,
log_level: RecordSetEngineDiagnosticLevel,
mut export_logs_service_request: ExportLogsServiceRequest,
) -> Result<
Expand All @@ -117,14 +143,10 @@ pub fn process_export_logs_service_request_using_pipeline(
);

let mut batch = engine
.begin_batch(pipeline)
.begin_batch(&pipeline.pipeline)
.map_err(|e| BridgeError::PipelineInitializationError(e.to_string()))?;

if let Some(options) = options
&& let Some(ParserMapKeySchema::Map(Some(attributes_schema))) = options
.get_source_map_schema()
.and_then(|s| s.get_schema_for_key("Attributes"))
{
if let Some(attributes_schema) = &pipeline.attributes_schema {
// Note: This block is a not-so-elegant fix to OTLP not supporting
// roundtrip of extended types. What we do is if we know something is a
// DateTime via the schema and the value is a string we will try to
Expand Down Expand Up @@ -589,7 +611,6 @@ mod tests {

let (included_records, dropped_records) =
process_export_logs_service_request_using_pipeline(
None,
&pipeline,
RecordSetEngineDiagnosticLevel::Verbose,
request,
Expand All @@ -610,7 +631,6 @@ mod tests {

let (included_records, dropped_records) =
process_export_logs_service_request_using_pipeline(
None,
&pipeline,
RecordSetEngineDiagnosticLevel::Verbose,
request,
Expand All @@ -632,7 +652,6 @@ mod tests {

let (included_records, dropped_records) =
process_export_logs_service_request_using_pipeline(
None,
&pipeline,
RecordSetEngineDiagnosticLevel::Verbose,
request,
Expand Down Expand Up @@ -721,24 +740,19 @@ mod tests {
)),
);

let options = build_parser_options(Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("TimeGenerated", ParserMapKeySchema::DateTime),
),
))
.unwrap();

let pipeline = KqlParser::parse_with_options(
let pipeline = parse_kql_query_into_pipeline(
"source | where gettype(TimeGenerated) == 'datetime'",
options.clone(),
Some(
BridgeOptions::new().with_attributes_schema(
ParserMapSchema::new()
.with_key_definition("TimeGenerated", ParserMapKeySchema::DateTime),
),
),
)
.unwrap()
.pipeline;
.unwrap();

let (included_records, dropped_records) =
process_export_logs_service_request_using_pipeline(
Some(&options),
&pipeline,
RecordSetEngineDiagnosticLevel::Verbose,
request,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

use data_engine_expressions::*;
use data_engine_recordset::*;
use data_engine_recordset_otlp_bridge::BridgePipeline;

pub(crate) fn process_records<'a, TRecords, TRecord>(
pipeline: &'a PipelineExpression,
pipeline: &'a BridgePipeline,
engine: &RecordSetEngine,
records: &mut TRecords,
) -> RecordSetEngineResults<'a, TRecord>
Expand All @@ -16,9 +16,9 @@ where
println!("Request:");
println!("{records:?}");

println!("{pipeline}");
println!("{}", pipeline.get_pipeline());

let mut batch = engine.begin_batch(pipeline).unwrap();
let mut batch = engine.begin_batch(pipeline.get_pipeline()).unwrap();

let dropped_records = batch.push_records(records);

Expand Down
7 changes: 6 additions & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ zip = "=4.2.0"
byte-unit = "5.2.0"
cpu-time = "1.0.0"

# Azure Monnitor Exporter
# Recordset KQL OTLP query_engine Processor
data_engine_recordset_otlp_bridge = { path = "../experimental/query_engine/engine-recordset-otlp-bridge" }
data_engine_recordset = { path = "../experimental/query_engine/engine-recordset" }

# Azure Monitor Exporter
azure_core = {version = "0.30.1", default-features = false }
azure_identity = {version = "0.30.0", default-features = false }
flate2 = "1.1.5"
Expand Down Expand Up @@ -199,6 +203,7 @@ azure-monitor-exporter = ["otap-df-otap/azure-monitor-exporter"]
# Experimental processors (opt-in)
experimental-processors = ["otap-df-otap/experimental-processors"]
condense-attributes-processor = ["otap-df-otap/condense-attributes-processor"]
recordset-kql-processor = ["otap-df-otap/recordset-kql-processor"]

[workspace.lints.rust]
# General compatibility lints
Expand Down
63 changes: 63 additions & 0 deletions rust/otap-dataflow/configs/fake-kql-debug-noop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
nodes:
receiver:
kind: receiver
plugin_urn: "urn:otel:otap:fake_data_generator:receiver"
out_ports:
out_port:
destinations:
- kql
dispatch_strategy: round_robin
config:
traffic_config:
max_signal_count: 100
max_batch_size: 50
signals_per_second: 10
log_weight: 100
metric_weight: 0
trace_weight: 0
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]

kql:
kind: processor
plugin_urn: "urn:otel:recordset_kql:processor"
out_ports:
out_port:
destinations:
- debug
dispatch_strategy: round_robin
config:
query: |
source
| extend Body = case(
EventName == "azure.resource.log",
strcat(Body, "\n\nTroubleshooting resource errors: https://docs.example.com/exporter-errors#kql-processor"),
Body
)

debug:
kind: processor
plugin_urn: "urn:otel:debug:processor"
out_ports:
out_port:
destinations:
- noop
dispatch_strategy: round_robin
config:
verbosity: detailed
mode: signal

noop:
kind: exporter
plugin_urn: "urn:otel:noop:exporter"
config:

service:
telemetry:
logs:
level: "error"
processors:
- batch:
exporter:
console:
resource:
service.name: "kql-processor-demo"
9 changes: 9 additions & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ tower = { workspace = true }
# Geneva exporter dependencies
geneva-uploader = { version = "0.3.0", optional = true }

# Recordset KQL OTLP query_engine Processor dependencies
data_engine_recordset_otlp_bridge = { workspace = true, optional = true }
data_engine_recordset = { workspace = true, optional = true }

# Azure Monitor Exporter dependencies
azure_identity = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
Expand Down Expand Up @@ -105,6 +109,11 @@ azure = ["object_store/azure", "object_store/cloud", "dep:azure_identity", "dep:
# Experimental processors
experimental-processors = []
condense-attributes-processor = ["experimental-processors"]
recordset-kql-processor = [
"experimental-processors",
"dep:data_engine_recordset_otlp_bridge",
"dep:data_engine_recordset"
]

[dev-dependencies]
flume.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions rust/otap-dataflow/crates/otap/src/experimental/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ pub mod azure_monitor_exporter;
/// Condense Attributes processor
#[cfg(feature = "condense-attributes-processor")]
pub mod condense_attributes_processor;

/// Recordset KQL OTLP Query Engine processor
#[cfg(feature = "recordset-kql-processor")]
pub mod recordset_kql_processor;
Loading
Loading