Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid cloning values during reduce #347

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
if !input.is_empty() {
logic(key, input, change);
}
change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
change.extend(output.drain(..).map(|(x,d)| (x.clone(), d.negate())));
crate::consolidation::consolidate(change);
})
}
Expand All @@ -298,7 +298,7 @@ pub trait ReduceCore<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestam
T2::R: Semigroup,
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val,T2::R)>)+'static
;
}

Expand All @@ -317,7 +317,7 @@ where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core(name, logic)
Expand All @@ -338,7 +338,7 @@ where
T2::R: Semigroup,
T2::Batch: Batch<K, T2::Val, G::Timestamp, T2::R>,
T2::Cursor: Cursor<K, T2::Val, G::Timestamp, T2::R>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {
L: FnMut(&K, &[(&V, R)], &mut Vec<(&T2::Val,T2::R)>, &mut Vec<(T2::Val, T2::R)>)+'static {

let mut result_trace = None;

Expand Down Expand Up @@ -676,13 +676,14 @@ where
C1: Cursor<K, V1, T, R1>,
C2: Cursor<K, V2, T, R2>,
C3: Cursor<K, V1, T, R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>);
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>);
}


/// Implementation based on replaying historical and new updates together.
mod history_replay {

use std::mem::ManuallyDrop;
use ::difference::Semigroup;
use lattice::Lattice;
use trace::Cursor;
Expand All @@ -691,6 +692,13 @@ mod history_replay {

use super::{PerKeyCompute, sort_dedup};

/// Clears and type erases a vector
fn vec_to_parts<T>(v: Vec<T>) -> (*mut (), usize) {
let mut v = ManuallyDrop::new(v);
v.clear();
(v.as_mut_ptr() as *mut (), v.capacity())
}

/// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
/// time order, maintaining consolidated representations of updates with respect to future interesting times.
pub struct HistoryReplayer<'a, V1, V2, T, R1, R2>
Expand All @@ -705,7 +713,11 @@ mod history_replay {
input_history: ValueHistory<'a, V1, T, R1>,
output_history: ValueHistory<'a, V2, T, R2>,
input_buffer: Vec<(&'a V1, R1)>,
output_buffer: Vec<(V2, R2)>,
// A type erased pointer and capacity for the temporary output buffer passed to `logic`.
// During `compute` the vector contains references to self which will get invalid as soon
// as compute returns. For this reason the temporary vector is always cleared before
// decomposing it back into its type erased parts
output_buffer_parts: (*mut (), usize),
update_buffer: Vec<(V2, R2)>,
output_produced: Vec<((V2, T), R2)>,
synth_times: Vec<T>,
Expand All @@ -728,7 +740,7 @@ mod history_replay {
input_history: ValueHistory::new(),
output_history: ValueHistory::new(),
input_buffer: Vec::new(),
output_buffer: Vec::new(),
output_buffer_parts: vec_to_parts(Vec::<(&V2, R2)>::new()),
update_buffer: Vec::new(),
output_produced: Vec::new(),
synth_times: Vec::new(),
Expand All @@ -754,7 +766,7 @@ mod history_replay {
C1: Cursor<K, V1, T, R1>,
C2: Cursor<K, V2, T, R2>,
C3: Cursor<K, V1, T, R1>,
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(V2, R2)>, &mut Vec<(V2, R2)>)
L: FnMut(&K, &[(&V1, R1)], &mut Vec<(&V2, R2)>, &mut Vec<(V2, R2)>)
{

// The work we need to perform is at times defined principally by the contents of `batch_cursor`
Expand Down Expand Up @@ -913,31 +925,41 @@ mod history_replay {
}
crate::consolidation::consolidate(&mut self.input_buffer);

let (ptr, cap) = self.output_buffer_parts;
// SAFETY:
// * `ptr` is valid because is has been previously allocated by a Vec
// constructor parameterized with the same type argument
// * `len` and `cap` are valid because the vector is converted to parts
// only through `vec_to_parts` which clears the vector and gets its
// capacity
let mut output_buffer = unsafe {
Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap)
};
meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
for &((ref value, ref time), ref diff) in output_replay.buffer().iter() {
for &((value, ref time), ref diff) in output_replay.buffer().iter() {
if time.less_equal(&next_time) {
self.output_buffer.push(((*value).clone(), diff.clone()));
output_buffer.push((value, diff.clone()));
}
else {
self.temporary.push(next_time.join(time));
}
}
for &((ref value, ref time), ref diff) in self.output_produced.iter() {
if time.less_equal(&next_time) {
self.output_buffer.push(((*value).clone(), diff.clone()));
output_buffer.push((value, diff.clone()));
}
else {
self.temporary.push(next_time.join(&time));
}
}
crate::consolidation::consolidate(&mut self.output_buffer);
crate::consolidation::consolidate(&mut output_buffer);

// Apply user logic if non-empty input and see what happens!
if self.input_buffer.len() > 0 || self.output_buffer.len() > 0 {
logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
if self.input_buffer.len() > 0 || output_buffer.len() > 0 {
logic(key, &self.input_buffer[..], &mut output_buffer, &mut self.update_buffer);
self.input_buffer.clear();
self.output_buffer.clear();
}
self.output_buffer_parts = vec_to_parts(output_buffer);

// output_replay.advance_buffer_by(&meet);
// for &((ref value, ref time), diff) in output_replay.buffer().iter() {
Expand Down Expand Up @@ -1076,6 +1098,28 @@ mod history_replay {
}
}

impl<'a, V1, V2, T, R1, R2> Drop for HistoryReplayer<'a, V1, V2, T, R1, R2>
where
V1: Ord+Clone+'a,
V2: Ord+Clone+'a,
T: Lattice+Ord+Clone,
R1: Semigroup,
R2: Semigroup,
{
fn drop(&mut self) {
let (ptr, cap) = self.output_buffer_parts;
// SAFETY:
// * `ptr` is valid because is has been previously allocated by a Vec
// constructor parameterized with the same type argument
// * `len` and `cap` are valid because the vector is converted to parts
// only through `vec_to_parts` which clears the vector and gets its
// capacity
unsafe {
Vec::from_raw_parts(ptr as *mut (&V2, R2), 0, cap);
}
}
}

/// Updates an optional meet by an optional time.
fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
if let Some(time) = other {
Expand Down