diff --git a/docs/operators.rst b/docs/operators.rst index 63001ebf3..d15eb3c0a 100644 --- a/docs/operators.rst +++ b/docs/operators.rst @@ -26,16 +26,18 @@ Operator Description Transforming Observables ------------------------ -================================================ ================================================ -Operator Description -================================================ ================================================ -:func:`buffer ` Periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time. -:func:`flat_map ` Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. -:func:`group_by ` Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key. -:func:`map ` Transform the items emitted by an Observable by applying a function to each item. -:func:`scan ` Apply a function to each item emitted by an Observable, sequentially, and emit each successive value. -:func:`window ` Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time. -================================================ ================================================ +================================================ ================================================ +Operator Description +================================================ ================================================ +:func:`buffer ` Periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time. +:func:`flat_map ` Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. +:func:`concat_map ` Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. +:func:`switch_map ` Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable. +:func:`group_by ` Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key. +:func:`map ` Transform the items emitted by an Observable by applying a function to each item. +:func:`scan ` Apply a function to each item emitted by an Observable, sequentially, and emit each successive value. +:func:`window ` Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time. +================================================ ================================================ Filtering Observables ---------------------- diff --git a/docs/testing.rst b/docs/testing.rst index a4c27e890..f51bb72a3 100644 --- a/docs/testing.rst +++ b/docs/testing.rst @@ -16,18 +16,13 @@ Basic example from reactivex.testing import ReactiveTest, TestScheduler from reactivex import operators - # setting up aliases for more concise code - on_next = ReactiveTest.on_next - on_error = ReactiveTest.on_error - on_completed = ReactiveTest.on_completed - def test_double(): # Create a scheduler scheduler = TestScheduler() # Define one or more source source = scheduler.create_hot_observable( - on_next(250, 3), - on_next(350, 5), + ReactiveTest.on_next(250, 3), + ReactiveTest.on_next(350, 5), ) # Define how the observable/operator is used on the source @@ -39,8 +34,8 @@ Basic example # check the messages and potentially subscriptions assert results.messages == [ - on_next(250, 6), - on_next(350, 10), + ReactiveTest.on_next(250, 6), + ReactiveTest.on_next(350, 10), ] @@ -53,6 +48,10 @@ or with full control, you can easily test various situations and combinations .. _in_sequence_or_throw: .. code:: python + # setting up aliases for more concise code + on_next = ReactiveTest.on_next + on_error = ReactiveTest.on_error + on_completed = ReactiveTest.on_completed def test_operator(): # Code to test; takes a sequence of integers and passes through, @@ -72,9 +71,8 @@ or with full control, you can easily test various situations and combinations source = scheduler.create_cold_observable( on_next(300, 1), on_next(400, 2), on_next(500, 3), on_completed(600) ) - # Here is another way to create the same observable, - # as long as we set the correct scheduler - source = reactivex.from_marbles('------1-2-3-|', timespan=50, scheduler=scheduler) + # Here is another way to create the same observable + source = reactivex.from_marbles('------1-2-3-|', timespan=50) # You can shorten the "create" function from the basic example to a lambda with no arguments result = scheduler.start(lambda: source.pipe( in_sequence_or_throw(), @@ -91,20 +89,20 @@ Timeline When ``scheduler.start`` is called, the test scheduler starts moving its virtual clock forward. Some important timestamps are however hidden as defaults, as listed below. -These values can be modified using kwargs in the ``scheduler.start(...)`` call: +These values can be modified using `kwargs` in the ``scheduler.start(...)`` call: 1. ``created`` [100]: When is the observable created. - That is when the ``create`` function seen in the basic example. + That is when the ``create`` function seen in the basic example is called. 2. ``subscribed`` [200]: When does the subscription occur. This explains the above emission timestamps: consider the first emission @500; given that we are using a cold observable, - and subscribe to it at 200, the "source"'s timeline starts at 200 and only 300 ticks later, it emits. + and subscribe to it at 200, the `source`'s timeline starts at 200 and only 300 ticks later, it emits. 3. ``disposed`` [1000]: When the subscription is disposed -Keep the following in mind when modifying these values: +Gotchas when modifying these values: -1. Do not use `0` as values since the code ignores that -2. If you change ``subscribed`` to be lower than 100, you need to change ``created`` as well +1. Do not use `0` as values for created/subscribed since the code would ignore it. +2. If you change ``subscribed`` to be lower than 100, you need to change ``created`` as well, otherwise nothing will happen. An alternative using marbles @@ -134,13 +132,17 @@ There is a simplified flow available in `reactivex.testing.marbles` and here's a assert results == outcome This method makes for very quick to write, and easy to read, tests. +At this moment however, it does not allow for testing subscriptions. Testing an observable factory ............................. -An observable created from `Observable(subscribe)` can be just as easily tested. -Let's use this example to additionally test a Disposable case. +An observable created directly from :class:`Observable ` +can be just as easily tested. + +In this example, we will additionally test a case where a +:class:`Disposable ` is used. .. code:: python @@ -163,7 +165,7 @@ Let's use this example to additionally test a Disposable case. on_next(220, 0), on_completed(220) ] - assert a == 43 + assert a == 43 # shows that our Disposable's action was as expected Testing errors @@ -188,20 +190,20 @@ Let's remedy that below. # At times it's better not to test the exact exception, # maybe its message changes with time or other reasons # We can test a specific notification's details as follows: - message, err = result.messages - assert message.time == 130 - assert err.time == 230 - assert message.value.kind == 'N' # Notification - assert err.value.kind == 'E' # E for errors - assert message.value.value == 1 - assert type(err.value.exception) == ValueError # look at .exception for errors + first_notification, error_notification = result.messages + assert first_notification.time == 130 + assert error_notification.time == 230 + assert first_notification.value.kind == 'N' # Notification + assert error_notification.value.kind == 'E' # E for errors + assert first_notification.value.value == 1 + assert type(error_notification.value.exception) == ValueError # look at .exception for errors Testing subscriptions, multiple observables, hot observables ............................................................ ``scheduler.start`` only allows for a single subscription. -Some cases like e.g. `operators.partition` require more. +Some cases like e.g. ``operators.partition`` require more. The examples below showcase some less commonly needed testing tools. .. code:: python @@ -218,7 +220,9 @@ The examples below showcase some less commonly needed testing tools. even.subscribe(steven) odd.subscribe(todd) - # Note! Since it's not "start" which creates the subscription, they actually occur at t=0 + # Note! Since the subscription is not created within + # `scheduler.start` below, the usual `subscribed` delay of t=200 + # is not in effect. The subscriptions therefore occur at t=0 scheduler.start() assert steven.messages == [ @@ -242,20 +246,23 @@ The examples below showcase some less commonly needed testing tools. shared = source.pipe( operators.share() ) - """first sub""" + # Creating our story: + # first sub is set to occur at t=200; this creates a sub on source scheduler.schedule_relative(200, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) - # second sub, should not sub to source itself + # second sub does not create a new sub on source, due to the `share` operator scheduler.schedule_relative(300, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) + # second sub ends scheduler.schedule_relative(500, lambda *_: subs[1].dispose()) + # first sub ends… and since there is no sub remaining, the only sub on source should be disposed too scheduler.schedule_relative(600, lambda *_: subs[0].dispose()) - """end first sub""" - # no existing sub should sub again onto source - we never dispose of it + # no existing sub on source, therefore this will create a new one + # we never dispose of it; we will test that infinite sub in the assertions scheduler.schedule_relative(900, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) scheduler.start() # Check that the submissions on the source are as expected assert source.subscriptions == [ - Subscription(200, 600), + Subscription(200, 600), # only one sub from 200 to 600 Subscription(900), # represents an infinite subscription ] @@ -279,9 +286,9 @@ The examples below showcase some less commonly needed testing tools. # the subscription starts at 200; # since `source` is a hot observable, the notification @190 will not be caught # the next notification is at 300 ticks, - # which, on our subscription, will show at 100 ticks (300-200 from subscribed) - # or 5 "-" each representing 20 ticks (timespan=20 in to_marbles) - # then the 42 is received - # and then nothing for another 200 ticks, so 10 "-" before complete + # which, on our subscription, will show at 100 ticks (300-200 from subscription delay) + # or 5 "-" each representing 20 ticks (timespan=20 in `to_marbles`). + # Then the "42" notification is received + # and then nothing for another 200 ticks, which is equal to 10 "-", before complete assert message.value.value == '-----(42)----------|' diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index cbf9b551f..76bd13c0c 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -3362,6 +3362,85 @@ def switch_latest() -> Callable[ return switch_latest_() +def switch_map( + project: Optional[Mapper[_T1, Observable[_T2]]] = None +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """Projects each source value to an Observable which is merged in + the output Observable, emitting values only from the most recently + projected Observable. + + + .. marble:: + :alt: switch_map + + ---a----------b-------c---------| + [ switch_map(x: x---x---x|) ] + ---a---a---a--b---b---c---c---c-| + + Examples: + >>> op = switch_map(lambda x: reactivex.timer(1.0).pipe(map(lambda x: x))) + >>> op = switch_map() + + Args: + project: Projecting function which takes the outer observable value + and the emission index and emits the inner observable; defaults to `identity` + + Returns: + An operator function that maps each value to the inner observable + and emits its values in order, emitting values only from the + most recently projected Observable. + + + If an inner observable complete, the resulting sequence does *not* + complete. + If an inner observable errors, the resulting sequence errors as well. + If the outer observable completes/errors, the resulting sequence + completes/errors. + + """ + + return compose(map(project), switch_latest()) + + +def switch_map_indexed( + project: Optional[MapperIndexed[_T1, Observable[_T2]]] = None +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """Projects each source value to an Observable which is merged in + the output Observable, emitting values only from the most recently + projected Observable. + + + .. marble:: + :alt: switch_map + + ---a----------b-------c---------------------| + [ switch_map_indexed(x,i: x*i---x*i---x*i|) ] + ---a---a---a--bb---bb-ccc---ccc---ccc-------| + + Examples: + >>> op = switch_map_indexed(lambda x, i: reactivex.timer(1.0).pipe(map(x*i))) + + Args: + project: Projecting function which takes the outer observable value + and the emission index and emits the inner observable + + Returns: + An operator function that maps each value to the inner observable + and emits its values in order, emitting values only from the + most recently projected Observable. + + + If an inner observable complete, the resulting sequence does *not* + complete. + If an inner observable errors, the resulting sequence errors as well. + If the outer observable completes/errors, the resulting sequence + completes/errors. + + """ + + return compose(map_indexed(project), switch_latest()) + + def take(count: int) -> Callable[[Observable[_T]], Observable[_T]]: """Returns a specified number of contiguous elements from the start of an observable sequence. diff --git a/tests/test_observable/test_switchmap.py b/tests/test_observable/test_switchmap.py new file mode 100644 index 000000000..363b818a0 --- /dev/null +++ b/tests/test_observable/test_switchmap.py @@ -0,0 +1,185 @@ +import unittest + +from reactivex import interval +from reactivex import operators as ops +from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.marbles import marbles_testing +from reactivex.testing.subscription import Subscription + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class TestSwitchMap(unittest.TestCase): + def test_switch_map(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "a"), + on_next(400, "b"), + on_next(500, "c"), + ) + + def create_inner(x: str): + def create_changing(j: int): + return (j, x) + + return interval(20).pipe(ops.map(create_changing)) + + def create(): + return xs.pipe(ops.switch_map(project=create_inner)) + + results = scheduler.start(create, disposed=580) + # (i, j, x): i is the index of the outer emit; + # j is the value of the inner interval; + # x is the value of the outer emission + assert results.messages == [ + on_next(320, (0, "a")), + on_next(340, (1, "a")), + on_next(360, (2, "a")), + on_next(380, (3, "a")), + on_next(420, (0, "b")), + on_next(440, (1, "b")), + on_next(460, (2, "b")), + on_next(480, (3, "b")), + on_next(520, (0, "c")), + on_next(540, (1, "c")), + on_next(560, (2, "c")), + ] + assert xs.subscriptions == [Subscription(200, 580)] + + def test_switch_map_inner_throws(self): + """Inner throwing causes outer to throw""" + ex = "ex" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_next( + 550, + 2, + ), + ) + + def create_inner(x: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_next(450, "b"), + on_error(520, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 520)] + assert sources[2].subscriptions == [] + + def test_switch_map_outer_throws(self): + """Outer throwing unsubscribes from all""" + ex = "ABC" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_error(430, ex), + ) + + def create_inner(x: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_error(430, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 430)] + assert sources[2].subscriptions == [] + + def test_switch_map_no_inner(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(500)) + # Fake inner which should never be subscribed to + sources = [scheduler.create_cold_observable(on_next(20, 2))] + + def create_inner(_x: int): + return sources.pop(0) + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [on_completed(500)] + assert xs.subscriptions == [Subscription(200, 500)] + assert sources[0].subscriptions == [] + + def test_switch_map_inner_completes(self): + """Inner completions do not affect outer""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "d"), + on_next(330, "f"), + on_completed(540), + ) + + def create_inner(x: str): + """An observable which will complete after 40 ticks""" + return interval(20).pipe(ops.map(lambda j: (j, x)), ops.take(2)) + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(320, (0, "d")), + on_next(350, (0, "f")), + on_next( + 370, (1, "f") + ), # here the current inner is unsubscribed but not the outer + on_completed(540), # only outer completion affects + ] + + def test_switch_map_default_mapper(self): + with marbles_testing(timespan=10) as (start, cold, hot, exp): + xs = hot( + " ---a---b------c-----", + { + "a": cold(" --1--2", None, None), + "b": cold(" --1-2-3-4-5|", None, None), + "c": cold(" --1--2", None, None), + }, + None, + ) + expected = exp(" -----1---1-2-3--1--2", None, None) + result = start(xs.pipe(ops.switch_map())) + assert result == expected diff --git a/tests/test_observable/test_switchmapindexed.py b/tests/test_observable/test_switchmapindexed.py new file mode 100644 index 000000000..5d5b3d15e --- /dev/null +++ b/tests/test_observable/test_switchmapindexed.py @@ -0,0 +1,185 @@ +import unittest + +from reactivex import interval +from reactivex import operators as ops +from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.marbles import marbles_testing +from reactivex.testing.subscription import Subscription + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class TestSwitchMapIndex(unittest.TestCase): + def test_switch_map_indexed_uses_index(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "a"), + on_next(400, "b"), + on_next(500, "c"), + ) + + def create_inner(x: str, i: int): + def create_changing(j: int): + return (i, j, x) + + return interval(20).pipe(ops.map(create_changing)) + + def create(): + return xs.pipe(ops.switch_map_indexed(project=create_inner)) + + results = scheduler.start(create, disposed=580) + # (i, j, x): i is the index of the outer emit; + # j is the value of the inner interval; + # x is the value of the outer emission + assert results.messages == [ + on_next(320, (0, 0, "a")), + on_next(340, (0, 1, "a")), + on_next(360, (0, 2, "a")), + on_next(380, (0, 3, "a")), + on_next(420, (1, 0, "b")), + on_next(440, (1, 1, "b")), + on_next(460, (1, 2, "b")), + on_next(480, (1, 3, "b")), + on_next(520, (2, 0, "c")), + on_next(540, (2, 1, "c")), + on_next(560, (2, 2, "c")), + ] + assert xs.subscriptions == [Subscription(200, 580)] + + def test_switch_map_indexed_inner_throws(self): + """Inner throwing causes outer to throw""" + ex = "ex" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_next( + 550, + 2, + ), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_next(450, "b"), + on_error(520, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 520)] + assert sources[2].subscriptions == [] + + def test_switch_map_indexed_outer_throws(self): + """Outer throwing unsubscribes from all""" + ex = "ABC" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_error(430, ex), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_error(430, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 430)] + assert sources[2].subscriptions == [] + + def test_switch_map_indexed_no_inner(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(500)) + # Fake inner which should never be subscribed to + sources = [scheduler.create_cold_observable(on_next(20, 2))] + + def create_inner(_x: int, i: int): + return sources[i] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [on_completed(500)] + assert xs.subscriptions == [Subscription(200, 500)] + assert sources[0].subscriptions == [] + + def test_switch_map_indexed_inner_completes(self): + """Inner completions do not affect outer""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "d"), + on_next(330, "f"), + on_completed(540), + ) + + def create_inner(x: str, i: int): + """An observable which will complete after 40 ticks""" + return interval(20).pipe(ops.map(lambda j: (i, j, x)), ops.take(2)) + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(320, (0, 0, "d")), + on_next(350, (1, 0, "f")), + on_next( + 370, (1, 1, "f") + ), # here the current inner is unsubscribed but not the outer + on_completed(540), # only outer completion affects + ] + + def test_switch_map_default_mapper(self): + with marbles_testing(timespan=10) as (start, cold, hot, exp): + xs = hot( + " ---a---b------c-----", + { + "a": cold(" --1--2", None, None), + "b": cold(" --1-2-3-4-5|", None, None), + "c": cold(" --1--2", None, None), + }, + None, + ) + expected = exp(" -----1---1-2-3--1--2", None, None) + result = start(xs.pipe(ops.switch_map_indexed())) + assert result == expected