Skip to content

Commit

Permalink
Record frontier per block
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Feb 27, 2024
1 parent fae3a94 commit 2f6fa05
Showing 1 changed file with 59 additions and 37 deletions.
96 changes: 59 additions & 37 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl<K, V, T, D> Batcher for MergeBatcher<K, V, T, D>

struct MergeSorter<D, T, R> {
/// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
queue: Vec<Vec<Vec<(D, T, R)>>>,
queue: Vec<Vec<(Antichain<T>, Vec<(D, T, R)>)>>,
stash: Vec<Vec<(D, T, R)>>,
pending: Vec<(D, T, R)>,
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
Expand Down Expand Up @@ -210,10 +210,12 @@ impl<
fn flush_pending(&mut self) {
if !self.pending.is_empty() {
let mut stack = self.empty();
let mut frontier = Antichain::new();
for tuple in self.pending.drain(..) {
frontier.insert_ref(&tuple.1);
stack.push(tuple);
}
let batch = vec![stack];
let batch = vec![(frontier, stack)];
self.account(&batch, 1);
self.queue.push(batch);
self.maintain();
Expand All @@ -230,18 +232,19 @@ impl<

// Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block.
for chain in &mut self.queue {
let mut target: Vec<Vec<_>> = Vec::with_capacity(chain.len());
for block in chain.drain(..) {
if target.last().map_or(false, |last| {
let mut target: Vec<(Antichain<T>, Vec<_>)> = Vec::with_capacity(chain.len());
for (frontier, block) in chain.drain(..) {
if target.last().map_or(false, |(_, last)| {
last.len() + block.len() <= Self::buffer_size()
}) {
// merge `target.last()` with `block`
let last = target.last_mut().unwrap();
let (last_frontier, last) = target.last_mut().unwrap();
for item in block.into_iter() {
last_frontier.insert_ref(&item.1);
last.push(item);
}
} else {
target.push(block);
target.push((frontier, block));
}
}
*chain = target;
Expand Down Expand Up @@ -280,6 +283,7 @@ impl<
self.flush_pending();

let mut keep = self.empty();
let mut keep_frontier = Antichain::new();
let mut ship = self.empty();
let mut ship_list = Vec::default();

Expand All @@ -289,33 +293,34 @@ impl<
for mut chain in std::mem::take(&mut self.queue).drain(..) {
let mut block_list = Vec::default();
let mut keep_list = Vec::default();
for mut block in chain.drain(..) {
// Is all data ready to be shipped?
let all = block.iter().all(|(_, t, _)| !upper.less_equal(t));
for (block_frontier, mut block) in chain.drain(..) {
// Is any data ready to be shipped?
let any = block.iter().any(|(_, t, _)| !upper.less_equal(t));
let any = !PartialOrder::less_equal(&upper, &block_frontier.borrow());
// Is all data ready to be shipped?
let all = any && block.iter().all(|(_, t, _)| !upper.less_equal(t));

if all {
// All data is ready, push what we accumulated, stash whole block.
if !ship.is_empty() {
block_list.push(std::mem::replace(&mut ship, self.empty()));
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
}
block_list.push(block);
block_list.push((Antichain::new(), block));
} else if any {
// Iterate block, sorting items into ship and keep
for datum in block.drain(..) {
if upper.less_equal(&datum.1) {
frontier.insert_ref(&datum.1);
keep_frontier.insert_ref(&datum.1);
keep.push(datum);
if keep.capacity() == keep.len() {
// remember keep
keep_list.push(std::mem::replace(&mut keep, self.empty()));
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
}
} else {
ship.push(datum);
if ship.capacity() == ship.len() {
// Ship is full, push in on the block list, get an empty one.
block_list.push(std::mem::replace(&mut ship, self.empty()));
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
}
}
}
Expand All @@ -328,18 +333,18 @@ impl<
frontier.insert_ref(t);
}
if !keep.is_empty() {
keep_list.push(std::mem::replace(&mut keep, self.empty()));
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
}
keep_list.push(block);
keep_list.push((block_frontier, block));
}
}

// Capture any residue left after iterating blocks.
if !ship.is_empty() {
block_list.push(std::mem::replace(&mut ship, self.empty()));
block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty())));
}
if !keep.is_empty() {
keep_list.push(std::mem::replace(&mut keep, self.empty()));
keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty())));
}

// Collect finished chains
Expand All @@ -362,15 +367,15 @@ impl<
}

// Pop the last element, or return an empty chain.
ship_list.pop().unwrap_or_default()
ship_list.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect()
}

