Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort results on replica, merge on envd #30558

Merged
merged 9 commits into from
Nov 20, 2024

Conversation

antiguru
Copy link
Member

@antiguru antiguru commented Nov 19, 2024

Sort results on replica, merge on environmentd.

Previously, we'd sort data only on evironmentd, which would cause it to consume more CPU than necessary. This change moves some of the sorting to clusterd, and only leaves the last merge step on environmentd.

The PR selects a minimal approach, and leaves most of the code related to result finishing untouched. It introduces an invariant that peek results must always be sorted according to the finishing, anything else will lead to undefined results. However, there's nothing that enforces the results to be sorted with the same ordering, which is potentially bad. Inside environmentd, it uses a simple heap to combine $k$ sorted runs into a single permutation map.

The interfaces to RowCollection (new, sorted_view) now take a &[ColumnOrder], and internally the implementation picks the right comparison function. If the column order slice is empty, it'll skip decoding the rows and directly defer to the tiebreaker.

The PR moves the RowCollection type into mz-expr, which isn't ideal. This is required because the ColumnOrder type is defined here, and we'd like to pass it to the constructor of the type. Alternatives would be to have a function here that passes the correct comparison function to RowCollection, but that seems to be strictly worse than moving the type.
I considered moving the type to compute-types, which seems a better fit, but not all uses of RowCollection depend on compute-types. If this is upsetting, I can think about alternatives.

This complexity for sorting on the cluster is roughly $\frac{n}{k}\cdot\log \frac{n}{k}$, where $n$ is the total number of result records, and $k$ the number of workers. The last merge step then has a time complexity of $n\cdot\log k$ to combine $k$ sorted runs into one.

Follow-up items include:

  • Avoid a single Bytes allocation for all rows, and instead keep the individual allocations.
  • Assert that all RowCollections are sorted equally.
  • Move the binary heap into an iterator to avoid materializing the sorted view permutation.

Tips to the reviewer

Don't look at individual commits.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@antiguru antiguru requested review from a team as code owners November 19, 2024 13:06
@antiguru antiguru marked this pull request as draft November 19, 2024 13:08
Copy link
Member

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woohoo! Love to see this happening. IMO the biggest feedback I have is enforcing the invariant that peek results must sorted at the type level, and maybe at the same time reducing the repetitiveness of creating DatumVecs and calling .sort_by(...).

It seems like everywhere we currently sort a Vec<Row> we're immediately passing the results into RowCollection::new. What if we push the sorting into RowCollection::new? i.e.

impl RowCollection {
  pub fn new(mut rows: Vec<Row>, finishing: &RowSetFinishing) -> Self { ... }
}

At which point RowCollection is sorted so what's the point of SortedRowCollection? It kind of feels like RowCollection could naturally become a SortedRowRun and then SortedRowCollection becomes a collection of SortedRowRuns? e.g.

struct SortedRowRun {
    encoded: Bytes,
    metadata: Arc<[EncodedRowMetadata]>,
}

struct SortedRowCollection {
    runs: Vec<SortedRowRun>,
}

This is a much larger change, and I think only part one (pushing the sort into RowCollection) is enough to get this across the line because it mostly solves the invariant that a RowCollection must be sorted. But I think we can still do part 2 without having to touch the code related to result finishing since most of that should use a Box<dyn RowIterator> IIRC.

