Skip to content

Commit

Permalink
Switch dataflow fragments to columnar (#31186)
Browse files Browse the repository at this point in the history
Switch arrange formation and the linear join preparation
phase to use columnar data on dataflow edges.

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru authored Jan 31, 2025
1 parent fd08a63 commit 7ecf0d9
Show file tree
Hide file tree
Showing 35 changed files with 212 additions and 144 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" }
mz-repr = { path = "../repr" }
mz-storage-types = { path = "../storage-types" }
serde = "1.0.152"
timely = "0.17.0"
timely = "0.17.1"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[package.metadata.cargo-udeps.ignore]
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
dec = "0.4.8"
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
Expand Down Expand Up @@ -81,7 +81,7 @@ serde_plain = "1.0.1"
sha2 = "0.10.6"
smallvec = { version = "1.10.0", features = ["union"] }
static_assertions = "1.1"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["rt", "time"] }
tokio-postgres = { version = "0.7.8" }
tracing = "0.1.37"
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ bytesize = "1.1.0"
chrono = { version = "0.4.35", default-features = false, features = ["std"] }
clap = { version = "4.5.23", features = ["derive"] }
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -60,7 +60,7 @@ serde_plain = "1.0.1"
static_assertions = "1.1"
sha2 = "0.10.6"
thiserror = "1.0.37"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0" }
tracing = "0.1.37"
uuid = "1.2.2"
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ workspace = true
anyhow = "1.0.66"
async-trait = "0.1.83"
crossbeam-channel = "0.5.8"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
mz-cluster-client = { path = "../cluster-client" }
mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] }
mz-persist-client = { path = "../persist-client" }
mz-service = { path = "../service" }
mz-txn-wal = { path = "../txn-wal" }
regex = "1.7.0"
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions src/compute-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async-trait = "0.1.83"
bytesize = "1.1.0"
crossbeam-channel = "0.5.8"
derivative = "2.2.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
futures = "0.3.25"
http = "1.1.0"
mz-build-info = { path = "../build-info" }
Expand Down Expand Up @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.125"
thiserror = "1.0.37"
timely = "0.17.0"
timely = "0.17.1"
tokio = "1.38.0"
tokio-stream = "0.1.11"
tonic = "0.12.1"
Expand Down
4 changes: 2 additions & 2 deletions src/compute-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
columnar = "0.2.2"
columnation = "0.1.0"
differential-dataflow = "0.13.4"
differential-dataflow = "0.13.5"
itertools = "0.12.1"
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
Expand All @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
serde = { version = "1.0.152", features = ["derive"] }
timely = "0.17.0"
timely = "0.17.1"
tracing = "0.1.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

Expand Down
6 changes: 3 additions & 3 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ bytesize = "1.1.0"
columnar = "0.2.2"
crossbeam-channel = "0.5.8"
dec = { version = "0.4.8", features = ["serde"] }
differential-dataflow = "0.13.4"
differential-dogs3 = "0.1.4"
differential-dataflow = "0.13.5"
differential-dogs3 = "0.1.5"
futures = "0.3.25"
itertools = "0.12.1"
lgalloc = "0.4"
Expand All @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false }
scopeguard = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
timely = "0.17.0"
timely = "0.17.1"
tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
uuid = { version = "1.7.0", features = ["serde", "v4"] }
Expand Down
13 changes: 12 additions & 1 deletion src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::task::Poll;

