Skip to content

Commit 08111dd

Browse files
committed
Make it work
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent da574a1 commit 08111dd

File tree

6 files changed

+58
-53
lines changed

6 files changed

+58
-53
lines changed

Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/compute/src/logging/initialize.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::time::{Duration, Instant};
1313
use differential_dataflow::logging::DifferentialEvent;
1414
use differential_dataflow::Collection;
1515
use mz_compute_client::logging::{LogVariant, LoggingConfig};
16-
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
16+
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
1717
use mz_repr::{Diff, Timestamp};
1818
use mz_storage_types::errors::DataflowError;
1919
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
@@ -97,7 +97,7 @@ struct LoggingContext<'a, A: Allocate> {
9797
now: Instant,
9898
start_offset: Duration,
9999
t_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, TimelyEvent)>>,
100-
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion>>,
100+
r_event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
101101
d_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, DifferentialEvent)>>,
102102
c_event_queue: EventQueue<Vec<(Duration, WorkerIdentifier, ComputeEvent)>>,
103103
shared_state: Rc<RefCell<SharedLoggingState>>,
@@ -185,7 +185,9 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
185185

186186
fn reachability_logger(&self) -> Logger<TrackerEvent> {
187187
let event_queue = self.r_event_queue.clone();
188-
type CB = PreallocatingCapacityContainerBuilder<FlatStack<ReachabilityEventRegion>>;
188+
type CB = PreallocatingCapacityContainerBuilder<
189+
FlatStack<ReachabilityEventRegion, MzOffsetOptimized>,
190+
>;
189191
let mut logger = BatchLogger::<CB, _>::new(event_queue.link, self.interval_ms);
190192
Logger::new(
191193
self.now,

src/compute/src/logging/reachability.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::rc::Rc;
1616
use mz_compute_client::logging::LoggingConfig;
1717
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
1818
use mz_ore::cast::CastFrom;
19-
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
19+
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
2020
use mz_ore::iter::IteratorExt;
2121
use mz_repr::{Datum, Diff, RowArena, SharedRow, Timestamp};
2222
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
@@ -39,7 +39,7 @@ use crate::typedefs::{FlatKeyValSpineDefault, RowRowSpine};
3939
pub(super) fn construct<A: Allocate>(
4040
worker: &mut timely::worker::Worker<A>,
4141
config: &LoggingConfig,
42-
event_queue: EventQueue<FlatStack<ReachabilityEventRegion>>,
42+
event_queue: EventQueue<FlatStack<ReachabilityEventRegion, MzOffsetOptimized>>,
4343
) -> BTreeMap<LogVariant, LogCollection> {
4444
let interval_ms = std::cmp::max(1, config.interval.as_millis());
4545
let worker_index = worker.index();
@@ -57,7 +57,8 @@ pub(super) fn construct<A: Allocate>(
5757
);
5858
type UpdatesRegion = <((UpdatesKey, ()), Timestamp, Diff) as MzRegionPreference>::Region;
5959

60-
type CB = PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion>>;
60+
type CB =
61+
PreallocatingCapacityContainerBuilder<FlatStack<UpdatesRegion, MzOffsetOptimized>>;
6162
let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>(
6263
scope,
6364
"reachability logs",

src/compute/src/logging/timely.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ use std::collections::BTreeMap;
1414
use std::rc::Rc;
1515
use std::time::Duration;
1616

17-
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
1817
use mz_compute_client::logging::LoggingConfig;
1918
use mz_ore::cast::CastFrom;
20-
use mz_ore::flatcontainer::{MzRegionPreference, OwnedRegionOpinion};
19+
use mz_ore::flatcontainer::{MzOffsetOptimized, MzRegionPreference, OwnedRegionOpinion};
2120
use mz_ore::region::LgAllocVec;
2221
use mz_repr::{Datum, Diff, Timestamp};
22+
use mz_timely_util::containers::PreallocatingCapacityContainerBuilder;
2323
use mz_timely_util::replay::MzReplay;
2424
use serde::{Deserialize, Serialize};
2525
use timely::communication::Allocate;
26-
use timely::container::columnation::{Columnation, CopyRegion};
26+
use timely::container::flatcontainer::FlatStack;
2727
use timely::container::CapacityContainerBuilder;
2828
use timely::dataflow::channels::pact::Pipeline;
2929
use timely::dataflow::channels::pushers::buffer::Session;
@@ -361,10 +361,12 @@ struct MessageCount {
361361
records: i64,
362362
}
363363

364-
type Pusher<D> =
365-
Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
364+
type FlatStackFor<D> =
365+
FlatStack<<(D, Timestamp, Diff) as MzRegionPreference>::Region, MzOffsetOptimized>;
366+
367+
type Pusher<D> = Counter<Timestamp, FlatStackFor<D>, Tee<Timestamp, FlatStackFor<D>>>;
366368
type OutputSession<'a, D> =
367-
Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
369+
Session<'a, Timestamp, PreallocatingCapacityContainerBuilder<FlatStackFor<D>>, Pusher<D>>;
368370

369371
/// Bundled output buffers used by the demux operator.
370372
//
@@ -374,7 +376,7 @@ type OutputSession<'a, D> =
374376
struct DemuxOutput<'a> {
375377
operates: OutputSession<'a, (usize, String)>,
376378
channels: OutputSession<'a, (ChannelDatum, ())>,
377-
addresses: OutputSession<'a, (usize, Vec<usize>)>,
379+
addresses: OutputSession<'a, (usize, OwnedRegionOpinion<Vec<usize>>)>,
378380
parks: OutputSession<'a, (ParkDatum, ())>,
379381
batches_sent: OutputSession<'a, (MessageDatum, ())>,
380382
batches_received: OutputSession<'a, (MessageDatum, ())>,

src/ore/src/flatcontainer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -911,13 +911,13 @@ mod lgallocvec {
911911
assert_eq!(region.index(index), &42);
912912

913913
let mut region = LgAllocVec::<u32>::default();
914-
region.push(42);
915-
region.push(43);
916-
region.push(44);
914+
let i0 = <_ as Push<_>>::push(&mut region, 42);
915+
let i1 = <_ as Push<_>>::push(&mut region, 43);
916+
let i2 = <_ as Push<_>>::push(&mut region, 44);
917917
region.reserve_items([1, 2, 3].iter());
918-
assert_eq!(region.index(0), &42);
919-
assert_eq!(region.index(1), &43);
920-
assert_eq!(region.index(2), &44);
918+
assert_eq!(region.index(i0), &42);
919+
assert_eq!(region.index(i1), &43);
920+
assert_eq!(region.index(i2), &44);
921921
}
922922
}
923923
}

src/timely-util/src/containers.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
1212
use std::collections::VecDeque;
1313

14+
use timely::container::flatcontainer::impls::index::IndexContainer;
1415
use timely::container::flatcontainer::{FlatStack, Push, Region};
15-
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
16+
use timely::container::{CapacityContainer, ContainerBuilder, PushInto};
1617
use timely::Container;
1718

1819
pub mod array;
@@ -31,50 +32,50 @@ pub mod stack;
3132
#[derive(Default, Debug)]
3233
pub struct PreallocatingCapacityContainerBuilder<C> {
3334
/// Container that we're writing to.
34-
current: C,
35+
current: Option<C>,
3536
/// Emtpy allocation.
3637
empty: Option<C>,
3738
/// Completed containers pending to be sent.
3839
pending: VecDeque<C>,
3940
}
4041

41-
impl<T, R> PushInto<T> for PreallocatingCapacityContainerBuilder<FlatStack<R>>
42+
impl<T, R, S> PushInto<T> for PreallocatingCapacityContainerBuilder<FlatStack<R, S>>
4243
where
4344
R: Region + Push<T> + Clone + 'static,
45+
S: IndexContainer<R::Index> + Clone + 'static,
46+
FlatStack<R, S>: CapacityContainer,
4447
{
4548
#[inline]
4649
fn push_into(&mut self, item: T) {
47-
if self.current.capacity() == 0 {
48-
self.current = self.empty.take().unwrap_or_default();
49-
// Protect against non-emptied containers.
50-
self.current.clear();
51-
}
52-
// Ensure capacity
53-
let preferred_capacity = FlatStack::<R>::preferred_capacity();
54-
if self.current.capacity() < preferred_capacity {
55-
self.current
56-
.reserve(preferred_capacity - self.current.len());
50+
if self.current.is_none() {
51+
let mut empty = self.empty.take().unwrap_or_default();
52+
empty.clear();
53+
self.current = Some(empty);
5754
}
5855

56+
let current = self.current.as_mut().unwrap();
57+
58+
// Ensure capacity
59+
current.ensure_preferred_capacity();
5960
// Push item
60-
self.current.push(item);
61+
current.push(item);
6162

6263
// Maybe flush
63-
if self.current.len() == self.current.capacity() {
64-
let pending = std::mem::take(&mut self.current);
65-
self.current = FlatStack::merge_capacity(std::iter::once(&pending));
66-
self.current
67-
.reserve(preferred_capacity.saturating_sub(self.current.len()));
64+
if current.len() >= FlatStack::<R, S>::preferred_capacity() {
65+
let pending = std::mem::take(current);
66+
*current = FlatStack::merge_capacity(std::iter::once(&pending));
6867
self.pending.push_back(pending);
6968
}
7069
}
7170
}
7271

73-
impl<R> ContainerBuilder for PreallocatingCapacityContainerBuilder<FlatStack<R>>
72+
impl<R, S> ContainerBuilder for PreallocatingCapacityContainerBuilder<FlatStack<R, S>>
7473
where
7574
R: Region + Clone + 'static,
75+
S: IndexContainer<R::Index> + Clone + 'static,
76+
FlatStack<R, S>: CapacityContainer,
7677
{
77-
type Container = FlatStack<R>;
78+
type Container = FlatStack<R, S>;
7879

7980
#[inline]
8081
fn extract(&mut self) -> Option<&mut Self::Container> {
@@ -84,12 +85,11 @@ where
8485

8586
#[inline]
8687
fn finish(&mut self) -> Option<&mut Self::Container> {
87-
if !self.current.is_empty() {
88-
let pending = std::mem::take(&mut self.current);
89-
self.current = FlatStack::merge_capacity(std::iter::once(&pending));
90-
let preferred_capacity = FlatStack::<R>::preferred_capacity();
91-
self.current
92-
.reserve(preferred_capacity.saturating_sub(self.current.len()));
88+
let current = self.current.as_mut();
89+
if current.as_ref().map_or(false, |c| !c.is_empty()) {
90+
let current = current.unwrap();
91+
let pending = std::mem::take(current);
92+
*current = FlatStack::merge_capacity(std::iter::once(&pending));
9393
self.pending.push_back(pending);
9494
}
9595
self.empty = self.pending.pop_front();

0 commit comments

Comments
 (0)