Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make .flatScan work seedless #737

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Prev Previous commit
made .scan and .fold seedless, incl. tests
  • Loading branch information
semmel committed Jul 15, 2019
commit 2a45c58f176a731a3adbaba658d642ee457c390f
2 changes: 1 addition & 1 deletion src/flatscan.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ export function flatScanSeedless<V>(src: Observable<V>, f: Function2<V, V, Obser
isSeeded = true;
current = updated;
});
}).toProperty();
}).toProperty().withDesc(new Desc(src, "flatScan", [f]));
}

/** @hidden */
9 changes: 8 additions & 1 deletion src/fold.ts
Original file line number Diff line number Diff line change
@@ -6,11 +6,18 @@ import "./scan";
import Observable from "./observable";
import { Desc } from "./describe";
import { Accumulator } from "./scan";
import { Property } from "./observable";;
import { Property } from "./observable";

/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out> {
return <any>src.scan(seed, f)
.last()
.withDesc(new Desc(src, "fold", [seed, f]));
}

/** @hidden */
export function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut> {
return <any>src.scan(f)
.last()
.withDesc(new Desc(src, "fold", [f]));
}
39 changes: 31 additions & 8 deletions src/observable.ts
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import doLogT from "./dolog";
import doErrorT from "./doerror";
import doActionT from "./doaction";
import doEndT from "./doend";
import { Accumulator, default as scan } from "./scan";
import { Accumulator, scanSeedless, default as scan } from "./scan";
import mapEndT from "./mapend";
import mapErrorT from "./maperror";
import { SpawnerOrObservable, EventSpawner, EventOrValue } from "./flatmap_";
@@ -40,7 +40,7 @@ import { filter } from "./filter";
import { and, not, or } from "./boolean";
import flatMapFirst from "./flatmapfirst";
import addPropertyInitValueToStream from "./internal/addpropertyinitialvaluetostream";
import fold from "./fold";
import { default as fold, foldSeedless } from "./fold";
import { startWithE, startWithP } from "./startwith";
import takeUntil from "./takeuntil";
import flatMap from "./flatmap";
@@ -407,8 +407,16 @@ Works like [`scan`](#scan) but only emits the final
value, i.e. the value just before the observable ends. Returns a
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)

fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

fold(f: Accumulator<V, V>): Property<V>

fold<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
@@ -596,8 +604,15 @@ Only applicable for observables with arrays as values.
}
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

reduce(f: Accumulator<V, V>): Property<V>

reduce<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
@@ -639,8 +654,16 @@ identically to EventStream.scan: the `seed` will be the initial value of
seed won't be output as is. Instead, the initial value of `r` will be `f(seed, x)`. This makes sense,
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return scan(this, seed, f)

scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

scan(f: Accumulator<V, V>): Property<V>

scan<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>scanSeedless(this, seed as any as Accumulator<V, V>);
}
return <any>scan(this, seed as any as V2, f as any as Accumulator<V, V2>)
}
/**
Skips the first n elements from the stream
28 changes: 27 additions & 1 deletion src/scan.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Observable from "./observable";
import { Property } from "./observable";;
import { Property } from "./observable";
import { Event, hasValue, Initial } from "./event";
import { more, noMore } from "./reply";
import { nop } from "./helpers";
@@ -58,3 +58,29 @@ export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumul
}
return resultProperty = new Property(new Desc(src, "scan", [seed, f]), subscribe)
}

/** @hidden */
export function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V> {
let acc: V;
let hasAccumulatedFirstValue: Boolean = false;
const subscribe: Subscribe<V> = (sink: EventSink<V>) => {
let unsub = src.subscribeInternal(function(event: Event<V>) {
if (hasValue(event)) {
//console.log("has value: ", hasValue(event), "isInitial:", event.isInitial);
if (!hasAccumulatedFirstValue) {
acc = event.value;
hasAccumulatedFirstValue = true;
return sink(<any>event); // let the initial event pass through
}

acc = f(acc, event.value);
return sink(event.apply(acc));

} else {
return sink(<any>event);
}
});
return unsub;
}
return new Property(new Desc(src, "scan", [f]), subscribe)
}
20 changes: 14 additions & 6 deletions test/flatscan.ts
Original file line number Diff line number Diff line change
@@ -10,12 +10,6 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6])
);

describe("Without a seed value", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6])
);

describe("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(0, addAsync(5)),
@@ -34,6 +28,20 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6], semiunstable)
);

describe("Without a seed value", () => {
it ("accumulates values with given seed and accumulator function which returns a stream of updated values", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6]
)
);
it("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).flatScan(addAsync(5)),
[0, error(), 1, 3, 6], semiunstable)
);
});

return it("yields the seed value immediately", function() {
const outputs: number[] = [];
new Bacon.Bus().flatScan(0, (a, b) => <any>1).onValue(value => { outputs.push(value) });
38 changes: 34 additions & 4 deletions test/fold.ts
Original file line number Diff line number Diff line change
@@ -8,14 +8,39 @@ describe("EventStream.fold", function() {
);
describe("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).fold(0, add),
() => series(1, [1, 2, error(), 3]).reduce(0, add),
[error(), 6])
);
describe("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([1, 2, error(), 3]).fold(0, add),
[error(), 6], unstable)
);

describe("Without seed value", function(){
it("folds stream into a single-valued Property, passes through errors", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).fold(add),
[error(), 6])
);
it("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).reduce(add),
[error(), 6])
);
it("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, error(), 3]).fold(add),
[error(), 6], unstable)
);
it("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
() => series(1, range(1, count, true)).fold((x: number,y: number) => x+1),
[count]);
});
});

