Skip to content

Commit 5f7ff7c

Browse files
committed
Switch KeyValSpine to flatcontainer
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 92b64c4 commit 5f7ff7c

File tree

18 files changed

+826
-203
lines changed

18 files changed

+826
-203
lines changed

Cargo.lock

Lines changed: 11 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,22 @@ debug = 2
176176
# tend to get rewritten or disappear (e.g., because a PR is force pushed or gets
177177
# merged), after which point it becomes impossible to build that historical
178178
# version of Materialize.
179+
[patch."https://github.com/TimelyDataflow/timely-dataflow"]
180+
# Projects that do not reliably release to crates.io.
181+
timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
182+
timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
183+
timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
184+
timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
185+
timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
179186
[patch.crates-io]
180187
# Projects that do not reliably release to crates.io.
181188
timely = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
182189
timely_bytes = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
183190
timely_communication = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
184191
timely_container = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
185192
timely_logging = { git = "https://github.com/MaterializeInc/timely-dataflow.git" }
186-
differential-dataflow = { git = "https://github.com/MaterializeInc/differential-dataflow.git" }
187-
dogsdogsdogs = { git = "https://github.com/MaterializeInc/differential-dataflow.git" }
193+
differential-dataflow = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" }
194+
dogsdogsdogs = { git = "https://github.com/antiguru/differential-dataflow.git", branch = "consolidate_layout_merger_chunk" }
188195

189196
# Waiting on https://github.com/sfackler/rust-postgres/pull/752.
190197
postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }

misc/cargo-vet/audits.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# cargo-vet audits file
32

43
[criteria.maintained-and-necessary]
@@ -281,7 +280,7 @@ version = "23.5.26"
281280
[[audits.flatcontainer]]
282281
who = "Moritz Hoffmann <[email protected]>"
283282
criteria = "safe-to-deploy"
284-
version = "0.4.1"
283+
version = "0.5.0"
285284

286285
[[audits.fluent-uri]]
287286
who = "Nikhil Benesch <[email protected]>"

src/cluster/src/communication.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
use std::any::Any;
3737
use std::cmp::Ordering;
3838
use std::fmt::Display;
39+
use std::sync::Arc;
3940
use std::time::Duration;
4041

4142
use anyhow::Context;
@@ -109,7 +110,7 @@ where
109110
}
110111
}
111112

