Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort results on replica, merge on envd #30558

Merged
merged 9 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
use mz_compute_types::ComputeInstanceId;
use mz_controller_types::ClusterId;
use mz_expr::explain::{fmt_text_constant_rows, HumanizedExplain, HumanizerMode};
use mz_expr::row::RowCollection;
use mz_expr::{
permutation_for_arrangement, EvalError, Id, MirRelationExpr, MirScalarExpr,
OptimizedMirRelationExpr, RowSetFinishing,
Expand All @@ -37,7 +38,7 @@ use mz_ore::str::{separated, StrExt};
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::explain::text::DisplayText;
use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowCollection, RowIterator};
use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator};
use serde::{Deserialize, Serialize};
use timely::progress::Timestamp;
use uuid::Uuid;
Expand Down Expand Up @@ -481,7 +482,7 @@ impl crate::coord::Coordinator {
));
}
}
let row_collection = RowCollection::new(&results);
let row_collection = RowCollection::new(results, &finishing.order_by);
let duration_histogram = self.metrics.row_set_finishing_seconds();

let (ret, reason) = match finishing.finish(
Expand Down
5 changes: 3 additions & 2 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use inner::return_if_err;
use mz_expr::row::RowCollection;
use mz_expr::{MirRelationExpr, RowSetFinishing};
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::{CatalogItemId, Diff, GlobalId, RowCollection};
use mz_repr::{CatalogItemId, Diff, GlobalId};
use mz_sql::catalog::CatalogError;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{
Expand Down Expand Up @@ -838,7 +839,7 @@ impl Coordinator {
let duration_histogram = session.metrics().row_set_finishing_seconds();

return match finishing.finish(
RowCollection::new(&plan.returning),
RowCollection::new(plan.returning, &finishing.order_by),
plan.max_result_size,
Some(max_returned_query_size),
duration_histogram,
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn main() {
.extern_path(".mz_expr.id", "::mz_expr")
.extern_path(".mz_expr.linear", "::mz_expr")
.extern_path(".mz_expr.relation", "::mz_expr")
.extern_path(".mz_expr.row.collection", "::mz_expr::row")
.extern_path(".mz_expr.scalar", "::mz_expr")
.extern_path(".mz_kafka_util.addr", "::mz_kafka_util")
.extern_path(".mz_persist_client", "::mz_persist_client")
Expand All @@ -38,7 +39,6 @@ fn main() {
.extern_path(".mz_repr.global_id", "::mz_repr::global_id")
.extern_path(".mz_repr.relation_and_scalar", "::mz_repr")
.extern_path(".mz_repr.row", "::mz_repr")
.extern_path(".mz_repr.row.collection", "::mz_repr")
.extern_path(".mz_repr.url", "::mz_repr::url")
.extern_path(".mz_compute_types", "::mz_compute_types")
.extern_path(".mz_cluster_client", "::mz_cluster_client")
Expand Down
4 changes: 2 additions & 2 deletions src/compute-client/src/protocol/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ syntax = "proto3";

package mz_compute_client.protocol.response;

import "expr/src/row/collection.proto";
import "google/protobuf/empty.proto";
import "proto/src/proto.proto";
import "repr/src/antichain.proto";
import "repr/src/global_id.proto";
import "repr/src/row.proto";
import "repr/src/row/collection.proto";

message ProtoComputeResponse {
message ProtoFrontiersKind {
Expand Down Expand Up @@ -59,7 +59,7 @@ message ProtoFrontiersResponse {

message ProtoPeekResponse {
oneof kind {
mz_repr.row.collection.ProtoRowCollection rows = 1;
mz_expr.row.collection.ProtoRowCollection rows = 1;
string error = 2;
google.protobuf.Empty canceled = 3;
}
Expand Down
5 changes: 3 additions & 2 deletions src/compute-client/src/protocol/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
use std::num::NonZeroUsize;

use mz_compute_types::plan::LirId;
use mz_expr::row::RowCollection;
use mz_ore::cast::CastFrom;
use mz_ore::tracing::OpenTelemetryContext;
use mz_proto::{any_uuid, IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Diff, GlobalId, Row, RowCollection};
use mz_repr::{Diff, GlobalId, Row};
use mz_timely_util::progress::any_antichain;
use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Just, Strategy, Union};
Expand Down Expand Up @@ -365,7 +366,7 @@ impl Arbitrary for PeekResponse {
),
1..11,
)
.prop_map(|rows| PeekResponse::Rows(RowCollection::new(&rows)))
.prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[])))
.boxed(),
".*".prop_map(PeekResponse::Error).boxed(),
Just(PeekResponse::Canceled).boxed(),
Expand Down
3 changes: 2 additions & 1 deletion src/compute-client/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use async_trait::async_trait;
use bytesize::ByteSize;
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::lattice::Lattice;
use mz_expr::row::RowCollection;
use mz_ore::assert_none;
use mz_ore::cast::CastFrom;
use mz_repr::{Diff, GlobalId, Row, RowCollection};
use mz_repr::{Diff, GlobalId, Row};
use mz_service::client::{GenericClient, Partitionable, PartitionedState};
use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
use timely::progress::frontier::{Antichain, MutableAntichain};
Expand Down
8 changes: 5 additions & 3 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::flat_plan::FlatPlan;
use mz_compute_types::plan::LirId;
use mz_dyncfg::ConfigSet;
use mz_expr::row::RowCollection;
use mz_expr::SafeMfpPlan;
use mz_ore::cast::CastFrom;
use mz_ore::metrics::UIntGauge;
Expand All @@ -44,7 +45,7 @@ use mz_persist_client::read::ReadHandle;
use mz_persist_client::Diagnostics;
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, RowCollection, Timestamp};
use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
use mz_storage_operators::stats::StatsCursor;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::sources::SourceData;
Expand Down Expand Up @@ -1078,6 +1079,7 @@ impl PendingPeek {
.map(|l| usize::cast_from(u64::from(l)))
.unwrap_or(usize::MAX)
+ peek.finishing.offset;
let order_by = peek.finishing.order_by.clone();

let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
let start = Instant::now();
Expand All @@ -1095,7 +1097,7 @@ impl PendingPeek {
Ok(vec![])
};
let result = match result {
Ok(rows) => PeekResponse::Rows(RowCollection::new(&rows)),
Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
Err(e) => PeekResponse::Error(e.to_string()),
};
match result_tx.send((result, start.elapsed())) {
Expand Down Expand Up @@ -1297,7 +1299,7 @@ impl IndexPeek {
}

let response = match self.collect_finished_data(max_result_size) {
Ok(rows) => PeekResponse::Rows(RowCollection::new(&rows)),
Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &self.peek.finishing.order_by)),
Err(text) => PeekResponse::Error(text),
};
Some(response)
Expand Down
1 change: 1 addition & 0 deletions src/expr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ filegroup(
"src/linear.proto",
"src/relation.proto",
"src/relation/func.proto",
"src/row/collection.proto",
"src/scalar.proto",
"src/scalar/func/format.proto",
"src/scalar/like_pattern.proto",
Expand Down
2 changes: 2 additions & 0 deletions src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ fn main() {
.extern_path(".mz_repr.strconv", "::mz_repr::strconv")
.type_attribute(".", "#[allow(missing_docs)]")
.btree_map(["."])
.bytes([".mz_expr.row.collection"])
.compile_protos(
&[
"expr/src/id.proto",
"expr/src/linear.proto",
"expr/src/relation.proto",
"expr/src/relation/func.proto",
"expr/src/row/collection.proto",
"expr/src/scalar.proto",
"expr/src/scalar/func/format.proto",
"expr/src/scalar/like_pattern.proto",
Expand Down
1 change: 1 addition & 0 deletions src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod relation;
mod scalar;

pub mod explain;
pub mod row;
pub mod virtual_syntax;
pub mod visit;

Expand Down
17 changes: 4 additions & 13 deletions src/expr/src/relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use mz_repr::explain::{
DummyHumanizer, ExplainConfig, ExprHumanizer, IndexUsageType, PlanRenderingContext,
};
use mz_repr::{
ColumnName, ColumnType, Datum, Diff, GlobalId, IntoRowIterator, RelationType, Row,
RowCollection, RowIterator, RowRef, ScalarType, SortedRowCollectionIter,
ColumnName, ColumnType, Datum, Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator,
ScalarType,
};
use proptest::prelude::{any, Arbitrary, BoxedStrategy};
use proptest::strategy::{Strategy, Union};
Expand All @@ -45,6 +45,7 @@ use timely::container::columnation::{Columnation, CopyRegion};

use crate::explain::{HumanizedExpr, HumanizerMode};
use crate::relation::func::{AggregateFunc, LagLeadType, TableFunc};
use crate::row::{RowCollection, SortedRowCollectionIter};
use crate::visit::{Visit, VisitChildren};
use crate::Id::Local;
use crate::{
Expand Down Expand Up @@ -3555,17 +3556,7 @@ impl RowSetFinishing {
return Err(format!("result exceeds max size of {max_bytes}",));
}

let mut left_datum_vec = mz_repr::DatumVec::new();
let mut right_datum_vec = mz_repr::DatumVec::new();

let sort_by = |left: &RowRef, right: &RowRef| {
let left_datums = left_datum_vec.borrow_with(left);
let right_datums = right_datum_vec.borrow_with(right);
compare_columns(&self.order_by, &left_datums, &right_datums, || {
left.cmp(right)
})
};
let sorted_view = rows.sorted_view(sort_by);
let sorted_view = rows.sorted_view(&self.order_by);
let mut iter = sorted_view
.into_row_iter()
.apply_offset(self.offset)
Expand Down
12 changes: 12 additions & 0 deletions src/expr/src/row.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub use crate::row::collection::*;

mod collection;
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

syntax = "proto3";

package mz_repr.row.collection;
package mz_expr.row.collection;

message ProtoRowCollection {
bytes encoded = 1;
repeated ProtoEncodedRowMetadata metadata = 2;
repeated uint64 runs = 3;
}

message ProtoEncodedRowMetadata {
Expand Down
Loading
Loading