From 903672719721f035fceb87d785a903ee81e18e59 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 19 Nov 2024 13:00:27 +0100 Subject: [PATCH 1/9] Sort results on replica, merge on envd Signed-off-by: Moritz Hoffmann --- src/compute/src/compute_state.rs | 26 ++++- src/expr/src/relation.rs | 7 +- src/repr/src/row/collection.proto | 1 + src/repr/src/row/collection.rs | 184 +++++++++++++++++++++++------- 4 files changed, 172 insertions(+), 46 deletions(-) diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 5e852862a5d3a..7a2b8642c97cc 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -1079,6 +1079,8 @@ impl PendingPeek { .unwrap_or(usize::MAX) + peek.finishing.offset; + let order_by = peek.finishing.order_by.clone(); + let task_handle = mz_ore::task::spawn(|| "persist::peek", async move { let start = Instant::now(); let result = if active_worker { @@ -1095,7 +1097,16 @@ impl PendingPeek { Ok(vec![]) }; let result = match result { - Ok(rows) => PeekResponse::Rows(RowCollection::new(&rows)), + Ok(mut rows) => { + // Sort results according to finishing. + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + rows.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + mz_expr::compare_columns(&order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); + PeekResponse::Rows(RowCollection::new(&rows)) + } Err(e) => PeekResponse::Error(e.to_string()), }; match result_tx.send((result, start.elapsed())) { @@ -1297,7 +1308,18 @@ impl IndexPeek { } let response = match self.collect_finished_data(max_result_size) { - Ok(rows) => PeekResponse::Rows(RowCollection::new(&rows)), + Ok(mut rows) => { + // Sort results according to finishing. + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + let order_by = &self.peek.finishing.order_by; + rows.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + mz_expr::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); + + PeekResponse::Rows(RowCollection::new(&rows)) + } Err(text) => PeekResponse::Error(text), }; Some(response) diff --git a/src/expr/src/relation.rs b/src/expr/src/relation.rs index 10c8f7a791e7e..cb1c46b1283f9 100644 --- a/src/expr/src/relation.rs +++ b/src/expr/src/relation.rs @@ -9,6 +9,7 @@ #![warn(missing_docs)] +use std::cell::RefCell; use std::cmp::{max, Ordering}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt; @@ -3555,10 +3556,12 @@ impl RowSetFinishing { return Err(format!("result exceeds max size of {max_bytes}",)); } - let mut left_datum_vec = mz_repr::DatumVec::new(); - let mut right_datum_vec = mz_repr::DatumVec::new(); + let left_datum_vec = RefCell::new(mz_repr::DatumVec::new()); + let right_datum_vec = RefCell::new(mz_repr::DatumVec::new()); let sort_by = |left: &RowRef, right: &RowRef| { + let (mut left_datum_vec, mut right_datum_vec) = + (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut()); let left_datums = left_datum_vec.borrow_with(left); let right_datums = right_datum_vec.borrow_with(right); compare_columns(&self.order_by, &left_datums, &right_datums, || { diff --git a/src/repr/src/row/collection.proto b/src/repr/src/row/collection.proto index 4e905ee2c38bc..b1063d72adb08 100644 --- a/src/repr/src/row/collection.proto +++ b/src/repr/src/row/collection.proto @@ -14,6 +14,7 @@ package mz_repr.row.collection; message ProtoRowCollection { bytes encoded = 1; repeated ProtoEncodedRowMetadata metadata = 2; + repeated uint64 fingers = 3; } message ProtoEncodedRowMetadata { diff --git a/src/repr/src/row/collection.rs b/src/repr/src/row/collection.rs index 290c82d13709a..ba57b310f4e07 100644 --- a/src/repr/src/row/collection.rs +++ b/src/repr/src/row/collection.rs @@ -9,6 +9,8 @@ //! Defines types for working with collections of [`Row`]. +use std::cmp::Reverse; +use std::collections::BinaryHeap; use std::num::NonZeroUsize; use std::sync::Arc; @@ -33,6 +35,8 @@ pub struct RowCollection { encoded: Bytes, /// Metadata about an individual Row in the blob. metadata: Vec, + /// Start of sorted runs of rows in rows. + fingers: Vec, } impl RowCollection { @@ -47,6 +51,7 @@ impl RowCollection { let mut encoded = Vec::::with_capacity(encoded_size); let mut metadata = Vec::::with_capacity(rows.len()); + let fingers = vec![rows.len()]; for (row, diff) in rows { encoded.extend(row.data()); @@ -59,6 +64,7 @@ impl RowCollection { RowCollection { encoded: Bytes::from(encoded), metadata, + fingers, } } @@ -80,6 +86,7 @@ impl RowCollection { self.metadata.extend(mapped_metas); self.encoded = Bytes::from(new_bytes); + self.fingers.push(self.metadata.len()); } /// Total count of [`Row`]s represented by this collection, considering a @@ -132,16 +139,37 @@ impl RowCollection { } /// "Sorts" the [`RowCollection`] by returning a sorted view over the collection. - pub fn sorted_view(self, mut f: F) -> SortedRowCollection + pub fn sorted_view(self, cmp: F) -> SortedRowCollection where - F: FnMut(&RowRef, &RowRef) -> std::cmp::Ordering, + F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { - let mut view: Vec<_> = (0..self.metadata.len()).collect(); - view.sort_by(|a, b| { - let (a_row, _) = self.get(*a).expect("index invalid?"); - let (b_row, _) = self.get(*b).expect("index invalid?"); - f(a_row, b_row) - }); + let mut heap = BinaryHeap::new(); + + for index in 0..self.fingers.len() { + let start = (index == 0) + .then_some(0) + .unwrap_or_else(|| self.fingers[index - 1]); + let end = self.fingers[index]; + + heap.push(Reverse(Finger { + collection: &self, + cmp: &cmp, + start, + end, + })); + } + + let mut view = Vec::with_capacity(self.metadata.len()); + + while let Some(Reverse(mut finger)) = heap.pop() { + view.push(finger.start); + finger.start += 1; + if finger.start < finger.end { + heap.push(Reverse(finger)); + } + } + + assert_eq!(view.len(), self.metadata.len()); SortedRowCollection { collection: self, @@ -159,13 +187,15 @@ impl<'a, T: IntoIterator> From for RowCollection { encoded.extend(row.data()); metadata.push(EncodedRowMetadata { offset: encoded.len(), - diff: unsafe { NonZeroUsize::new_unchecked(1) }, + diff: NonZeroUsize::MIN, }); } + let fingers = vec![metadata.len()]; RowCollection { encoded: Bytes::from(encoded), metadata, + fingers, } } } @@ -179,6 +209,7 @@ impl RustType for RowCollection { .iter() .map(EncodedRowMetadata::into_proto) .collect(), + fingers: self.fingers.iter().copied().map(u64::cast_from).collect(), } } @@ -190,6 +221,7 @@ impl RustType for RowCollection { .into_iter() .map(EncodedRowMetadata::from_proto) .collect::>()?, + fingers: proto.fingers.into_iter().map(usize::cast_from).collect(), }) } } @@ -337,12 +369,12 @@ impl SortedRowCollectionIter { /// /// Projects columns for the provided `row`. fn project<'a>( - projection: Option<&'a Vec>, + projection: Option<&[usize]>, row: &'a RowRef, datum_buf: &'a mut DatumVec, row_buf: &'a mut Row, - ) -> Option<&'a RowRef> { - if let Some(projection) = projection.as_ref() { + ) -> &'a RowRef { + if let Some(projection) = projection { // Copy the required columns into our reusable buffer. { let datums = datum_buf.borrow_with(row); @@ -351,9 +383,9 @@ impl SortedRowCollectionIter { .extend(projection.iter().map(|i| &datums[*i])); } - Some(row_buf) + row_buf } else { - Some(row) + row } } } @@ -377,9 +409,12 @@ impl RowIterator for SortedRowCollectionIter { // Project away and/or re-order any columns. let (datum_buf, row_buf) = &mut self.projection_buf; - let row = Self::project(self.projection.as_ref(), row, datum_buf, row_buf)?; - - Some(row) + Some(Self::project( + self.projection.as_deref(), + row, + datum_buf, + row_buf, + )) } fn peek(&mut self) -> Option<&RowRef> { @@ -395,9 +430,12 @@ impl RowIterator for SortedRowCollectionIter { // Project away and/or re-order any columns. let (datum_buf, row_buf) = &mut self.projection_buf; - let row = Self::project(self.projection.as_ref(), row, datum_buf, row_buf)?; - - Some(row) + Some(Self::project( + self.projection.as_deref(), + row, + datum_buf, + row_buf, + )) } fn count(&self) -> usize { @@ -422,6 +460,47 @@ impl IntoRowIterator for SortedRowCollection { } } +struct Finger<'a, F> { + collection: &'a RowCollection, + cmp: &'a F, + start: usize, + end: usize, +} + +impl<'a, F> PartialOrd for Finger<'a, F> +where + F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a, F> Ord for Finger<'a, F> +where + F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (self.cmp)( + self.collection.get(self.start).unwrap().0, + self.collection.get(other.start).unwrap().0, + ) + } +} + +impl<'a, F> PartialEq for Finger<'a, F> +where + F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, +{ + fn eq(&self, other: &Self) -> bool { + self.partial_cmp(other) + .map(|ordering| ordering == std::cmp::Ordering::Equal) + .unwrap_or(false) + } +} + +impl<'a, F> Eq for Finger<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {} + #[cfg(test)] mod tests { use std::borrow::Borrow; @@ -466,9 +545,20 @@ mod tests { fn test_sort() { let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]); let b = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(10))]); - - let col = RowCollection::from([&a, &b]); - let mut rows = [a, b]; + let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]); + let d = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(9))]); + + let col = { + let mut part = [&a, &b]; + part.sort_by(|a, b| a.cmp(b)); + let mut part1 = RowCollection::from(part); + let mut part = [&c, &d]; + part.sort_by(|a, b| a.cmp(b)); + let part2 = RowCollection::from(part); + part1.merge(&part2); + part1 + }; + let mut rows = [a, b, c, d]; let sorted_view = col.sorted_view(|a, b| a.cmp(b)); rows.sort_by(|a, b| a.cmp(b)); @@ -485,10 +575,11 @@ mod tests { fn test_sorted_iter() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let col = RowCollection::new(&[ - (a.clone(), NonZeroUsize::new(3).unwrap()), - (b.clone(), NonZeroUsize::new(2).unwrap()), - ]); + let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); + col.merge(&RowCollection::new(&[( + b.clone(), + NonZeroUsize::new(2).unwrap(), + )])); let col = col.sorted_view(|a, b| a.cmp(b)); let mut iter = col.into_row_iter(); @@ -511,10 +602,11 @@ mod tests { fn test_sorted_iter_offset() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let col = RowCollection::new(&[ - (a.clone(), NonZeroUsize::new(3).unwrap()), - (b.clone(), NonZeroUsize::new(2).unwrap()), - ]); + let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); + col.merge(&RowCollection::new(&[( + b.clone(), + NonZeroUsize::new(2).unwrap(), + )])); let col = col.sorted_view(|a, b| a.cmp(b)); // Test with a reasonable offset that does not span rows. @@ -552,10 +644,11 @@ mod tests { fn test_sorted_iter_limit() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let col = RowCollection::new(&[ - (a.clone(), NonZeroUsize::new(3).unwrap()), - (b.clone(), NonZeroUsize::new(2).unwrap()), - ]); + let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); + col.merge(&RowCollection::new(&[( + b.clone(), + NonZeroUsize::new(2).unwrap(), + )])); let col = col.sorted_view(|a, b| a.cmp(b)); // Test with a limit that spans only the first row. @@ -669,10 +762,12 @@ mod tests { fn test_count_respects_limit_and_offset() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let col = RowCollection::new(&[ + let mut array = [ (a.clone(), NonZeroUsize::new(3).unwrap()), (b.clone(), NonZeroUsize::new(2).unwrap()), - ]); + ]; + array.sort_by(|a, b| a.cmp(b)); + let col = RowCollection::new(&array); let col = col.sorted_view(|a, b| a.cmp(b)); // How many total rows there are. @@ -763,10 +858,15 @@ mod tests { #[mz_ore::test] #[cfg_attr(miri, ignore)] // too slow fn proptest_sort() { - fn row_collection_sort(mut a: Vec) { - let a_col = RowCollection::from(&a); + fn row_collection_sort(mut a: Vec, mut b: Vec) { + a.sort_by(|a, b| a.cmp(b)); + b.sort_by(|a, b| a.cmp(b)); + let mut col = RowCollection::from(&a); + col.merge(&RowCollection::from(&b)); - let sorted_view = a_col.sorted_view(|a, b| a.cmp(b)); + let sorted_view = col.sorted_view(|a, b| a.cmp(b)); + + a.append(&mut b); a.sort_by(|a, b| a.cmp(b)); for i in 0..a.len() { @@ -780,9 +880,9 @@ mod tests { // This test is slow, so we limit the default number of test cases. proptest!( Config { cases: 10, ..Default::default() }, - |(a in any::>())| { + |(a in any::>(), b in any::>())| { // The proptest! macro interferes with rustfmt. - row_collection_sort(a) + row_collection_sort(a, b) } ); } From 2376d704bb89f4c834db5a8e2a483716f7cbb78c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 19 Nov 2024 15:28:56 +0100 Subject: [PATCH 2/9] Fix finger merging Signed-off-by: Moritz Hoffmann --- src/adapter/src/coord/peek.rs | 10 +++++++++- src/adapter/src/coord/sequencer.rs | 8 +++++++- src/repr/src/row/collection.rs | 15 ++++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index db21cd014e57b..82e7e9499a1cd 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -37,7 +37,9 @@ use mz_ore::str::{separated, StrExt}; use mz_ore::tracing::OpenTelemetryContext; use mz_repr::explain::text::DisplayText; use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes}; -use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowCollection, RowIterator}; +use mz_repr::{ + DatumVec, Diff, GlobalId, IntoRowIterator, RelationType, Row, RowCollection, RowIterator, +}; use serde::{Deserialize, Serialize}; use timely::progress::Timestamp; use uuid::Uuid; @@ -481,6 +483,12 @@ impl crate::coord::Coordinator { )); } } + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + results.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + mz_expr::compare_columns(&finishing.order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); let row_collection = RowCollection::new(&results); let duration_histogram = self.metrics.row_set_finishing_seconds(); diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index c3b42579c0202..3804d7be732f3 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -17,7 +17,7 @@ use futures::FutureExt; use inner::return_if_err; use mz_expr::{MirRelationExpr, RowSetFinishing}; use mz_ore::tracing::OpenTelemetryContext; -use mz_repr::{CatalogItemId, Diff, GlobalId, RowCollection}; +use mz_repr::{CatalogItemId, DatumVec, Diff, GlobalId, RowCollection}; use mz_sql::catalog::CatalogError; use mz_sql::names::ResolvedIds; use mz_sql::plan::{ @@ -837,6 +837,12 @@ impl Coordinator { let max_returned_query_size = session.vars().max_query_result_size(); let duration_histogram = session.metrics().row_set_finishing_seconds(); + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + plan.returning.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + mz_expr::compare_columns(&finishing.order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); return match finishing.finish( RowCollection::new(&plan.returning), plan.max_result_size, diff --git a/src/repr/src/row/collection.rs b/src/repr/src/row/collection.rs index ba57b310f4e07..1434bbc2766e7 100644 --- a/src/repr/src/row/collection.rs +++ b/src/repr/src/row/collection.rs @@ -74,6 +74,8 @@ impl RowCollection { return; } + let self_len = self.metadata.len(); + // TODO(parkmycar): Using SegmentedBytes here would be nice. let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()]; new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]); @@ -86,7 +88,8 @@ impl RowCollection { self.metadata.extend(mapped_metas); self.encoded = Bytes::from(new_bytes); - self.fingers.push(self.metadata.len()); + self.fingers + .extend(other.fingers.iter().map(|f| f + self_len)); } /// Total count of [`Row`]s represented by this collection, considering a @@ -151,6 +154,16 @@ impl RowCollection { .unwrap_or_else(|| self.fingers[index - 1]); let end = self.fingers[index]; + // TODO: Remove + for j in start..end { + if j + 1 < end { + assert_ne!( + cmp(self.get(j).unwrap().0, self.get(j + 1).unwrap().0), + std::cmp::Ordering::Greater + ); + } + } + heap.push(Reverse(Finger { collection: &self, cmp: &cmp, From b8cd991a8e6af7d9dfdbf02eeecf028a927c443f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 19 Nov 2024 18:44:37 +0100 Subject: [PATCH 3/9] Fix bug! Signed-off-by: Moritz Hoffmann --- src/repr/src/row/collection.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/repr/src/row/collection.rs b/src/repr/src/row/collection.rs index 1434bbc2766e7..a279d24c8e430 100644 --- a/src/repr/src/row/collection.rs +++ b/src/repr/src/row/collection.rs @@ -163,13 +163,14 @@ impl RowCollection { ); } } - - heap.push(Reverse(Finger { - collection: &self, - cmp: &cmp, - start, - end, - })); + if start < end { + heap.push(Reverse(Finger { + collection: &self, + cmp: &cmp, + start, + end, + })); + } } let mut view = Vec::with_capacity(self.metadata.len()); From 693160bd6e71f4c2b07e215b854c052f0dc737fd Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 10:55:06 +0100 Subject: [PATCH 4/9] Move RowCollection to expr, sort in ::new Signed-off-by: Moritz Hoffmann --- src/adapter/src/coord/peek.rs | 13 +- src/adapter/src/coord/sequencer.rs | 5 +- src/compute-client/build.rs | 2 +- .../src/protocol/response.proto | 4 +- src/compute-client/src/protocol/response.rs | 5 +- src/compute-client/src/service.rs | 3 +- src/compute/src/compute_state.rs | 27 +--- src/expr/build.rs | 2 + src/expr/src/lib.rs | 1 + src/expr/src/relation.rs | 5 +- src/expr/src/row.rs | 12 ++ src/{repr => expr}/src/row/collection.proto | 2 +- src/{repr => expr}/src/row/collection.rs | 123 +++++++++--------- src/repr/build.rs | 3 +- src/repr/src/lib.rs | 1 - src/repr/src/row.rs | 6 +- 16 files changed, 108 insertions(+), 106 deletions(-) create mode 100644 src/expr/src/row.rs rename src/{repr => expr}/src/row/collection.proto (94%) rename src/{repr => expr}/src/row/collection.rs (90%) diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 82e7e9499a1cd..4b3e1a2c212ea 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -28,6 +28,7 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexImport}; use mz_compute_types::ComputeInstanceId; use mz_controller_types::ClusterId; use mz_expr::explain::{fmt_text_constant_rows, HumanizedExplain, HumanizerMode}; +use mz_expr::row::RowCollection; use mz_expr::{ permutation_for_arrangement, EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing, @@ -37,9 +38,7 @@ use mz_ore::str::{separated, StrExt}; use mz_ore::tracing::OpenTelemetryContext; use mz_repr::explain::text::DisplayText; use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes}; -use mz_repr::{ - DatumVec, Diff, GlobalId, IntoRowIterator, RelationType, Row, RowCollection, RowIterator, -}; +use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator}; use serde::{Deserialize, Serialize}; use timely::progress::Timestamp; use uuid::Uuid; @@ -483,13 +482,7 @@ impl crate::coord::Coordinator { )); } } - let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); - results.sort_by(|(row1, _diff1), (row2, _diff2)| { - let borrow1 = datum_vec1.borrow_with(row1); - let borrow2 = datum_vec2.borrow_with(row2); - mz_expr::compare_columns(&finishing.order_by, &borrow1, &borrow2, || row1.cmp(row2)) - }); - let row_collection = RowCollection::new(&results); + let row_collection = RowCollection::new(results, &finishing.order_by); let duration_histogram = self.metrics.row_set_finishing_seconds(); let (ret, reason) = match finishing.finish( diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 3804d7be732f3..6d2ded064e4ef 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -15,9 +15,10 @@ use futures::future::LocalBoxFuture; use futures::FutureExt; use inner::return_if_err; +use mz_expr::row::RowCollection; use mz_expr::{MirRelationExpr, RowSetFinishing}; use mz_ore::tracing::OpenTelemetryContext; -use mz_repr::{CatalogItemId, DatumVec, Diff, GlobalId, RowCollection}; +use mz_repr::{CatalogItemId, DatumVec, Diff, GlobalId}; use mz_sql::catalog::CatalogError; use mz_sql::names::ResolvedIds; use mz_sql::plan::{ @@ -844,7 +845,7 @@ impl Coordinator { mz_expr::compare_columns(&finishing.order_by, &borrow1, &borrow2, || row1.cmp(row2)) }); return match finishing.finish( - RowCollection::new(&plan.returning), + RowCollection::new(plan.returning, &finishing.order_by), plan.max_result_size, Some(max_returned_query_size), duration_histogram, diff --git a/src/compute-client/build.rs b/src/compute-client/build.rs index 02012d17af134..4608f6cbf7bb8 100644 --- a/src/compute-client/build.rs +++ b/src/compute-client/build.rs @@ -28,6 +28,7 @@ fn main() { .extern_path(".mz_expr.id", "::mz_expr") .extern_path(".mz_expr.linear", "::mz_expr") .extern_path(".mz_expr.relation", "::mz_expr") + .extern_path(".mz_expr.row.collection", "::mz_expr::row") .extern_path(".mz_expr.scalar", "::mz_expr") .extern_path(".mz_kafka_util.addr", "::mz_kafka_util") .extern_path(".mz_persist_client", "::mz_persist_client") @@ -38,7 +39,6 @@ fn main() { .extern_path(".mz_repr.global_id", "::mz_repr::global_id") .extern_path(".mz_repr.relation_and_scalar", "::mz_repr") .extern_path(".mz_repr.row", "::mz_repr") - .extern_path(".mz_repr.row.collection", "::mz_repr") .extern_path(".mz_repr.url", "::mz_repr::url") .extern_path(".mz_compute_types", "::mz_compute_types") .extern_path(".mz_cluster_client", "::mz_cluster_client") diff --git a/src/compute-client/src/protocol/response.proto b/src/compute-client/src/protocol/response.proto index b4c98cd5ffdb0..cd73d62bc29b8 100644 --- a/src/compute-client/src/protocol/response.proto +++ b/src/compute-client/src/protocol/response.proto @@ -13,12 +13,12 @@ syntax = "proto3"; package mz_compute_client.protocol.response; +import "expr/src/row/collection.proto"; import "google/protobuf/empty.proto"; import "proto/src/proto.proto"; import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; import "repr/src/row.proto"; -import "repr/src/row/collection.proto"; message ProtoComputeResponse { message ProtoFrontiersKind { @@ -59,7 +59,7 @@ message ProtoFrontiersResponse { message ProtoPeekResponse { oneof kind { - mz_repr.row.collection.ProtoRowCollection rows = 1; + mz_expr.row.collection.ProtoRowCollection rows = 1; string error = 2; google.protobuf.Empty canceled = 3; } diff --git a/src/compute-client/src/protocol/response.rs b/src/compute-client/src/protocol/response.rs index fd8cd96b6a6f6..b08ea181718db 100644 --- a/src/compute-client/src/protocol/response.rs +++ b/src/compute-client/src/protocol/response.rs @@ -12,10 +12,11 @@ use std::num::NonZeroUsize; use mz_compute_types::plan::LirId; +use mz_expr::row::RowCollection; use mz_ore::cast::CastFrom; use mz_ore::tracing::OpenTelemetryContext; use mz_proto::{any_uuid, IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; -use mz_repr::{Diff, GlobalId, Row, RowCollection}; +use mz_repr::{Diff, GlobalId, Row}; use mz_timely_util::progress::any_antichain; use proptest::prelude::{any, Arbitrary}; use proptest::strategy::{BoxedStrategy, Just, Strategy, Union}; @@ -365,7 +366,7 @@ impl Arbitrary for PeekResponse { ), 1..11, ) - .prop_map(|rows| PeekResponse::Rows(RowCollection::new(&rows))) + .prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[]))) .boxed(), ".*".prop_map(PeekResponse::Error).boxed(), Just(PeekResponse::Canceled).boxed(), diff --git a/src/compute-client/src/service.rs b/src/compute-client/src/service.rs index db9dc2019f00c..3d1163c533bd9 100644 --- a/src/compute-client/src/service.rs +++ b/src/compute-client/src/service.rs @@ -19,9 +19,10 @@ use async_trait::async_trait; use bytesize::ByteSize; use differential_dataflow::consolidation::consolidate_updates; use differential_dataflow::lattice::Lattice; +use mz_expr::row::RowCollection; use mz_ore::assert_none; use mz_ore::cast::CastFrom; -use mz_repr::{Diff, GlobalId, Row, RowCollection}; +use mz_repr::{Diff, GlobalId, Row}; use mz_service::client::{GenericClient, Partitionable, PartitionedState}; use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream}; use timely::progress::frontier::{Antichain, MutableAntichain}; diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 7a2b8642c97cc..9b11ea527a661 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -32,6 +32,7 @@ use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::flat_plan::FlatPlan; use mz_compute_types::plan::LirId; use mz_dyncfg::ConfigSet; +use mz_expr::row::RowCollection; use mz_expr::SafeMfpPlan; use mz_ore::cast::CastFrom; use mz_ore::metrics::UIntGauge; @@ -44,7 +45,7 @@ use mz_persist_client::read::ReadHandle; use mz_persist_client::Diagnostics; use mz_persist_types::codec_impls::UnitSchema; use mz_repr::fixed_length::ToDatumIter; -use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, RowCollection, Timestamp}; +use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp}; use mz_storage_operators::stats::StatsCursor; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::sources::SourceData; @@ -1097,16 +1098,7 @@ impl PendingPeek { Ok(vec![]) }; let result = match result { - Ok(mut rows) => { - // Sort results according to finishing. - let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); - rows.sort_by(|(row1, _diff1), (row2, _diff2)| { - let borrow1 = datum_vec1.borrow_with(row1); - let borrow2 = datum_vec2.borrow_with(row2); - mz_expr::compare_columns(&order_by, &borrow1, &borrow2, || row1.cmp(row2)) - }); - PeekResponse::Rows(RowCollection::new(&rows)) - } + Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)), Err(e) => PeekResponse::Error(e.to_string()), }; match result_tx.send((result, start.elapsed())) { @@ -1308,18 +1300,7 @@ impl IndexPeek { } let response = match self.collect_finished_data(max_result_size) { - Ok(mut rows) => { - // Sort results according to finishing. - let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); - let order_by = &self.peek.finishing.order_by; - rows.sort_by(|(row1, _diff1), (row2, _diff2)| { - let borrow1 = datum_vec1.borrow_with(row1); - let borrow2 = datum_vec2.borrow_with(row2); - mz_expr::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) - }); - - PeekResponse::Rows(RowCollection::new(&rows)) - } + Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &self.peek.finishing.order_by)), Err(text) => PeekResponse::Error(text), }; Some(response) diff --git a/src/expr/build.rs b/src/expr/build.rs index 8bb647f34659c..f2e2f78b34c2b 100644 --- a/src/expr/build.rs +++ b/src/expr/build.rs @@ -28,12 +28,14 @@ fn main() { .extern_path(".mz_repr.strconv", "::mz_repr::strconv") .type_attribute(".", "#[allow(missing_docs)]") .btree_map(["."]) + .bytes([".mz_expr.row.collection"]) .compile_protos( &[ "expr/src/id.proto", "expr/src/linear.proto", "expr/src/relation.proto", "expr/src/relation/func.proto", + "expr/src/row/collection.proto", "expr/src/scalar.proto", "expr/src/scalar/func/format.proto", "expr/src/scalar/like_pattern.proto", diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index d0859b7ebe682..f733a037def97 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -25,6 +25,7 @@ mod relation; mod scalar; pub mod explain; +pub mod row; pub mod virtual_syntax; pub mod visit; diff --git a/src/expr/src/relation.rs b/src/expr/src/relation.rs index cb1c46b1283f9..8223723e2171e 100644 --- a/src/expr/src/relation.rs +++ b/src/expr/src/relation.rs @@ -35,8 +35,8 @@ use mz_repr::explain::{ DummyHumanizer, ExplainConfig, ExprHumanizer, IndexUsageType, PlanRenderingContext, }; use mz_repr::{ - ColumnName, ColumnType, Datum, Diff, GlobalId, IntoRowIterator, RelationType, Row, - RowCollection, RowIterator, RowRef, ScalarType, SortedRowCollectionIter, + ColumnName, ColumnType, Datum, Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator, + RowRef, ScalarType, }; use proptest::prelude::{any, Arbitrary, BoxedStrategy}; use proptest::strategy::{Strategy, Union}; @@ -46,6 +46,7 @@ use timely::container::columnation::{Columnation, CopyRegion}; use crate::explain::{HumanizedExpr, HumanizerMode}; use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc}; +use crate::row::{RowCollection, SortedRowCollectionIter}; use crate::visit::{Visit, VisitChildren}; use crate::Id::Local; use crate::{ diff --git a/src/expr/src/row.rs b/src/expr/src/row.rs new file mode 100644 index 0000000000000..5930964fc25bc --- /dev/null +++ b/src/expr/src/row.rs @@ -0,0 +1,12 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub use crate::row::collection::*; + +mod collection; diff --git a/src/repr/src/row/collection.proto b/src/expr/src/row/collection.proto similarity index 94% rename from src/repr/src/row/collection.proto rename to src/expr/src/row/collection.proto index b1063d72adb08..bf7bfac1fd471 100644 --- a/src/repr/src/row/collection.proto +++ b/src/expr/src/row/collection.proto @@ -9,7 +9,7 @@ syntax = "proto3"; -package mz_repr.row.collection; +package mz_expr.row.collection; message ProtoRowCollection { bytes encoded = 1; diff --git a/src/repr/src/row/collection.rs b/src/expr/src/row/collection.rs similarity index 90% rename from src/repr/src/row/collection.rs rename to src/expr/src/row/collection.rs index a279d24c8e430..726b84b6528f0 100644 --- a/src/repr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -17,13 +17,12 @@ use std::sync::Arc; use bytes::Bytes; use mz_ore::cast::CastFrom; use mz_proto::RustType; +use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef}; use serde::{Deserialize, Serialize}; -use crate::row::iter::{IntoRowIterator, RowIterator}; -use crate::row::{Row, RowRef}; -use crate::DatumVec; +use crate::ColumnOrder; -include!(concat!(env!("OUT_DIR"), "/mz_repr.row.collection.rs")); +include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs")); /// Collection of [`Row`]s represented as a single blob. /// @@ -40,14 +39,21 @@ pub struct RowCollection { } impl RowCollection { - /// Create a new [`RowCollection`] from a collection of [`Row`]s. - pub fn new(rows: &[(Row, NonZeroUsize)]) -> Self { + /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`. + pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self { + // Sort data to maintain sortedness invariants. + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + rows.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); // Pre-sizing our buffer should allow us to make just 1 allocation, and // use the perfect amount of memory. // // Note(parkmycar): I didn't do any benchmarking to determine if this // is faster, so feel free to change this if you'd like. - let encoded_size = rows.iter().map(|(row, _diff)| row.data.len()).sum(); + let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum(); let mut encoded = Vec::::with_capacity(encoded_size); let mut metadata = Vec::::with_capacity(rows.len()); @@ -57,7 +63,7 @@ impl RowCollection { encoded.extend(row.data()); metadata.push(EncodedRowMetadata { offset: encoded.len(), - diff: *diff, + diff: diff, }); } @@ -192,28 +198,6 @@ impl RowCollection { } } -impl<'a, T: IntoIterator> From for RowCollection { - fn from(rows: T) -> Self { - let mut encoded = Vec::::new(); - let mut metadata = Vec::::new(); - - for row in rows { - encoded.extend(row.data()); - metadata.push(EncodedRowMetadata { - offset: encoded.len(), - diff: NonZeroUsize::MIN, - }); - } - let fingers = vec![metadata.len()]; - - RowCollection { - encoded: Bytes::from(encoded), - metadata, - fingers, - } - } -} - impl RustType for RowCollection { fn into_proto(&self) -> ProtoRowCollection { ProtoRowCollection { @@ -520,16 +504,38 @@ mod tests { use std::borrow::Borrow; use mz_ore::assert_none; + use mz_repr::Datum; use proptest::prelude::*; use proptest::test_runner::Config; use super::*; - use crate::Datum; + + impl<'a, T: IntoIterator> From for RowCollection { + fn from(rows: T) -> Self { + let mut encoded = Vec::::new(); + let mut metadata = Vec::::new(); + + for row in rows { + encoded.extend(row.data()); + metadata.push(EncodedRowMetadata { + offset: encoded.len(), + diff: NonZeroUsize::MIN, + }); + } + let fingers = vec![metadata.len()]; + + RowCollection { + encoded: Bytes::from(encoded), + metadata, + fingers, + } + } + } #[mz_ore::test] fn test_row_collection() { let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]); - let b = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(10))]); + let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]); let collection = RowCollection::from([&a, &b]); @@ -543,7 +549,7 @@ mod tests { #[mz_ore::test] fn test_merge() { let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]); - let b = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(10))]); + let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]); let mut a_col = RowCollection::from([&a]); let b_col = RowCollection::from([&b]); @@ -558,9 +564,9 @@ mod tests { #[mz_ore::test] fn test_sort() { let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]); - let b = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(10))]); + let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]); let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]); - let d = Row::pack_slice(&[Datum::MzTimestamp(crate::Timestamp::new(9))]); + let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]); let col = { let mut part = [&a, &b]; @@ -589,11 +595,11 @@ mod tests { fn test_sorted_iter() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); - col.merge(&RowCollection::new(&[( - b.clone(), - NonZeroUsize::new(2).unwrap(), - )])); + let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]); + col.merge(&RowCollection::new( + vec![(b.clone(), NonZeroUsize::new(2).unwrap())], + &[], + )); let col = col.sorted_view(|a, b| a.cmp(b)); let mut iter = col.into_row_iter(); @@ -616,11 +622,11 @@ mod tests { fn test_sorted_iter_offset() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); - col.merge(&RowCollection::new(&[( - b.clone(), - NonZeroUsize::new(2).unwrap(), - )])); + let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]); + col.merge(&RowCollection::new( + vec![(b.clone(), NonZeroUsize::new(2).unwrap())], + &[], + )); let col = col.sorted_view(|a, b| a.cmp(b)); // Test with a reasonable offset that does not span rows. @@ -658,11 +664,11 @@ mod tests { fn test_sorted_iter_limit() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let mut col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); - col.merge(&RowCollection::new(&[( - b.clone(), - NonZeroUsize::new(2).unwrap(), - )])); + let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]); + col.merge(&RowCollection::new( + vec![(b.clone(), NonZeroUsize::new(2).unwrap())], + &[], + )); let col = col.sorted_view(|a, b| a.cmp(b)); // Test with a limit that spans only the first row. @@ -710,7 +716,7 @@ mod tests { #[mz_ore::test] fn test_mapped_row_iterator() { let a = Row::pack_slice(&[Datum::String("hello world")]); - let col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(3).unwrap())]); + let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]); let col = col.sorted_view(|a, b| a.cmp(b)); // Make sure we can call `.map` on a `dyn RowIterator`. @@ -727,7 +733,7 @@ mod tests { #[mz_ore::test] fn test_projected_row_iterator() { let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]); - let col = RowCollection::new(&[(a.clone(), NonZeroUsize::new(2).unwrap())]); + let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]); let col = col.sorted_view(|a, b| a.cmp(b)); // Project away the first column. @@ -776,12 +782,13 @@ mod tests { fn test_count_respects_limit_and_offset() { let a = Row::pack_slice(&[Datum::String("hello world")]); let b = Row::pack_slice(&[Datum::UInt32(42)]); - let mut array = [ - (a.clone(), NonZeroUsize::new(3).unwrap()), - (b.clone(), NonZeroUsize::new(2).unwrap()), - ]; - array.sort_by(|a, b| a.cmp(b)); - let col = RowCollection::new(&array); + let col = RowCollection::new( + vec![ + (a.clone(), NonZeroUsize::new(3).unwrap()), + (b.clone(), NonZeroUsize::new(2).unwrap()), + ], + &[], + ); let col = col.sorted_view(|a, b| a.cmp(b)); // How many total rows there are. diff --git a/src/repr/build.rs b/src/repr/build.rs index 761d3ed7a9aad..b5db5a2051ffd 100644 --- a/src/repr/build.rs +++ b/src/repr/build.rs @@ -14,7 +14,7 @@ fn main() { .protoc_executable(mz_build_tools::protoc()) .btree_map(["."]) .extern_path(".mz_proto", "::mz_proto") - .bytes([".mz_repr.row.ProtoDatum.bytes", ".mz_repr.row.collection"]) + .bytes([".mz_repr.row.ProtoDatum.bytes"]) .compile_protos( &[ "repr/src/adt/array.proto", @@ -35,7 +35,6 @@ fn main() { "repr/src/relation_and_scalar.proto", "repr/src/role_id.proto", "repr/src/row.proto", - "repr/src/row/collection.proto", "repr/src/strconv.proto", "repr/src/timestamp.proto", "repr/src/url.proto", diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs index 4be5579eefe79..3311395cff298 100644 --- a/src/repr/src/lib.rs +++ b/src/repr/src/lib.rs @@ -58,7 +58,6 @@ pub use crate::relation::{ RelationDesc, RelationDescBuilder, RelationType, RelationVersion, RelationVersionSelector, VersionedRelationDesc, }; -pub use crate::row::collection::{ProtoRowCollection, RowCollection, SortedRowCollectionIter}; pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder}; pub use crate::row::iter::{IntoRowIterator, RowIterator}; pub use crate::row::{ diff --git a/src/repr/src/row.rs b/src/repr/src/row.rs index dba6dc590a543..fc130574d92d9 100644 --- a/src/repr/src/row.rs +++ b/src/repr/src/row.rs @@ -45,7 +45,6 @@ use crate::adt::timestamp::CheckedTimestamp; use crate::scalar::{arb_datum, DatumKind}; use crate::{Datum, RelationDesc, Timestamp}; -pub mod collection; pub(crate) mod encode; pub mod iter; @@ -245,6 +244,11 @@ impl Row { inline_size.saturating_add(heap_size) } + /// The length of the encoded row in bytes. Does not include the size of the `Row` struct itself. + pub fn data_len(&self) -> usize { + self.data.len() + } + /// Returns the total capacity in bytes used by this row. pub fn byte_capacity(&self) -> usize { self.data.capacity() From 9eb60d90b7576ac84c56b5f2e17f6bfc7ba650d6 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 11:05:13 +0100 Subject: [PATCH 5/9] s/fingers/runs/g Signed-off-by: Moritz Hoffmann --- src/expr/src/row/collection.proto | 2 +- src/expr/src/row/collection.rs | 57 ++++++++++++++++++------------- 2 files changed, 35 insertions(+), 24 deletions(-) diff --git a/src/expr/src/row/collection.proto b/src/expr/src/row/collection.proto index bf7bfac1fd471..7dcc7a8e81eeb 100644 --- a/src/expr/src/row/collection.proto +++ b/src/expr/src/row/collection.proto @@ -14,7 +14,7 @@ package mz_expr.row.collection; message ProtoRowCollection { bytes encoded = 1; repeated ProtoEncodedRowMetadata metadata = 2; - repeated uint64 fingers = 3; + repeated uint64 runs = 3; } message ProtoEncodedRowMetadata { diff --git a/src/expr/src/row/collection.rs b/src/expr/src/row/collection.rs index 726b84b6528f0..a89dea9bb7ce5 100644 --- a/src/expr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -34,8 +34,8 @@ pub struct RowCollection { encoded: Bytes, /// Metadata about an individual Row in the blob. metadata: Vec, - /// Start of sorted runs of rows in rows. - fingers: Vec, + /// End of non-empty, sorted runs of rows in number of rows. + runs: Vec, } impl RowCollection { @@ -48,6 +48,7 @@ impl RowCollection { let borrow2 = datum_vec2.borrow_with(row2); crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) }); + // Pre-sizing our buffer should allow us to make just 1 allocation, and // use the perfect amount of memory. // @@ -57,26 +58,33 @@ impl RowCollection { let mut encoded = Vec::::with_capacity(encoded_size); let mut metadata = Vec::::with_capacity(rows.len()); - let fingers = vec![rows.len()]; + let runs = if rows.is_empty() { + vec![] + } else { + vec![rows.len()] + }; for (row, diff) in rows { encoded.extend(row.data()); metadata.push(EncodedRowMetadata { offset: encoded.len(), - diff: diff, + diff, }); } RowCollection { encoded: Bytes::from(encoded), metadata, - fingers, + runs, } } /// Merge another [`RowCollection`] into `self`. pub fn merge(&mut self, other: &RowCollection) { - if other.count(0, None) == 0 { + if other.is_empty() { + return; + } else if self.is_empty() { + *self = other.clone(); return; } @@ -94,8 +102,7 @@ impl RowCollection { self.metadata.extend(mapped_metas); self.encoded = Bytes::from(new_bytes); - self.fingers - .extend(other.fingers.iter().map(|f| f + self_len)); + self.runs.extend(other.runs.iter().map(|f| f + self_len)); } /// Total count of [`Row`]s represented by this collection, considering a @@ -114,6 +121,12 @@ impl RowCollection { total } + /// Returns true iff this collection is empty. Not `pub` because it doesn't take offset/limit + /// into account. + fn is_empty(&self) -> bool { + self.metadata.is_empty() + } + /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection. pub fn entries(&self) -> usize { self.metadata.len() @@ -154,11 +167,11 @@ impl RowCollection { { let mut heap = BinaryHeap::new(); - for index in 0..self.fingers.len() { + for index in 0..self.runs.len() { let start = (index == 0) .then_some(0) - .unwrap_or_else(|| self.fingers[index - 1]); - let end = self.fingers[index]; + .unwrap_or_else(|| self.runs[index - 1]); + let end = self.runs[index]; // TODO: Remove for j in start..end { @@ -169,14 +182,12 @@ impl RowCollection { ); } } - if start < end { - heap.push(Reverse(Finger { - collection: &self, - cmp: &cmp, - start, - end, - })); - } + heap.push(Reverse(Finger { + collection: &self, + cmp: &cmp, + start, + end, + })); } let mut view = Vec::with_capacity(self.metadata.len()); @@ -207,7 +218,7 @@ impl RustType for RowCollection { .iter() .map(EncodedRowMetadata::into_proto) .collect(), - fingers: self.fingers.iter().copied().map(u64::cast_from).collect(), + runs: self.runs.iter().copied().map(u64::cast_from).collect(), } } @@ -219,7 +230,7 @@ impl RustType for RowCollection { .into_iter() .map(EncodedRowMetadata::from_proto) .collect::>()?, - fingers: proto.fingers.into_iter().map(usize::cast_from).collect(), + runs: proto.runs.into_iter().map(usize::cast_from).collect(), }) } } @@ -522,12 +533,12 @@ mod tests { diff: NonZeroUsize::MIN, }); } - let fingers = vec![metadata.len()]; + let runs = vec![metadata.len()]; RowCollection { encoded: Bytes::from(encoded), metadata, - fingers, + runs, } } } From a90242cd8dc8aa21d2bee1cedb174642f440e19e Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 11:08:24 +0100 Subject: [PATCH 6/9] Fix build Signed-off-by: Moritz Hoffmann --- src/expr/BUILD.bazel | 1 + src/repr/BUILD.bazel | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/expr/BUILD.bazel b/src/expr/BUILD.bazel index a0a0d1212225e..caab64ca5172f 100644 --- a/src/expr/BUILD.bazel +++ b/src/expr/BUILD.bazel @@ -117,6 +117,7 @@ filegroup( "src/linear.proto", "src/relation.proto", "src/relation/func.proto", + "src/row/collection.proto", "src/scalar.proto", "src/scalar/func/format.proto", "src/scalar/like_pattern.proto", diff --git a/src/repr/BUILD.bazel b/src/repr/BUILD.bazel index 2336a9b764701..f3644126f3232 100644 --- a/src/repr/BUILD.bazel +++ b/src/repr/BUILD.bazel @@ -127,7 +127,6 @@ filegroup( "src/relation_and_scalar.proto", "src/role_id.proto", "src/row.proto", - "src/row/collection.proto", "src/strconv.proto", "src/timestamp.proto", "src/url.proto", From 41efec37920718f1c541961b1dd3db6ec1f69d66 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 12:00:27 +0100 Subject: [PATCH 7/9] Renaming Signed-off-by: Moritz Hoffmann --- src/adapter/src/coord/sequencer.rs | 6 ----- src/expr/src/row/collection.rs | 37 +++++++++++++----------------- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 6d2ded064e4ef..cc3ac2ce4af0e 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -838,12 +838,6 @@ impl Coordinator { let max_returned_query_size = session.vars().max_query_result_size(); let duration_histogram = session.metrics().row_set_finishing_seconds(); - let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); - plan.returning.sort_by(|(row1, _diff1), (row2, _diff2)| { - let borrow1 = datum_vec1.borrow_with(row1); - let borrow2 = datum_vec2.borrow_with(row2); - mz_expr::compare_columns(&finishing.order_by, &borrow1, &borrow2, || row1.cmp(row2)) - }); return match finishing.finish( RowCollection::new(plan.returning, &finishing.order_by), plan.max_result_size, diff --git a/src/expr/src/row/collection.rs b/src/expr/src/row/collection.rs index a89dea9bb7ce5..09b4e206576f8 100644 --- a/src/expr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -24,7 +24,7 @@ use crate::ColumnOrder; include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs")); -/// Collection of [`Row`]s represented as a single blob. +/// Collection of sorted [`Row`]s represented as a single blob. /// /// Note: the encoding format we use to represent [`Row`]s in this struct is /// not stable, and thus should never be persisted durably. @@ -40,6 +40,10 @@ pub struct RowCollection { impl RowCollection { /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`. + /// + /// Note that all row collections must be constructed with the same `order_by` to ensure that + /// the sort order is consistent. Anything else is undefined behavior. + // TODO: Remember the `order_by` and assert that it is the same for all collections. pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self { // Sort data to maintain sortedness invariants. let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); @@ -173,16 +177,7 @@ impl RowCollection { .unwrap_or_else(|| self.runs[index - 1]); let end = self.runs[index]; - // TODO: Remove - for j in start..end { - if j + 1 < end { - assert_ne!( - cmp(self.get(j).unwrap().0, self.get(j + 1).unwrap().0), - std::cmp::Ordering::Greater - ); - } - } - heap.push(Reverse(Finger { + heap.push(Reverse(RunIter { collection: &self, cmp: &cmp, start, @@ -192,11 +187,11 @@ impl RowCollection { let mut view = Vec::with_capacity(self.metadata.len()); - while let Some(Reverse(mut finger)) = heap.pop() { - view.push(finger.start); - finger.start += 1; - if finger.start < finger.end { - heap.push(Reverse(finger)); + while let Some(Reverse(mut run)) = heap.pop() { + view.push(run.start); + run.start += 1; + if run.start < run.end { + heap.push(Reverse(run)); } } @@ -469,14 +464,14 @@ impl IntoRowIterator for SortedRowCollection { } } -struct Finger<'a, F> { +struct RunIter<'a, F> { collection: &'a RowCollection, cmp: &'a F, start: usize, end: usize, } -impl<'a, F> PartialOrd for Finger<'a, F> +impl<'a, F> PartialOrd for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { @@ -485,7 +480,7 @@ where } } -impl<'a, F> Ord for Finger<'a, F> +impl<'a, F> Ord for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { @@ -497,7 +492,7 @@ where } } -impl<'a, F> PartialEq for Finger<'a, F> +impl<'a, F> PartialEq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { @@ -508,7 +503,7 @@ where } } -impl<'a, F> Eq for Finger<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {} +impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {} #[cfg(test)] mod tests { From 537c6ea6e4f6cb276b73da8e492d39fb9c9b2927 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 14:04:35 +0100 Subject: [PATCH 8/9] Cleanup Signed-off-by: Moritz Hoffmann --- src/adapter/src/coord/sequencer.rs | 2 +- src/compute/src/compute_state.rs | 1 - src/expr/src/relation.rs | 17 +---- src/expr/src/row/collection.rs | 118 +++++++++++++++-------------- 4 files changed, 66 insertions(+), 72 deletions(-) diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index cc3ac2ce4af0e..26b16e31f822b 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -18,7 +18,7 @@ use inner::return_if_err; use mz_expr::row::RowCollection; use mz_expr::{MirRelationExpr, RowSetFinishing}; use mz_ore::tracing::OpenTelemetryContext; -use mz_repr::{CatalogItemId, DatumVec, Diff, GlobalId}; +use mz_repr::{CatalogItemId, Diff, GlobalId}; use mz_sql::catalog::CatalogError; use mz_sql::names::ResolvedIds; use mz_sql::plan::{ diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 9b11ea527a661..e82205a0e7de0 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -1079,7 +1079,6 @@ impl PendingPeek { .map(|l| usize::cast_from(u64::from(l))) .unwrap_or(usize::MAX) + peek.finishing.offset; - let order_by = peek.finishing.order_by.clone(); let task_handle = mz_ore::task::spawn(|| "persist::peek", async move { diff --git a/src/expr/src/relation.rs b/src/expr/src/relation.rs index 8223723e2171e..ff00dc1083ef7 100644 --- a/src/expr/src/relation.rs +++ b/src/expr/src/relation.rs @@ -9,7 +9,6 @@ #![warn(missing_docs)] -use std::cell::RefCell; use std::cmp::{max, Ordering}; use std::collections::{BTreeMap, BTreeSet}; use std::fmt; @@ -36,7 +35,7 @@ use mz_repr::explain::{ }; use mz_repr::{ ColumnName, ColumnType, Datum, Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator, - RowRef, ScalarType, + ScalarType, }; use proptest::prelude::{any, Arbitrary, BoxedStrategy}; use proptest::strategy::{Strategy, Union}; @@ -3557,19 +3556,7 @@ impl RowSetFinishing { return Err(format!("result exceeds max size of {max_bytes}",)); } - let left_datum_vec = RefCell::new(mz_repr::DatumVec::new()); - let right_datum_vec = RefCell::new(mz_repr::DatumVec::new()); - - let sort_by = |left: &RowRef, right: &RowRef| { - let (mut left_datum_vec, mut right_datum_vec) = - (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut()); - let left_datums = left_datum_vec.borrow_with(left); - let right_datums = right_datum_vec.borrow_with(right); - compare_columns(&self.order_by, &left_datums, &right_datums, || { - left.cmp(right) - }) - }; - let sorted_view = rows.sorted_view(sort_by); + let sorted_view = rows.sorted_view(&self.order_by); let mut iter = sorted_view .into_row_iter() .apply_offset(self.offset) diff --git a/src/expr/src/row/collection.rs b/src/expr/src/row/collection.rs index 09b4e206576f8..9a70f36a8c206 100644 --- a/src/expr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -9,6 +9,7 @@ //! Defines types for working with collections of [`Row`]. +use std::cell::RefCell; use std::cmp::Reverse; use std::collections::BinaryHeap; use std::num::NonZeroUsize; @@ -34,24 +35,29 @@ pub struct RowCollection { encoded: Bytes, /// Metadata about an individual Row in the blob. metadata: Vec, - /// End of non-empty, sorted runs of rows in number of rows. + /// End of non-empty, sorted runs of rows in index into `metadata`. runs: Vec, } impl RowCollection { /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`. /// - /// Note that all row collections must be constructed with the same `order_by` to ensure that - /// the sort order is consistent. Anything else is undefined behavior. + /// Note that all row collections to be merged must be constructed with the same `order_by` + /// to ensure a consistent sort order. Anything else is undefined behavior. // TODO: Remember the `order_by` and assert that it is the same for all collections. pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self { // Sort data to maintain sortedness invariants. - let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); - rows.sort_by(|(row1, _diff1), (row2, _diff2)| { - let borrow1 = datum_vec1.borrow_with(row1); - let borrow2 = datum_vec2.borrow_with(row2); - crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) - }); + if order_by.is_empty() { + // Skip row decoding if not required. + rows.sort(); + } else { + let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new()); + rows.sort_by(|(row1, _diff1), (row2, _diff2)| { + let borrow1 = datum_vec1.borrow_with(row1); + let borrow2 = datum_vec2.borrow_with(row2); + crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2)) + }); + } // Pre-sizing our buffer should allow us to make just 1 allocation, and // use the perfect amount of memory. @@ -62,11 +68,9 @@ impl RowCollection { let mut encoded = Vec::::with_capacity(encoded_size); let mut metadata = Vec::::with_capacity(rows.len()); - let runs = if rows.is_empty() { - vec![] - } else { - vec![rows.len()] - }; + let runs = (!rows.is_empty()) + .then(|| vec![rows.len()]) + .unwrap_or_default(); for (row, diff) in rows { encoded.extend(row.data()); @@ -85,15 +89,10 @@ impl RowCollection { /// Merge another [`RowCollection`] into `self`. pub fn merge(&mut self, other: &RowCollection) { - if other.is_empty() { - return; - } else if self.is_empty() { - *self = other.clone(); + if other.count(0, None) == 0 { return; } - let self_len = self.metadata.len(); - // TODO(parkmycar): Using SegmentedBytes here would be nice. let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()]; new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]); @@ -103,6 +102,7 @@ impl RowCollection { offset: meta.offset + self.encoded.len(), diff: meta.diff, }); + let self_len = self.metadata.len(); self.metadata.extend(mapped_metas); self.encoded = Bytes::from(new_bytes); @@ -125,12 +125,6 @@ impl RowCollection { total } - /// Returns true iff this collection is empty. Not `pub` because it doesn't take offset/limit - /// into account. - fn is_empty(&self) -> bool { - self.metadata.is_empty() - } - /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection. pub fn entries(&self) -> usize { self.metadata.len() @@ -164,39 +158,54 @@ impl RowCollection { Some((row, upper)) } - /// "Sorts" the [`RowCollection`] by returning a sorted view over the collection. - pub fn sorted_view(self, cmp: F) -> SortedRowCollection + /// "Sorts" the [`RowCollection`] by the column order in `order_by`. Returns a sorted view over + /// the collection. + pub fn sorted_view(self, order_by: &[ColumnOrder]) -> SortedRowCollection { + if order_by.is_empty() { + self.sorted_view_inner(&Ord::cmp) + } else { + let left_datum_vec = RefCell::new(mz_repr::DatumVec::new()); + let right_datum_vec = RefCell::new(mz_repr::DatumVec::new()); + + let cmp = &|left: &RowRef, right: &RowRef| { + let (mut left_datum_vec, mut right_datum_vec) = + (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut()); + let left_datums = left_datum_vec.borrow_with(left); + let right_datums = right_datum_vec.borrow_with(right); + crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right)) + }; + self.sorted_view_inner(cmp) + } + } + + fn sorted_view_inner(self, cmp: &F) -> SortedRowCollection where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { - let mut heap = BinaryHeap::new(); + let mut heap = BinaryHeap::with_capacity(self.runs.len()); for index in 0..self.runs.len() { - let start = (index == 0) - .then_some(0) - .unwrap_or_else(|| self.runs[index - 1]); + let start = (index > 0).then(|| self.runs[index - 1]).unwrap_or(0); let end = self.runs[index]; heap.push(Reverse(RunIter { collection: &self, - cmp: &cmp, - start, - end, + cmp, + range: start..end, })); } let mut view = Vec::with_capacity(self.metadata.len()); while let Some(Reverse(mut run)) = heap.pop() { - view.push(run.start); - run.start += 1; - if run.start < run.end { - heap.push(Reverse(run)); + if let Some(next) = run.range.next() { + view.push(next); + if !run.range.is_empty() { + heap.push(Reverse(run)); + } } } - assert_eq!(view.len(), self.metadata.len()); - SortedRowCollection { collection: self, sorted_view: view.into(), @@ -464,11 +473,11 @@ impl IntoRowIterator for SortedRowCollection { } } +/// Iterator-like struct to help with extracting rows in sorted order from `RowCollection`. struct RunIter<'a, F> { collection: &'a RowCollection, cmp: &'a F, - start: usize, - end: usize, + range: std::ops::Range, } impl<'a, F> PartialOrd for RunIter<'a, F> @@ -485,10 +494,9 @@ where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - (self.cmp)( - self.collection.get(self.start).unwrap().0, - self.collection.get(other.start).unwrap().0, - ) + let left = self.collection.get(self.range.start).unwrap().0; + let right = self.collection.get(other.range.start).unwrap().0; + (self.cmp)(left, right) } } @@ -586,7 +594,7 @@ mod tests { }; let mut rows = [a, b, c, d]; - let sorted_view = col.sorted_view(|a, b| a.cmp(b)); + let sorted_view = col.sorted_view(&[]); rows.sort_by(|a, b| a.cmp(b)); for i in 0..rows.len() { @@ -606,7 +614,7 @@ mod tests { vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[], )); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); let mut iter = col.into_row_iter(); // Peek shouldn't advance the iterator. @@ -633,7 +641,7 @@ mod tests { vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[], )); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); // Test with a reasonable offset that does not span rows. let mut iter = col.into_row_iter().apply_offset(1); @@ -675,7 +683,7 @@ mod tests { vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[], )); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); // Test with a limit that spans only the first row. let mut iter = col.into_row_iter().with_limit(1); @@ -723,7 +731,7 @@ mod tests { fn test_mapped_row_iterator() { let a = Row::pack_slice(&[Datum::String("hello world")]); let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); // Make sure we can call `.map` on a `dyn RowIterator`. let iter: Box = Box::new(col.into_row_iter()); @@ -740,7 +748,7 @@ mod tests { fn test_projected_row_iterator() { let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]); let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); // Project away the first column. let mut iter = col.into_row_iter().with_projection(vec![1]); @@ -795,7 +803,7 @@ mod tests { ], &[], ); - let col = col.sorted_view(|a, b| a.cmp(b)); + let col = col.sorted_view(&[]); // How many total rows there are. let iter = col.into_row_iter(); @@ -891,7 +899,7 @@ mod tests { let mut col = RowCollection::from(&a); col.merge(&RowCollection::from(&b)); - let sorted_view = col.sorted_view(|a, b| a.cmp(b)); + let sorted_view = col.sorted_view(&[]); a.append(&mut b); a.sort_by(|a, b| a.cmp(b)); From 562c3d3d04d00faeee036c58619654a80f14ca59 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 20 Nov 2024 16:33:36 +0100 Subject: [PATCH 9/9] Feedback Signed-off-by: Moritz Hoffmann --- src/expr/src/row/collection.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/expr/src/row/collection.rs b/src/expr/src/row/collection.rs index 9a70f36a8c206..2b090fa232e7d 100644 --- a/src/expr/src/row/collection.rs +++ b/src/expr/src/row/collection.rs @@ -25,7 +25,7 @@ use crate::ColumnOrder; include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs")); -/// Collection of sorted [`Row`]s represented as a single blob. +/// Collection of runs of sorted [`Row`]s represented as a single blob. /// /// Note: the encoding format we use to represent [`Row`]s in this struct is /// not stable, and thus should never be persisted durably. @@ -35,7 +35,7 @@ pub struct RowCollection { encoded: Bytes, /// Metadata about an individual Row in the blob. metadata: Vec, - /// End of non-empty, sorted runs of rows in index into `metadata`. + /// Ends of non-empty, sorted runs of rows in index into `metadata`. runs: Vec, } @@ -505,9 +505,7 @@ where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering, { fn eq(&self, other: &Self) -> bool { - self.partial_cmp(other) - .map(|ordering| ordering == std::cmp::Ordering::Equal) - .unwrap_or(false) + self.cmp(other) == std::cmp::Ordering::Equal } }