diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 852b350b27df..094f34170a30 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -106,6 +106,17 @@ async fn explain_analyze_baseline_metrics() { || plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() } + fn expected_to_have_output_bytes(plan: &dyn ExecutionPlan) -> bool { + use datafusion::physical_plan; + + if let Some(projection_exec) = plan.as_any().downcast_ref::() { + // If the projection is empty, there are no buffers used, so we don't expect any output bytes + if projection_exec.schema().fields().len() == 0 { + return false; + } + } + true + } // Validate that the recorded elapsed compute time was more than // zero for all operators as well as the start/end timestamp are set @@ -130,6 +141,14 @@ async fn explain_analyze_baseline_metrics() { DisplayableExecutionPlan::with_metrics(plan).one_line() ); + let output_bytes = metrics.output_bytes().unwrap(); + + assert!( + if expected_to_have_output_bytes(plan) { output_bytes > 0 } else { output_bytes == 0 }, + "plan: {}", + DisplayableExecutionPlan::with_metrics(plan).one_line() + ); + let mut saw_start = false; let mut saw_end = false; metrics.iter().for_each(|m| match m.value() { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 14b2d0a932c2..9f8113c4e6f1 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1829,6 +1829,7 @@ mod tests { let metrics = merged_aggregate.metrics().unwrap(); let output_rows = metrics.output_rows().unwrap(); + let output_bytes = metrics.output_bytes().unwrap(); let spill_count = metrics.spill_count().unwrap(); let spilled_bytes = metrics.spilled_bytes().unwrap(); let spilled_rows = metrics.spilled_rows().unwrap(); @@ -1837,12 +1838,14 @@ mod tests { // When spilling, the output rows metrics become partial output size + final output size // This is because final aggregation starts while partial aggregation is still emitting assert_eq!(8, output_rows); + assert!(output_bytes > 0); assert!(spill_count > 0); assert!(spilled_bytes > 0); assert!(spilled_rows > 0); } else { assert_eq!(3, output_rows); + assert!(output_bytes > 0); assert_eq!(0, spill_count); assert_eq!(0, spilled_bytes); diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 91af03bf46df..b2be60f2b09b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -492,6 +492,7 @@ mod lazy_memory_tests { // Verify metrics match actual output assert_eq!(metrics.output_rows().unwrap(), expected_rows); + assert!(metrics.output_bytes().unwrap() > 0); assert!(metrics.elapsed_compute().unwrap() > 0); } diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index de436d0e4f5c..3bb0e3024a33 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -21,6 +21,8 @@ use std::task::Poll; use arrow::record_batch::RecordBatch; +use crate::spill::get_record_batch_memory_size; + use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp}; use datafusion_common::Result; @@ -53,6 +55,9 @@ pub struct BaselineMetrics { /// output rows: the total output rows output_rows: Count, + + /// output bytes: the total output bytes + output_bytes: Count, } impl BaselineMetrics { @@ -65,6 +70,7 @@ impl BaselineMetrics { end_time: MetricBuilder::new(metrics).end_timestamp(partition), elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition), output_rows: MetricBuilder::new(metrics).output_rows(partition), + output_bytes: MetricBuilder::new(metrics).output_bytes(partition), } } @@ -78,6 +84,7 @@ impl BaselineMetrics { end_time: Default::default(), elapsed_compute: self.elapsed_compute.clone(), output_rows: Default::default(), + output_bytes: Default::default(), } } @@ -91,6 +98,11 @@ impl BaselineMetrics { &self.output_rows } + /// return the metric for the total number of output bytes produced + pub fn output_bytes(&self) -> &Count { + &self.output_bytes + } + /// Records the fact that this operator's execution is complete /// (recording the `end_time` metric). /// @@ -102,12 +114,13 @@ impl BaselineMetrics { self.end_time.record() } - /// Record that some number of rows have been produced as output + /// Record that some number of rows and bytes have been produced as output /// /// See the [`RecordOutput`] for conveniently recording record /// batch output for other thing - pub fn record_output(&self, num_rows: usize) { + pub fn record_output(&self, num_rows: usize, num_bytes: usize) { self.output_rows.add(num_rows); + self.output_bytes.add(num_bytes); } /// If not previously recorded `done()`, record @@ -178,23 +191,16 @@ pub trait RecordOutput { fn record_output(self, bm: &BaselineMetrics) -> Self; } -impl RecordOutput for usize { - fn record_output(self, bm: &BaselineMetrics) -> Self { - bm.record_output(self); - self - } -} - impl RecordOutput for RecordBatch { fn record_output(self, bm: &BaselineMetrics) -> Self { - bm.record_output(self.num_rows()); + bm.record_output(self.num_rows(), get_record_batch_memory_size(&self)); self } } impl RecordOutput for &RecordBatch { fn record_output(self, bm: &BaselineMetrics) -> Self { - bm.record_output(self.num_rows()); + bm.record_output(self.num_rows(), get_record_batch_memory_size(self)); self } } diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index dbda0a310ce5..d8f59a7bd942 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -105,6 +105,15 @@ impl<'a> MetricBuilder<'a> { count } + /// Consume self and create a new counter for recording output bytes + /// triggered by an operator + pub fn output_bytes(self, partition: usize) -> Count { + let count = Count::new(); + self.with_partition(partition) + .build(MetricValue::OutputBytes(count.clone())); + count + } + /// Consume self and create a new counter for recording the number of spills /// triggered by an operator pub fn spill_count(self, partition: usize) -> Count { diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 87783eada8b0..71c0e16be863 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -197,6 +197,13 @@ impl MetricsSet { .map(|v| v.as_usize()) } + /// Convenience: return the number of bytes produced, aggregated + /// across partitions or `None` if no metric is present + pub fn output_bytes(&self) -> Option { + self.sum(|metric| matches!(metric.value(), MetricValue::OutputBytes(_))) + .map(|v| v.as_usize()) + } + /// Convenience: return the count of spills, aggregated /// across partitions or `None` if no metric is present pub fn spill_count(&self) -> Option { @@ -257,6 +264,7 @@ impl MetricsSet { MetricValue::Count { name, .. } => name == metric_name, MetricValue::Time { name, .. } => name == metric_name, MetricValue::OutputRows(_) => false, + MetricValue::OutputBytes(_) => false, MetricValue::ElapsedCompute(_) => false, MetricValue::SpillCount(_) => false, MetricValue::SpilledBytes(_) => false, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 1cc4a4fbcb05..8794288f7081 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -348,6 +348,8 @@ impl Drop for ScopedTimerGuard<'_> { pub enum MetricValue { /// Number of output rows produced: "output_rows" metric OutputRows(Count), + /// Number of output bytes produced: "output_bytes" metric + OutputBytes(Count), /// Elapsed Compute Time: the wall clock time spent in "cpu /// intensive" work. /// @@ -417,6 +419,9 @@ impl PartialEq for MetricValue { (MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => { count == other } + (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => { + count == other + } (MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => { time == other } @@ -480,6 +485,7 @@ impl MetricValue { pub fn name(&self) -> &str { match self { Self::OutputRows(_) => "output_rows", + Self::OutputBytes(_) => "output_bytes", Self::SpillCount(_) => "spill_count", Self::SpilledBytes(_) => "spilled_bytes", Self::SpilledRows(_) => "spilled_rows", @@ -498,6 +504,7 @@ impl MetricValue { pub fn as_usize(&self) -> usize { match self { Self::OutputRows(count) => count.value(), + Self::OutputBytes(bytes) => bytes.value(), Self::SpillCount(count) => count.value(), Self::SpilledBytes(bytes) => bytes.value(), Self::SpilledRows(count) => count.value(), @@ -525,6 +532,7 @@ impl MetricValue { pub fn new_empty(&self) -> Self { match self { Self::OutputRows(_) => Self::OutputRows(Count::new()), + Self::OutputBytes(_) => Self::OutputBytes(Count::new()), Self::SpillCount(_) => Self::SpillCount(Count::new()), Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()), Self::SpilledRows(_) => Self::SpilledRows(Count::new()), @@ -563,6 +571,7 @@ impl MetricValue { pub fn aggregate(&mut self, other: &Self) { match (self, other) { (Self::OutputRows(count), Self::OutputRows(other_count)) + | (Self::OutputBytes(count), Self::OutputBytes(other_count)) | (Self::SpillCount(count), Self::SpillCount(other_count)) | (Self::SpilledBytes(count), Self::SpilledBytes(other_count)) | (Self::SpilledRows(count), Self::SpilledRows(other_count)) @@ -615,18 +624,19 @@ impl MetricValue { /// numbers are "more useful" (and displayed first) pub fn display_sort_key(&self) -> u8 { match self { - Self::OutputRows(_) => 0, // show first - Self::ElapsedCompute(_) => 1, // show second - Self::SpillCount(_) => 2, - Self::SpilledBytes(_) => 3, - Self::SpilledRows(_) => 4, - Self::CurrentMemoryUsage(_) => 5, - Self::Count { .. } => 6, - Self::Gauge { .. } => 7, - Self::Time { .. } => 8, - Self::StartTimestamp(_) => 9, // show timestamps last - Self::EndTimestamp(_) => 10, - Self::Custom { .. } => 11, + Self::OutputRows(_) => 0, // show first + Self::ElapsedCompute(_) => 1, // show second + Self::OutputBytes(_) => 2, + Self::SpillCount(_) => 3, + Self::SpilledBytes(_) => 4, + Self::SpilledRows(_) => 5, + Self::CurrentMemoryUsage(_) => 6, + Self::Count { .. } => 7, + Self::Gauge { .. } => 8, + Self::Time { .. } => 9, + Self::StartTimestamp(_) => 10, // show timestamps last + Self::EndTimestamp(_) => 11, + Self::Custom { .. } => 12, } } @@ -646,7 +656,7 @@ impl Display for MetricValue { | Self::Count { count, .. } => { write!(f, "{count}") } - Self::SpilledBytes(count) => { + Self::OutputBytes(count) | Self::SpilledBytes(count) => { let readable_count = human_readable_size(count.value()); write!(f, "{readable_count}") } @@ -783,6 +793,20 @@ mod tests { } } + #[test] + fn test_display_output_bytes() { + let count = Count::new(); + let output_byte = MetricValue::OutputBytes(count.clone()); + + assert_eq!("0.0 B", output_byte.to_string()); + + count.add((100 * MB) as usize); + assert_eq!("100.0 MB", output_byte.to_string()); + + count.add((0.5 * MB as f64) as usize); + assert_eq!("100.5 MB", output_byte.to_string()); + } + #[test] fn test_display_spilled_bytes() { let count = Count::new(); diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f941827dd036..4b6ac4ab53a1 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -711,7 +711,7 @@ impl ExternalSorter { let sorted = sort_batch(&batch, &expressions, None)?; - metrics.record_output(sorted.num_rows()); + metrics.record_output(sorted.num_rows(), get_record_batch_memory_size(&sorted)); drop(batch); drop(reservation); Ok(sorted) @@ -1504,6 +1504,7 @@ mod tests { let metrics = sort_exec.metrics().unwrap(); assert_eq!(metrics.output_rows().unwrap(), 10000); + assert_eq!(metrics.output_bytes().unwrap(), 40000); assert!(metrics.elapsed_compute().unwrap() > 0); let spill_count = metrics.spill_count().unwrap(); @@ -1623,6 +1624,8 @@ mod tests { let metrics = sort_exec.metrics().unwrap(); assert_eq!(metrics.output_rows().unwrap(), 20000); + // FIXME: This might be double-counting across batches. + assert_eq!(metrics.output_bytes().unwrap(), 958800); assert!(metrics.elapsed_compute().unwrap() > 0); let spill_count = metrics.spill_count().unwrap();