Skip to content

Commit 6a7ea11

Browse files
Merge pull request #389 from samuelgruetter/idiomaticscala
Scala Adaptor Improvements
2 parents 00d7c3b + f54857a commit 6a7ea11

File tree

12 files changed

+469
-275
lines changed

12 files changed

+469
-275
lines changed

language-adaptors/rxjava-scala/README.md

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,71 @@
11
# Scala Adaptor for RxJava
22

3-
There's an old Scala adaptor ( `rx.lang.scala.RxImplicits` with test `rx.lang.scala.RxImplicitsTest` ), which is deprecated. All other classes in `rx.lang.scala` belong to the new adaptor.
3+
This adaptor allows to use RxJava in Scala with anonymous functions, e.g.
44

5-
# Binaries
5+
```scala
6+
val o = Observable.interval(200 millis).take(5)
7+
o.subscribe(n => println("n = " + n))
8+
Observable(1, 2, 3, 4).reduce(_ + _)
9+
```
10+
11+
For-comprehensions are also supported:
12+
13+
```scala
14+
val first = Observable(10, 11, 12)
15+
val second = Observable(10, 11, 12)
16+
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
17+
```
18+
19+
Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples:
20+
21+
```scala
22+
// instead of concat:
23+
def ++[U >: T](that: Observable[U]): Observable[U]
24+
25+
// instance method instead of static:
26+
def zip[U](that: Observable[U]): Observable[(T, U)]
27+
28+
// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications:
29+
def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U]
30+
31+
// additional type parameter U with lower bound to get covariance right:
32+
def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U]
33+
34+
// curried in Scala collections, so curry fold also here:
35+
def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R]
36+
37+
// using Duration instead of (long timepan, TimeUnit duration):
38+
def sample(duration: Duration): Observable[T]
39+
40+
// called skip in Java, but drop in Scala
41+
def drop(n: Int): Observable[T]
42+
43+
// there's only mapWithIndex in Java, because Java doesn't have tuples:
44+
def zipWithIndex: Observable[(T, Int)]
45+
46+
// corresponds to Java's toList:
47+
def toSeq: Observable[Seq[T]]
48+
49+
// the implicit evidence argument ensures that switch can only be called on Observables of Observables:
50+
def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U]
51+
52+
// Java's from becomes apply, and we use Scala Range
53+
def apply(range: Range): Observable[Int]
54+
55+
// use Bottom type:
56+
def never: Observable[Nothing]
57+
```
58+
59+
Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome).
60+
61+
For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala).
62+
63+
Scala code using Rx should only import members from `rx.lang.scala` and below.
64+
65+
Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala).
66+
67+
68+
## Binaries
669

