Skip to content

Record output_bytes in EXPLAIN ANALYZE #16481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ async fn explain_analyze_baseline_metrics() {
|| plan.as_any().downcast_ref::<physical_plan::union::UnionExec>().is_some()
|| plan.as_any().downcast_ref::<physical_plan::windows::WindowAggExec>().is_some()
}
fn expected_to_have_output_bytes(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan;

if let Some(projection_exec) = plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>() {
// If the projection is empty, there are no buffers used, so we don't expect any output bytes
if projection_exec.schema().fields().len() == 0 {
return false;
}
}
true
}

// Validate that the recorded elapsed compute time was more than
// zero for all operators as well as the start/end timestamp are set
Expand All @@ -130,6 +141,14 @@ async fn explain_analyze_baseline_metrics() {
DisplayableExecutionPlan::with_metrics(plan).one_line()
);

let output_bytes = metrics.output_bytes().unwrap();

assert!(
if expected_to_have_output_bytes(plan) { output_bytes > 0 } else { output_bytes == 0 },
"plan: {}",
DisplayableExecutionPlan::with_metrics(plan).one_line()
);

let mut saw_start = false;
let mut saw_end = false;
metrics.iter().for_each(|m| match m.value() {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,7 @@ mod tests {

let metrics = merged_aggregate.metrics().unwrap();
let output_rows = metrics.output_rows().unwrap();
let output_bytes = metrics.output_bytes().unwrap();
let spill_count = metrics.spill_count().unwrap();
let spilled_bytes = metrics.spilled_bytes().unwrap();
let spilled_rows = metrics.spilled_rows().unwrap();
Expand All @@ -1837,12 +1838,14 @@ mod tests {
// When spilling, the output rows metrics become partial output size + final output size
// This is because final aggregation starts while partial aggregation is still emitting
assert_eq!(8, output_rows);
assert!(output_bytes > 0);

assert!(spill_count > 0);
assert!(spilled_bytes > 0);
assert!(spilled_rows > 0);
} else {
assert_eq!(3, output_rows);
assert!(output_bytes > 0);

assert_eq!(0, spill_count);
assert_eq!(0, spilled_bytes);
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ mod lazy_memory_tests {

// Verify metrics match actual output
assert_eq!(metrics.output_rows().unwrap(), expected_rows);
assert!(metrics.output_bytes().unwrap() > 0);
assert!(metrics.elapsed_compute().unwrap() > 0);
}

Expand Down
28 changes: 17 additions & 11 deletions datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::task::Poll;

use arrow::record_batch::RecordBatch;

use crate::spill::get_record_batch_memory_size;

use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
use datafusion_common::Result;

Expand Down Expand Up @@ -53,6 +55,9 @@ pub struct BaselineMetrics {

/// output rows: the total output rows
output_rows: Count,

/// output bytes: the total output bytes
output_bytes: Count,
}

impl BaselineMetrics {
Expand All @@ -65,6 +70,7 @@ impl BaselineMetrics {
end_time: MetricBuilder::new(metrics).end_timestamp(partition),
elapsed_compute: MetricBuilder::new(metrics).elapsed_compute(partition),
output_rows: MetricBuilder::new(metrics).output_rows(partition),
output_bytes: MetricBuilder::new(metrics).output_bytes(partition),
}
}

Expand All @@ -78,6 +84,7 @@ impl BaselineMetrics {
end_time: Default::default(),
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
output_bytes: Default::default(),
}
}

Expand All @@ -91,6 +98,11 @@ impl BaselineMetrics {
&self.output_rows
}

/// return the metric for the total number of output bytes produced
pub fn output_bytes(&self) -> &Count {
&self.output_bytes
}

/// Records the fact that this operator's execution is complete
/// (recording the `end_time` metric).
///
Expand All @@ -102,12 +114,13 @@ impl BaselineMetrics {
self.end_time.record()
}

/// Record that some number of rows have been produced as output
/// Record that some number of rows and bytes have been produced as output
///
/// See the [`RecordOutput`] for conveniently recording record
/// batch output for other thing
pub fn record_output(&self, num_rows: usize) {
pub fn record_output(&self, num_rows: usize, num_bytes: usize) {
self.output_rows.add(num_rows);
self.output_bytes.add(num_bytes);
}

/// If not previously recorded `done()`, record
Expand Down Expand Up @@ -178,23 +191,16 @@ pub trait RecordOutput {
fn record_output(self, bm: &BaselineMetrics) -> Self;
}

impl RecordOutput for usize {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self);
self
}
}

impl RecordOutput for RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
bm.record_output(self.num_rows(), get_record_batch_memory_size(&self));
self
}
}

