Skip to content

Commit 67138e1

Browse files
Merge pull request #393 from benjchristensen/parallel-operator
Parallel Operator & ObserveOn/ScheduledObserver Fixes
2 parents 9c3d197 + e218050 commit 67138e1

File tree

13 files changed

+551
-93
lines changed

13 files changed

+551
-93
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rx.lang.groovy
2+
3+
import org.junit.Test
4+
5+
import rx.Observable
6+
import rx.Scheduler
7+
import rx.concurrency.Schedulers
8+
import rx.util.functions.Func1
9+
10+
class TestParallel {
11+
12+
@Test
13+
public void testParallelOperator() {
14+
Observable.range(0, 100)
15+
.parallel({
16+
it.map({ return it; })
17+
})
18+
.toBlockingObservable()
19+
.forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); });
20+
}
21+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
176176
* Observable, and that synchronously notifies its {@link Observer}s
177177
*/
178178
def synchronize: Observable[T] = {
179-
Observable[T](JObservable.synchronize(asJava))
179+
Observable[T](asJava.synchronize)
180180
}
181181

182182
/**

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@
3434
import rx.operators.OperationCache;
3535
import rx.operators.OperationCombineLatest;
3636
import rx.operators.OperationConcat;
37+
import rx.operators.OperationDebounce;
3738
import rx.operators.OperationDefer;
3839
import rx.operators.OperationDematerialize;
39-
import rx.operators.OperationDistinctUntilChanged;
4040
import rx.operators.OperationDistinct;
41+
import rx.operators.OperationDistinctUntilChanged;
4142
import rx.operators.OperationFilter;
4243
import rx.operators.OperationFinally;
4344
import rx.operators.OperationFirstOrDefault;
@@ -53,6 +54,7 @@
5354
import rx.operators.OperationOnErrorResumeNextViaObservable;
5455
import rx.operators.OperationOnErrorReturn;
5556
import rx.operators.OperationOnExceptionResumeNextViaObservable;
57+
import rx.operators.OperationParallel;
5658
import rx.operators.OperationRetry;
5759
import rx.operators.OperationSample;
5860
import rx.operators.OperationScan;
@@ -67,7 +69,6 @@
6769
import rx.operators.OperationTakeUntil;
6870
import rx.operators.OperationTakeWhile;
6971
import rx.operators.OperationThrottleFirst;
70-
import rx.operators.OperationDebounce;
7172
import rx.operators.OperationTimestamp;
7273
import rx.operators.OperationToObservableFuture;
7374
import rx.operators.OperationToObservableIterable;
@@ -1810,17 +1811,22 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
18101811
* its {@link Observer}s; it invokes {@code onCompleted} or {@code onError} only once; and it never invokes {@code onNext} after invoking either {@code onCompleted} or {@code onError}.
18111812
* {@code synchronize} enforces this, and the Observable it returns invokes {@code onNext} and {@code onCompleted} or {@code onError} synchronously.
18121813
*
1813-
* @param observable
1814-
* the source Observable
18151814
* @param <T>
18161815
* the type of item emitted by the source Observable
18171816
* @return an Observable that is a chronologically well-behaved version of the source
18181817
* Observable, and that synchronously notifies its {@link Observer}s
18191818
*/
1820-
public static <T> Observable<T> synchronize(Observable<? extends T> observable) {
1821-
return create(OperationSynchronize.synchronize(observable));
1819+
public Observable<T> synchronize() {
1820+
return create(OperationSynchronize.synchronize(this));
18221821
}
18231822

1823+
/**
1824+
* @deprecated Replaced with instance method.
1825+
*/
1826+
@Deprecated
1827+
public static <T> Observable<T> synchronize(Observable<T> source) {
1828+
return create(OperationSynchronize.synchronize(source));
1829+
}
18241830

18251831
/**
18261832
* Emits an item each time interval (containing a sequential number).
@@ -3600,6 +3606,31 @@ public Observable<T> cache() {
36003606
return create(OperationCache.cache(this));
36013607
}
36023608

3609+
/**
3610+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and return an {@code Observable<R>} with the output.
3611+
*
3612+
* @param f
3613+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
3614+
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
3615+
*/
3616+
public <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
3617+
return OperationParallel.parallel(this, f);
3618+
}
3619+
3620+
/**
3621+
* Perform work in parallel by sharding an {@code Observable<T>} on a {@link Scheduler} and return an {@code Observable<R>} with the output.
3622+
*
3623+
* @param f
3624+
* a {@link Func1} that applies Observable operators to {@code Observable<T>} in parallel and returns an {@code Observable<R>}
3625+
* @param s
3626+
* a {@link Scheduler} to perform the work on.
3627+
* @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler}
3628+
*/
3629+
3630+
public <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
3631+
return OperationParallel.parallel(this, f, s);
3632+
}
3633+
36033634
/**
36043635
* Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting
36053636
* items to those {@link Observer}s that have subscribed to it.

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) {
212212
}
213213

214214
/**
215-
* Returns the scheduler's notion of current absolute time in milliseconds.
215+
* @return the scheduler's notion of current absolute time in milliseconds.
216216
*/
217217
public long now() {
218218
return System.currentTimeMillis();
219219
}
220220

