Skip to content

Commit 8b7db2b

Browse files
Merge pull request #426 from benjchristensen/publish-subject-resubscribe
PublishSubject ReSubscribe for publish().refCount() Behavior
2 parents 2c4b2c5 + f035770 commit 8b7db2b

File tree

3 files changed

+184
-242
lines changed

3 files changed

+184
-242
lines changed

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 93 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,7 +51,7 @@
5151
* <p>
5252
* <pre> {@code
5353
54-
PublishSubject<Object> subject = PublishSubject.create();
54+
* ublishSubject<Object> subject = PublishSubject.create();
5555
// observer1 will receive all onNext and onCompleted events
5656
subject.subscribe(observer1);
5757
subject.onNext("one");
@@ -62,21 +62,16 @@
6262
subject.onCompleted();
6363
6464
} </pre>
65-
*
65+
*
6666
* @param <T>
6767
*/
6868
public class PublishSubject<T> extends Subject<T, T> {
6969
public static <T> PublishSubject<T> create() {
7070
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
71-
final AtomicReference<Notification<? extends T>> terminalState = new AtomicReference<Notification<? extends T>>();
7271

7372
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
7473
@Override
7574
public Subscription onSubscribe(Observer<? super T> observer) {
76-
// shortcut check if terminal state exists already
77-
Subscription s = checkTerminalState(observer);
78-
if(s != null) return s;
79-
8075
final SafeObservableSubscription subscription = new SafeObservableSubscription();
8176

8277
subscription.wrap(new Subscription() {
@@ -87,67 +82,26 @@ public void unsubscribe() {
8782
}
8883
});
8984

90-
/**
91-
* NOTE: We are synchronizing to avoid a race condition between terminalState being set and
92-
* a new observer being added to observers.
93-
*
94-
* The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
95-
* so a high-volume hot-observable will not pay this cost for emitting data.
96-
*
97-
* Due to the restricted impact of blocking synchronization here I have not pursued more complicated
98-
* approaches to try and stay completely non-blocking.
99-
*/
100-
synchronized (terminalState) {
101-
// check terminal state again
102-
s = checkTerminalState(observer);
103-
if (s != null)
104-
return s;
105-
106-
// on subscribe add it to the map of outbound observers to notify
107-
observers.put(subscription, observer);
108-
109-
return subscription;
110-
}
111-
}
85+
// on subscribe add it to the map of outbound observers to notify
86+
observers.put(subscription, observer);
11287

113-
private Subscription checkTerminalState(Observer<? super T> observer) {
114-
Notification<? extends T> n = terminalState.get();
115-
if (n != null) {
116-
// we are terminated to immediately emit and don't continue with subscription
117-
if (n.isOnCompleted()) {
118-
observer.onCompleted();
119-
} else {
120-
observer.onError(n.getThrowable());
121-
}
122-
return Subscriptions.empty();
123-
} else {
124-
return null;
125-
}
88+
return subscription;
12689
}
90+
12791
};
12892

129-
return new PublishSubject<T>(onSubscribe, observers, terminalState);
93+
return new PublishSubject<T>(onSubscribe, observers);
13094
}
13195

13296
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
133-
private final AtomicReference<Notification<? extends T>> terminalState;
13497

135-
protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers, AtomicReference<Notification<? extends T>> terminalState) {
98+
protected PublishSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
13699
super(onSubscribe);
137100
this.observers = observers;
138-
this.terminalState = terminalState;
139101
}
140102