impl RecordOutput for &RecordBatch {
fn record_output(self, bm: &BaselineMetrics) -> Self {
bm.record_output(self.num_rows());
bm.record_output(self.num_rows(), get_record_batch_memory_size(self));
self
}
}
Expand Down
9 changes: 9 additions & 0 deletions datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ impl<'a> MetricBuilder<'a> {
count
}

/// Consume self and create a new counter for recording output bytes
/// triggered by an operator
pub fn output_bytes(self, partition: usize) -> Count {
let count = Count::new();
self.with_partition(partition)
.build(MetricValue::OutputBytes(count.clone()));
count
}

/// Consume self and create a new counter for recording the number of spills
/// triggered by an operator
pub fn spill_count(self, partition: usize) -> Count {
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ impl MetricsSet {
.map(|v| v.as_usize())
}

/// Convenience: return the number of bytes produced, aggregated
/// across partitions or `None` if no metric is present
pub fn output_bytes(&self) -> Option<usize> {
self.sum(|metric| matches!(metric.value(), MetricValue::OutputBytes(_)))
.map(|v| v.as_usize())
}

/// Convenience: return the count of spills, aggregated
/// across partitions or `None` if no metric is present
pub fn spill_count(&self) -> Option<usize> {
Expand Down Expand Up @@ -257,6 +264,7 @@ impl MetricsSet {
MetricValue::Count { name, .. } => name == metric_name,
MetricValue::Time { name, .. } => name == metric_name,
MetricValue::OutputRows(_) => false,
MetricValue::OutputBytes(_) => false,
MetricValue::ElapsedCompute(_) => false,
MetricValue::SpillCount(_) => false,
MetricValue::SpilledBytes(_) => false,
Expand Down
50 changes: 37 additions & 13 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ impl Drop for ScopedTimerGuard<'_> {
pub enum MetricValue {
/// Number of output rows produced: "output_rows" metric
OutputRows(Count),
/// Number of output bytes produced: "output_bytes" metric
OutputBytes(Count),
/// Elapsed Compute Time: the wall clock time spent in "cpu
/// intensive" work.
///
Expand Down Expand Up @@ -417,6 +419,9 @@ impl PartialEq for MetricValue {
(MetricValue::OutputRows(count), MetricValue::OutputRows(other)) => {
count == other
}
(MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
count == other
}
(MetricValue::ElapsedCompute(time), MetricValue::ElapsedCompute(other)) => {
time == other
}
Expand Down Expand Up @@ -480,6 +485,7 @@ impl MetricValue {
pub fn name(&self) -> &str {
match self {
Self::OutputRows(_) => "output_rows",
Self::OutputBytes(_) => "output_bytes",
Self::SpillCount(_) => "spill_count",
Self::SpilledBytes(_) => "spilled_bytes",
Self::SpilledRows(_) => "spilled_rows",
Expand All @@ -498,6 +504,7 @@ impl MetricValue {
pub fn as_usize(&self) -> usize {
match self {
Self::OutputRows(count) => count.value(),
Self::OutputBytes(bytes) => bytes.value(),
Self::SpillCount(count) => count.value(),
Self::SpilledBytes(bytes) => bytes.value(),
Self::SpilledRows(count) => count.value(),
Expand Down Expand Up @@ -525,6 +532,7 @@ impl MetricValue {
pub fn new_empty(&self) -> Self {
match self {
Self::OutputRows(_) => Self::OutputRows(Count::new()),
Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
Self::SpillCount(_) => Self::SpillCount(Count::new()),
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
Expand Down Expand Up @@ -563,6 +571,7 @@ impl MetricValue {
pub fn aggregate(&mut self, other: &Self) {
match (self, other) {
(Self::OutputRows(count), Self::OutputRows(other_count))
| (Self::OutputBytes(count), Self::OutputBytes(other_count))
| (Self::SpillCount(count), Self::SpillCount(other_count))
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
| (Self::SpilledRows(count), Self::SpilledRows(other_count))
Expand Down Expand Up @@ -615,18 +624,19 @@ impl MetricValue {
/// numbers are "more useful" (and displayed first)
pub fn display_sort_key(&self) -> u8 {
match self {
Self::OutputRows(_) => 0, // show first
Self::ElapsedCompute(_) => 1, // show second
Self::SpillCount(_) => 2,
Self::SpilledBytes(_) => 3,
Self::SpilledRows(_) => 4,
Self::CurrentMemoryUsage(_) => 5,
Self::Count { .. } => 6,
Self::Gauge { .. } => 7,
Self::Time { .. } => 8,
Self::StartTimestamp(_) => 9, // show timestamps last
Self::EndTimestamp(_) => 10,
Self::Custom { .. } => 11,
Self::OutputRows(_) => 0, // show first
Self::ElapsedCompute(_) => 1, // show second
Self::OutputBytes(_) => 2,
Self::SpillCount(_) => 3,
Self::SpilledBytes(_) => 4,
Self::SpilledRows(_) => 5,
Self::CurrentMemoryUsage(_) => 6,
Self::Count { .. } => 7,
Self::Gauge { .. } => 8,
Self::Time { .. } => 9,
Self::StartTimestamp(_) => 10, // show timestamps last
Self::EndTimestamp(_) => 11,
Self::Custom { .. } => 12,
}
}

Expand All @@ -646,7 +656,7 @@ impl Display for MetricValue {
| Self::Count { count, .. } => {
write!(f, "{count}")
}
Self::SpilledBytes(count) => {
Self::OutputBytes(count) | Self::SpilledBytes(count) => {
let readable_count = human_readable_size(count.value());
write!(f, "{readable_count}")
}
Expand Down Expand Up @@ -783,6 +793,20 @@ mod tests {
}
}

#[test]
fn test_display_output_bytes() {
let count = Count::new();
let output_byte = MetricValue::OutputBytes(count.clone());

assert_eq!("0.0 B", output_byte.to_string());

count.add((100 * MB) as usize);
assert_eq!("100.0 MB", output_byte.to_string());

count.add((0.5 * MB as f64) as usize);
assert_eq!("100.5 MB", output_byte.to_string());
}

#[test]
fn test_display_spilled_bytes() {
let count = Count::new();
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ impl ExternalSorter {

let sorted = sort_batch(&batch, &expressions, None)?;

metrics.record_output(sorted.num_rows());
metrics.record_output(sorted.num_rows(), get_record_batch_memory_size(&sorted));
drop(batch);
drop(reservation);
Ok(sorted)
Expand Down Expand Up @@ -1504,6 +1504,7 @@ mod tests {
let metrics = sort_exec.metrics().unwrap();

assert_eq!(metrics.output_rows().unwrap(), 10000);
assert_eq!(metrics.output_bytes().unwrap(), 40000);
assert!(metrics.elapsed_compute().unwrap() > 0);

let spill_count = metrics.spill_count().unwrap();
Expand Down Expand Up @@ -1623,6 +1624,8 @@ mod tests {
let metrics = sort_exec.metrics().unwrap();

assert_eq!(metrics.output_rows().unwrap(), 20000);
// FIXME: This might be double-counting across batches.
assert_eq!(metrics.output_bytes().unwrap(), 958800);
assert!(metrics.elapsed_compute().unwrap() > 0);

let spill_count = metrics.spill_count().unwrap();
Expand Down
Loading