use columnar::Columnar;
use differential_dataflow::dynamic::pointstamp::PointStamp;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
Expand Down Expand Up @@ -498,6 +499,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
where
G: Scope<Timestamp = mz_repr::Timestamp>,
T: Refines<G::Timestamp> + RenderTimestamp,
<T as Columnar>::Container: Clone + Send,
{
pub(crate) fn import_index(
&mut self,
Expand Down Expand Up @@ -647,6 +649,7 @@ impl<'g, G, T> Context<Child<'g, G, T>>
where
G: Scope<Timestamp = mz_repr::Timestamp>,
T: RenderTimestamp,
<T as Columnar>::Container: Clone + Send,
{
pub(crate) fn export_index_iterative(
&self,
Expand Down Expand Up @@ -889,6 +892,8 @@ impl<G> Context<G>
where
G: Scope,
G::Timestamp: RenderTimestamp,
<G::Timestamp as Columnar>::Container: Clone + Send,
for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
{
/// Renders a non-recursive plan to a differential dataflow, producing the collection of
/// results.
Expand Down Expand Up @@ -1293,7 +1298,11 @@ where

#[allow(dead_code)] // Some of the methods on this trait are unused, but useful to have.
/// A timestamp type that can be used for operations within MZ's dataflow layer.
pub trait RenderTimestamp: Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation {
pub trait RenderTimestamp:
Timestamp + Lattice + Refines<mz_repr::Timestamp> + Columnation + Columnar
where
<Self as Columnar>::Container: Clone + Send,
{
/// The system timestamp component of the timestamp.
///
/// This is useful for manipulating the system time, as when delaying
Expand Down Expand Up @@ -1429,6 +1438,7 @@ impl<S> WithStartSignal for MzArrangementImport<S>
where
S: Scope,
S::Timestamp: RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
{
fn with_start_signal(self, signal: StartSignal) -> Self {
match self {
Expand All @@ -1443,6 +1453,7 @@ impl<S, Tr> WithStartSignal for Arranged<S, Tr>
where
S: Scope,
S::Timestamp: RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
Tr: TraceReader + Clone,
{
fn with_start_signal(self, signal: StartSignal) -> Self {
Expand Down
109 changes: 62 additions & 47 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,40 @@ use std::collections::BTreeMap;
use std::rc::Weak;
use std::sync::mpsc;

use columnar::Columnar;
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::cursor::IntoOwned;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{Collection, Data};
use differential_dataflow::{AsCollection, Collection, Data};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::{AvailableCollections, LirId};
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
use mz_repr::fixed_length::{FromDatumIter, ToDatumIter};
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder};
use mz_timely_util::operator::{CollectionExt, StreamExt};
use timely::container::columnation::Columnation;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
use timely::dataflow::operators::generic::OutputHandleCore;
use timely::dataflow::operators::Capability;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use timely::Container;
use tracing::error;

use crate::arrangement::manager::SpecializedTraceHandle;
use crate::compute_state::{ComputeState, HydrationEvent};
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
use crate::render::errors::ErrorLogger;
use crate::render::{LinearJoinSpec, RenderTimestamp};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::{
ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
};
Expand Down Expand Up @@ -919,10 +922,11 @@ where

impl<S, T> CollectionBundle<S, T>
where
T: timely::progress::Timestamp + Lattice + Columnation,
T: Timestamp + Lattice + Columnation,
S: Scope,
S::Timestamp:
Refines<T> + Lattice + timely::progress::Timestamp + crate::render::RenderTimestamp,
S::Timestamp: Refines<T> + RenderTimestamp,
<S::Timestamp as Columnar>::Container: Clone + Send,
for<'a> <S::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
{
/// Presents `self` as a stream of updates, having been subjected to `mfp`.
///
Expand Down Expand Up @@ -1046,7 +1050,8 @@ where
.collection
.clone()
.expect("Collection constructed above");
let (oks, errs_keyed) = Self::specialized_arrange(&name, oks, &key, &thinning);
let (oks, errs_keyed) =
Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
&format!("{}-errors", name),
Expand All @@ -1058,50 +1063,60 @@ where
self
}

/// Builds a specialized arrangement to provided types. The specialization for key and
/// value types of the arrangement is based on the bit length derived from the corresponding
/// type descriptions.
fn specialized_arrange(
/// Builds an arrangement from a collection, using the specified key and value thinning.
///
/// The arrangement's key is based on the `key` expressions, and the value the input with
/// the `thinning` applied to it. It selects which of the input columns are included in the
/// value of the arrangement. The thinning is in support of permuting arrangements such that
/// columns in the key are not included in the value.
fn arrange_collection(
name: &String,
oks: Collection<S, Row, i64>,
key: &Vec<MirScalarExpr>,
thinning: &Vec<usize>,
key: Vec<MirScalarExpr>,
thinning: Vec<usize>,
) -> (MzArrangement<S>, Collection<S, DataflowError, i64>) {
// Catch-all: Just use RowRow.
// The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
// for the ok stream. The `map_fallible` cannot be used here because the closure cannot
// return references, which is what we need to push into columnar streams. Instead, we use
// a bespoke operator that also optimizes reuse of allocations across individual updates.
let (oks, errs) = oks
.map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
.inner
.unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, i64)>, _, _, _>(
Pipeline,
"FormArrangementKey",
specialized_arrangement_key(key.clone(), thinning.clone()),
move |_, _| {
Box::new(move |input, ok, err| {
let mut key_buf = Row::default();
let mut val_buf = Row::default();
let mut datums = DatumVec::new();
let temp_storage = RowArena::new();
while let Some((time, data)) = input.next() {
let mut ok_session = ok.session_with_builder(&time);
let mut err_session = err.session(&time);
for (row, time, diff) in data.iter() {
temp_storage.clear();
let datums = datums.borrow_with(row);
let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
match key_buf.packer().try_extend(key_iter) {
Ok(()) => {
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
val_buf.packer().extend(val_datum_iter);
ok_session.give(((&*key_buf, &*val_buf), time, diff));
}
Err(e) => {
err_session.give((e.into(), time.clone(), *diff));
}
}
}
}
})
},
);
let oks =
oks.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(name);
(MzArrangement::RowRow(oks), errs)
}
}

/// Obtains a function that maps input rows to (key, value) pairs according to
/// the given key and thinning expressions. This function allows for specialization
/// of key and value types and is intended to use to form arrangement keys.
fn specialized_arrangement_key<K, V>(
key: Vec<MirScalarExpr>,
thinning: Vec<usize>,
) -> impl FnMut(Row) -> Result<(K, V), DataflowError>
where
K: Columnation + Data + FromDatumIter,
V: Columnation + Data + FromDatumIter,
{
let mut key_buf = K::default();
let mut val_buf = V::default();
let mut datums = DatumVec::new();
move |row| {
// TODO: Consider reusing the `row` allocation; probably in *next* invocation.
let datums = datums.borrow_with(&row);
let temp_storage = RowArena::new();
let val_datum_iter = thinning.iter().map(|c| datums[*c]);
Ok::<(K, V), DataflowError>((
key_buf.try_from_datum_iter(key.iter().map(|k| k.eval(&datums, &temp_storage)))?,
val_buf.from_datum_iter(val_datum_iter),
))
let oks = oks
.mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
);
(MzArrangement::RowRow(oks), errs.as_collection())
}
}

Expand Down
Loading

0 comments on commit 7ecf0d9

Please sign in to comment.