// merges two sorted input lists into one sorted output list.
fn merge_by(
&mut self,
list1: Vec<Vec<(D, T, R)>>,
list2: Vec<Vec<(D, T, R)>>,
) -> Vec<Vec<(D, T, R)>> {
list1: Vec<(Antichain<T>, Vec<(D, T, R)>)>,
list2: Vec<(Antichain<T>, Vec<(D, T, R)>)>,
) -> Vec<(Antichain<T>, Vec<(D, T, R)>)> {
use std::cmp::Ordering;

// TODO: `list1` and `list2` get dropped; would be better to reuse?
Expand All @@ -380,9 +385,10 @@ impl<
let mut list1 = list1.into_iter();
let mut list2 = list2.into_iter();

let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().unwrap_or_default());
let mut head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default());
let mut head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default());

let mut frontier = Antichain::new();
// while we have valid data in each input, merge.
while !head1.is_empty() && !head2.is_empty() {

Expand All @@ -394,51 +400,67 @@ impl<
(&x.0, &x.1).cmp(&(&y.0, &y.1))
};
match cmp {
Ordering::Less => result.push(head1.pop_front().unwrap()),
Ordering::Greater => result.push(head2.pop_front().unwrap()),
Ordering::Less => {
let datum = head1.pop_front().unwrap();
frontier.insert_ref(&datum.1);
result.push(datum);
},
Ordering::Greater => {
let datum = head2.pop_front().unwrap();
frontier.insert_ref(&datum.1);
result.push(datum);
},
Ordering::Equal => {
let (data1, time1, mut diff1) = head1.pop_front().unwrap();
let (_data2, _time2, diff2) = head2.pop_front().unwrap();
diff1.plus_equals(&diff2);
if !diff1.is_zero() {
frontier.insert_ref(&time1);
result.push((data1, time1, diff1));
}
}
}
}

if result.capacity() == result.len() {
output.push(result);
let frontier = std::mem::take(&mut frontier);
output.push((frontier, result));
result = self.empty();
}

if head1.is_empty() {
self.recycle(Vec::from(head1));
head1 = VecDeque::from(list1.next().unwrap_or_default());
head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default());
}
if head2.is_empty() {
self.recycle(Vec::from(head2));
head2 = VecDeque::from(list2.next().unwrap_or_default());
head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default());
}
}

if result.len() > 0 {
output.push(result);
output.push((std::mem::take(&mut frontier), result));
} else {
self.recycle(result);
}

if !head1.is_empty() {
let mut result = self.empty();
for item1 in head1 { result.push(item1); }
output.push(result);
for item1 in head1 {
frontier.insert_ref(&item1.1);
result.push(item1);
}
output.push((std::mem::take(&mut frontier), result));
}
output.extend(list1);

if !head2.is_empty() {
let mut result = self.empty();
for item2 in head2 { result.push(item2); }
output.push(result);
for item2 in head2 {
frontier.insert_ref(&item2.1);
result.push(item2);
}
output.push((std::mem::take(&mut frontier), result));
}
output.extend(list2);

Expand All @@ -452,7 +474,7 @@ impl<D, T, R> MergeSorter<D, T, R>
///
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
fn account<'a, I: IntoIterator<Item = &'a Vec<(D, T, R)>>>(
fn account<'a, I: IntoIterator<Item = &'a (Antichain<T>, Vec<(D, T, R)>)>>(
&self,
items: I,
diff: isize,
Expand All @@ -461,7 +483,7 @@ impl<D, T, R> MergeSorter<D, T, R>
let (mut records, mut siz, mut capacity, mut allocations) =
(0isize, 0isize, 0isize, 0isize);
for stack in items {
records = records.saturating_add_unsigned(stack.len());
records = records.saturating_add_unsigned(stack.1.len());
// stack.heap_size(|s, c| {
// siz = siz.saturating_add_unsigned(s);
// capacity = capacity.saturating_add_unsigned(c);
Expand Down

0 comments on commit 2f6fa05

Please sign in to comment.