diff --git a/Cargo.lock b/Cargo.lock index 2b6f8db62e51d..2693ff55694f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2381,9 +2381,9 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.13.4" +version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75b93af605b7f82fbf6e671a5bb0f940b385e57254a0af59ce6dfb98b8c4b302" +checksum = "62e90a3a356ef9af92925918d36f5206cb74ef834a1ec39c1ed9631d00736686" dependencies = [ "columnar", "fnv", @@ -2393,9 +2393,9 @@ dependencies = [ [[package]] name = "differential-dogs3" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ba29145a1df1bdc3da1142eeb991f0866620c79ce9d85e83a3837f29112ba0" +checksum = "6ecafb7f2d23b07b36a442274be75c4090cfd225b49918aceeba577b119780ea" dependencies = [ "differential-dataflow", "serde", @@ -10434,9 +10434,9 @@ dependencies = [ [[package]] name = "timely" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0506b6506ef04c371ea6151942df60e309e7f5e710458a6b533e364ee0b3cab3" +checksum = "1502aad02b725c0efcd64bac7f2b1b05bf402c2ada6d74cd95b6b9ac252d62ed" dependencies = [ "bincode", "byteorder", diff --git a/src/adapter-types/Cargo.toml b/src/adapter-types/Cargo.toml index d1cdc0ae56507..4c66606fcb08e 100644 --- a/src/adapter-types/Cargo.toml +++ b/src/adapter-types/Cargo.toml @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } serde = "1.0.152" -timely = "0.17.0" +timely = "0.17.1" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [package.metadata.cargo-udeps.ignore] diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 273be043fa7a3..c58b17196ca29 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" @@ -81,7 +81,7 @@ serde_plain = "1.0.1" sha2 = "0.10.6" smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["rt", "time"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 6d1e030684df6..61be0b675f8e1 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" ipnet = "2.5.0" itertools = "0.12.1" @@ -60,7 +60,7 @@ serde_plain = "1.0.1" static_assertions = "1.1" sha2 = "0.10.6" thiserror = "1.0.37" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0" } tracing = "0.1.37" uuid = "1.2.2" diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index 43ca3a040c8c2..11832fc8b08f9 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" crossbeam-channel = "0.5.8" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] } @@ -21,7 +21,7 @@ mz-persist-client = { path = "../persist-client" } mz-service = { path = "../service" } mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index c7ed7aa902e90..1b783735aeb51 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytesize = "1.1.0" crossbeam-channel = "0.5.8" derivative = "2.2.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" futures = "0.3.25" http = "1.1.0" mz-build-info = { path = "../build-info" } @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" thiserror = "1.0.37" -timely = "0.17.0" +timely = "0.17.1" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index 4e368867b8edd..78a7e2bf78f2e 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.4" +differential-dataflow = "0.13.5" itertools = "0.12.1" mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.17.0" +timely = "0.17.1" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index e972dbb2f1782..c00ea84eedcc6 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -16,8 +16,8 @@ bytesize = "1.1.0" columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } -differential-dataflow = "0.13.4" -differential-dogs3 = "0.1.4" +differential-dataflow = "0.13.5" +differential-dogs3 = "0.1.5" futures = "0.3.25" itertools = "0.12.1" lgalloc = "0.4" @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["serde", "union"] } -timely = "0.17.0" +timely = "0.17.1" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 83d2cd12d1f9f..72c7953bed533 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -110,6 +110,7 @@ use std::rc::{Rc, Weak}; use std::sync::Arc; use std::task::Poll; +use columnar::Columnar; use differential_dataflow::dynamic::pointstamp::PointStamp; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; @@ -498,6 +499,7 @@ impl<'g, G, T> Context> where G: Scope, T: Refines + RenderTimestamp, + ::Container: Clone + Send, { pub(crate) fn import_index( &mut self, @@ -647,6 +649,7 @@ impl<'g, G, T> Context> where G: Scope, T: RenderTimestamp, + ::Container: Clone + Send, { pub(crate) fn export_index_iterative( &self, @@ -889,6 +892,8 @@ impl Context where G: Scope, G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Renders a non-recursive plan to a differential dataflow, producing the collection of /// results. @@ -1293,7 +1298,11 @@ where #[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have. /// A timestamp type that can be used for operations within MZ's dataflow layer. -pub trait RenderTimestamp: Timestamp + Lattice + Refines + Columnation { +pub trait RenderTimestamp: + Timestamp + Lattice + Refines + Columnation + Columnar +where + ::Container: Clone + Send, +{ /// The system timestamp component of the timestamp. /// /// This is useful for manipulating the system time, as when delaying @@ -1429,6 +1438,7 @@ impl WithStartSignal for MzArrangementImport where S: Scope, S::Timestamp: RenderTimestamp, + ::Container: Clone + Send, { fn with_start_signal(self, signal: StartSignal) -> Self { match self { @@ -1443,6 +1453,7 @@ impl WithStartSignal for Arranged where S: Scope, S::Timestamp: RenderTimestamp, + ::Container: Clone + Send, Tr: TraceReader + Clone, { fn with_start_signal(self, signal: StartSignal) -> Self { diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index cae8662236652..577afa3861439 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -14,37 +14,40 @@ use std::collections::BTreeMap; use std::rc::Weak; use std::sync::mpsc; +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; -use differential_dataflow::{Collection, Data}; +use differential_dataflow::{AsCollection, Collection, Data}; use mz_compute_types::dataflows::DataflowDescription; use mz_compute_types::plan::{AvailableCollections, LirId}; use mz_expr::{Id, MapFilterProject, MirScalarExpr}; -use mz_repr::fixed_length::{FromDatumIter, ToDatumIter}; +use mz_repr::fixed_length::ToDatumIter; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; +use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::container::columnation::Columnation; use timely::container::CapacityContainerBuilder; -use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::operators::generic::OutputHandleCore; use timely::dataflow::operators::Capability; use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; +use timely::Container; use tracing::error; use crate::arrangement::manager::SpecializedTraceHandle; use crate::compute_state::{ComputeState, HydrationEvent}; -use crate::extensions::arrange::{KeyCollection, MzArrange}; +use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore}; use crate::render::errors::ErrorLogger; use crate::render::{LinearJoinSpec, RenderTimestamp}; -use crate::row_spine::{RowRowBatcher, RowRowBuilder}; +use crate::row_spine::RowRowBuilder; use crate::typedefs::{ ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine, }; @@ -919,10 +922,11 @@ where impl CollectionBundle where - T: timely::progress::Timestamp + Lattice + Columnation, + T: Timestamp + Lattice + Columnation, S: Scope, - S::Timestamp: - Refines + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp, + S::Timestamp: Refines + RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Presents `self` as a stream of updates, having been subjected to `mfp`. /// @@ -1046,7 +1050,8 @@ where .collection .clone() .expect("Collection constructed above"); - let (oks, errs_keyed) = Self::specialized_arrange(&name, oks, &key, &thinning); + let (oks, errs_keyed) = + Self::arrange_collection(&name, oks, key.clone(), thinning.clone()); let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into(); let errs = errs.mz_arrange::, ErrBuilder<_, _>, ErrSpine<_, _>>( &format!("{}-errors", name), @@ -1058,50 +1063,60 @@ where self } - /// Builds a specialized arrangement to provided types. The specialization for key and - /// value types of the arrangement is based on the bit length derived from the corresponding - /// type descriptions. - fn specialized_arrange( + /// Builds an arrangement from a collection, using the specified key and value thinning. + /// + /// The arrangement's key is based on the `key` expressions, and the value the input with + /// the `thinning` applied to it. It selects which of the input columns are included in the + /// value of the arrangement. The thinning is in support of permuting arrangements such that + /// columns in the key are not included in the value. + fn arrange_collection( name: &String, oks: Collection, - key: &Vec, - thinning: &Vec, + key: Vec, + thinning: Vec, ) -> (MzArrangement, Collection) { - // Catch-all: Just use RowRow. + // The following `unary_fallible` implements a `map_fallible`, but produces columnar updates + // for the ok stream. The `map_fallible` cannot be used here because the closure cannot + // return references, which is what we need to push into columnar streams. Instead, we use + // a bespoke operator that also optimizes reuse of allocations across individual updates. let (oks, errs) = oks - .map_fallible::, CapacityContainerBuilder<_>, _, _, _>( + .inner + .unary_fallible::, _, _, _>( + Pipeline, "FormArrangementKey", - specialized_arrangement_key(key.clone(), thinning.clone()), + move |_, _| { + Box::new(move |input, ok, err| { + let mut key_buf = Row::default(); + let mut val_buf = Row::default(); + let mut datums = DatumVec::new(); + let temp_storage = RowArena::new(); + while let Some((time, data)) = input.next() { + let mut ok_session = ok.session_with_builder(&time); + let mut err_session = err.session(&time); + for (row, time, diff) in data.iter() { + temp_storage.clear(); + let datums = datums.borrow_with(row); + let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage)); + match key_buf.packer().try_extend(key_iter) { + Ok(()) => { + let val_datum_iter = thinning.iter().map(|c| datums[*c]); + val_buf.packer().extend(val_datum_iter); + ok_session.give(((&*key_buf, &*val_buf), time, diff)); + } + Err(e) => { + err_session.give((e.into(), time.clone(), *diff)); + } + } + } + } + }) + }, ); - let oks = - oks.mz_arrange::, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name); - (MzArrangement::RowRow(oks), errs) - } -} - -/// Obtains a function that maps input rows to (key, value) pairs according to -/// the given key and thinning expressions. This function allows for specialization -/// of key and value types and is intended to use to form arrangement keys. -fn specialized_arrangement_key( - key: Vec, - thinning: Vec, -) -> impl FnMut(Row) -> Result<(K, V), DataflowError> -where - K: Columnation + Data + FromDatumIter, - V: Columnation + Data + FromDatumIter, -{ - let mut key_buf = K::default(); - let mut val_buf = V::default(); - let mut datums = DatumVec::new(); - move |row| { - // TODO: Consider reusing the `row` allocation; probably in *next* invocation. - let datums = datums.borrow_with(&row); - let temp_storage = RowArena::new(); - let val_datum_iter = thinning.iter().map(|c| datums[*c]); - Ok::<(K, V), DataflowError>(( - key_buf.try_from_datum_iter(key.iter().map(|k| k.eval(&datums, &temp_storage)))?, - val_buf.from_datum_iter(val_datum_iter), - )) + let oks = oks + .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>( + ExchangeCore::, _>::new_core(columnar_exchange::),name + ); + (MzArrangement::RowRow(oks), errs.as_collection()) } } diff --git a/src/compute/src/render/flat_map.rs b/src/compute/src/render/flat_map.rs index 566395c64ffcb..cacbd65d7d042 100644 --- a/src/compute/src/render/flat_map.rs +++ b/src/compute/src/render/flat_map.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_expr::MfpPlan; use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc}; @@ -26,6 +27,7 @@ impl Context where G: Scope, G::Timestamp: crate::render::RenderTimestamp, + ::Container: Clone + Send, { /// Applies a `TableFunc` to every row, followed by an `mfp`. pub fn render_flat_map( @@ -130,6 +132,7 @@ fn drain_through_mfp( >, ) where T: crate::render::RenderTimestamp, + ::Container: Clone + Send, { let temp_storage = RowArena::new(); let binding = SharedRow::get(); diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 7501e9f124e02..b68115fd772d0 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet}; +use columnar::Columnar; use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; @@ -45,7 +46,9 @@ use crate::typedefs::{RowRowAgent, RowRowEnter}; impl Context where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, { /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows. /// @@ -326,7 +329,9 @@ fn dispatch_build_halfjoin_local( ) where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, { match trace { @@ -358,7 +363,9 @@ fn dispatch_build_halfjoin_trace( where G: Scope, T: Timestamp + Lattice + Columnation, - G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines + Columnation, + G::Timestamp: RenderTimestamp + Refines, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static, { match trace { @@ -398,7 +405,9 @@ fn build_halfjoin( ) where G: Scope, - G::Timestamp: crate::render::RenderTimestamp, + G::Timestamp: RenderTimestamp, + ::Container: Clone + Send, + for<'a> ::Ref<'a>: Ord + Copy, Tr: TraceReader