Skip to content

Commit f54857a

Browse files
add methods to BlockingObservable
1 parent 5efe114 commit f54857a

File tree

3 files changed

+92
-27
lines changed

3 files changed

+92
-27
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1830,7 +1830,7 @@ class UnitTestSuite extends JUnitSuite {
18301830

18311831
@Test def testTest() = {
18321832
val a: Observable[Int] = Observable()
1833-
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.last)
1833+
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)
18341834
}
18351835

18361836
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.junit.{Before, Test, Ignore}
2323
import org.junit.Assert._
2424
import rx.lang.scala.concurrency.NewThreadScheduler
2525

26-
//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
26+
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
2727
class RxScalaDemo extends JUnitSuite {
2828

2929
@Test def intervalExample() {
@@ -229,11 +229,17 @@ class RxScalaDemo extends JUnitSuite {
229229
waitFor(sharedNumbers)
230230
}
231231

232+
@Test def testSingleOption() {
233+
assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption)
234+
assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption)
235+
assertEquals(None, Observable() .toBlockingObservable.singleOption)
236+
}
237+
232238
def output(s: String): Unit = println(s)
233239

234240
// blocks until obs has completed
235241
def waitFor[T](obs: Observable[T]): Unit = {
236-
obs.toBlockingObservable.last
242+
obs.toBlockingObservable.toIterable.last
237243
}
238244

239245
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,99 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <:
2222
extends AnyVal
2323
{
2424

25+
/**
26+
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
27+
* completes.
28+
* <p>
29+
* NOTE: This will block even if the Observable is asynchronous.
30+
* <p>
31+
* This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does
32+
* not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods.
33+
* <p>
34+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.forEach.png">
35+
*
36+
* @param onNext
37+
* the {@link Action1} to invoke for every item emitted by the {@link Observable}
38+
* @throws RuntimeException
39+
* if an error occurs
40+
*/
2541
def foreach(f: T => Unit): Unit = {
26-
asJava.forEach(f)
42+
asJava.forEach(f);
2743
}
2844

29-
def last: T = {
30-
asJava.last() : T // useless ascription because of compiler bug
45+
// last -> use toIterable.last
46+
// lastOrDefault -> use toIterable.lastOption
47+
// first -> use toIterable.head
48+
// firstOrDefault -> use toIterable.headOption
49+
// single(predicate) -> use filter and single
50+
// singleOrDefault -> use singleOption
51+
52+
/**
53+
* Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}.
54+
* <p>
55+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.mostRecent.png">
56+
*
57+
* @param initialValue
58+
* the initial value that will be yielded by the {@link Iterable} sequence if the {@link Observable} has not yet emitted an item
59+
* @return an {@link Iterable} that on each iteration returns the item that the {@link Observable} has most recently emitted
60+
*/
61+
def mostRecent[U >: T](initialValue: U): Iterable[U] = {
62+
val asJavaU = asJava.asInstanceOf[rx.observables.BlockingObservable[U]]
63+
asJavaU.mostRecent(initialValue).asScala: Iterable[U] // useless ascription because of compiler bug
3164
}
3265

33-
// last(Func1<? super T, Boolean>)
34-
// lastOrDefault(T)
35-
// lastOrDefault(T, Func1<? super T, Boolean>)
36-
// mostRecent(T)
37-
// next()
66+
/**
67+
* Returns an {@link Iterable} that blocks until the {@link Observable} emits another item,
68+
* then returns that item.
69+
* <p>
70+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.next.png">
71+
*
72+
* @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} emits a new item, whereupon the Iterable returns that item
73+
*/
74+
def next: Iterable[T] = {
75+
asJava.next().asScala: Iterable[T] // useless ascription because of compiler bug
76+
}
3877

78+
/**
79+
* If this {@link Observable} completes after emitting a single item, return that item,
80+
* otherwise throw an exception.
81+
* <p>
82+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.single.png">
83+
*
84+
* @return the single item emitted by the {@link Observable}
85+
*/
3986
def single: T = {
40-
asJava.single() : T // useless ascription because of compiler bug
87+
asJava.single(): T // useless ascription because of compiler bug
88+
}
89+
90+
/**
91+
* If this {@link Observable} completes after emitting a single item, return an Option containing
92+
* this item, otherwise return {@code None}.
93+
*/
94+
def singleOption: Option[T] = {
95+
var size: Int = 0
96+
var last: Option[T] = None
97+
for (t <- toIterable) {
98+
size += 1
99+
last = Some(t)
100+
}
101+
if (size == 1) last else None
41102
}
42-
43-
// single(Func1<? super T, Boolean>)
44-
45-
// def singleOption: Option[T] = { TODO }
46-
// corresponds to Java's
47-
// singleOrDefault(T)
48-
49-
// singleOrDefault(BlockingObservable<? extends T>, boolean, T)
50-
// singleOrDefault(T, Func1<? super T, Boolean>)
51-
// toFuture()
52-
103+
104+
// TODO toFuture()
105+
106+
/**
107+
* Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}.
108+
*/
53109
def toIterable: Iterable[T] = {
54-
asJava.toIterable().asScala : Iterable[T] // useless ascription because of compiler bug
110+
asJava.toIterable().asScala: Iterable[T] // useless ascription because of compiler bug
55111
}
56-
112+
113+
/**
114+
* Returns a {@link List} that contains all items emitted by this {@link Observable}.
115+
*/
57116
def toList: List[T] = {
58-
asJava.toIterable().asScala.toList : List[T] // useless ascription because of compiler bug
117+
asJava.toIterable().asScala.toList: List[T] // useless ascription because of compiler bug
59118
}
60119

61-
}
120+
}

0 commit comments

Comments
 (0)