From 1d6a5e562339ae0acc3ad9488817725dea2a0a46 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 27 Feb 2024 13:09:08 -0500 Subject: [PATCH] Extract ready times from existing chains Teaches the merge batcher to extract ready times from the existing chains, and maintaining the chain invariant after extracting data. This reduces the effort to maintain data that is not yet ready, by maintaining a frontier per chain block that allows us to efficiently decide that a block needs to be inspected or not. Signed-off-by: Moritz Hoffmann --- examples/hello.rs | 2 +- src/trace/implementations/merge_batcher.rs | 439 +++++++++++------- .../implementations/merge_batcher_col.rs | 386 +++++++++------ 3 files changed, 531 insertions(+), 296 deletions(-) diff --git a/examples/hello.rs b/examples/hello.rs index b02f9c506..419bbef2f 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -51,7 +51,7 @@ fn main() { // Load up graph data. Round-robin among workers. for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } { - input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1) + input.update_at((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1_000_000, 1) } input.advance_to(1); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 680aa5306..f5ebb56c2 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,15 +1,15 @@ -//! A general purpose `Batcher` implementation based on radix sort. +//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. use std::collections::VecDeque; - +use crate::difference::Semigroup; +use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::trace::{Batcher, Builder}; use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; +use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; - -use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use timely::PartialOrder; /// Creates batches from unordered tuples. pub struct MergeBatcher { @@ -32,7 +32,7 @@ where MergeBatcher { sorter: MergeSorter::new(logger, operator_id), frontier: Antichain::new(), - lower: Antichain::from_elem(T::minimum()), + lower: Antichain::from_elem(::minimum()), } } @@ -43,10 +43,8 @@ where RefOrMut::Ref(reference) => { // This is a moment at which we could capture the allocations backing // `batch` into a different form of region, rather than just cloning. - let mut owned: Vec<_> = self.sorter.empty(); - owned.clone_from(reference); - self.sorter.push(&mut owned); - }, + self.sorter.push(&mut reference.clone()); + } RefOrMut::Mut(reference) => { self.sorter.push(reference); } @@ -57,11 +55,10 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline(never)] + #[inline] fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Vec::new(); - self.sorter.finish_into(&mut merged); + self.frontier.clear(); + let merged = self.sorter.extract_into(upper.borrow(), &mut self.frontier); // Determine the number of distinct keys, values, and updates, // and form a builder pre-sized for these numbers. @@ -70,75 +67,40 @@ where let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); + for ((key, val), _time, _) in merged.iter().map(|t| t.iter()).flatten() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; } + } else { + keys += 1; + vals += 1; } + upds += 1; + prev_keyval = Some((key, val)); } B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = Vec::new(); - - self.frontier.clear(); - - // TODO: Re-use buffer, rather than dropping. - for mut buffer in merged.drain(..) { - for ((key, val), time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { - self.frontier.insert(time.clone()); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.push(((key, val), time, diff)); - } - else { - builder.push(((key, val), time, diff)); - } + for buffer in merged.into_iter() { + for datum in &buffer[..] { + builder.copy(datum); } // Recycling buffer. - self.sorter.push(&mut buffer); - } - - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); + self.sorter.recycle(buffer); } - // Drain buffers (fast reclaimation). - // TODO : This isn't obviously the best policy, but "safe" wrt footprint. - // In particular, if we are reading serialized input data, we may - // prefer to keep these buffers around to re-fill, if possible. - let mut buffer = Vec::new(); - self.sorter.push(&mut buffer); - // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 { - buffer = Vec::new(); - self.sorter.push(&mut buffer); - } + // Drain buffers (fast reclamation). + self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = builder.done( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(T::minimum()), + ); self.lower = upper; seal } @@ -151,18 +113,20 @@ where struct MergeSorter { /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>>, + queue: Vec, Vec<(D, T, R)>)>>, stash: Vec>, + pending: Vec<(D, T, R)>, logger: Option>, operator_id: usize, } -impl MergeSorter { +impl MergeSorter { - const BUFFER_SIZE_BYTES: usize = 1 << 13; + const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn buffer_size() -> usize { - let size = ::std::mem::size_of::<(D, T, R)>(); + /// Buffer size (number of elements) to use for new/empty buffers. + const fn buffer_size() -> usize { + let size = std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -179,71 +143,224 @@ impl MergeSorter { operator_id, queue: Vec::new(), stash: Vec::new(), + pending: Vec::new(), } } #[inline] - pub fn empty(&mut self) -> Vec<(D, T, R)> { + fn empty(&mut self) -> Vec<(D, T, R)> { self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) } - #[inline] - pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // TODO: Reason about possible unbounded stash growth. How to / should we return them? - // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. - let mut batch = if self.stash.len() > 2 { - ::std::mem::replace(batch, self.stash.pop().unwrap()) + /// Remove all elements from the stash. + fn clear_stash(&mut self) { + self.stash.clear(); + } + + /// Insert an empty buffer into the stash. Panics if the buffer is not empty. + fn recycle(&mut self, mut buffer: Vec<(D, T, R)>) { + if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 { + buffer.clear(); + self.stash.push(buffer); } - else { - ::std::mem::take(batch) - }; + } + + fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. + if self.pending.capacity() < Self::buffer_size() { + self.pending + .reserve(Self::buffer_size() - self.pending.capacity()); + } + + while !batch.is_empty() { + self.pending.extend( + batch.drain( + ..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()), + ), + ); + if self.pending.len() == self.pending.capacity() { + crate::consolidation::consolidate_updates(&mut self.pending); + if self.pending.len() > self.pending.capacity() / 2 { + // Flush if `self.pending` is more than half full after consolidation. + self.flush_pending(); + } + } + } + } - if !batch.is_empty() { - crate::consolidation::consolidate_updates(&mut batch); - self.account([batch.len()], 1); - self.queue_push(vec![batch]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); + /// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and + /// sorted. After this function returns, `self.pending` is empty. + fn flush_pending(&mut self) { + if !self.pending.is_empty() { + let mut stack = self.empty(); + let mut frontier = Antichain::new(); + for tuple in self.pending.drain(..) { + frontier.insert_ref(&tuple.1); + stack.push(tuple); + } + let batch = vec![(frontier, stack)]; + self.account(&batch, 1); + self.queue.push(batch); + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() > self.queue[self.queue.len()-2].len() / 2) { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); let merged = self.merge_by(list1, list2); - self.queue_push(merged); + self.queue.push(merged); } } } - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - pub fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Maintain the internal chain structure. Ensures that: + /// * All chains are sorted by size. + /// * Within each chain, adjacent blocks are reduced, i.e., their combined length is larger than + /// the block size. + /// * All chains are of geometrically increasing length. + fn maintain(&mut self) { + self.account(self.queue.iter().flatten(), -1); + + // Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block. + for chain in &mut self.queue { + let mut target: Vec<(Antichain, Vec<_>)> = Vec::with_capacity(chain.len()); + for (frontier, block) in chain.drain(..) { + if target.last().map_or(false, |(_, last)| { + last.len() + block.len() <= Self::buffer_size() + }) { + // merge `target.last()` with `block` + let (last_frontier, last) = target.last_mut().unwrap(); + for item in block.into_iter() { + last_frontier.insert_ref(&item.1); + last.push(item); + } + } else { + target.push((frontier, block)); + } + } + *chain = target; + } + + // Step 2: Sort queue by chain length. Depending on how much we extracted, + // the chains might be mis-ordered. + self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len())); + + // Step 3: Merge chains that are within a power of two. + let mut index = self.queue.len().saturating_sub(1); + while index > 0 { + if self.queue[index-1].len() / 2 < self.queue[index].len() { + // Chains at `index-1` and `index` are within a factor of two, merge them. + let list1 = self.queue.remove(index-1); + let list2 = std::mem::take(&mut self.queue[index-1]); + self.queue[index-1] = self.merge_by(list1, list2); + } + index -= 1; } - self.queue_push(list); + + self.account(self.queue.iter().flatten(), 1); } - #[inline(never)] - pub fn finish_into(&mut self, target: &mut Vec>) { - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining + /// data's time in `frontier`. + fn extract_into( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + ) -> Vec> { + // Flush pending data + crate::consolidation::consolidate_updates(&mut self.pending); + self.flush_pending(); + + let mut keep = self.empty(); + let mut keep_frontier = Antichain::new(); + let mut ship = self.empty(); + let mut ship_list = Vec::default(); + + self.account(self.queue.iter().flatten(), -1); + + // Walk all chains, separate ready data from data to keep. + for mut chain in std::mem::take(&mut self.queue).drain(..) { + let mut block_list = Vec::default(); + let mut keep_list = Vec::default(); + for (block_frontier, mut block) in chain.drain(..) { + // Is any data ready to be shipped? + let any = !PartialOrder::less_equal(&upper, &block_frontier.borrow()); + // Is all data ready to be shipped? + let all = any && block.iter().all(|(_, t, _)| !upper.less_equal(t)); + + if all { + // All data is ready, push what we accumulated, stash whole block. + if !ship.is_empty() { + block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + } + block_list.push((Antichain::new(), block)); + } else if any { + // Iterate block, sorting items into ship and keep + for datum in block.drain(..) { + if upper.less_equal(&datum.1) { + frontier.insert_ref(&datum.1); + keep_frontier.insert_ref(&datum.1); + keep.push(datum); + if keep.capacity() == keep.len() { + // remember keep + keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); + } + } else { + ship.push(datum); + if ship.capacity() == ship.len() { + // Ship is full, push in on the block list, get an empty one. + block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + } + } + } + // Recycle leftovers + self.recycle(block); + } else { + // Keep the entire block. + if !keep.is_empty() { + keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); + } + keep_list.push((block_frontier, block)); + } + } + + // Capture any residue left after iterating blocks. + if !ship.is_empty() { + block_list.push((Antichain::new(), std::mem::replace(&mut ship, self.empty()))); + } + if !keep.is_empty() { + keep_list.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep, self.empty()))); + } + + // Collect finished chains + if !block_list.is_empty() { + ship_list.push(block_list); + } + if !keep_list.is_empty() { + self.queue.push(keep_list); + } + } + + self.account(self.queue.iter().flatten(), 1); + + if ship_list.len() > 0 { + self.maintain(); } - if let Some(mut last) = self.queue_pop() { - ::std::mem::swap(&mut last, target); + while ship_list.len() > 1 { + let list1 = ship_list.pop().unwrap(); + let list2 = ship_list.pop().unwrap(); + ship_list.push(self.merge_by(list1, list2)); } + + // Pop the last element, or return an empty chain. + ship_list.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect() } // merges two sorted input lists into one sorted output list. - #[inline(never)] - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1); - + fn merge_by( + &mut self, + list1: Vec<(Antichain, Vec<(D, T, R)>)>, + list2: Vec<(Antichain, Vec<(D, T, R)>)>, + ) -> Vec<(Antichain, Vec<(D, T, R)>)> { use std::cmp::Ordering; - // TODO: `list1` and `list2` get dropped; would be better to reuse? let mut output = Vec::with_capacity(list1.len() + list2.len()); let mut result = self.empty(); @@ -251,9 +368,10 @@ impl MergeSorter { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); - let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); + let mut head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default()); + let mut head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default()); + let mut frontier = Antichain::new(); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -265,13 +383,22 @@ impl MergeSorter { (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => result.push(head1.pop_front().unwrap()), - Ordering::Greater => result.push(head2.pop_front().unwrap()), + Ordering::Less => { + let datum = head1.pop_front().unwrap(); + frontier.insert_ref(&datum.1); + result.push(datum); + }, + Ordering::Greater => { + let datum = head2.pop_front().unwrap(); + frontier.insert_ref(&datum.1); + result.push(datum); + }, Ordering::Equal => { let (data1, time1, mut diff1) = head1.pop_front().unwrap(); let (_data2, _time2, diff2) = head2.pop_front().unwrap(); diff1.plus_equals(&diff2); if !diff1.is_zero() { + frontier.insert_ref(&time1); result.push((data1, time1, diff1)); } } @@ -279,36 +406,44 @@ impl MergeSorter { } if result.capacity() == result.len() { - output.push(result); + let frontier = std::mem::take(&mut frontier); + output.push((frontier, result)); result = self.empty(); } if head1.is_empty() { - let done1 = Vec::from(head1); - if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } - head1 = VecDeque::from(list1.next().unwrap_or_default()); + self.recycle(Vec::from(head1)); + head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default()); } if head2.is_empty() { - let done2 = Vec::from(head2); - if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } - head2 = VecDeque::from(list2.next().unwrap_or_default()); + self.recycle(Vec::from(head2)); + head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default()); } } - if !result.is_empty() { output.push(result); } - else if result.capacity() > 0 { self.stash.push(result); } + if result.len() > 0 { + output.push((std::mem::take(&mut frontier), result)); + } else { + self.recycle(result); + } if !head1.is_empty() { let mut result = self.empty(); - for item1 in head1 { result.push(item1); } - output.push(result); + for item1 in head1 { + frontier.insert_ref(&item1.1); + result.push(item1); + } + output.push((std::mem::take(&mut frontier), result)); } output.extend(list1); if !head2.is_empty() { let mut result = self.empty(); - for item2 in head2 { result.push(item2); } - output.push(result); + for item2 in head2 { + frontier.insert_ref(&item2.1); + result.push(item2); + } + output.push((std::mem::take(&mut frontier), result)); } output.extend(list2); @@ -316,31 +451,21 @@ impl MergeSorter { } } -impl MergeSorter { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten().map(Vec::len), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(batch.iter().map(Vec::len), 1); - self.queue.push(batch); - } - +impl MergeSorter +{ /// Account size changes. Only performs work if a logger exists. /// /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account>(&self, items: I, diff: isize) { + fn account<'a, I: IntoIterator, Vec<(D, T, R)>)>>( + &self, + items: I, + diff: isize, + ) where D: 'a, T: 'a, R: 'a { if let Some(logger) = &self.logger { - let mut records= 0isize; - for len in items { - records = records.saturating_add_unsigned(len); + let mut records = 0isize; + for stack in items { + records = records.saturating_add_unsigned(stack.1.len()); } logger.log(BatcherEvent { operator: self.operator_id, @@ -355,6 +480,6 @@ impl MergeSorter { impl Drop for MergeSorter { fn drop(&mut self) { - while self.queue_pop().is_some() { } + self.account(self.queue.iter().flatten(), -1); } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 9c2faab12..9da881d04 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,23 +1,23 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. -use timely::Container; +use crate::difference::Semigroup; +use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::trace::{Batcher, Builder}; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; +use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; - -use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use timely::{Container, PartialOrder}; /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher -where - K: Columnation + 'static, - V: Columnation + 'static, - T: Columnation + 'static, - D: Columnation + 'static, + where + K: Columnation + 'static, + V: Columnation + 'static, + T: Columnation + 'static, + D: Columnation + 'static, { sorter: MergeSorterColumnation<(K, V), T, D>, lower: Antichain, @@ -25,16 +25,19 @@ where } impl Batcher for ColumnatedMergeBatcher -where - K: Columnation + Ord + Clone + 'static, - V: Columnation + Ord + Clone + 'static, - T: Columnation + Timestamp + 'static, - D: Columnation + Semigroup + 'static, + where + K: Columnation + Ord + Clone + 'static, + V: Columnation + Ord + Clone + 'static, + T: Columnation + Timestamp + 'static, + D: Columnation + Semigroup + 'static, { - type Item = ((K,V),T,D); + type Item = ((K, V), T, D); type Time = T; - fn new(logger: Option>, operator_id: usize) -> Self { + fn new( + logger: Option>, + operator_id: usize, + ) -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(logger, operator_id), frontier: Antichain::new(), @@ -50,7 +53,7 @@ where // This is a moment at which we could capture the allocations backing // `batch` into a different form of region, rather than just cloning. self.sorter.push(&mut reference.clone()); - }, + } RefOrMut::Mut(reference) => { self.sorter.push(reference); } @@ -62,10 +65,12 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Default::default(); - self.sorter.finish_into(&mut merged); + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + self.frontier.clear(); + let merged = self.sorter.extract_into(upper.borrow(), &mut self.frontier); // Determine the number of distinct keys, values, and updates, // and form a builder pre-sized for these numbers. @@ -74,69 +79,40 @@ where let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); + for ((key, val), _time, _) in merged.iter().map(|t| t.iter()).flatten() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; } + } else { + keys += 1; + vals += 1; } + upds += 1; + prev_keyval = Some((key, val)); } B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = TimelyStack::default(); - - self.frontier.clear(); - - for buffer in merged.drain(..) { - for datum @ ((_key, _val), time, _diff) in &buffer[..] { - if upper.less_equal(time) { - self.frontier.insert(time.clone()); - if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - keep = self.sorter.empty(); - } - } else if keep.len() == keep.capacity() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.copy(datum); - } - else { - builder.copy(datum); - } + for buffer in merged.into_iter() { + for datum in &buffer[..] { + builder.copy(datum); } // Recycling buffer. self.sorter.recycle(buffer); } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); - } - // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = builder.done( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(T::minimum()), + ); self.lower = upper; seal } @@ -159,7 +135,6 @@ impl Default for TimelyStackQueue { } impl TimelyStackQueue { - fn pop(&mut self) -> &T { self.head += 1; &self.list[self.head - 1] @@ -170,25 +145,28 @@ impl TimelyStackQueue { } fn from(list: TimelyStack) -> Self { - TimelyStackQueue { - list, - head: 0, - } + TimelyStackQueue { list, head: 0 } } fn done(self) -> TimelyStack { self.list } - fn is_empty(&self) -> bool { self.head == self.list[..].len() } + fn is_empty(&self) -> bool { + self.head == self.list[..].len() + } /// Return an iterator over the remaining elements. - fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { + fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { self.list[self.head..].iter() } } -struct MergeSorterColumnation { +struct MergeSorterColumnation< + D: Columnation + 'static, + T: Columnation + 'static, + R: Columnation + 'static, +> { /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. queue: Vec>>, stash: Vec>, @@ -197,8 +175,12 @@ struct MergeSorterColumnation MergeSorterColumnation { - +impl< + D: Ord + Columnation + 'static, + T: Clone + PartialOrder + Ord + Columnation + 'static, + R: Semigroup + Columnation + 'static, +> MergeSorterColumnation +{ const BUFFER_SIZE_BYTES: usize = 64 << 10; /// Buffer size (number of elements) to use for new/empty buffers. @@ -218,7 +200,10 @@ impl>, operator_id: usize) -> Self { + fn new( + logger: Option>, + operator_id: usize, + ) -> Self { Self { logger, operator_id, @@ -229,7 +214,9 @@ impl TimelyStack<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) + self.stash + .pop() + .unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } /// Remove all elements from the stash. @@ -248,11 +235,16 @@ impl) { // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. if self.pending.capacity() < Self::pending_buffer_size() { - self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity()); + self.pending + .reserve(Self::pending_buffer_size() - self.pending.capacity()); } while !batch.is_empty() { - self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); + self.pending.extend( + batch.drain( + ..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()), + ), + ); if self.pending.len() == self.pending.capacity() { crate::consolidation::consolidate_updates(&mut self.pending); if self.pending.len() > self.pending.capacity() / 2 { @@ -272,45 +264,164 @@ impl 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } + let batch = vec![stack]; + self.account(&batch, 1); + self.queue.push(batch); + self.maintain(); } } - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Maintain the internal chain structure. Ensures that: + /// * All chains are sorted by size. + /// * Within each chain, adjacent blocks are reduced, i.e., their combined length is larger than + /// the block size. + /// * All chains are of geometrically increasing length. + fn maintain(&mut self) { + self.account(self.queue.iter().flatten(), -1); + + // Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block. + for chain in &mut self.queue { + let mut target: Vec> = Vec::with_capacity(chain.len()); + for block in chain.drain(..) { + if target.last().map_or(false, |last| { + last.len() + block.len() <= Self::buffer_size() + }) { + // merge `target.last()` with `block` + let last = target.last_mut().unwrap(); + for item in block.iter() { + last.copy(item); + } + } else { + target.push(block); + } + } + *chain = target; } - self.queue_push(list); + + // Step 2: Sort queue by chain length. + self.queue.sort_by_key(|chain| chain.len()); + + // Step 3: Merge chains that are within a power of two. + let mut index = 0; + while index + 1 < self.queue.len() { + if self.queue[index].len() > self.queue[index + 1].len() / 2 { + // Chains at `index` and `index+1` are within a factor of two, merge them. + let list1 = self.queue.remove(index); + let list2 = std::mem::take(&mut self.queue[index]); + self.queue[index] = self.merge_by(list1, list2); + // Ensure chains are sorted by length. + self.queue.sort_by_key(|chain| chain.len()); + } else { + index += 1; + } + } + + self.account(self.queue.iter().flatten(), 1); } - fn finish_into(&mut self, target: &mut Vec>) { + /// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining + /// data's time in `frontier`. + fn extract_into( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + ) -> Vec> { + // Flush pending data crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + + let mut keep = self.empty(); + let mut ship = self.empty(); + let mut ship_list = Vec::default(); + + self.account(self.queue.iter().flatten(), -1); + + // Walk all chains, separate ready data from data to keep. + for mut chain in std::mem::take(&mut self.queue).drain(..) { + let mut block_list = Vec::default(); + let mut keep_list = Vec::default(); + for block in chain.drain(..) { + // Is all data ready to be shipped? + let all = block.iter().all(|(_, t, _)| !upper.less_equal(t)); + // Is any data ready to be shipped? + let any = block.iter().any(|(_, t, _)| !upper.less_equal(t)); + + if all { + // All data is ready, push what we accumulated, stash whole block. + if !ship.is_empty() { + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + block_list.push(block); + } else if any { + // Iterate block, sorting items into ship and keep + for datum in block.iter() { + if upper.less_equal(&datum.1) { + frontier.insert_ref(&datum.1); + keep.copy(datum); + if keep.capacity() == keep.len() { + // remember keep + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + } else { + ship.copy(datum); + if ship.capacity() == ship.len() { + // Ship is full, push in on the block list, get an empty one. + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + } + } + // Recycle leftovers + self.recycle(block); + } else { + // Keep the entire block. + + for (_, t, _) in block.iter() { + frontier.insert_ref(t); + } + if !keep.is_empty() { + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + keep_list.push(block); + } + } + + // Capture any residue left after iterating blocks. + if !ship.is_empty() { + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + if !keep.is_empty() { + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + + // Collect finished chains + if !block_list.is_empty() { + ship_list.push(block_list); + } + if !keep_list.is_empty() { + self.queue.push(keep_list); + } } - if let Some(mut last) = self.queue_pop() { - std::mem::swap(&mut last, target); + self.account(self.queue.iter().flatten(), 1); + + self.maintain(); + + while ship_list.len() > 1 { + let list1 = ship_list.pop().unwrap(); + let list2 = ship_list.pop().unwrap(); + ship_list.push(self.merge_by(list1, list2)); } + + // Pop the last element, or return an empty chain. + ship_list.pop().unwrap_or_default() } // merges two sorted input lists into one sorted output list. - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + fn merge_by( + &mut self, + list1: Vec>, + list2: Vec>, + ) -> Vec> { use std::cmp::Ordering; // TODO: `list1` and `list2` get dropped; would be better to reuse? @@ -325,18 +436,20 @@ impl 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { let x = head1.peek(); let y = head2.peek(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => { result.copy(head1.pop()); } - Ordering::Greater => { result.copy(head2.pop()); } - Ordering::Equal => { + Ordering::Less => { + result.copy(head1.pop()); + } + Ordering::Greater => { + result.copy(head2.pop()); + } + Ordering::Equal => { let (data1, time1, diff1) = head1.pop(); let (_data2, _time2, diff2) = head2.pop(); let mut diff1 = diff1.clone(); @@ -372,7 +485,9 @@ impl MergeSorterColumnation { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten(), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(&batch, 1); - self.queue.push(batch); - } - +impl +MergeSorterColumnation +{ /// Account size changes. Only performs work if a logger exists. /// /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account<'a, I: IntoIterator>>(&self, items: I, diff: isize) { + fn account<'a, I: IntoIterator>>( + &self, + items: I, + diff: isize, + ) { if let Some(logger) = &self.logger { - let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); + let (mut records, mut siz, mut capacity, mut allocations) = + (0isize, 0isize, 0isize, 0isize); for stack in items { records = records.saturating_add_unsigned(stack.len()); stack.heap_size(|s, c| { @@ -429,11 +538,12 @@ impl Drop for MergeSorterColumnation { +impl Drop +for MergeSorterColumnation +{ fn drop(&mut self) { - while self.queue_pop().is_some() { } + self.account(self.queue.iter().flatten(), -1); } }