Skip to content

Introduce EnterTime, fix Negate #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,91 @@ pub mod batcher {
}
}
}

mod differential {
use columnar::Columnar;
use differential_dataflow::difference::Abelian;
use differential_dataflow::operators::enter::EnterTime;
use differential_dataflow::operators::Negate;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::{Enter, Operator};
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, StreamCore};
use timely::progress::timestamp::Refines;
use timely::{Container, Data};

use crate::builder::ColumnBuilder;
use crate::Column;

impl<G, D, R, TInner> EnterTime<Column<(D, G::Timestamp, R)>, G, TInner>
for StreamCore<G, Column<(D, G::Timestamp, R)>>
where
G: Scope,
G::Timestamp: Columnar,
<G::Timestamp as Columnar>::Container: Data,
D: Columnar,
D::Container: Data,
R: Columnar,
R::Container: Data,
TInner: Columnar + Refines<G::Timestamp>,
TInner::Container: Data,
{
type Container = Column<(D, TInner, R)>;

fn enter_time<'a>(
&self,
child: &Child<'a, G, TInner>,
) -> StreamCore<Child<'a, G, TInner>, Column<(D, TInner, R)>> {
self.enter(child).unary::<ColumnBuilder<_>, _, _, _>(
Pipeline,
"EnterTimeColumn",
|_capability, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for (data, time, diff) in data.iter() {
// TODO: This isn't optimal if `into_owned` needs to allocate. We could
// specialize over concrete timestamp types to avoid this, and work
// through columns at a time.
let time_owned = Columnar::into_owned(time);
session.give((data, &TInner::to_inner(time_owned), diff));
}
}
}
},
)
}
}

impl<G, D, R> Negate<Column<(D, G::Timestamp, R)>, G>
for StreamCore<G, Column<(D, G::Timestamp, R)>>
where
G: Scope,
G::Timestamp: Columnar,
<G::Timestamp as Columnar>::Container: Data,
D: Columnar,
D::Container: Data,
R: Columnar + Abelian,
R::Container: Data,
{
fn negate(&self) -> Self {
self.unary::<ColumnBuilder<_>, _, _, _>(
Pipeline,
"NegateColumn",
|_capability, _info| {
move |input, output| {
let mut diff_owned = R::zero();
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for (data, time, diff) in data.iter() {
R::copy_from(&mut diff_owned, diff);
diff_owned.negate();
session.give((data, time, &diff_owned));
}
}
}
},
)
}
}
}
64 changes: 33 additions & 31 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use timely::dataflow::StreamCore;
use crate::difference::{Semigroup, Abelian, Multiply};
use crate::lattice::Lattice;
use crate::hashable::Hashable;
use crate::operators::enter::EnterTime;

/// A mutable collection of values of type `D`
///
Expand Down Expand Up @@ -173,6 +174,37 @@ impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
self.inner.scope()
}

/// Brings a Collection into a nested scope.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let result = scope.region(|child| {
/// data.enter(child)
/// .leave()
/// });
///
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R, <StreamCore<G, C> as EnterTime<C, G, T>>::Container>
where
T: Refines<G::Timestamp>,
StreamCore<G, C>: EnterTime<C, G, T>,
{
self.inner
.enter_time(child)
.as_collection()
}


/// Creates a new collection whose counts are the negation of those in the input.
///
/// This method is most commonly used with `concat` to get those element in one collection but not another.
Expand All @@ -198,7 +230,7 @@ impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
/// ```
// TODO: Removing this function is possible, but breaks existing callers of `negate` who expect
// an inherent method on `Collection`.
pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<G, C> {
pub fn negate(&self) -> Collection<G, D, R, C> where StreamCore<G, C>: crate::operators::Negate<C, G> {
crate::operators::Negate::negate(&self.inner).as_collection()
}
}
Expand Down Expand Up @@ -362,36 +394,6 @@ impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
.as_collection()
}

/// Brings a Collection into a nested scope.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let result = scope.region(|child| {
/// data.enter(child)
/// .leave()
/// });
///
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
where
T: Refines<<G as ScopeParent>::Timestamp>,
{
self.inner
.enter(child)
.map(|(data, time, diff)| (data, T::to_inner(time), diff))
.as_collection()
}