770
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22).
871

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.lang.scala
17+
18+
import java.{ lang => jlang }
19+
import rx.util.functions._
20+
21+
/**
22+
* These function conversions convert between Scala functions and Rx Funcs and Actions.
23+
* Most users RxScala won't need them, but they might be useful if one wants to use
24+
* the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants
25+
* to use a Java library taking/returning Funcs and Actions.
26+
*/
27+
object ImplicitFunctionConversions {
28+
import language.implicitConversions
29+
30+
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
31+
new rx.Observable.OnSubscribeFunc[T] {
32+
def onSubscribe(obs: Observer[_ >: T]): Subscription = {
33+
f(obs)
34+
}
35+
}
36+
37+
/**
38+
* Converts a by-name parameter to a Rx Func0
39+
*/
40+
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
41+
new Func0[B] {
42+
def call(): B = param
43+
}
44+
45+
/**
46+
* Converts 0-arg function to Rx Action0
47+
*/
48+
implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 =
49+
new Action0 {
50+
def call(): Unit = f()
51+
}
52+
53+
/**
54+
* Converts 1-arg function to Rx Action1
55+
*/
56+
implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] =
57+
new Action1[A] {
58+
def call(a: A): Unit = f(a)
59+
}
60+
61+
/**
62+
* Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
63+
*/
64+
implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] =
65+
new Func1[A, jlang.Boolean] {
66+
def call(a: A): jlang.Boolean = f(a).booleanValue
67+
}
68+
69+
/**
70+
* Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean]
71+
*/
72+
implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] =
73+
new Func2[A, B, jlang.Boolean] {
74+
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
75+
}
76+
77+
/**
78+
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
79+
*/
80+
implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] =
81+
new Func2[A, jlang.Integer, jlang.Boolean] {
82+
def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue
83+
}
84+
85+
/**
86+
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
87+
*/
88+
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
89+
new Func2[A, A, jlang.Integer] {
90+
def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue
91+
}
92+
93+
/**
94+
* This implicit allows Scala code to use any exception type and still work
95+
* with invariant Func1 interface
96+
*/
97+
implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] =
98+
new Func1[Exception, B] {
99+
def call(ex: Exception): B = f(ex.asInstanceOf[A])
100+
}
101+
102+
/**
103+
* The following implicits convert functions of different arities into the Rx equivalents
104+
*/
105+
implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] =
106+
new Func0[A] {
107+
def call(): A = f()
108+
}
109+
110+
implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] =
111+
new Func1[A, B] {
112+
def call(a: A): B = f(a)
113+
}
114+
115+
implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] =
116+
new Func2[A, B, C] {
117+
def call(a: A, b: B) = f(a, b)
118+
}
119+
120+
implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] =
121+
new Func3[A, B, C, D] {
122+
def call(a: A, b: B, c: C) = f(a, b, c)
123+
}
124+
125+
implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] =
126+
new Func4[A, B, C, D, E] {
127+
def call(a: A, b: B, c: C, d: D) = f(a, b, c, d)
128+
}
129+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 34 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package rx.lang.scala
2020
import org.scalatest.junit.JUnitSuite
2121
import scala.collection.Seq
2222
import rx.lang.scala.observables.BlockingObservable
23-
import rx.lang.scala.observables.ConnectableObservable
2423

2524

2625
/**
@@ -38,7 +37,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
3837
import rx.util.functions._
3938
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
4039
import rx.lang.scala.util._
41-
import rx.lang.scala.internal.ImplicitFunctionConversions._
40+
import rx.lang.scala.subjects.Subject
41+
import rx.lang.scala.ImplicitFunctionConversions._
4242

4343
/**
4444
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
@@ -132,11 +132,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
132132
* into
133133
* @param <R>
134134
* result type
135-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
136-
* push results into the specified {@link Subject}
135+
* @return a pair of a start function and an {@link Observable} such that when the start function
136+
* is called, the Observable starts to push results into the specified {@link Subject}
137137
*/
138-
// public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) TODO
139-
138+
def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = {
139+
val javaCO = asJava.multicast[R](subject)
140+
(() => javaCO.connect(), Observable[R](javaCO))
141+
}
140142

