diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 3557443a2e..bcb80a4527 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -33,7 +33,7 @@ use otap_df_engine::{ }; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; use otap_df_query_engine::pipeline::Pipeline; -use otap_df_telemetry::metrics::MetricSet; +use otap_df_telemetry::{metrics::MetricSet, otel_debug}; use serde_json::Value; use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata}; @@ -173,21 +173,25 @@ impl Processor for TransformProcessor { } }, Message::PData(pdata) => { - self.metrics.msgs_consumed.inc(); let (context, payload) = pdata.into_parts(); + self.metrics.items_received.add(payload.num_items() as u64); let payload = if !self.should_process(&payload) { // skip handling this pdata + otel_debug!("transform.skipped"); payload } else { let mut otap_batch: OtapArrowRecords = payload.try_into()?; + let num_items = otap_batch.num_items() as u64; otap_batch.decode_transport_optimized_ids()?; match self.pipeline.execute(otap_batch).await { Ok(otap_batch) => { - self.metrics.msgs_transformed.inc(); + self.metrics + .items_transformed + .add(otap_batch.num_items() as u64); otap_batch.into() } Err(e) => { - self.metrics.msgs_transform_failed.inc(); + self.metrics.items_failed.add(num_items); return Err(EngineError::ProcessorError { processor: effect_handler.processor_id(), kind: ProcessorErrorKind::Other, @@ -198,10 +202,11 @@ impl Processor for TransformProcessor { } }; + let num_forwarded = payload.num_items() as u64; effect_handler .send_message(OtapPdata::new(context, payload)) .await - .inspect(|_| self.metrics.msgs_forwarded.inc())?; + .inspect(|_| self.metrics.items_sent.add(num_forwarded))?; } }; @@ -347,29 +352,29 @@ mod test { // Allow the collector to pull from the channel tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let mut msgs_consumed = 0; - let mut msgs_forwarded = 0; - let mut msgs_transformed = 0; - let mut msgs_transform_failed = 0; + let mut items_received = 0; + let mut items_sent = 0; + let mut items_transformed = 0; + let mut items_failed = 0; registry.visit_current_metrics(|desc, _attrs, iter| { if desc.name == "transform.processor.metrics" { for (field, v) in iter { let val = v.to_u64_lossy(); match field.name { - "msgs.consumed" => msgs_consumed += val, - "msgs.forwarded" => msgs_forwarded += val, - "msgs.transformed" => msgs_transformed += val, - "msgs.transform.failed" => msgs_transform_failed += val, + "items.received" => items_received += val, + "items.sent" => items_sent += val, + "items.transformed" => items_transformed += val, + "items.failed" => items_failed += val, _ => {} } } } }); - assert_eq!(msgs_consumed, 1); - assert_eq!(msgs_forwarded, 1); - assert_eq!(msgs_transformed, 1); - assert_eq!(msgs_transform_failed, 0) + assert_eq!(items_received, 2); + assert_eq!(items_sent, 1); + assert_eq!(items_transformed, 1); + assert_eq!(items_failed, 0) }); } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/metrics.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/metrics.rs index a5fa7576a2..b834813882 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/metrics.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/metrics.rs @@ -10,19 +10,19 @@ use otap_df_telemetry_macros::metric_set; #[metric_set(name = "transform.processor.metrics")] #[derive(Debug, Default, Clone)] pub struct Metrics { - /// PData messages consumed by this processor. - #[metric(unit = "{msg}")] - pub msgs_consumed: Counter, + /// Telemetry items (log records, spans, or metrics) received by this processor. + #[metric(unit = "{item}")] + pub items_received: Counter, - /// PData messages forwarded by this processor. - #[metric(unit = "{msg}")] - pub msgs_forwarded: Counter, + /// Telemetry items sent to the next processor in the pipeline. + #[metric(unit = "{item}")] + pub items_sent: Counter, - /// Number of messages successfully transformed. - #[metric(unit = "{msg}")] - pub msgs_transformed: Counter, + /// Telemetry items successfully transformed. + #[metric(unit = "{item}")] + pub items_transformed: Counter, - /// Number of failed transform attempts. - #[metric(unit = "{msg}")] - pub msgs_transform_failed: Counter, + /// Telemetry items that failed during transformation. + #[metric(unit = "{item}")] + pub items_failed: Counter, }