/// Brings a Collection into a nested scope, at varying times.
///
/// The `initial` function indicates the time at which each element of the Collection should appear.
Expand Down
54 changes: 54 additions & 0 deletions differential-dataflow/src/operators/enter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Enter a collection into a scope.

use timely::Data;
use timely::dataflow::{Scope, ScopeParent, Stream, StreamCore};
use timely::dataflow::operators::core::{Enter, Map};
use timely::dataflow::scopes::Child;
use timely::progress::timestamp::Refines;

/// Extension trait for streams.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but doccomment could use some love.

pub trait EnterTime<C, G, TInner>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the Negate comment, I think G, TInner, C is the more common order; certainly the scope comes first in just about all timely traits iirc (both Map and Operator, that I just checked). Timely's Enter trait is Enter<G, T, C> fwiw.

where
G: ScopeParent,
TInner: Refines<G::Timestamp>,
{
/// The containers in the output stream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar nit: probably something better to say here. Perhaps

/// The type of container used to represent updates with `TInner` timestamps

type Container: Clone;

/// Brings a stream into a nested scope.
///
/// # Examples
///
/// ```
/// use timely::dataflow::Scope;
/// use differential_dataflow::input::Input;
///
/// ::timely::example(|scope| {
///
/// let data = scope.new_collection_from(1 .. 10).1;
///
/// let result = scope.region(|child| {
/// data.enter(child)
/// .leave()
/// });
///
/// data.assert_eq(&result);
/// });
/// ```
fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> StreamCore<Child<'a, G, TInner>, Self::Container>;
}

impl<G, D, R, TInner> EnterTime<Vec<(D, G::Timestamp, R)>, G, TInner> for Stream<G, (D, G::Timestamp, R)>
where
G: Scope,
D: Data,
R: Data,
TInner: Refines<G::Timestamp>,
{
type Container = Vec<(D, TInner, R)>;

fn enter_time<'a>(&self, child: &Child<'a, G, TInner>) -> Stream<Child<'a, G, TInner>, (D, TInner, R)> {
self.enter(child)
.map(|(data, time, diff)| (data, TInner::to_inner(time), diff))
}
}
2 changes: 1 addition & 1 deletion differential-dataflow/src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ where
impl<G: Scope, D: Data, R: Abelian, C: Container + Clone + 'static> Variable<G, D, R, C>
where
G::Timestamp: Lattice,
StreamCore<G, C>: crate::operators::Negate<G, C> + ResultsIn<G, C>,
StreamCore<G, C>: crate::operators::Negate<C, G> + ResultsIn<G, C>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think conventionally, we usually have scope parameters first and containers later (e.g. timely's Map trait, as the first one I found). I'm looking at my local DD repo, and it has Negate<G, C> in it, where G: Scope. I'm not sure I understand the flipping that is going on here.

{
/// Creates a new initially empty `Variable`.
///
Expand Down
7 changes: 4 additions & 3 deletions differential-dataflow/src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ pub use self::count::CountTotal;
pub use self::threshold::ThresholdTotal;

pub mod arrange;
pub mod negate;
pub mod reduce;
pub mod consolidate;
pub mod count;
pub mod enter;
pub mod iterate;
pub mod join;
pub mod count;
pub mod negate;
pub mod reduce;
pub mod threshold;

use crate::lattice::Lattice;
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/operators/negate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ pub trait Negate<G, C> {
fn negate(&self) -> Self;
}

impl<G, D, R, C> Negate<G, C> for Collection<G, D, R, C>
impl<G, D, R, C> Negate<C, G> for Collection<G, D, R, C>
where
G: Scope,
C: Clone,
StreamCore<G, C>: Negate<G, C>,
StreamCore<G, C>: Negate<C, G>,
{
fn negate(&self) -> Self {
self.inner.negate().as_collection()
}
}

impl<G: Scope, D: Data, T: Data, R: Data + Abelian> Negate<G, Vec<(D, T, R)>> for Stream<G, (D, T, R)> {
impl<G: Scope, D: Data, T: Data, R: Data + Abelian> Negate<Vec<(D, T, R)>, G> for Stream<G, (D, T, R)> {
fn negate(&self) -> Self {
self.map_in_place(|x| x.2.negate())
}
Expand Down