Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seedless versions of .fold (aka .reduce) and .scan #743

Open
wants to merge 2 commits into
base: seedless-flatscan
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export function flatScanSeedless<V>(src: Observable<V>, f: Function2<V, V, Obser
isSeeded = true;
current = updated;
});
}).toProperty();
}).toProperty().withDesc(new Desc(src, "flatScan", [f]));
}

/** @hidden */
Expand Down
9 changes: 8 additions & 1 deletion src/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import "./scan";
import Observable from "./observable";
import { Desc } from "./describe";
import { Accumulator } from "./scan";
import { Property } from "./observable";;
import { Property } from "./observable";

/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out> {
return <any>src.scan(seed, f)
.last()
.withDesc(new Desc(src, "fold", [seed, f]));
}

/** @hidden */
export function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut> {
return <any>src.scan(f)
.last()
.withDesc(new Desc(src, "fold", [f]));
}
39 changes: 31 additions & 8 deletions src/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import doLogT from "./dolog";
import doErrorT from "./doerror";
import doActionT from "./doaction";
import doEndT from "./doend";
import { Accumulator, default as scan } from "./scan";
import { Accumulator, scanSeedless, default as scan } from "./scan";
import mapEndT from "./mapend";
import mapErrorT from "./maperror";
import { SpawnerOrObservable, EventSpawner, EventOrValue } from "./flatmap_";
Expand All @@ -40,7 +40,7 @@ import { filter } from "./filter";
import { and, not, or } from "./boolean";
import flatMapFirst from "./flatmapfirst";
import addPropertyInitValueToStream from "./internal/addpropertyinitialvaluetostream";
import fold from "./fold";
import { default as fold, foldSeedless } from "./fold";
import { startWithE, startWithP } from "./startwith";
import takeUntil from "./takeuntil";
import flatMap from "./flatmap";
Expand Down Expand Up @@ -407,8 +407,16 @@ Works like [`scan`](#scan) but only emits the final
value, i.e. the value just before the observable ends. Returns a
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)

fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

fold(f: Accumulator<V, V>): Property<V>

fold<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -596,8 +604,15 @@ Only applicable for observables with arrays as values.
}
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

reduce(f: Accumulator<V, V>): Property<V>

reduce<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -639,8 +654,16 @@ identically to EventStream.scan: the `seed` will be the initial value of
seed won't be output as is. Instead, the initial value of `r` will be `f(seed, x)`. This makes sense,
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return scan(this, seed, f)

scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

scan(f: Accumulator<V, V>): Property<V>

scan<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>scanSeedless(this, seed as any as Accumulator<V, V>);
}
return <any>scan(this, seed as any as V2, f as any as Accumulator<V, V2>)
}
/**
Skips the first n elements from the stream
Expand Down
28 changes: 27 additions & 1 deletion src/scan.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Observable from "./observable";
import { Property } from "./observable";;
import { Property } from "./observable";
import { Event, hasValue, Initial } from "./event";
import { more, noMore } from "./reply";
import { nop } from "./helpers";
Expand Down Expand Up @@ -58,3 +58,29 @@ export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumul
}
return resultProperty = new Property(new Desc(src, "scan", [seed, f]), subscribe)
}

/** @hidden */
export function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V> {
let acc: V;
let hasAccumulatedFirstValue: Boolean = false;
const subscribe: Subscribe<V> = (sink: EventSink<V>) => {
let unsub = src.subscribeInternal(function(event: Event<V>) {
if (hasValue(event)) {
//console.log("has value: ", hasValue(event), "isInitial:", event.isInitial);
if (!hasAccumulatedFirstValue) {
acc = event.value;
hasAccumulatedFirstValue = true;
return sink(<any>event); // let the initial event pass through
}

acc = f(acc, event.value);
return sink(event.apply(acc));

} else {
return sink(<any>event);
}
});
return unsub;
}
return new Property(new Desc(src, "scan", [f]), subscribe)
}
20 changes: 14 additions & 6 deletions test/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6])
);

describe("Without a seed value", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6])
);

describe("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(0, addAsync(5)),
Expand All @@ -34,6 +28,20 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6], semiunstable)
);

describe("Without a seed value", () => {
it ("accumulates values with given seed and accumulator function which returns a stream of updated values", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6]
)
);
it("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).flatScan(addAsync(5)),
[0, error(), 1, 3, 6], semiunstable)
);
});

return it("yields the seed value immediately", function() {
const outputs: number[] = [];
new Bacon.Bus().flatScan(0, (a, b) => <any>1).onValue(value => { outputs.push(value) });
Expand Down
38 changes: 34 additions & 4 deletions test/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,39 @@ describe("EventStream.fold", function() {
);
describe("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).fold(0, add),
() => series(1, [1, 2, error(), 3]).reduce(0, add),
[error(), 6])
);
describe("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([1, 2, error(), 3]).fold(0, add),
[error(), 6], unstable)
);

describe("Without seed value", function(){
it("folds stream into a single-valued Property, passes through errors", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).fold(add),
[error(), 6])
);
it("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).reduce(add),
[error(), 6])
);
it("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, error(), 3]).fold(add),
[error(), 6], unstable)
);
it("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
() => series(1, range(1, count, true)).fold((x: number,y: number) => x+1),
[count]);
});
});