Comment on lines 178 to 207
while let Some(Reverse(mut finger)) = heap.pop() {
view.push(finger.start);
finger.start += 1;
if finger.start < finger.end {
heap.push(Reverse(finger));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is a great opportunity to push this logic into SortedRowCollection or SortedRowCollectionIter maybe? i.e. as folks iterate through a row collection is when we do this streaming merge sort?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I assume we're going to iterate through the result rows only once when we send them over the wire, so we can avoid having the extra view buffer around and decoding the rows twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, we need to iterate multiple times to determine the size of the whole result. I agree we shouldn't (and there is no deep reason we have to), but it requires more changes.

src/repr/src/row/collection.proto Outdated Show resolved Hide resolved
@@ -33,6 +35,8 @@ pub struct RowCollection {
encoded: Bytes,
/// Metadata about an individual Row in the blob.
metadata: Vec<EncodedRowMetadata>,
/// Start of sorted runs of rows in rows.
fingers: Vec<usize>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation here confused me. This field actually stores the indexes of the ends of sorted runs, right? Is there a reason for that? It does feel like storing the start indexes would be more natural.

I know you said the PR lacks documentation, so if you still planned to adjust it here then nvm!

Comment on lines 178 to 207
while let Some(Reverse(mut finger)) = heap.pop() {
view.push(finger.start);
finger.start += 1;
if finger.start < finger.end {
heap.push(Reverse(finger));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! I assume we're going to iterate through the result rows only once when we send them over the wire, so we can avoid having the extra view buffer around and decoding the rows twice.

@teskje
Copy link
Contributor

teskje commented Nov 20, 2024

We discussed a bit offline already, an my understanding is that this PR is meant to stop the bleeding with an as-small-as-possible diff and do improvements as a follow-up. That plan is fine with me.

IMO the biggest feedback I have is enforcing the invariant that peek results must sorted at the type level

I agree with that. Once concern is that we add some place where we create PeekResponse::Rows but forget that the contained rows are expected to be sorted runs. Currently there is an assert in the merging but we don't want to keep that in prod, so it'd be easy to end up with incorrect results returned to the user. So PeekResponse::Rows should contain a type that ensures the sorting invariants we need.

Another reason for wanting such a type is that we sometimes don't have to sort! Specifically, if the order_by is empty, we can return the data in any order, I think. I initially thought that wasn't the case because results from different workers can cancel out, but since their diffs are NonZeroU64, they can only add up but never cancel. Don't trust me on this, I'm probably missing something about how peek finishing works.

But if it's true that we don't need to sort if the order_by is empty, then we want a type that knows about the order_by and does the right thing (sorting or not) depending on it. For example:

struct RowRuns {
    runs: Vec<RowCollection>,
    order_by: Vec<ColumnOrder>,
}

impl RowRuns {
    fn push(&mut self, mut rows: Vec<(Row, NonZeroU64)>) {
        if !self.order_by.is_empty() {
            sort(&mut rows, &self.order_by);
        }
        self.runs.push(RowCollection::new(&rows));
    }
}

@antiguru
Copy link
Member Author

Another reason for wanting such a type is that we sometimes don't have to sort!

I agree that strictly speaking there are cases where we don't have to sort, but I'm not comfortable changing the invariant as part of this PR. We might have downstream code that relies on a certain row order, as well as our tests, so I'd like to separate this from the current effort.

@antiguru
Copy link
Member Author

It kind of feels like RowCollection could naturally become a SortedRowRun and then SortedRowCollection becomes a collection of SortedRowRuns?

I agree it could! It's a non-trivial departure to what we currently have: At the moment, we allow to index into a RowCollection and the sorted variants, which we use primarily to iterate. If we want to avoid the index lookup, we could change the iterator to sit on the binary heap, but then we'd need to be careful not to clone the iterator -- the cost of iterating would be $n\log k$ instead of $n$.

@teskje
Copy link
Contributor

teskje commented Nov 20, 2024

I agree that strictly speaking there are cases where we don't have to sort, but I'm not comfortable changing the invariant as part of this PR. We might have downstream code that relies on a certain row order, as well as our tests, so I'd like to separate this from the current effort.

Yes, I'm very much in favor of taking small steps! Just wanted to record my thoughts for follow-ups we can/should do. Also partly to check my thinking around whether or not sorting is necessary.

@antiguru antiguru marked this pull request as ready for review November 20, 2024 13:14
@antiguru antiguru requested a review from a team as a code owner November 20, 2024 13:14
@antiguru antiguru requested review from ParkMyCar and teskje November 20, 2024 13:14
Copy link

shepherdlybot bot commented Nov 20, 2024

Risk Score:80 / 100 Bug Hotspots:0 Resilience Coverage:16%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Feature Flag
  • (Required) Integration Test
  • (Required) Observability
  • (Required) QA Review
  • (Required) Run Nightly Tests
  • Unit Test
Risk Summary:

The pull request has a high risk score of 80, driven by predictors such as the "Sum Bug Reports Of Files" and the "Delta of Executable Lines." Historically, PRs with these predictors are 116% more likely to cause a bug than the repository baseline. The observed bug trend in the repository is steady.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

@antiguru
Copy link
Member Author

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

src/expr/src/row/collection.rs Outdated Show resolved Hide resolved
src/expr/src/row/collection.rs Outdated Show resolved Hide resolved
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Copy link
Member

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sorry I didn't realize pushing the sort into RowCollection would require moving the struct to the expr crate 🙈 thanks for making that change!

@antiguru antiguru enabled auto-merge (squash) November 20, 2024 16:22
@antiguru
Copy link
Member Author

Thanks for the reviews!

@antiguru antiguru merged commit 11593f4 into MaterializeInc:main Nov 20, 2024
81 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants