Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch dataflow fragments to columnar #31186

Merged
merged 2 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
serde = "1.0.152"
timely = "0.17.0"
timely = "0.17.1"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[package.metadata.cargo-udeps.ignore]
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
dec = "0.4.8"
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
Expand Down Expand Up @@ -81,7 +81,7 @@ serde_plain = "1.0.1"
sha2 = "0.10.6"
smallvec = { version = "1.10.0", features = ["union"] }
static_assertions = "1.1"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["rt", "time"] }
tokio-postgres = { version = "0.7.8" }
tracing = "0.1.37"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
clap = { version = "4.5.23", features = ["derive"] }
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -60,7 +60,7 @@ serde_plain = "1.0.1"
static_assertions = "1.1"
sha2 = "0.10.6"
thiserror = "1.0.37"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0" }
tracing = "0.1.37"
uuid = "1.2.2"
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ workspace = true
anyhow = "1.0.66"
async-trait = "0.1.83"
crossbeam-channel = "0.5.8"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
mz-cluster-client = { path = "../cluster-client" }
mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
mz-service = { path = "../service" }
mz-txn-wal = { path = "../txn-wal" }
regex = "1.7.0"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.83"
bytesize = "1.1.0"
crossbeam-channel = "0.5.8"
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
http = "1.1.0"
mz-build-info = { path = "../build-info" }
Expand Down Expand Up @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.125"
thiserror = "1.0.37"
timely = "0.17.0"
timely = "0.17.1"
tokio = "1.38.0"
tokio-stream = "0.1.11"
tonic = "0.12.1"
Expand Down
4 changes: 2 additions & 2 deletions src/compute-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
columnar = "0.2.2"
columnation = "0.1.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
itertools = "0.12.1"
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
Expand All @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
timely = "0.17.0"
timely = "0.17.1"
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

Expand Down
6 changes: 3 additions & 3 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ bytesize = "1.1.0"
columnar = "0.2.2"
crossbeam-channel = "0.5.8"
dec = { version = "0.4.8", features = ["serde"] }
differential-dataflow = "0.13.4"
differential-dogs3 = "0.1.4"
differential-dataflow = "0.13.5"
differential-dogs3 = "0.1.5"
futures = "0.3.25"
itertools = "0.12.1"
lgalloc = "0.4"
Expand All @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false }
scopeguard = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
uuid = { version = "1.7.0", features = ["serde", "v4"] }
Expand Down
13 changes: 12 additions & 1 deletion src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::task::Poll;