return describe("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
Expand All @@ -24,10 +49,15 @@ describe("EventStream.fold", function() {
});
});

describe("Property.fold", () =>
describe("Property.fold", () => {
describe("Folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2,3]).toProperty(1).fold(0, add),
() => series(1, [2, 3]).toProperty(1).fold(0, add),
[6])
);
describe("Without seed value folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2, 3]).toProperty(1).fold(add),
[6])
)
);
});
110 changes: 110 additions & 0 deletions test/scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,69 @@ describe("EventStream.scan", function() {
expect(count).to.equal(1);
});
});

describe("Without a seed value", () => {
it("accumulates values and lets errors pass", () =>
expectPropertyEvents(
() => series(1, [1, 2, 3, error(), 4]).scan(add),
[1, 3, 6, error(), 10],
unstable
)
);
it("yields null seed value", () =>
expectPropertyEvents(
() => series(1, [null, 1]).scan(() => 1),
[null, 1], unstable)
);
it("works with synchronous streams", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, 3]).scan((x, y) => x + y),
[0, 1, 3, 6], unstable)
);
it("works with merged synchronous streams", () =>
expectPropertyEvents(
() => Bacon.mergeAll(once(0), once(1), once(2)).scan((a, b) => a + b),
[0, 1, 3], unstable)
);
it("works with functions as values", () =>
expectPropertyEvents(
() => series(1, [(() => 0), (() => 1), (() => 2)]).scan((a, b) => b).map(f => f()),
[0, 1, 2], unstable)
);
describe("calls accumulator function once per value", function () {
describe("(simple case)", function () {
let count = 0;
expectPropertyEvents(
() => series(2, [0, 1, 2, 3]).scan(function (x, y) {
count++;
return x + y;
}),
[0, 1, 3, 6],
{
extraCheck() {
return it("calls accumulator once per value", () => expect(count).to.equal(3));
},
unstable
}
);
});
it("(when pushing to Bus in accumulator)", function () {
let count = 0;
const someBus = new Bacon.Bus<null>();
someBus.onValue(function () {});
const src = new Bacon.Bus<number>();
const result = src.scan(function (_, __) {
someBus.push(null);
return count++;
});
result.onValue();
result.onValue();
src.push(0);
src.push(1);
expect(count).to.equal(1);
});
});
});
});

describe("Property.scan", function() {
Expand Down Expand Up @@ -97,4 +160,51 @@ describe("Property.scan", function() {
[1])
);
});
describe("without Seed value", function() {
it("with Init value, starts with init, f(init, xs[0])", () =>
expectPropertyEvents(
() => series(1, [1,2,3]).toProperty(0).scan(add),
[0, 1, 3, 6],
unstable
)
);
it("without Init value, starts with seed", () =>
expectPropertyEvents(
() => series(1, [0, 2,3]).toProperty().scan(add),
[0, 2, 5],
unstable
)
);
it("treats null init value like any other value", function() {
expectPropertyEvents(
() => series(1, [null as any, 1]).toProperty().scan(add),
[null, 1],
unstable
);
expectPropertyEvents(
() => series(1, [null as any, 2]).toProperty(1).scan(add),
[1, 1, 3],
unstable
);
});
describe("for synchronous source", function() {
it("with Init value, starts with f(seed, init)", () =>
expectPropertyEvents(
() => fromArray([0,2,3]).toProperty(1).scan(add),
[1, 1, 3, 6], unstable)
);
it("without Init value, starts with seed", () =>
expectPropertyEvents(
() => fromArray([0,2,3]).toProperty().scan(add),
[0, 2, 5], unstable)
);
it("works with synchronously responding empty source", () =>
expectPropertyEvents(
() => Bacon.never<number>().toProperty(1).scan(add),
[1],
unstable
)
);
});
});
});
2 changes: 2 additions & 0 deletions types/fold.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ import { Accumulator } from "./scan";
import { Property } from "./observable";
/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out>;
/** @hidden */
export declare function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut>;
3 changes: 3 additions & 0 deletions types/observable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ export declare abstract class Observable<V> {
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
fold(f: Accumulator<V, V>): Property<V>;
/**
An alias for [onValue](#onvalue).

Expand Down Expand Up @@ -439,6 +440,7 @@ export declare abstract class Observable<V> {
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
reduce(f: Accumulator<V, V>): Property<V>;
/**
Creates an EventStream by sampling this
stream/property value at each event from the `sampler` stream. The result
Expand Down Expand Up @@ -479,6 +481,7 @@ export declare abstract class Observable<V> {
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
scan(f: Accumulator<V, V>): Property<V>;
/**
Skips the first n elements from the stream
*/
Expand Down
2 changes: 2 additions & 0 deletions types/scan.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ import { Property } from "./observable";
export declare type Accumulator<In, Out> = (acc: Out, value: In) => Out;
/** @hidden */
export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out>;
/** @hidden */
export declare function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V>;