141103
@Override
142104
public void onCompleted() {
143-
/**
144-
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
145-
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
146-
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
147-
*/
148-
synchronized (terminalState) {
149-
terminalState.set(new Notification<T>());
150-
}
151105
for (Observer<? super T> observer : snapshotOfValues()) {
152106
observer.onCompleted();
153107
}
@@ -156,14 +110,6 @@ public void onCompleted() {
156110

157111
@Override
158112
public void onError(Throwable e) {
159-
/**
160-
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
161-
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
162-
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
163-
*/
164-
synchronized (terminalState) {
165-
terminalState.set(new Notification<T>(e));
166-
}
167113
for (Observer<? super T> observer : snapshotOfValues()) {
168114
observer.onError(e);
169115
}
@@ -179,12 +125,12 @@ public void onNext(T args) {
179125

180126
/**
181127
* Current snapshot of 'values()' so that concurrent modifications aren't included.
182-
*
128+
*
183129
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
184-
*
130+
*
185131
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
186132
* of possibly being included in the current onNext iteration.
187-
*
133+
*
188134
* @return List<Observer<T>>
189135
*/
190136
private Collection<Observer<? super T>> snapshotOfValues() {
@@ -378,75 +324,6 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
378324
verify(aObserver, Mockito.never()).onCompleted();
379325
}
380326

381-
/**
382-
* Test that subscribing after onError/onCompleted immediately terminates instead of causing it to hang.
383-
*
384-
* Nothing is mentioned in Rx Guidelines for what to do in this case so I'm doing what seems to make sense
385-
* which is:
386-
*
387-
* - cache terminal state (onError/onCompleted)
388-
* - any subsequent subscriptions will immediately receive the terminal state rather than start a new subscription
389-
*
390-
*/
391-
@Test
392-
public void testUnsubscribeAfterOnCompleted() {
393-
PublishSubject<String> subject = PublishSubject.create();
394-
395-
@SuppressWarnings("unchecked")
396-
Observer<String> anObserver = mock(Observer.class);
397-
subject.subscribe(anObserver);
398-
399-
subject.onNext("one");
400-
subject.onNext("two");
401-
subject.onCompleted();
402-
403-
InOrder inOrder = inOrder(anObserver);
404-
inOrder.verify(anObserver, times(1)).onNext("one");
405-
inOrder.verify(anObserver, times(1)).onNext("two");
406-
inOrder.verify(anObserver, times(1)).onCompleted();
407-
inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class));
408-
409-
@SuppressWarnings("unchecked")
410-
Observer<String> anotherObserver = mock(Observer.class);
411-
subject.subscribe(anotherObserver);
412-
413-
inOrder = inOrder(anotherObserver);
414-
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
415-
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
416-
inOrder.verify(anotherObserver, times(1)).onCompleted();
417-
inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class));
418-
}
419-
420-
@Test
421-
public void testUnsubscribeAfterOnError() {
422-
PublishSubject<String> subject = PublishSubject.create();
423-
RuntimeException exception = new RuntimeException("failure");
424-
425-
@SuppressWarnings("unchecked")
426-
Observer<String> anObserver = mock(Observer.class);
427-
subject.subscribe(anObserver);
428-
429-
subject.onNext("one");
430-
subject.onNext("two");
431-
subject.onError(exception);
432-
433-
InOrder inOrder = inOrder(anObserver);
434-
inOrder.verify(anObserver, times(1)).onNext("one");
435-
inOrder.verify(anObserver, times(1)).onNext("two");
436-
inOrder.verify(anObserver, times(1)).onError(exception);
437-
inOrder.verify(anObserver, Mockito.never()).onCompleted();
438-
439-
@SuppressWarnings("unchecked")
440-
Observer<String> anotherObserver = mock(Observer.class);
441-
subject.subscribe(anotherObserver);
442-
443-
inOrder = inOrder(anotherObserver);
444-
inOrder.verify(anotherObserver, Mockito.never()).onNext("one");
445-
inOrder.verify(anotherObserver, Mockito.never()).onNext("two");
446-
inOrder.verify(anotherObserver, times(1)).onError(exception);
447-
inOrder.verify(anotherObserver, Mockito.never()).onCompleted();
448-
}
449-
450327
@Test
451328
public void testUnsubscribe()
452329
{
@@ -519,8 +396,7 @@ public void call(String v) {
519396

520397
});
521398

522-
523-
for(int i=0; i<10; i++) {
399+
for (int i = 0; i < 10; i++) {
524400
s.onNext(i);
525401
}
526402
s.onCompleted();
@@ -533,5 +409,83 @@ public void call(String v) {
533409
assertEquals(45, list.size());
534410
}
535411

412+
/**
413+
* Should be able to unsubscribe all Observers, have it stop emitting, then subscribe new ones and it start emitting again.
414+
*/
415+
@Test
416+
public void testReSubscribe() {
417+
final PublishSubject<Integer> ps = PublishSubject.create();
418+
419+
Observer<Integer> o1 = mock(Observer.class);
420+
Subscription s1 = ps.subscribe(o1);
421+
422+
// emit
423+
ps.onNext(1);
424+
425+
// validate we got it
426+
InOrder inOrder1 = inOrder(o1);
427+
inOrder1.verify(o1, times(1)).onNext(1);
428+
inOrder1.verifyNoMoreInteractions();
429+
430+
// unsubscribe
431+
s1.unsubscribe();
432+
433+
// emit again but nothing will be there to receive it
434+
ps.onNext(2);
435+
436+
Observer<Integer> o2 = mock(Observer.class);
437+
Subscription s2 = ps.subscribe(o2);
438+
439+
// emit
440+
ps.onNext(3);
441+
442+
// validate we got it
443+
InOrder inOrder2 = inOrder(o2);
444+
inOrder2.verify(o2, times(1)).onNext(3);
445+
inOrder2.verifyNoMoreInteractions();
446+
447+
s2.unsubscribe();
448+
}
449+
450+
/**
451+
* Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
452+
*/
453+
@Test
454+
public void testReSubscribeAfterTerminalState() {
455+
final PublishSubject<Integer> ps = PublishSubject.create();
456+
457+
Observer<Integer> o1 = mock(Observer.class);
458+
Subscription s1 = ps.subscribe(o1);
459+
460+
// emit
461+
ps.onNext(1);
462+
463+
// validate we got it
464+
InOrder inOrder1 = inOrder(o1);
465+
inOrder1.verify(o1, times(1)).onNext(1);
466+
inOrder1.verifyNoMoreInteractions();
467+
468+
// unsubscribe
469+
s1.unsubscribe();
470+
471+
ps.onCompleted();
472+
473+
// emit again but nothing will be there to receive it
474+
ps.onNext(2);
475+
476+
Observer<Integer> o2 = mock(Observer.class);
477+
Subscription s2 = ps.subscribe(o2);
478+
479+
// emit
480+
ps.onNext(3);
481+
482+
// validate we got it
483+
InOrder inOrder2 = inOrder(o2);
484+
inOrder2.verify(o2, times(1)).onNext(3);
485+
inOrder2.verifyNoMoreInteractions();
486+
487+
s2.unsubscribe();
488+
}
489+
536490
}
537491
}

0 commit comments

Comments
 (0)