return describe("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
@@ -24,10 +49,15 @@ describe("EventStream.fold", function() {
});
});

describe("Property.fold", () =>
describe("Property.fold", () => {
describe("Folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2,3]).toProperty(1).fold(0, add),
() => series(1, [2, 3]).toProperty(1).fold(0, add),
[6])
);
describe("Without seed value folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2, 3]).toProperty(1).fold(add),
[6])
)
);
});
110 changes: 110 additions & 0 deletions test/scan.ts
Original file line number Diff line number Diff line change
@@ -59,6 +59,69 @@ describe("EventStream.scan", function() {
expect(count).to.equal(1);
});
});

describe("Without a seed value", () => {
it("accumulates values and lets errors pass", () =>
expectPropertyEvents(
() => series(1, [1, 2, 3, error(), 4]).scan(add),
[1, 3, 6, error(), 10],
unstable
)
);
it("yields null seed value", () =>
expectPropertyEvents(
() => series(1, [null, 1]).scan(() => 1),
[null, 1], unstable)
);
it("works with synchronous streams", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, 3]).scan((x, y) => x + y),
[0, 1, 3, 6], unstable)
);
it("works with merged synchronous streams", () =>
expectPropertyEvents(
() => Bacon.mergeAll(once(0), once(1), once(2)).scan((a, b) => a + b),
[0, 1, 3], unstable)
);
it("works with functions as values", () =>
expectPropertyEvents(
() => series(1, [(() => 0), (() => 1), (() => 2)]).scan((a, b) => b).map(f => f()),
[0, 1, 2], unstable)
);
describe("calls accumulator function once per value", function () {
describe("(simple case)", function () {
let count = 0;
expectPropertyEvents(
() => series(2, [0, 1, 2, 3]).scan(function (x, y) {
count++;
return x + y;
}),
[0, 1, 3, 6],
{
extraCheck() {
return it("calls accumulator once per value", () => expect(count).to.equal(3));
},
unstable
}
);
});
it("(when pushing to Bus in accumulator)", function () {
let count = 0;
const someBus = new Bacon.Bus<null>();
someBus.onValue(function () {});
const src = new Bacon.Bus<number>();
const result = src.scan(function (_, __) {
someBus.push(null);
return count++;
});
result.onValue();
result.onValue();
src.push(0);
src.push(1);
expect(count).to.equal(1);
});
});
});
});

describe("Property.scan", function() {
@@ -97,4 +160,51 @@ describe("Property.scan", function() {
[1])
);
});
describe("without Seed value", function() {
it("with Init value, starts with init, f(init, xs[0])", () =>
expectPropertyEvents(
() => series(1, [1,2,3]).toProperty(0).scan(add),
[0, 1, 3, 6],
unstable
)
);
it("without Init value, starts with seed", () =>
expectPropertyEvents(
() => series(1, [0, 2,3]).toProperty().scan(add),
[0, 2, 5],
unstable
)
);
it("treats null init value like any other value", function() {
expectPropertyEvents(
() => series(1, [null as any, 1]).toProperty().scan(add),
[null, 1],
unstable
);
expectPropertyEvents(
() => series(1, [null as any, 2]).toProperty(1).scan(add),
[1, 1, 3],
unstable
);
});
describe("for synchronous source", function() {
it("with Init value, starts with f(seed, init)", () =>
expectPropertyEvents(
() => fromArray([0,2,3]).toProperty(1).scan(add),
[1, 1, 3, 6], unstable)
);
it("without Init value, starts with seed", () =>
expectPropertyEvents(
() => fromArray([0,2,3]).toProperty().scan(add),
[0, 2, 5], unstable)
);
it("works with synchronously responding empty source", () =>
expectPropertyEvents(
() => Bacon.never<number>().toProperty(1).scan(add),
[1],
unstable
)
);
});
});
});
2 changes: 2 additions & 0 deletions types/fold.d.ts
Original file line number Diff line number Diff line change
@@ -7,3 +7,5 @@ import { Accumulator } from "./scan";
import { Property } from "./observable";
/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out>;
/** @hidden */
export declare function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut>;
3 changes: 3 additions & 0 deletions types/observable.d.ts
Original file line number Diff line number Diff line change
@@ -293,6 +293,7 @@ export declare abstract class Observable<V> {
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
fold(f: Accumulator<V, V>): Property<V>;
/**
An alias for [onValue](#onvalue).

@@ -439,6 +440,7 @@ export declare abstract class Observable<V> {
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
reduce(f: Accumulator<V, V>): Property<V>;
/**
Creates an EventStream by sampling this
stream/property value at each event from the `sampler` stream. The result
@@ -479,6 +481,7 @@ export declare abstract class Observable<V> {
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>;
scan(f: Accumulator<V, V>): Property<V>;
/**
Skips the first n elements from the stream
*/
2 changes: 2 additions & 0 deletions types/scan.d.ts
Original file line number Diff line number Diff line change
@@ -3,3 +3,5 @@ import { Property } from "./observable";
export declare type Accumulator<In, Out> = (acc: Out, value: In) => Out;
/** @hidden */
export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out>;
/** @hidden */
export declare function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V>;