From f2e0782d790808a427a84668c5a2b386145fd87c Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 24 Jan 2025 20:22:39 +0100 Subject: [PATCH] Switch some dataflow fragments to columnar Change the dataflow fragment for `specialized_arrange` and in the linear join preparation phase to use columnar data on edges. Signed-off-by: Moritz Hoffmann --- Cargo.lock | 12 +-- src/adapter-types/Cargo.toml | 2 +- src/adapter/Cargo.toml | 4 +- src/catalog/Cargo.toml | 4 +- src/cluster/Cargo.toml | 4 +- src/compute-client/Cargo.toml | 4 +- src/compute-types/Cargo.toml | 4 +- src/compute/Cargo.toml | 6 +- src/compute/src/render.rs | 13 ++- src/compute/src/render/context.rs | 99 ++++++++++++---------- src/compute/src/render/flat_map.rs | 3 + src/compute/src/render/join/delta_join.rs | 29 +++++-- src/compute/src/render/join/linear_join.rs | 80 ++++++++++------- src/compute/src/render/sinks.rs | 10 ++- src/compute/src/render/top_k.rs | 9 +- src/controller/Cargo.toml | 2 +- src/durable-cache/Cargo.toml | 4 +- src/environmentd/Cargo.toml | 2 +- src/expr/Cargo.toml | 2 +- src/interchange/Cargo.toml | 4 +- src/persist-cli/Cargo.toml | 4 +- src/persist-client/Cargo.toml | 4 +- src/persist-types/Cargo.toml | 2 +- src/persist/Cargo.toml | 4 +- src/repr/Cargo.toml | 4 +- src/repr/src/row.rs | 5 ++ src/service/Cargo.toml | 2 +- src/storage-client/Cargo.toml | 4 +- src/storage-controller/Cargo.toml | 4 +- src/storage-operators/Cargo.toml | 4 +- src/storage-types/Cargo.toml | 4 +- src/storage/Cargo.toml | 4 +- src/timely-util/Cargo.toml | 4 +- src/transform/Cargo.toml | 2 +- src/txn-wal/Cargo.toml | 4 +- 35 files changed, 209 insertions(+), 143 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6797f59041cc5..305db5218993d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2369,9 +2369,9 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.13.4" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75b93af605b7f82fbf6e671a5bb0f940b385e57254a0af59ce6dfb98b8c4b302" +checksum = "62e90a3a356ef9af92925918d36f5206cb74ef834a1ec39c1ed9631d00736686" dependencies = [ "columnar", "fnv", @@ -2381,9 +2381,9 @@ dependencies = [ [[package]] name = "differential-dogs3" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ba29145a1df1bdc3da1142eeb991f0866620c79ce9d85e83a3837f29112ba0" +checksum = "6ecafb7f2d23b07b36a442274be75c4090cfd225b49918aceeba577b119780ea" dependencies = [ "differential-dataflow", "serde", @@ -10406,9 +10406,9 @@ dependencies = [ [[package]] name = "timely" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0506b6506ef04c371ea6151942df60e309e7f5e710458a6b533e364ee0b3cab3" +checksum = "1502aad02b725c0efcd64bac7f2b1b05bf402c2ada6d74cd95b6b9ac252d62ed" dependencies = [ "bincode", "byteorder", diff --git a/src/adapter-types/Cargo.toml b/src/adapter-types/Cargo.toml index d1cdc0ae56507..4c66606fcb08e 100644 --- a/src/adapter-types/Cargo.toml +++ b/src/adapter-types/Cargo.toml @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } serde = "1.0.152" -timely = "0.17.0" +timely = "0.17.1" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [package.metadata.cargo-udeps.ignore] diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 92c489f39a4e5..051b3bc47e9d4 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" @@ -78,7 +78,7 @@ serde_plain = "1.0.1" sha2 = "0.10.6" smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["rt", "time"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 6d1e030684df6..61be0b675f8e1 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" ipnet = "2.5.0" itertools = "0.12.1" @@ -60,7 +60,7 @@ serde_plain = "1.0.1" static_assertions = "1.1" sha2 = "0.10.6" thiserror = "1.0.37" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0" } tracing = "0.1.37" uuid = "1.2.2" diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index 43ca3a040c8c2..11832fc8b08f9 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" crossbeam-channel = "0.5.8" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] } @@ -21,7 +21,7 @@ mz-persist-client = { path = "../persist-client" } mz-service = { path = "../service" } mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index c7ed7aa902e90..1b783735aeb51 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytesize = "1.1.0" crossbeam-channel = "0.5.8" derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" http = "1.1.0" mz-build-info = { path = "../build-info" } @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" thiserror = "1.0.37" -timely = "0.17.0" +timely = "0.17.1" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index 4e368867b8edd..78a7e2bf78f2e 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" itertools = "0.12.1" mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.17.0" +timely = "0.17.1" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index e972dbb2f1782..c00ea84eedcc6 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -16,8 +16,8 @@ bytesize = "1.1.0" columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } -differential-dataflow = "0.13.4" -differential-dogs3 = "0.1.4" +differential-dataflow = "0.13.5" +differential-dogs3 = "0.1.5" futures = "0.3.25" itertools = "0.12.1" lgalloc = "0.4" @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["serde", "union"] } -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 83d2cd12d1f9f..72c7953bed533 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -110,6 +110,7 @@ use std::rc::{Rc, Weak}; use std::sync::Arc; use std::task::Poll; +use columnar::Columnar; use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; @@ -498,6 +499,7 @@ impl<'g, G, T> Context> where G: Scope, T: Refines + RenderTimestamp, + ::Container: Clone + Send, { pub(crate) fn import_index( &mut self, @@ -647,6 +649,7 @@ impl<'g, G, T> Context> where G: Scope, T: RenderTimestamp, + ::Container: Clone + Send, { pub(crate) fn export_index_iterative( &self, @@ -889,6 +892,8 @@ impl Context where G: Scope, G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Renders a non-recursive plan to a differential dataflow, producing the collection of /// results. @@ -1293,7 +1298,11 @@ where #[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have. /// A timestamp type that can be used for operations within MZ's dataflow layer. -pub trait RenderTimestamp: Timestamp + Lattice + Refines + Columnation { +pub trait RenderTimestamp: + Timestamp + Lattice + Refines + Columnation + Columnar +where + ::Container: Clone + Send, +{ /// The system timestamp component of the timestamp. /// /// This is useful for manipulating the system time, as when delaying @@ -1429,6 +1438,7 @@ impl WithStartSignal for MzArrangementImport where S: Scope, S::Timestamp: RenderTimestamp, + ::Container: Clone + Send, { fn with_start_signal(self, signal: StartSignal) -> Self { match self { @@ -1443,6 +1453,7 @@ impl WithStartSignal for Arranged where S: Scope, S::Timestamp: RenderTimestamp, + ::Container: Clone + Send, Tr: TraceReader + Clone, { fn with_start_signal(self, signal: StartSignal) -> Self { diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index cae8662236652..af3e62605e494 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -14,37 +14,40 @@ use std::collections::BTreeMap; use std::rc::Weak; use std::sync::mpsc; +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; -use differential_dataflow::{Collection, Data}; +use differential_dataflow::{AsCollection, Collection, Data}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::{AvailableCollections, LirId}; use mz_expr::{Id, MapFilterProject, MirScalarExpr}; -use mz_repr::fixed_length::{FromDatumIter, ToDatumIter}; +use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; +use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::container::columnation::Columnation; use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::generic::OutputHandleCore; use timely::dataflow::operators::Capability; use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; +use timely::Container; use tracing::error; use crate::arrangement::manager::SpecializedTraceHandle; use crate::compute_state::{ComputeState, HydrationEvent}; -use crate::extensions::arrange::{KeyCollection, MzArrange}; +use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; use crate::render::errors::ErrorLogger; use crate::render::{LinearJoinSpec, RenderTimestamp}; -use crate::row_spine::{RowRowBatcher, RowRowBuilder}; +use crate::row_spine::RowRowBuilder; use crate::typedefs::{ ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine, }; @@ -919,10 +922,11 @@ where impl CollectionBundle where - T: timely::progress::Timestamp + Lattice + Columnation, + T: Timestamp + Lattice + Columnation, S: Scope, - S::Timestamp: - Refines + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp, + S::Timestamp: Refines + RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Presents `self` as a stream of updates, having been subjected to `mfp`. /// @@ -1046,7 +1050,8 @@ where .collection .clone() .expect("Collection constructed above"); - let (oks, errs_keyed) = Self::specialized_arrange(&name, oks, &key, &thinning); + let (oks, errs_keyed) = + Self::specialized_arrange(&name, oks, key.clone(), thinning.clone()); let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into(); let errs = errs.mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( &format!("{}-errors", name), @@ -1064,44 +1069,48 @@ where fn specialized_arrange( name: &String, oks: Collection, - key: &Vec, - thinning: &Vec, + key: Vec, + thinning: Vec, ) -> (MzArrangement, Collection) { - // Catch-all: Just use RowRow. - let (oks, errs) = oks - .map_fallible::, CapacityContainerBuilder<_>, _, _, _>( - "FormArrangementKey", - specialized_arrangement_key(key.clone(), thinning.clone()), + let (oks, errs) = + oks.inner + .unary_fallible::, _, _, _>( + Pipeline, + "FormArrangementKey", + move |_, _| { + Box::new(move |input, ok, err| { + let mut key_buf = Row::default(); + let mut val_buf = Row::default(); + let mut datums = DatumVec::new(); + let temp_storage = RowArena::new(); + while let Some((time, data)) = input.next() { + let mut ok_session = ok.session_with_builder(&time); + let mut err_session = err.session(&time); + for (row, time, diff) in data.iter() { + temp_storage.clear(); + let datums = datums.borrow_with(row); + let val_datum_iter = thinning.iter().map(|c| datums[*c]); + match key_buf.packer().try_extend( + key.iter().map(|k| k.eval(&datums, &temp_storage)), + ) { + Ok(()) => { + val_buf.packer().extend(val_datum_iter); + ok_session.give(((&*key_buf, &*val_buf), time, diff)); + } + Err(e) => { + err_session.give((e.into(), time.clone(), *diff)); + } + } + } + } + }) + }, + ); + let oks = oks + .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + ExchangeCore::, _>::new_core(columnar_exchange::),name ); - let oks = - oks.mz_arrange::, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name); - (MzArrangement::RowRow(oks), errs) - } -} - -/// Obtains a function that maps input rows to (key, value) pairs according to -/// the given key and thinning expressions. This function allows for specialization -/// of key and value types and is intended to use to form arrangement keys. -fn specialized_arrangement_key( - key: Vec, - thinning: Vec, -) -> impl FnMut(Row) -> Result<(K, V), DataflowError> -where - K: Columnation + Data + FromDatumIter, - V: Columnation + Data + FromDatumIter, -{ - let mut key_buf = K::default(); - let mut val_buf = V::default(); - let mut datums = DatumVec::new(); - move |row| { - // TODO: Consider reusing the `row` allocation; probably in *next* invocation. - let datums = datums.borrow_with(&row); - let temp_storage = RowArena::new(); - let val_datum_iter = thinning.iter().map(|c| datums[*c]); - Ok::<(K, V), DataflowError>(( - key_buf.try_from_datum_iter(key.iter().map(|k| k.eval(&datums, &temp_storage)))?, - val_buf.from_datum_iter(val_datum_iter), - )) + (MzArrangement::RowRow(oks), errs.as_collection()) } } diff --git a/src/compute/src/render/flat_map.rs b/src/compute/src/render/flat_map.rs index 566395c64ffcb..cacbd65d7d042 100644 --- a/src/compute/src/render/flat_map.rs +++ b/src/compute/src/render/flat_map.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_expr::MfpPlan; use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc}; @@ -26,6 +27,7 @@ impl Context where G: Scope, G::Timestamp: crate::render::RenderTimestamp, + ::Container: Clone + Send, { /// Applies a `TableFunc` to every row, followed by an `mfp`. pub fn render_flat_map( @@ -130,6 +132,7 @@ fn drain_through_mfp( >, ) where T: crate::render::RenderTimestamp, + ::Container: Clone + Send, { let temp_storage = RowArena::new(); let binding = SharedRow::get(); diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 7501e9f124e02..b68115fd772d0 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; @@ -45,7 +46,9 @@ use crate::typedefs::{RowRowAgent, RowRowEnter}; impl Context where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows. /// @@ -326,7 +329,9 @@ fn dispatch_build_halfjoin_local( ) where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, { match trace { @@ -358,7 +363,9 @@ fn dispatch_build_halfjoin_trace( where G: Scope, T: Timestamp + Lattice + Columnation, - G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines + Columnation, + G::Timestamp: RenderTimestamp + Refines, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, { match trace { @@ -398,7 +405,9 @@ fn build_halfjoin( ) where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, Tr: TraceReader