use columnar::Columnar;
use differential_dataflow::dynamic::pointstamp::PointStamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
Expand Down Expand Up @@ -498,6 +499,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
where
G: Scope<Timestamp = mz_repr::Timestamp>,
T: Refines<G::Timestamp> + RenderTimestamp,
<T as Columnar>::Container: Clone + Send,
{
pub(crate) fn import_index(
&mut self,
Expand Down Expand Up @@ -647,6 +649,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
where
G: Scope<Timestamp = mz_repr::Timestamp>,
T: RenderTimestamp,
<T as Columnar>::Container: Clone + Send,
{
pub(crate) fn export_index_iterative(
&self,
Expand Down Expand Up @@ -889,6 +892,8 @@ impl<G> Context<G>
where
G: Scope,
G::Timestamp: RenderTimestamp,
<G::Timestamp as Columnar>::Container: Clone + Send,
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
{
/// Renders a non-recursive plan to a differential dataflow, producing the collection of
/// results.
Expand Down Expand Up @@ -1293,7 +1298,11 @@ where

#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
/// A timestamp type that can be used for operations within MZ's dataflow layer.
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
pub trait RenderTimestamp:
Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
where
<Self as Columnar>::Container: Clone + Send,
{
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
Expand Down Expand Up @@ -1429,6 +1438,7 @@ impl<S> WithStartSignal for MzArrangementImport<S>
where
S: Scope,
S::Timestamp: RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
{
fn with_start_signal(self, signal: StartSignal) -> Self {
match self {
Expand All @@ -1443,6 +1453,7 @@ impl<S, Tr> WithStartSignal for Arranged<S, Tr>
where
S: Scope,
S::Timestamp: RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
Tr: TraceReader + Clone,
{
fn with_start_signal(self, signal: StartSignal) -> Self {
Expand Down
109 changes: 62 additions & 47 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,40 @@ use std::collections::BTreeMap;
use std::rc::Weak;
use std::sync::mpsc;

use columnar::Columnar;
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::cursor::IntoOwned;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{Collection, Data};
use differential_dataflow::{AsCollection, Collection, Data};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::{AvailableCollections, LirId};
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
use mz_repr::fixed_length::{FromDatumIter, ToDatumIter};
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
use mz_timely_util::operator::{CollectionExt, StreamExt};
use timely::container::columnation::Columnation;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::operators::generic::OutputHandleCore;
use timely::dataflow::operators::Capability;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use timely::Container;
use tracing::error;

use crate::arrangement::manager::SpecializedTraceHandle;
use crate::compute_state::{ComputeState, HydrationEvent};
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
use crate::render::errors::ErrorLogger;
use crate::render::{LinearJoinSpec, RenderTimestamp};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::{
ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
};
Expand Down Expand Up @@ -919,10 +922,11 @@ where

impl<S, T> CollectionBundle<S, T>
where
T: timely::progress::Timestamp + Lattice + Columnation,
T: Timestamp + Lattice + Columnation,
S: Scope,
S::Timestamp:
Refines<T> + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp,
S::Timestamp: Refines<T> + RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
for<'a> <S::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
{
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
///
Expand Down Expand Up @@ -1046,7 +1050,8 @@ where
.collection
.clone()
.expect("Collection constructed above");
let (oks, errs_keyed) = Self::specialized_arrange(&name, oks, &key, &thinning);
let (oks, errs_keyed) =
Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
&format!("{}-errors", name),
Expand All @@ -1058,50 +1063,60 @@ where
self
}

/// Builds a specialized arrangement to provided types. The specialization for key and
/// value types of the arrangement is based on the bit length derived from the corresponding
/// type descriptions.
fn specialized_arrange(
/// Builds an arrangement from a collection, using the specified key and value thinning.
///
/// The arrangement's key is based on the `key` expressions, and the value the input with
/// the `thinning` applied to it. It selects which of the input columns are included in the
/// value of the arrangement. The thinning is in support of permuting arrangements such that
/// columns in the key are not included in the value.
fn arrange_collection(
name: &String,
oks: Collection<S, Row, i64>,
key: &Vec<MirScalarExpr>,
thinning: &Vec<usize>,
key: Vec<MirScalarExpr>,
thinning: Vec<usize>,
) -> (MzArrangement<S>, Collection<S, DataflowError, i64>) {
// Catch-all: Just use RowRow.
// The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
// for the ok stream. The `map_fallible` cannot be used here because the closure cannot
// return references, which is what we need to push into columnar streams. Instead, we use
// a bespoke operator that also optimizes reuse of allocations across individual updates.
let (oks, errs) = oks
.map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
.inner
.unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, i64)>, _, _, _>(
Pipeline,
"FormArrangementKey",
specialized_arrangement_key(key.clone(), thinning.clone()),
move |_, _| {
Box::new(move |input, ok, err| {
let mut key_buf = Row::default();
let mut val_buf = Row::default();
let mut datums = DatumVec::new();
let temp_storage = RowArena::new();
while let Some((time, data)) = input.next() {
let mut ok_session = ok.session_with_builder(&time);
let mut err_session = err.session(&time);
for (row, time, diff) in data.iter() {
temp_storage.clear();
let datums = datums.borrow_with(row);
let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
match key_buf.packer().try_extend(key_iter) {
Ok(()) => {
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
val_buf.packer().extend(val_datum_iter);
ok_session.give(((&*key_buf, &*val_buf), time, diff));
}
Err(e) => {
err_session.give((e.into(), time.clone(), *diff));
}
}
}
}
})
},
);
let oks =
oks.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name);
(MzArrangement::RowRow(oks), errs)
}
}

/// Obtains a function that maps input rows to (key, value) pairs according to
/// the given key and thinning expressions. This function allows for specialization
/// of key and value types and is intended to use to form arrangement keys.
fn specialized_arrangement_key<K, V>(
key: Vec<MirScalarExpr>,
thinning: Vec<usize>,
) -> impl FnMut(Row) -> Result<(K, V), DataflowError>
where
K: Columnation + Data + FromDatumIter,
V: Columnation + Data + FromDatumIter,
{
let mut key_buf = K::default();
let mut val_buf = V::default();
let mut datums = DatumVec::new();
move |row| {
// TODO: Consider reusing the `row` allocation; probably in *next* invocation.
let datums = datums.borrow_with(&row);
let temp_storage = RowArena::new();
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
Ok::<(K, V), DataflowError>((
key_buf.try_from_datum_iter(key.iter().map(|k| k.eval(&datums, &temp_storage)))?,
val_buf.from_datum_iter(val_datum_iter),
))
let oks = oks
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
);
(MzArrangement::RowRow(oks), errs.as_collection())
}
}

Expand Down
Loading