221+
/**
222+
* Parallelism available to a Scheduler.
223+
* <p>
224+
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
225+
*
226+
* @return the scheduler's available degree of parallelism.
227+
*/
228+
public int degreeOfParallelism() {
229+
return Runtime.getRuntime().availableProcessors();
230+
}
231+
221232
public static class UnitTest {
222233
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
223234
@Test

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
*/
1616
package rx.concurrency;
1717

18+
import java.util.concurrent.ExecutorService;
19+
import java.util.concurrent.Executors;
1820
import java.util.concurrent.ScheduledFuture;
21+
import java.util.concurrent.ThreadFactory;
1922
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicLong;
2024

2125
import rx.Scheduler;
2226
import rx.Subscription;
23-
import rx.operators.SafeObservableSubscription;
2427
import rx.subscriptions.CompositeSubscription;
2528
import rx.subscriptions.Subscriptions;
2629
import rx.util.functions.Func2;
@@ -29,27 +32,74 @@
2932
* Schedules work on a new thread.
3033
*/
3134
public class NewThreadScheduler extends Scheduler {
32-
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
35+
36+
private final static NewThreadScheduler INSTANCE = new NewThreadScheduler();
37+
private final static AtomicLong count = new AtomicLong();
3338

3439
public static NewThreadScheduler getInstance() {
3540
return INSTANCE;
3641
}
3742

38-
@Override
39-
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
40-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
41-
final Scheduler _scheduler = this;
43+
private NewThreadScheduler() {
4244

43-
Thread t = new Thread(new Runnable() {
44-
@Override
45-
public void run() {
46-
subscription.wrap(action.call(_scheduler, state));
47-
}
48-
}, "RxNewThreadScheduler");
45+
}
4946

50-
t.start();
47+
private static class EventLoopScheduler extends Scheduler {
48+
private final ExecutorService executor;
5149

52-
return subscription;
50+
private EventLoopScheduler() {
51+
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
52+
53+
@Override
54+
public Thread newThread(Runnable r) {
55+
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
56+
}
57+
});
58+
}
59+
60+
@Override
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
62+
final Scheduler _scheduler = this;
63+
return Subscriptions.from(executor.submit(new Runnable() {
64+
65+
@Override
66+
public void run() {
67+
action.call(_scheduler, state);
68+
}
69+
}));
70+
}
71+
72+
@Override
73+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
74+
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
75+
// we will instead schedule the event then launch the thread after the delay has passed
76+
final Scheduler _scheduler = this;
77+
final CompositeSubscription subscription = new CompositeSubscription();
78+
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {
79+
80+
@Override
81+
public void run() {
82+
if (!subscription.isUnsubscribed()) {
83+
// when the delay has passed we now do the work on the actual scheduler
84+
Subscription s = _scheduler.schedule(state, action);
85+
// add the subscription to the CompositeSubscription so it is unsubscribed
86+
subscription.add(s);
87+
}
88+
}
89+
}, delayTime, unit);
90+
91+
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
92+
subscription.add(Subscriptions.create(f));
93+
94+
return subscription;
95+
}
96+
97+
}
98+
99+
@Override
100+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
101+
EventLoopScheduler s = new EventLoopScheduler();
102+
return s.schedule(state, action);
53103
}
54104

