File tree Expand file tree Collapse file tree 1 file changed +4
-4
lines changed
rxjava-core/src/main/java/rx/operators Expand file tree Collapse file tree 1 file changed +4
-4
lines changed Original file line number Diff line number Diff line change 32
32
private final Scheduler scheduler ;
33
33
private final CompositeSubscription parentSubscription ;
34
34
private final EventLoop eventLoop = new EventLoop ();
35
-
36
- private final ConcurrentLinkedQueue <Notification <? extends T >> queue = new ConcurrentLinkedQueue <Notification <? extends T >>();
35
+ final AtomicInteger counter = new AtomicInteger ();
37
36
private final AtomicBoolean started = new AtomicBoolean ();
38
37
38
+ private final ConcurrentLinkedQueue <Notification <? extends T >> queue = new ConcurrentLinkedQueue <Notification <? extends T >>();
39
+
40
+
39
41
public ScheduledObserver (CompositeSubscription s , Observer <? super T > underlying , Scheduler scheduler ) {
40
42
this .parentSubscription = s ;
41
43
this .underlying = underlying ;
@@ -57,8 +59,6 @@ public void onNext(final T args) {
57
59
enqueue (new Notification <T >(args ));
58
60
}
59
61
60
- final AtomicInteger counter = new AtomicInteger ();
61
-
62
62
private void enqueue (Notification <? extends T > notification ) {
63
63
// this must happen before synchronization between threads
64
64
queue .offer (notification );
You can’t perform that action at this time.
0 commit comments