diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index af864bffb..ec5c97a67 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -460,3 +460,91 @@ pub mod batcher { } } } + +mod differential { + use columnar::Columnar; + use differential_dataflow::difference::Abelian; + use differential_dataflow::operators::enter::EnterTime; + use differential_dataflow::operators::Negate; + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::{Enter, Operator}; + use timely::dataflow::scopes::Child; + use timely::dataflow::{Scope, StreamCore}; + use timely::progress::timestamp::Refines; + use timely::{Container, Data}; + + use crate::builder::ColumnBuilder; + use crate::Column; + + impl EnterTime, G, TInner> + for StreamCore> + where + G: Scope, + G::Timestamp: Columnar, + ::Container: Data, + D: Columnar, + D::Container: Data, + R: Columnar, + R::Container: Data, + TInner: Columnar + Refines, + TInner::Container: Data, + { + type Container = Column<(D, TInner, R)>; + + fn enter_time<'a>( + &self, + child: &Child<'a, G, TInner>, + ) -> StreamCore, Column<(D, TInner, R)>> { + self.enter(child).unary::, _, _, _>( + Pipeline, + "EnterTimeColumn", + |_capability, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session_with_builder(&time); + for (data, time, diff) in data.iter() { + // TODO: This isn't optimal if `into_owned` needs to allocate. We could + // specialize over concrete timestamp types to avoid this, and work + // through columns at a time. + let time_owned = Columnar::into_owned(time); + session.give((data, &TInner::to_inner(time_owned), diff)); + } + } + } + }, + ) + } + } + + impl Negate, G> + for StreamCore> + where + G: Scope, + G::Timestamp: Columnar, + ::Container: Data, + D: Columnar, + D::Container: Data, + R: Columnar + Abelian, + R::Container: Data, + { + fn negate(&self) -> Self { + self.unary::, _, _, _>( + Pipeline, + "NegateColumn", + |_capability, _info| { + move |input, output| { + let mut diff_owned = R::zero(); + while let Some((time, data)) = input.next() { + let mut session = output.session_with_builder(&time); + for (data, time, diff) in data.iter() { + R::copy_from(&mut diff_owned, diff); + diff_owned.negate(); + session.give((data, time, &diff_owned)); + } + } + } + }, + ) + } + } +} diff --git a/differential-dataflow/src/collection.rs b/differential-dataflow/src/collection.rs index dd173bcf8..d841967fd 100644 --- a/differential-dataflow/src/collection.rs +++ b/differential-dataflow/src/collection.rs @@ -22,6 +22,7 @@ use timely::dataflow::StreamCore; use crate::difference::{Semigroup, Abelian, Multiply}; use crate::lattice::Lattice; use crate::hashable::Hashable; +use crate::operators::enter::EnterTime; /// A mutable collection of values of type `D` /// @@ -173,6 +174,37 @@ impl Collection { self.inner.scope() } + /// Brings a Collection into a nested scope. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.region(|child| { + /// data.enter(child) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R, as EnterTime>::Container> + where + T: Refines, + StreamCore: EnterTime, + { + self.inner + .enter_time(child) + .as_collection() + } + + /// Creates a new collection whose counts are the negation of those in the input. /// /// This method is most commonly used with `concat` to get those element in one collection but not another. @@ -198,7 +230,7 @@ impl Collection { /// ``` // TODO: Removing this function is possible, but breaks existing callers of `negate` who expect // an inherent method on `Collection`. - pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { + pub fn negate(&self) -> Collection where StreamCore: crate::operators::Negate { crate::operators::Negate::negate(&self.inner).as_collection() } } @@ -362,36 +394,6 @@ impl Collection { .as_collection() } - /// Brings a Collection into a nested scope. - /// - /// # Examples - /// - /// ``` - /// use timely::dataflow::Scope; - /// use differential_dataflow::input::Input; - /// - /// ::timely::example(|scope| { - /// - /// let data = scope.new_collection_from(1 .. 10).1; - /// - /// let result = scope.region(|child| { - /// data.enter(child) - /// .leave() - /// }); - /// - /// data.assert_eq(&result); - /// }); - /// ``` - pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection, D, R> - where - T: Refines<::Timestamp>, - { - self.inner - .enter(child) - .map(|(data, time, diff)| (data, T::to_inner(time), diff)) - .as_collection() - } - /// Brings a Collection into a nested scope, at varying times. /// /// The `initial` function indicates the time at which each element of the Collection should appear. diff --git a/differential-dataflow/src/operators/enter.rs b/differential-dataflow/src/operators/enter.rs new file mode 100644 index 000000000..361b0fdc9 --- /dev/null +++ b/differential-dataflow/src/operators/enter.rs @@ -0,0 +1,54 @@ +//! Enter a collection into a scope. + +use timely::Data; +use timely::dataflow::{Scope, ScopeParent, Stream, StreamCore}; +use timely::dataflow::operators::core::{Enter, Map}; +use timely::dataflow::scopes::Child; +use timely::progress::timestamp::Refines; + +/// Extension trait for streams. +pub trait EnterTime +where + G: ScopeParent, + TInner: Refines, +{ + /// The containers in the output stream. + type Container: Clone; + + /// Brings a stream into a nested scope. + /// + /// # Examples + /// + /// ``` + /// use timely::dataflow::Scope; + /// use differential_dataflow::input::Input; + /// + /// ::timely::example(|scope| { + /// + /// let data = scope.new_collection_from(1 .. 10).1; + /// + /// let result = scope.region(|child| { + /// data.enter(child) + /// .leave() + /// }); + /// + /// data.assert_eq(&result); + /// }); + /// ``` + fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> StreamCore, Self::Container>; +} + +impl EnterTime, G, TInner> for Stream +where + G: Scope, + D: Data, + R: Data, + TInner: Refines, +{ + type Container = Vec<(D, TInner, R)>; + + fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> Stream, (D, TInner, R)> { + self.enter(child) + .map(|(data, time, diff)| (data, TInner::to_inner(time), diff)) + } +} diff --git a/differential-dataflow/src/operators/iterate.rs b/differential-dataflow/src/operators/iterate.rs index 6056ad8cd..36caed7c7 100644 --- a/differential-dataflow/src/operators/iterate.rs +++ b/differential-dataflow/src/operators/iterate.rs @@ -169,7 +169,7 @@ where impl Variable where G::Timestamp: Lattice, - StreamCore: crate::operators::Negate + ResultsIn, + StreamCore: crate::operators::Negate + ResultsIn, { /// Creates a new initially empty `Variable`. /// diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 615cfa399..1d625b2ee 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -12,12 +12,13 @@ pub use self::count::CountTotal; pub use self::threshold::ThresholdTotal; pub mod arrange; -pub mod negate; -pub mod reduce; pub mod consolidate; +pub mod count; +pub mod enter; pub mod iterate; pub mod join; -pub mod count; +pub mod negate; +pub mod reduce; pub mod threshold; use crate::lattice::Lattice; diff --git a/differential-dataflow/src/operators/negate.rs b/differential-dataflow/src/operators/negate.rs index b354c95ca..2388f17c4 100644 --- a/differential-dataflow/src/operators/negate.rs +++ b/differential-dataflow/src/operators/negate.rs @@ -35,18 +35,18 @@ pub trait Negate { fn negate(&self) -> Self; } -impl Negate for Collection +impl Negate for Collection where G: Scope, C: Clone, - StreamCore: Negate, + StreamCore: Negate, { fn negate(&self) -> Self { self.inner.negate().as_collection() } } -impl Negate> for Stream { +impl Negate, G> for Stream { fn negate(&self) -> Self { self.map_in_place(|x| x.2.negate()) }