112-
match initialize_networking_from_sockets(sockets, process, workers, Box::new(|_| None)) {
113+
match initialize_networking_from_sockets(sockets, process, workers, Arc::new(|_| None)) {
113114
Ok((stuff, guard)) => {
114115
info!(process = process, "successfully initialized network");
115116
Ok((

src/compute/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ mz-compute-types = { path = "../compute-types" }
3030
mz-dyncfg = { path = "../dyncfg" }
3131
mz-dyncfgs = { path = "../dyncfgs" }
3232
mz-expr = { path = "../expr" }
33-
mz-ore = { path = "../ore", features = ["async", "flatcontainer", "process", "tracing_"] }
33+
mz-ore = { path = "../ore", features = ["async", "differential", "flatcontainer", "process", "tracing_"] }
3434
mz-persist-client = { path = "../persist-client" }
3535
mz-persist-types = { path = "../persist-types" }
3636
mz-repr = { path = "../repr" }

src/compute/src/extensions/arrange.rs

Lines changed: 7 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use timely::progress::Timestamp;
2323
use timely::Container;
2424

2525
use crate::logging::compute::ComputeEvent;
26-
use crate::typedefs::{KeyAgent, KeyValAgent, RowAgent, RowRowAgent, RowValAgent};
26+
use crate::typedefs::{KeyAgent, RowAgent, RowRowAgent, RowValAgent};
2727

2828
/// Extension trait to arrange data.
2929
pub trait MzArrange: MzArrangeCore
@@ -270,36 +270,6 @@ where
270270
}
271271
}
272272

273-
impl<G, K, V, T, R> ArrangementSize for Arranged<G, KeyValAgent<K, V, T, R>>
274-
where
275-
G: Scope<Timestamp = T>,
276-
G::Timestamp: Lattice + Ord + Columnation,
277-
K: Data + Columnation,
278-
V: Data + Columnation,
279-
T: Lattice + Timestamp,
280-
R: Semigroup + Ord + Columnation + 'static,
281-
{
282-
fn log_arrangement_size(self) -> Self {
283-
log_arrangement_size_inner(self, |trace| {
284-
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
285-
let mut callback = |siz, cap| {
286-
size += siz;
287-
capacity += cap;
288-
allocations += usize::from(cap > 0);
289-
};
290-
trace.map_batches(|batch| {
291-
batch.storage.keys.heap_size(&mut callback);
292-
batch.storage.keys_offs.heap_size(&mut callback);
293-
batch.storage.vals.heap_size(&mut callback);
294-
batch.storage.vals_offs.heap_size(&mut callback);
295-
batch.storage.times.heap_size(&mut callback);
296-
batch.storage.diffs.heap_size(&mut callback);
297-
});
298-
(size, capacity, allocations)
299-
})
300-
}
301-
}
302-
303273
impl<G, K, T, R> ArrangementSize for Arranged<G, KeyAgent<K, T, R>>
304274
where
305275
G: Scope<Timestamp = T>,
@@ -415,8 +385,8 @@ mod flatcontainer {
415385
use differential_dataflow::lattice::Lattice;
416386
use differential_dataflow::operators::arrange::Arranged;
417387
use differential_dataflow::trace::TraceReader;
418-
use mz_ore::flatcontainer::MzRegionPreference;
419-
use timely::container::flatcontainer::{IntoOwned, Push, Region, ReserveItems};
388+
use mz_ore::flatcontainer::{MzRegion, MzRegionPreference};
389+
use timely::container::flatcontainer::{IntoOwned, Region};
420390
use timely::dataflow::Scope;
421391
use timely::progress::Timestamp;
422392
use timely::PartialOrder;
@@ -429,31 +399,10 @@ mod flatcontainer {
429399
Self: Clone,
430400
G: Scope<Timestamp = T::Owned>,
431401
G::Timestamp: Lattice + Ord + MzRegionPreference,
432-
K: Region
433-
+ Clone
434-
+ Push<<K as Region>::Owned>
435-
+ for<'a> Push<<K as Region>::ReadItem<'a>>
436-
+ for<'a> ReserveItems<<K as Region>::ReadItem<'a>>
437-
+ 'static,
438-
V: Region
439-
+ Clone
440-
+ Push<<V as Region>::Owned>
441-
+ for<'a> Push<<V as Region>::ReadItem<'a>>
442-
+ for<'a> ReserveItems<<V as Region>::ReadItem<'a>>
443-
+ 'static,
444-
T: Region
445-
+ Clone
446-
+ Push<<T as Region>::Owned>
447-
+ for<'a> Push<<T as Region>::ReadItem<'a>>
448-
+ for<'a> ReserveItems<<T as Region>::ReadItem<'a>>
449-
+ 'static,
450-
R: Region
451-
+ Clone
452-
+ Push<<R as Region>::Owned>
453-
+ for<'a> Push<&'a <R as Region>::Owned>
454-
+ for<'a> Push<<R as Region>::ReadItem<'a>>
455-
+ for<'a> ReserveItems<<R as Region>::ReadItem<'a>>
456-
+ 'static,
402+
K: MzRegion,
403+
V: MzRegion,
404+
T: MzRegion,
405+
R: MzRegion,
457406
K::Owned: Clone + Ord,
458407
V::Owned: Clone + Ord,
459408
T::Owned: Lattice + for<'a> PartialOrder<<T as Region>::ReadItem<'a>> + Timestamp,

src/compute/src/logging/differential.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub(super) fn construct<A: Allocate>(
130130
let stream_to_collection = |input: Stream<_, ((usize, ()), Timestamp, Diff)>, log, name| {
131131
let mut packer = PermutedRowPacker::new(log);
132132
input
133-
.mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(
133+
.mz_arrange_core::<_, KeyValSpine<usize, (), Timestamp, Diff, _>>(
134134
Pipeline,
135135
&format!("PreArrange Differential {name}"),
136136
)

src/compute/src/logging/initialize.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use mz_compute_client::logging::{LogVariant, LoggingConfig};
1616
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
1717
use mz_repr::{Diff, Timestamp};
1818
use mz_storage_types::errors::DataflowError;
19+
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
1920
use mz_timely_util::operator::CollectionExt;
2021
use timely::communication::Allocate;
2122
use timely::container::flatcontainer::FlatStack;
@@ -184,10 +185,8 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
184185

185186
fn reachability_logger(&self) -> Logger<TrackerEvent> {
186187
let event_queue = self.r_event_queue.clone();
187-
let mut logger = BatchLogger::<
188-
CapacityContainerBuilder<FlatStack<ReachabilityEventRegion>>,
189-
_,
190-
>::new(event_queue.link, self.interval_ms);
188+
type CB = PreallocatingCapacityContainerBuilder<FlatStack<ReachabilityEventRegion>>;
189+
let mut logger = BatchLogger::<CB, _>::new(event_queue.link, self.interval_ms);
191190
Logger::new(
192191
self.now,
193192
self.start_offset,

src/compute/src/logging/reachability.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ use mz_ore::cast::CastFrom;
1919
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
2020
use mz_ore::iter::IteratorExt;
2121
use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp};
22+
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
2223
use mz_timely_util::replay::MzReplay;
2324
use timely::communication::Allocate;
2425
use timely::container::flatcontainer::FlatStack;
25-
use timely::container::CapacityContainerBuilder;
2626
use timely::dataflow::channels::pact::Pipeline;
2727

2828
use crate::extensions::arrange::{MzArrange, MzArrangeCore};
@@ -57,7 +57,7 @@ pub(super) fn construct<A: Allocate>(
5757
);
5858
type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region;
5959

60-
type CB = CapacityContainerBuilder<FlatStack<UpdatesRegion>>;
60+
type CB = PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion>>;
6161
let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>(
6262
scope,
6363
"reachability logs",
@@ -102,7 +102,7 @@ pub(super) fn construct<A: Allocate>(
102102
);
103103

104104
let updates =
105-
updates.as_collection(move |(update_type, addr, source, port, ts), _| {
105+
updates.as_collection(move |(&update_type, addr, &source, &port, ts), _| {
106106
let row_arena = RowArena::default();
107107
let update_type = if update_type { "source" } else { "target" };
108108
let binding = SharedRow::get();
@@ -118,7 +118,7 @@ pub(super) fn construct<A: Allocate>(
118118
Datum::UInt64(u64::cast_from(port)),
119119
Datum::UInt64(u64::cast_from(worker_index)),
120120
Datum::String(update_type),
121-
Datum::from(ts.clone()),
121+
Datum::from(ts.copied()),
122122
];
123123
row_builder.packer().extend(key.iter().map(|k| datums[*k]));
124124
let key_row = row_builder.clone();

0 commit comments

Comments
 (0)