diff --git a/examples/hello.rs b/examples/hello.rs index b02f9c506..fb5a5498d 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -1,3 +1,4 @@ +use std::time::Instant; use rand::{Rng, SeedableRng, StdRng}; use differential_dataflow::input::Input; @@ -51,7 +52,8 @@ fn main() { // Load up graph data. Round-robin among workers. for _ in 0 .. (edges / peers) + if index < (edges % peers) { 1 } else { 0 } { - input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1) + input.update_at((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1_000_000, 1) + // input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1) } input.advance_to(1); @@ -67,7 +69,8 @@ fn main() { if index == 0 { let mut next = batch; - for round in 1 .. { + let start_time = Instant::now(); + for round in 1 .. 1_000_100 { input.advance_to(round); input.update((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1); @@ -83,6 +86,7 @@ fn main() { next += batch; } } + println!("rounds finished after {:?}", start_time.elapsed()); } } }).unwrap(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 680aa5306..df252073c 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,15 +1,15 @@ -//! A general purpose `Batcher` implementation based on radix sort. +//! A general purpose `Batcher` implementation based on radix sort for TimelyStack. use std::collections::VecDeque; - +use crate::difference::Semigroup; +use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::trace::{Batcher, Builder}; use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; +use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; - -use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use timely::PartialOrder; /// Creates batches from unordered tuples. pub struct MergeBatcher { @@ -32,7 +32,7 @@ where MergeBatcher { sorter: MergeSorter::new(logger, operator_id), frontier: Antichain::new(), - lower: Antichain::from_elem(T::minimum()), + lower: Antichain::from_elem(::minimum()), } } @@ -43,10 +43,8 @@ where RefOrMut::Ref(reference) => { // This is a moment at which we could capture the allocations backing // `batch` into a different form of region, rather than just cloning. - let mut owned: Vec<_> = self.sorter.empty(); - owned.clone_from(reference); - self.sorter.push(&mut owned); - }, + self.sorter.push(&mut reference.clone()); + } RefOrMut::Mut(reference) => { self.sorter.push(reference); } @@ -57,11 +55,9 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - #[inline(never)] fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Vec::new(); - self.sorter.finish_into(&mut merged); + self.frontier.clear(); + let extracted = self.sorter.extract(upper.borrow(), &mut self.frontier); // Determine the number of distinct keys, values, and updates, // and form a builder pre-sized for these numbers. @@ -70,75 +66,40 @@ where let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); + for ((key, val), _time, _) in extracted.iter().flatten() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; } + } else { + keys += 1; + vals += 1; } + upds += 1; + prev_keyval = Some((key, val)); } B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = Vec::new(); - - self.frontier.clear(); - - // TODO: Re-use buffer, rather than dropping. - for mut buffer in merged.drain(..) { - for ((key, val), time, diff) in buffer.drain(..) { - if upper.less_equal(&time) { - self.frontier.insert(time.clone()); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.push(((key, val), time, diff)); - } - else { - builder.push(((key, val), time, diff)); - } + for mut buffer in extracted { + for datum in buffer.drain(..) { + builder.push(datum); } // Recycling buffer. - self.sorter.push(&mut buffer); + self.sorter.recycle(buffer); } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); - } - - // Drain buffers (fast reclaimation). - // TODO : This isn't obviously the best policy, but "safe" wrt footprint. - // In particular, if we are reading serialized input data, we may - // prefer to keep these buffers around to re-fill, if possible. - let mut buffer = Vec::new(); - self.sorter.push(&mut buffer); - // We recycle buffers with allocations (capacity, and not zero-sized). - while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 { - buffer = Vec::new(); - self.sorter.push(&mut buffer); - } + // Drain buffers (fast reclamation). + self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = builder.done( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(T::minimum()), + ); self.lower = upper; seal } @@ -150,19 +111,23 @@ where } struct MergeSorter { - /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. - queue: Vec>>, + /// each power-of-two length list of least times and allocations. Do not push/pop directly but use the corresponding functions. + queue: Vec, Vec<(D, T, R)>)>>, + /// Empty, recycled allocations. Use [`MergeSorter::emtpy`] to pop an allocation. stash: Vec>, + /// Data that was pushed but not yet inserted into queue. Not necessarily sorted or compacted. + pending: Vec<(D, T, R)>, logger: Option>, operator_id: usize, } -impl MergeSorter { +impl MergeSorter { - const BUFFER_SIZE_BYTES: usize = 1 << 13; + const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn buffer_size() -> usize { - let size = ::std::mem::size_of::<(D, T, R)>(); + /// Buffer size (number of elements) to use for new/empty buffers. + const fn buffer_size() -> usize { + let size = std::mem::size_of::<(D, T, R)>(); if size == 0 { Self::BUFFER_SIZE_BYTES } else if size <= Self::BUFFER_SIZE_BYTES { @@ -174,76 +139,235 @@ impl MergeSorter { #[inline] fn new(logger: Option>, operator_id: usize) -> Self { + // Construct `Self` with zero capacity to avoid allocations if never used. Self { logger, operator_id, queue: Vec::new(), stash: Vec::new(), + pending: Vec::new(), } } #[inline] - pub fn empty(&mut self) -> Vec<(D, T, R)> { + fn empty(&mut self) -> Vec<(D, T, R)> { self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size())) } - #[inline] - pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) { - // TODO: Reason about possible unbounded stash growth. How to / should we return them? - // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop. - let mut batch = if self.stash.len() > 2 { - ::std::mem::replace(batch, self.stash.pop().unwrap()) + /// Remove all elements from the stash. + fn clear_stash(&mut self) { + self.stash.clear(); + } + + /// Insert an empty buffer into the stash. Panics if the buffer is not empty. + fn recycle(&mut self, mut buffer: Vec<(D, T, R)>) { + if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 { + buffer.clear(); + self.stash.push(buffer); } - else { - ::std::mem::take(batch) - }; + } - if !batch.is_empty() { - crate::consolidation::consolidate_updates(&mut batch); - self.account([batch.len()], 1); - self.queue_push(vec![batch]); - while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Push an update into this sorter. + /// + /// We assume that the length of `batch` is shorter than the capacity of `self.pending`. + /// Otherwise, this function can get quadratic behavior. + fn push(&mut self, batch: &mut Vec<(D, T, R)>) { + // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. + if self.pending.capacity() < Self::buffer_size() { + self.pending + .reserve(Self::buffer_size() - self.pending.capacity()); + } + + // Consolidate to avoid redundant work. + crate::consolidation::consolidate_updates(batch); + + while !batch.is_empty() { + self.pending.extend( + batch.drain( + std::cmp::min(0, batch.len().saturating_sub(self.pending.capacity() - self.pending.len())).., + ), + ); + if self.pending.len() == self.pending.capacity() { + crate::consolidation::consolidate_updates(&mut self.pending); + if self.pending.len() > self.pending.capacity() / 2 { + // Flush if `self.pending` is more than half full after consolidation. + self.flush_pending(); + } } } } - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - pub fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and + /// sorted. After this function returns, `self.pending` is empty. + fn flush_pending(&mut self) { + if !self.pending.is_empty() { + let mut block = self.empty(); + let mut least_times = Antichain::new(); + for tuple in self.pending.drain(..) { + least_times.insert_ref(&tuple.1); + block.push(tuple); + } + let chain = vec![(least_times, block)]; + self.account(&chain, 1); + self.queue.push(chain); + while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() > self.queue[self.queue.len()-2].len() / 2) { + let list1 = self.queue.pop().unwrap(); + let list2 = self.queue.pop().unwrap(); + let merged = self.merge_by(list1, list2); + self.queue.push(merged); + } } - self.queue_push(list); } - #[inline(never)] - pub fn finish_into(&mut self, target: &mut Vec>) { - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining + /// data's time in `frontier`. + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + ) -> Vec> { + // Flush pending data + crate::consolidation::consolidate_updates(&mut self.pending); + self.flush_pending(); + + let mut keep_buffer = self.empty(); + let mut keep_frontier = Antichain::new(); + let mut ship_buffer = self.empty(); + let mut ship_chains = Vec::default(); + + self.account(self.queue.iter().flatten(), -1); + + // Walk all chains, separate ready data from data to keep. + for mut chain in std::mem::take(&mut self.queue).drain(..) { + let mut ship_chain = Vec::default(); + let mut keep_chain = Vec::default(); + for (block_frontier, mut block) in chain.drain(..) { + // Is any data ready to be shipped? + if PartialOrder::less_equal(&upper, &block_frontier.borrow()) { + // Keep the entire block. + if !keep_buffer.is_empty() { + for t in keep_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); + } + for t in block_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((block_frontier, block)); + } else { + // Split the block: Some data may be ready. + + // Iterate block, sorting items into ship and keep + for datum in block.drain(..) { + if upper.less_equal(&datum.1) { + keep_frontier.insert_ref(&datum.1); + keep_buffer.push(datum); + if keep_buffer.capacity() == keep_buffer.len() { + // remember keep + for t in keep_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); + } + } else { + ship_buffer.push(datum); + if ship_buffer.capacity() == ship_buffer.len() { + // Ship is full, push in on the block list, get an empty one. + ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty()))); + } + } + } + // Recycle leftovers + self.recycle(block); + } + } + + // Capture any residue left after iterating blocks. + if !ship_buffer.is_empty() { + ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty()))); + } + if !keep_buffer.is_empty() { + for t in keep_frontier.iter() { + frontier.insert_ref(t); + } + keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty()))); + } + + // Collect finished chains + if !keep_chain.is_empty() { + if !ship_chain.is_empty() { + // Canonicalize the chain by adjacent blocks that combined fit into a single block. + let mut target: Vec<(Antichain, Vec<_>)> = Vec::with_capacity(keep_chain.len()); + for (frontier, mut block) in keep_chain.drain(..) { + if let Some((last_frontier, last)) = target.last_mut().filter(|(_, last)| { + last.len() + block.len() <= Self::buffer_size() + }) { + // merge `target.last()` with `block` + for item in block.drain(..) { + last_frontier.insert_ref(&item.1); + last.push(item); + } + self.recycle(block); + } else { + target.push((frontier, block)); + } + } + keep_chain = target; + } + self.queue.push(keep_chain); + } + if !ship_chain.is_empty() { + ship_chains.push(ship_chain); + } } - if let Some(mut last) = self.queue_pop() { - ::std::mem::swap(&mut last, target); + // If we extracted some data, perform maintenance work. + if !ship_chains.is_empty() { + // Maintain the internal chain structure. Ensures that all chains are of geometrically + // increasing length. The function assumes that chains itself are well-formed, i.e., + // they contain elements in increasing order. + + // Step 1: Sort queue by chain length. Depending on how much we extracted, + // the chains might be mis-ordered. + self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len())); + + // Step 2: Merge chains that are within a power of two. + let mut index = self.queue.len().saturating_sub(1); + while index > 0 { + if self.queue[index-1].len() / 2 < self.queue[index].len() { + // Chains at `index-1` and `index` are within a factor of two, merge them. + let list1 = self.queue.remove(index-1); + let list2 = std::mem::take(&mut self.queue[index-1]); + self.queue[index-1] = self.merge_by(list1, list2); + } + index -= 1; + } + } + + self.account(self.queue.iter().flatten(), 1); + + // Merge `ship_chains` into a single element. Roll up from the smallest to the largest + // chain. + ship_chains.sort_by_key(|chain| std::cmp::Reverse(chain.len())); + + while ship_chains.len() > 1 { + let list1 = ship_chains.pop().unwrap(); + let list2 = ship_chains.pop().unwrap(); + ship_chains.push(self.merge_by(list1, list2)); } + + // Pop the last element, or return an empty chain. + ship_chains.pop().unwrap_or_default().into_iter().map(|(_, list)| list).collect() } // merges two sorted input lists into one sorted output list. - #[inline(never)] - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { - self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1); - + fn merge_by( + &mut self, + list1: Vec<(Antichain, Vec<(D, T, R)>)>, + list2: Vec<(Antichain, Vec<(D, T, R)>)>, + ) -> Vec<(Antichain, Vec<(D, T, R)>)> { use std::cmp::Ordering; - // TODO: `list1` and `list2` get dropped; would be better to reuse? let mut output = Vec::with_capacity(list1.len() + list2.len()); let mut result = self.empty(); @@ -251,9 +375,10 @@ impl MergeSorter { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = VecDeque::from(list1.next().unwrap_or_default()); - let mut head2 = VecDeque::from(list2.next().unwrap_or_default()); + let mut head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default()); + let mut head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default()); + let mut frontier = Antichain::new(); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { @@ -265,13 +390,22 @@ impl MergeSorter { (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => result.push(head1.pop_front().unwrap()), - Ordering::Greater => result.push(head2.pop_front().unwrap()), + Ordering::Less => { + let datum = head1.pop_front().unwrap(); + frontier.insert_ref(&datum.1); + result.push(datum); + }, + Ordering::Greater => { + let datum = head2.pop_front().unwrap(); + frontier.insert_ref(&datum.1); + result.push(datum); + }, Ordering::Equal => { let (data1, time1, mut diff1) = head1.pop_front().unwrap(); let (_data2, _time2, diff2) = head2.pop_front().unwrap(); diff1.plus_equals(&diff2); if !diff1.is_zero() { + frontier.insert_ref(&time1); result.push((data1, time1, diff1)); } } @@ -279,36 +413,44 @@ impl MergeSorter { } if result.capacity() == result.len() { - output.push(result); + let frontier = std::mem::take(&mut frontier); + output.push((frontier, result)); result = self.empty(); } if head1.is_empty() { - let done1 = Vec::from(head1); - if done1.capacity() == Self::buffer_size() { self.stash.push(done1); } - head1 = VecDeque::from(list1.next().unwrap_or_default()); + self.recycle(Vec::from(head1)); + head1 = VecDeque::from(list1.next().map(|(_, list)| list).unwrap_or_default()); } if head2.is_empty() { - let done2 = Vec::from(head2); - if done2.capacity() == Self::buffer_size() { self.stash.push(done2); } - head2 = VecDeque::from(list2.next().unwrap_or_default()); + self.recycle(Vec::from(head2)); + head2 = VecDeque::from(list2.next().map(|(_, list)| list).unwrap_or_default()); } } - if !result.is_empty() { output.push(result); } - else if result.capacity() > 0 { self.stash.push(result); } + if result.len() > 0 { + output.push((std::mem::take(&mut frontier), result)); + } else { + self.recycle(result); + } if !head1.is_empty() { let mut result = self.empty(); - for item1 in head1 { result.push(item1); } - output.push(result); + for item1 in head1 { + frontier.insert_ref(&item1.1); + result.push(item1); + } + output.push((std::mem::take(&mut frontier), result)); } output.extend(list1); if !head2.is_empty() { let mut result = self.empty(); - for item2 in head2 { result.push(item2); } - output.push(result); + for item2 in head2 { + frontier.insert_ref(&item2.1); + result.push(item2); + } + output.push((std::mem::take(&mut frontier), result)); } output.extend(list2); @@ -317,30 +459,19 @@ impl MergeSorter { } impl MergeSorter { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten().map(Vec::len), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(batch.iter().map(Vec::len), 1); - self.queue.push(batch); - } - /// Account size changes. Only performs work if a logger exists. /// /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account>(&self, items: I, diff: isize) { + fn account<'a, I: IntoIterator, Vec<(D, T, R)>)>>( + &self, + items: I, + diff: isize, + ) where D: 'a, T: 'a, R: 'a { if let Some(logger) = &self.logger { - let mut records= 0isize; - for len in items { - records = records.saturating_add_unsigned(len); + let mut records = 0isize; + for stack in items { + records = records.saturating_add_unsigned(stack.1.len()); } logger.log(BatcherEvent { operator: self.operator_id, @@ -355,6 +486,6 @@ impl MergeSorter { impl Drop for MergeSorter { fn drop(&mut self) { - while self.queue_pop().is_some() { } + self.account(self.queue.iter().flatten(), -1); } } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 9c2faab12..9da881d04 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -1,23 +1,23 @@ //! A general purpose `Batcher` implementation based on radix sort for TimelyStack. -use timely::Container; +use crate::difference::Semigroup; +use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::trace::{Batcher, Builder}; use timely::communication::message::RefOrMut; use timely::container::columnation::{Columnation, TimelyStack}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; +use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; - -use crate::difference::Semigroup; -use crate::logging::{BatcherEvent, DifferentialEvent}; -use crate::trace::{Batcher, Builder}; +use timely::{Container, PartialOrder}; /// Creates batches from unordered tuples. pub struct ColumnatedMergeBatcher -where - K: Columnation + 'static, - V: Columnation + 'static, - T: Columnation + 'static, - D: Columnation + 'static, + where + K: Columnation + 'static, + V: Columnation + 'static, + T: Columnation + 'static, + D: Columnation + 'static, { sorter: MergeSorterColumnation<(K, V), T, D>, lower: Antichain, @@ -25,16 +25,19 @@ where } impl Batcher for ColumnatedMergeBatcher -where - K: Columnation + Ord + Clone + 'static, - V: Columnation + Ord + Clone + 'static, - T: Columnation + Timestamp + 'static, - D: Columnation + Semigroup + 'static, + where + K: Columnation + Ord + Clone + 'static, + V: Columnation + Ord + Clone + 'static, + T: Columnation + Timestamp + 'static, + D: Columnation + Semigroup + 'static, { - type Item = ((K,V),T,D); + type Item = ((K, V), T, D); type Time = T; - fn new(logger: Option>, operator_id: usize) -> Self { + fn new( + logger: Option>, + operator_id: usize, + ) -> Self { ColumnatedMergeBatcher { sorter: MergeSorterColumnation::new(logger, operator_id), frontier: Antichain::new(), @@ -50,7 +53,7 @@ where // This is a moment at which we could capture the allocations backing // `batch` into a different form of region, rather than just cloning. self.sorter.push(&mut reference.clone()); - }, + } RefOrMut::Mut(reference) => { self.sorter.push(reference); } @@ -62,10 +65,12 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { - - let mut merged = Default::default(); - self.sorter.finish_into(&mut merged); + fn seal>( + &mut self, + upper: Antichain, + ) -> B::Output { + self.frontier.clear(); + let merged = self.sorter.extract_into(upper.borrow(), &mut self.frontier); // Determine the number of distinct keys, values, and updates, // and form a builder pre-sized for these numbers. @@ -74,69 +79,40 @@ where let mut vals = 0; let mut upds = 0; let mut prev_keyval = None; - for buffer in merged.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } - else if p_val != val { - vals += 1; - } - upds += 1; - } else { - keys += 1; - vals += 1; - upds += 1; - } - prev_keyval = Some((key, val)); + for ((key, val), _time, _) in merged.iter().map(|t| t.iter()).flatten() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; } + } else { + keys += 1; + vals += 1; } + upds += 1; + prev_keyval = Some((key, val)); } B::with_capacity(keys, vals, upds) }; - let mut kept = Vec::new(); - let mut keep = TimelyStack::default(); - - self.frontier.clear(); - - for buffer in merged.drain(..) { - for datum @ ((_key, _val), time, _diff) in &buffer[..] { - if upper.less_equal(time) { - self.frontier.insert(time.clone()); - if keep.is_empty() { - if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() { - keep = self.sorter.empty(); - } - } else if keep.len() == keep.capacity() { - kept.push(keep); - keep = self.sorter.empty(); - } - keep.copy(datum); - } - else { - builder.copy(datum); - } + for buffer in merged.into_iter() { + for datum in &buffer[..] { + builder.copy(datum); } // Recycling buffer. self.sorter.recycle(buffer); } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !kept.is_empty() { - self.sorter.push_list(kept); - } - // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); + let seal = builder.done( + self.lower.clone(), + upper.clone(), + Antichain::from_elem(T::minimum()), + ); self.lower = upper; seal } @@ -159,7 +135,6 @@ impl Default for TimelyStackQueue { } impl TimelyStackQueue { - fn pop(&mut self) -> &T { self.head += 1; &self.list[self.head - 1] @@ -170,25 +145,28 @@ impl TimelyStackQueue { } fn from(list: TimelyStack) -> Self { - TimelyStackQueue { - list, - head: 0, - } + TimelyStackQueue { list, head: 0 } } fn done(self) -> TimelyStack { self.list } - fn is_empty(&self) -> bool { self.head == self.list[..].len() } + fn is_empty(&self) -> bool { + self.head == self.list[..].len() + } /// Return an iterator over the remaining elements. - fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { + fn iter(&self) -> impl Iterator + Clone + ExactSizeIterator { self.list[self.head..].iter() } } -struct MergeSorterColumnation { +struct MergeSorterColumnation< + D: Columnation + 'static, + T: Columnation + 'static, + R: Columnation + 'static, +> { /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions. queue: Vec>>, stash: Vec>, @@ -197,8 +175,12 @@ struct MergeSorterColumnation MergeSorterColumnation { - +impl< + D: Ord + Columnation + 'static, + T: Clone + PartialOrder + Ord + Columnation + 'static, + R: Semigroup + Columnation + 'static, +> MergeSorterColumnation +{ const BUFFER_SIZE_BYTES: usize = 64 << 10; /// Buffer size (number of elements) to use for new/empty buffers. @@ -218,7 +200,10 @@ impl>, operator_id: usize) -> Self { + fn new( + logger: Option>, + operator_id: usize, + ) -> Self { Self { logger, operator_id, @@ -229,7 +214,9 @@ impl TimelyStack<(D, T, R)> { - self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) + self.stash + .pop() + .unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size())) } /// Remove all elements from the stash. @@ -248,11 +235,16 @@ impl) { // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`. if self.pending.capacity() < Self::pending_buffer_size() { - self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity()); + self.pending + .reserve(Self::pending_buffer_size() - self.pending.capacity()); } while !batch.is_empty() { - self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()))); + self.pending.extend( + batch.drain( + ..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len()), + ), + ); if self.pending.len() == self.pending.capacity() { crate::consolidation::consolidate_updates(&mut self.pending); if self.pending.len() > self.pending.capacity() / 2 { @@ -272,45 +264,164 @@ impl 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); - } + let batch = vec![stack]; + self.account(&batch, 1); + self.queue.push(batch); + self.maintain(); } } - // This is awkward, because it isn't a power-of-two length any more, and we don't want - // to break it down to be so. - fn push_list(&mut self, list: Vec>) { - while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + /// Maintain the internal chain structure. Ensures that: + /// * All chains are sorted by size. + /// * Within each chain, adjacent blocks are reduced, i.e., their combined length is larger than + /// the block size. + /// * All chains are of geometrically increasing length. + fn maintain(&mut self) { + self.account(self.queue.iter().flatten(), -1); + + // Step 1: Canonicalize each chain by adjacent blocks that combined fit into a single block. + for chain in &mut self.queue { + let mut target: Vec> = Vec::with_capacity(chain.len()); + for block in chain.drain(..) { + if target.last().map_or(false, |last| { + last.len() + block.len() <= Self::buffer_size() + }) { + // merge `target.last()` with `block` + let last = target.last_mut().unwrap(); + for item in block.iter() { + last.copy(item); + } + } else { + target.push(block); + } + } + *chain = target; } - self.queue_push(list); + + // Step 2: Sort queue by chain length. + self.queue.sort_by_key(|chain| chain.len()); + + // Step 3: Merge chains that are within a power of two. + let mut index = 0; + while index + 1 < self.queue.len() { + if self.queue[index].len() > self.queue[index + 1].len() / 2 { + // Chains at `index` and `index+1` are within a factor of two, merge them. + let list1 = self.queue.remove(index); + let list2 = std::mem::take(&mut self.queue[index]); + self.queue[index] = self.merge_by(list1, list2); + // Ensure chains are sorted by length. + self.queue.sort_by_key(|chain| chain.len()); + } else { + index += 1; + } + } + + self.account(self.queue.iter().flatten(), 1); } - fn finish_into(&mut self, target: &mut Vec>) { + /// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining + /// data's time in `frontier`. + fn extract_into( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + ) -> Vec> { + // Flush pending data crate::consolidation::consolidate_updates(&mut self.pending); self.flush_pending(); - while self.queue.len() > 1 { - let list1 = self.queue_pop().unwrap(); - let list2 = self.queue_pop().unwrap(); - let merged = self.merge_by(list1, list2); - self.queue_push(merged); + + let mut keep = self.empty(); + let mut ship = self.empty(); + let mut ship_list = Vec::default(); + + self.account(self.queue.iter().flatten(), -1); + + // Walk all chains, separate ready data from data to keep. + for mut chain in std::mem::take(&mut self.queue).drain(..) { + let mut block_list = Vec::default(); + let mut keep_list = Vec::default(); + for block in chain.drain(..) { + // Is all data ready to be shipped? + let all = block.iter().all(|(_, t, _)| !upper.less_equal(t)); + // Is any data ready to be shipped? + let any = block.iter().any(|(_, t, _)| !upper.less_equal(t)); + + if all { + // All data is ready, push what we accumulated, stash whole block. + if !ship.is_empty() { + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + block_list.push(block); + } else if any { + // Iterate block, sorting items into ship and keep + for datum in block.iter() { + if upper.less_equal(&datum.1) { + frontier.insert_ref(&datum.1); + keep.copy(datum); + if keep.capacity() == keep.len() { + // remember keep + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + } else { + ship.copy(datum); + if ship.capacity() == ship.len() { + // Ship is full, push in on the block list, get an empty one. + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + } + } + // Recycle leftovers + self.recycle(block); + } else { + // Keep the entire block. + + for (_, t, _) in block.iter() { + frontier.insert_ref(t); + } + if !keep.is_empty() { + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + keep_list.push(block); + } + } + + // Capture any residue left after iterating blocks. + if !ship.is_empty() { + block_list.push(std::mem::replace(&mut ship, self.empty())); + } + if !keep.is_empty() { + keep_list.push(std::mem::replace(&mut keep, self.empty())); + } + + // Collect finished chains + if !block_list.is_empty() { + ship_list.push(block_list); + } + if !keep_list.is_empty() { + self.queue.push(keep_list); + } } - if let Some(mut last) = self.queue_pop() { - std::mem::swap(&mut last, target); + self.account(self.queue.iter().flatten(), 1); + + self.maintain(); + + while ship_list.len() > 1 { + let list1 = ship_list.pop().unwrap(); + let list2 = ship_list.pop().unwrap(); + ship_list.push(self.merge_by(list1, list2)); } + + // Pop the last element, or return an empty chain. + ship_list.pop().unwrap_or_default() } // merges two sorted input lists into one sorted output list. - fn merge_by(&mut self, list1: Vec>, list2: Vec>) -> Vec> { + fn merge_by( + &mut self, + list1: Vec>, + list2: Vec>, + ) -> Vec> { use std::cmp::Ordering; // TODO: `list1` and `list2` get dropped; would be better to reuse? @@ -325,18 +436,20 @@ impl 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { let x = head1.peek(); let y = head2.peek(); (&x.0, &x.1).cmp(&(&y.0, &y.1)) }; match cmp { - Ordering::Less => { result.copy(head1.pop()); } - Ordering::Greater => { result.copy(head2.pop()); } - Ordering::Equal => { + Ordering::Less => { + result.copy(head1.pop()); + } + Ordering::Greater => { + result.copy(head2.pop()); + } + Ordering::Equal => { let (data1, time1, diff1) = head1.pop(); let (_data2, _time2, diff2) = head2.pop(); let mut diff1 = diff1.clone(); @@ -372,7 +485,9 @@ impl MergeSorterColumnation { - /// Pop a batch from `self.queue` and account size changes. - #[inline] - fn queue_pop(&mut self) -> Option>> { - let batch = self.queue.pop(); - self.account(batch.iter().flatten(), -1); - batch - } - - /// Push a batch to `self.queue` and account size changes. - #[inline] - fn queue_push(&mut self, batch: Vec>) { - self.account(&batch, 1); - self.queue.push(batch); - } - +impl +MergeSorterColumnation +{ /// Account size changes. Only performs work if a logger exists. /// /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff. - fn account<'a, I: IntoIterator>>(&self, items: I, diff: isize) { + fn account<'a, I: IntoIterator>>( + &self, + items: I, + diff: isize, + ) { if let Some(logger) = &self.logger { - let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize); + let (mut records, mut siz, mut capacity, mut allocations) = + (0isize, 0isize, 0isize, 0isize); for stack in items { records = records.saturating_add_unsigned(stack.len()); stack.heap_size(|s, c| { @@ -429,11 +538,12 @@ impl Drop for MergeSorterColumnation { +impl Drop +for MergeSorterColumnation +{ fn drop(&mut self) { - while self.queue_pop().is_some() { } + self.account(self.queue.iter().flatten(), -1); } }