diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 135128efe..b538e3503 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -281,7 +281,7 @@ pub trait ReduceCore where G::Timestam if !input.is_empty() { logic(key, input, change); } - change.extend(output.drain(..).map(|(x,d)| (x, d.negate()))); + change.extend(output.drain(..).map(|(x,d)| (x.clone(), d.negate()))); crate::consolidation::consolidate(change); }) } @@ -298,7 +298,7 @@ pub trait ReduceCore where G::Timestam T2::R: Semigroup, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static ; } @@ -317,7 +317,7 @@ where T2: Trace+TraceReader+'static, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { self.arrange_by_key_named(&format!("Arrange: {}", name)) .reduce_core(name, logic) @@ -338,7 +338,7 @@ where T2::R: Semigroup, T2::Batch: Batch, T2::Cursor: Cursor, - L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { + L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static { let mut result_trace = None; @@ -676,13 +676,14 @@ where C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>); + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>); } /// Implementation based on replaying historical and new updates together. mod history_replay { + use std::mem::ManuallyDrop; use ::difference::Semigroup; use lattice::Lattice; use trace::Cursor; @@ -691,6 +692,13 @@ mod history_replay { use super::{PerKeyCompute, sort_dedup}; + /// Clears and type erases a vector + fn vec_to_parts(v: Vec) -> (*mut (), usize) { + let mut v = ManuallyDrop::new(v); + v.clear(); + (v.as_mut_ptr() as *mut (), v.capacity()) + } + /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in /// time order, maintaining consolidated representations of updates with respect to future interesting times. pub struct HistoryReplayer<'a, V1, V2, T, R1, R2> @@ -705,7 +713,11 @@ mod history_replay { input_history: ValueHistory<'a, V1, T, R1>, output_history: ValueHistory<'a, V2, T, R2>, input_buffer: Vec<(&'a V1, R1)>, - output_buffer: Vec<(V2, R2)>, + // A type erased pointer and capacity for the temporary output buffer passed to `logic`. + // During `compute` the vector contains references to self which will get invalid as soon + // as compute returns. For this reason the temporary vector is always cleared before + // decomposing it back into its type erased parts + output_buffer_parts: (*mut (), usize), update_buffer: Vec<(V2, R2)>, output_produced: Vec<((V2, T), R2)>, synth_times: Vec, @@ -728,7 +740,7 @@ mod history_replay { input_history: ValueHistory::new(), output_history: ValueHistory::new(), input_buffer: Vec::new(), - output_buffer: Vec::new(), + output_buffer_parts: vec_to_parts(Vec::<(&V2, R2)>::new()), update_buffer: Vec::new(), output_produced: Vec::new(), synth_times: Vec::new(), @@ -754,7 +766,7 @@ mod history_replay { C1: Cursor, C2: Cursor, C3: Cursor, - L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>) + L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>) { // The work we need to perform is at times defined principally by the contents of `batch_cursor` @@ -913,10 +925,20 @@ mod history_replay { } crate::consolidation::consolidate(&mut self.input_buffer); + let (ptr, cap) = self.output_buffer_parts; + // SAFETY: + // * `ptr` is valid because is has been previously allocated by a Vec + // constructor parameterized with the same type argument + // * `len` and `cap` are valid because the vector is converted to parts + // only through `vec_to_parts` which clears the vector and gets its + // capacity + let mut output_buffer = unsafe { + Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap) + }; meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet)); - for &((ref value, ref time), ref diff) in output_replay.buffer().iter() { + for &((value, ref time), ref diff) in output_replay.buffer().iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + output_buffer.push((value, diff.clone())); } else { self.temporary.push(next_time.join(time)); @@ -924,20 +946,20 @@ mod history_replay { } for &((ref value, ref time), ref diff) in self.output_produced.iter() { if time.less_equal(&next_time) { - self.output_buffer.push(((*value).clone(), diff.clone())); + output_buffer.push((value, diff.clone())); } else { self.temporary.push(next_time.join(&time)); } } - crate::consolidation::consolidate(&mut self.output_buffer); + crate::consolidation::consolidate(&mut output_buffer); // Apply user logic if non-empty input and see what happens! - if self.input_buffer.len() > 0 || self.output_buffer.len() > 0 { - logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer); + if self.input_buffer.len() > 0 || output_buffer.len() > 0 { + logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer); self.input_buffer.clear(); - self.output_buffer.clear(); } + self.output_buffer_parts = vec_to_parts(output_buffer); // output_replay.advance_buffer_by(&meet); // for &((ref value, ref time), diff) in output_replay.buffer().iter() { @@ -1076,6 +1098,28 @@ mod history_replay { } } + impl<'a, V1, V2, T, R1, R2> Drop for HistoryReplayer<'a, V1, V2, T, R1, R2> + where + V1: Ord+Clone+'a, + V2: Ord+Clone+'a, + T: Lattice+Ord+Clone, + R1: Semigroup, + R2: Semigroup, + { + fn drop(&mut self) { + let (ptr, cap) = self.output_buffer_parts; + // SAFETY: + // * `ptr` is valid because is has been previously allocated by a Vec + // constructor parameterized with the same type argument + // * `len` and `cap` are valid because the vector is converted to parts + // only through `vec_to_parts` which clears the vector and gets its + // capacity + unsafe { + Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap); + } + } + } + /// Updates an optional meet by an optional time. fn update_meet(meet: &mut Option, other: Option<&T>) { if let Some(time) = other {