141143
/**
142144
* Returns an Observable that first emits the items emitted by this, and then the items emitted
@@ -904,11 +906,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
904906
* <p>
905907
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.png">
906908
*
907-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
908-
* emit items to its {@link Observer}s
909+
* @return a pair of a start function and an {@link Observable} such that when the start function
910+
* is called, the Observable starts to emit items to its {@link Observer}s
909911
*/
910-
def replay(): ConnectableObservable[T] = {
911-
new ConnectableObservable[T](asJava.replay())
912+
def replay(): (() => Subscription, Observable[T]) = {
913+
val javaCO = asJava.replay()
914+
(() => javaCO.connect(), Observable[T](javaCO))
912915
}
913916

914917
/**
@@ -937,11 +940,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
937940
* <p>
938941
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
939942
*
940-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
941-
* emit items to its {@link Observer}s
943+
* @return a pair of a start function and an {@link Observable} such that when the start function
944+
* is called, the Observable starts to emit items to its {@link Observer}s
942945
*/
943-
def publish: ConnectableObservable[T] = {
944-
new ConnectableObservable[T](asJava.publish())
946+
def publish: (() => Subscription, Observable[T]) = {
947+
val javaCO = asJava.publish()
948+
(() => javaCO.connect(), Observable[T](javaCO))
945949
}
946950

947951
// There is no aggregate function with signature
@@ -1215,51 +1219,25 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12151219
// because we can just use ++ instead
12161220

12171221
/**
1218-
* Groups the items emitted by an Observable according to a specified criterion, and emits these
1219-
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
1220-
* <p>
1221-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/groupBy.png">
1222+
* Groups the items emitted by this Observable according to a specified discriminator function.
12221223
*
1223-
* @param keySelector
1224+
* @param f
12241225
* a function that extracts the key from an item
1225-
* @param elementSelector
1226-
* a function to map a source item to an item in a {@link GroupedObservable}
1227-
* @param <K>
1228-
* the key type
1229-
* @param <R>
1230-
* the type of items emitted by the resulting {@link GroupedObservable}s
1231-
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
1232-
* unique key value and emits items representing items from the source Observable that
1233-
* share that key value
1234-
*/
1235-
/* TODO make a Scala GroupedObservable and groupBy
1236-
def groupBy[K,R](keySelector: T => K, elementSelector: T => R ): Observable[GroupedObservable[K,R]] = {
1237-
???
1238-
}
1239-
*/
1240-
// public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector)
1241-
1242-
/**
1243-
* Groups the items emitted by an Observable according to a specified criterion, and emits these
1244-
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
1245-
* <p>
1246-
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/groupBy.png">
1247-
*
1248-
* @param keySelector
1249-
* a function that extracts the key for each item
12501226
* @param <K>
1251-
* the key type
1252-
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
1253-
* unique key value and emits items representing items from the source Observable that
1254-
* share that key value
1227+
* the type of keys returned by the discriminator function.
1228+
* @return an Observable that emits {@code (key, observable)} pairs, where {@code observable}
1229+
* contains all items for which {@code f} returned {@code key}.
12551230
*/
1256-
/* TODO
1257-
def groupBy[K](keySelector: T => K ): Observable[GroupedObservable[K,T]] = {
1258-
???
1231+
def groupBy[K](f: T => K): Observable[(K, Observable[T])] = {
1232+
val o1 = asJava.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
1233+
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey(), Observable[T](o))
1234+
Observable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
12591235
}
1260-
*/
1261-
// public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector)
12621236

1237+
// There's no method corresponding to
1238+
// public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector)
1239+
// because this can be obtained by combining groupBy and map (as in Scala)
1240+
12631241
/**
12641242
* Given an Observable that emits Observables, creates a single Observable that
12651243
* emits the items emitted by the most recently published of those Observables.
@@ -1482,7 +1460,7 @@ object Observable {
14821460
import rx.{Observable => JObservable}
14831461
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
14841462
import rx.lang.scala.util._
1485-
import rx.lang.scala.internal.ImplicitFunctionConversions._
1463+
import rx.lang.scala.ImplicitFunctionConversions._
14861464

14871465
private[scala]
14881466
def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
@@ -1800,7 +1778,7 @@ object Observable {
18001778
// "implementation restriction: nested class is not allowed in value class.
18011779
// This restriction is planned to be removed in subsequent releases."
18021780
class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <: T]) {
1803-
import rx.lang.scala.internal.ImplicitFunctionConversions._
1781+
import rx.lang.scala.ImplicitFunctionConversions._
18041782

18051783
def map[B](f: T => B): Observable[B] = {
18061784
Observable[B](asJava.filter(p).map[B](f))
@@ -1852,7 +1830,7 @@ class UnitTestSuite extends JUnitSuite {
18521830

18531831
@Test def testTest() = {
18541832
val a: Observable[Int] = Observable()
1855-
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.last)
1833+
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)
18561834
}
18571835

18581836
}

0 commit comments

Comments
 (0)