55105
@Override

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.concurrent.CountDownLatch;
2222
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import org.junit.Test;
2526
import org.mockito.InOrder;
@@ -33,6 +34,8 @@
3334
import rx.Subscription;
3435
import rx.concurrency.ImmediateScheduler;
3536
import rx.concurrency.Schedulers;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.util.functions.Func2;
3639

3740
/**
3841
* Asynchronously notify Observers on the specified Scheduler.
@@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
6063
// do nothing if we request ImmediateScheduler so we don't invoke overhead
6164
return source.subscribe(observer);
6265
} else {
63-
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
66+
CompositeSubscription s = new CompositeSubscription();
67+
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
68+
return s;
6469
}
6570
}
6671
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import org.junit.Test;
23+
24+
import rx.Observable;
25+
import rx.Scheduler;
26+
import rx.concurrency.Schedulers;
27+
import rx.observables.GroupedObservable;
28+
import rx.util.functions.Action1;
29+
import rx.util.functions.Func0;
30+
import rx.util.functions.Func1;
31+
32+
/**
33+
* Identifies unit of work that can be executed in parallel on a given Scheduler.
34+
*/
35+
public final class OperationParallel<T> {
36+
37+
public static <T, R> Observable<R> parallel(Observable<T> source, Func1<Observable<T>, Observable<R>> f) {
38+
return parallel(source, f, Schedulers.threadPoolForComputation());
39+
}
40+
41+
public static <T, R> Observable<R> parallel(final Observable<T> source, final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
42+
return Observable.defer(new Func0<Observable<R>>() {
43+
44+
@Override
45+
public Observable<R> call() {
46+
final AtomicInteger i = new AtomicInteger(0);
47+
return source.groupBy(new Func1<T, Integer>() {
48+
49+
@Override
50+
public Integer call(T t) {
51+
return i.incrementAndGet() % s.degreeOfParallelism();
52+
}
53+
54+
}).flatMap(new Func1<GroupedObservable<Integer, T>, Observable<R>>() {
55+
56+
@Override
57+
public Observable<R> call(GroupedObservable<Integer, T> group) {
58+
return f.call(group.observeOn(s));
59+
}
60+
}).synchronize();
61+
}
62+
});
63+
}
64+
65+
public static class UnitTest {
66+
67+
@Test
68+
public void testParallel() {
69+
int NUM = 1000;
70+
final AtomicInteger count = new AtomicInteger();
71+
Observable.range(1, NUM).parallel(
72+
new Func1<Observable<Integer>, Observable<Integer[]>>() {
73+
74+
@Override
75+
public Observable<Integer[]> call(Observable<Integer> o) {
76+
return o.map(new Func1<Integer, Integer[]>() {
77+
78+
@Override
79+
public Integer[] call(Integer t) {
80+
return new Integer[] { t, t * 99 };
81+
}
82+
83+
});
84+
}
85+
}).toBlockingObservable().forEach(new Action1<Integer[]>() {
86+
87+
@Override
88+
public void call(Integer[] v) {
89+
count.incrementAndGet();
90+
System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread());
91+
}
92+
93+
});
94+
95+
// just making sure we finish and get the number we expect
96+
assertEquals(NUM, count.get());
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)