diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
index 2c432adf36..e0e92c83eb 100644
--- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
+++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
@@ -19,6 +19,7 @@ import java.io.IOException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.DurationLong
@@ -47,6 +48,29 @@ import rx.lang.scala.schedulers._
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {
+ @Test def subscribeExample() {
+ val o = Observable.items(1, 2, 3)
+
+ // Generally, we have two methods, `subscribe` and `foreach`, to listen to the messages from an Observable.
+ // `foreach` is just an alias to `subscribe`.
+ o.subscribe(
+ n => println(n),
+ e => e.printStackTrace(),
+ () => println("done")
+ )
+
+ o.foreach(
+ n => println(n),
+ e => e.printStackTrace(),
+ () => println("done")
+ )
+
+ // For-comprehension is also an alternative, if you are only interested in `onNext`
+ for (i <- o) {
+ println(i)
+ }
+ }
+
@Test def intervalExample() {
val o = Observable.interval(200 millis).take(5)
o.subscribe(n => println("n = " + n))
@@ -130,7 +154,7 @@ class RxScalaDemo extends JUnitSuite {
o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_))
}
- @Test def fattenSomeExample() {
+ @Test def flattenSomeExample() {
// To merge some observables which are all known already:
List(
Observable.interval(200 millis),
@@ -139,6 +163,24 @@ class RxScalaDemo extends JUnitSuite {
).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_))
}
+ @Test def flattenExample() {
+ List(
+ Observable.interval(200 millis).map(_ => 1).take(5),
+ Observable.interval(200 millis).map(_ => 2).take(5),
+ Observable.interval(200 millis).map(_ => 3).take(5),
+ Observable.interval(200 millis).map(_ => 4).take(5)
+ ).toObservable.flatten.toBlocking.foreach(println(_))
+ }
+
+ @Test def flattenExample2() {
+ List(
+ Observable.interval(200 millis).map(_ => 1).take(5),
+ Observable.interval(200 millis).map(_ => 2).take(5),
+ Observable.interval(200 millis).map(_ => 3).take(5),
+ Observable.interval(200 millis).map(_ => 4).take(5)
+ ).toObservable.flatten(2).toBlocking.foreach(println(_))
+ }
+
@Test def rangeAndBufferExample() {
val o = Observable.from(1 to 18)
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
@@ -276,6 +318,13 @@ class RxScalaDemo extends JUnitSuite {
sequenced.subscribe(x => println(s"Emitted group: $x"))
}
+ @Test def groupByUntilExample2() {
+ val numbers = Observable.interval(250 millis).take(14)
+ val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)})
+ val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
+ sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
+ }
+
@Test def combineLatestExample() {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
@@ -286,6 +335,17 @@ class RxScalaDemo extends JUnitSuite {
waitFor(combinedCounter)
}
+ @Test def combineLatestExample2() {
+ val firstCounter = Observable.interval(250 millis)
+ val secondCounter = Observable.interval(550 millis)
+ val thirdCounter = Observable.interval(850 millis)
+ val sources = Seq(firstCounter, secondCounter, thirdCounter)
+ val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10)
+
+ combinedCounter subscribe {x => println(s"Emitted group: $x")}
+ waitFor(combinedCounter)
+ }
+
@Test def olympicsExampleWithoutPublish() {
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
medals.subscribe(println(_)) // triggers an execution of medals Observable
@@ -829,6 +889,40 @@ class RxScalaDemo extends JUnitSuite {
println(m.toBlockingObservable.single)
}
+ @Test def toMultimapExample1(): Unit = {
+ val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
+ val keySelector = (s: String) => s.head
+ val m = o.toMultimap(keySelector)
+ println(m.toBlocking.single)
+ }
+
+ @Test def toMultimapExample2(): Unit = {
+ val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
+ val keySelector = (s: String) => s.head
+ val valueSelector = (s: String) => s.tail
+ val m = o.toMultimap(keySelector, valueSelector)
+ println(m.toBlocking.single)
+ }
+
+ @Test def toMultimapExample3(): Unit = {
+ val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
+ val keySelector = (s: String) => s.head
+ val valueSelector = (s: String) => s.tail
+ val mapFactory = () => mutable.Map('d' -> mutable.Buffer("oug"))
+ val m = o.toMultimap(keySelector, valueSelector, mapFactory)
+ println(m.toBlocking.single.mapValues(_.toList))
+ }
+
+ @Test def toMultimapExample4(): Unit = {
+ val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
+ val keySelector = (s: String) => s.head
+ val valueSelector = (s: String) => s.tail
+ val mapFactory = () => mutable.Map('d' -> mutable.ListBuffer("oug"))
+ val bufferFactory = (k: Char) => mutable.ListBuffer[String]()
+ val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory)
+ println(m.toBlocking.single)
+ }
+
@Test def containsExample(): Unit = {
val o1 = List(1, 2, 3).toObservable.contains(2)
assertTrue(o1.toBlockingObservable.single)
@@ -857,6 +951,28 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
}
+ @Test def retryExample3(): Unit = {
+ var isFirst = true
+ val o = Observable {
+ (subscriber: Subscriber[String]) =>
+ if (isFirst) {
+ subscriber.onNext("alice")
+ subscriber.onError(new IOException("Oops"))
+ isFirst = false
+ }
+ else {
+ subscriber.onNext("bob")
+ subscriber.onError(new RuntimeException("Oops"))
+ }
+ }
+ o.retry {
+ (times, e) => e match {
+ case e: IOException => times <= 3
+ case _ => false
+ }
+ }.subscribe(s => println(s), e => e.printStackTrace())
+ }
+
@Test def liftExample1(): Unit = {
// Add "No. " in front of each item
val o = List(1, 2, 3).toObservable.lift {
@@ -1176,4 +1292,82 @@ class RxScalaDemo extends JUnitSuite {
.take(20)
.toBlocking.foreach(println)
}
+
+ @Test def onErrorResumeNextExample() {
+ val o = Observable {
+ (subscriber: Subscriber[Int]) =>
+ subscriber.onNext(1)
+ subscriber.onNext(2)
+ subscriber.onError(new IOException("Oops"))
+ subscriber.onNext(3)
+ subscriber.onNext(4)
+ }
+ o.onErrorResumeNext(_ => Observable.items(10, 11, 12)).subscribe(println(_))
+ }
+
+ @Test def onErrorFlatMapExample() {
+ val o = Observable {
+ (subscriber: Subscriber[Int]) =>
+ subscriber.onNext(1)
+ subscriber.onNext(2)
+ subscriber.onError(new IOException("Oops"))
+ subscriber.onNext(3)
+ subscriber.onNext(4)
+ }
+ o.onErrorFlatMap((_, _) => Observable.items(10, 11, 12)).subscribe(println(_))
+ }
+
+ @Test def onErrorFlatMapExample2() {
+ val o = Observable.items(4, 2, 0).map(16 / _).onErrorFlatMap {
+ (e, op) => op match {
+ case Some(v) if v == 0 => Observable.items(Int.MinValue)
+ case _ => Observable.empty
+ }
+ }
+ o.subscribe(println(_))
+ }
+
+ @Test def switchMapExample() {
+ val o = Observable.interval(300 millis).take(5).switchMap[String] {
+ n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}")
+ }
+ o.toBlocking.foreach(println)
+ }
+
+ @Test def joinExample() {
+ val o1 = Observable.interval(500 millis).map(n => "1: " + n)
+ val o2 = Observable.interval(100 millis).map(n => "2: " + n)
+ val o = o1.join(o2,
+ (_: String) => Observable.timer(300 millis),
+ (_: String) => Observable.timer(200 millis),
+ (t1: String, t2: String) => (t1, t2))
+ o.take(10).toBlocking.foreach(println)
+ }
+
+ @Test def groupJoinExample() {
+ val o1 = Observable.interval(500 millis).map(n => "1: " + n)
+ val o2 = Observable.interval(100 millis).map(n => "2: " + n)
+ val o = o1.groupJoin(o2,
+ (_: String) => Observable.timer(300 millis),
+ (_: String) => Observable.timer(200 millis),
+ (t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single))
+ o.take(3).toBlocking.foreach(println)
+ }
+
+ @Test def pivotExample() {
+ val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map {
+ case (t: String, o: Observable[Int]) => (t, o.groupBy(i => i % 2 == 0))
+ }
+ println("o1:")
+ (for ((k1, o) <- o1;
+ (k2, vs) <- o;
+ v <- vs
+ ) yield (k1, k2, v)).subscribe(println(_))
+ val o2 = o1.pivot
+ println("o2:")
+ (for ((k1, o) <- o2;
+ (k2, vs) <- o;
+ v <- vs
+ ) yield (k1, k2, v)).subscribe(println(_))
+ }
}
diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
index d3c0621341..0d8ddcda20 100644
--- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
+++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
@@ -102,6 +102,7 @@ trait Observable[+T]
import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.concurrent.duration.{Duration, TimeUnit, MILLISECONDS}
+ import scala.collection.mutable
import rx.functions._
import rx.lang.scala.observables.BlockingObservable
import ImplicitFunctionConversions._
@@ -1208,6 +1209,30 @@ trait Observable[+T]
toScalaObservable[U](asJavaObservable.onErrorReturn(f2))
}
+ /**
+ * Intercepts `onError` notifications from the source Observable and replaces them with the
+ * `onNext` emissions of an Observable returned by a specified function. This allows the source
+ * sequence to continue even if it issues multiple `onError` notifications.
+ *
+ *
+ *
+ * @param resumeFunction a function that accepts an `Throwable` and an `Option` associated with this error representing
+ * the Throwable issued by the source Observable, and returns an Observable that emits items
+ * that will be emitted in place of the error. If no value is associated with the error, the value
+ * will be `None`.
+ * @return the original Observable, with appropriately modified behavior
+ */
+ def onErrorFlatMap[U >: T](resumeFunction: (Throwable, Option[Any]) => Observable[U]): Observable[U] = {
+ val f = new Func1[rx.exceptions.OnErrorThrowable, rx.Observable[_ <: U]] {
+ override def call(t: rx.exceptions.OnErrorThrowable): rx.Observable[_ <: U] = {
+ val v = if (t.isValueNull) Some(t.getValue) else None
+ resumeFunction(t.getCause, v).asJavaObservable
+ }
+ }
+ val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]]
+ toScalaObservable[U](thisJava.onErrorFlatMap(f))
+ }
+
/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
@@ -2169,6 +2194,35 @@ trait Observable[+T]
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}
+ /**
+ * Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function
+ * until the duration Observable expires for the key.
+ *
+ *
+ *
+ * Note: The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it
+ * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that
+ * do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them.
+ *
+ * @param keySelector a function to extract the key for each item
+ * @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one
+ * of the resulting `Observable[V]`s
+ * @param closings a function to signal the expiration of a group
+ * @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key
+ * value and each of which emits all items emitted by the source [[Observable]] during that
+ * key's duration that share that same key value, transformed by the value selector
+ */
+ def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V, closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = {
+ val jKeySelector: Func1[_ >: T, _ <: K] = keySelector
+ val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector
+ val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] {
+ override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo))
+ }
+ val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o))
+ val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f)
+ toScalaObservable[(K, Observable[V])](jo)
+ }
+
/**
* Correlates the items emitted by two Observables based on overlapping durations.
*
@@ -2213,6 +2267,53 @@ trait Observable[+T]
)
}
+ /**
+ * Returns an Observable that correlates two Observables when they overlap in time and groups the results.
+ *
+ *
+ *
+ * @param other the other Observable to correlate items from the source Observable with
+ * @param leftDuration a function that returns an Observable whose emissions indicate the duration of the values of
+ * the source Observable
+ * @param rightDuration a function that returns an Observable whose emissions indicate the duration of the values of
+ * the `other` Observable
+ * @param resultSelector a function that takes an item emitted by each Observable and returns the value to be emitted
+ * by the resulting Observable
+ * @return an Observable that emits items based on combining those items emitted by the source Observables
+ * whose durations overlap
+ */
+ def groupJoin[S, R](other: Observable[S], leftDuration: T => Observable[Any], rightDuration: S => Observable[Any], resultSelector: (T, Observable[S]) => R): Observable[R] = {
+ val outer: rx.Observable[_ <: T] = this.asJavaObservable
+ val inner: rx.Observable[_ <: S] = other.asJavaObservable
+ val left: Func1[_ >: T, _ <: rx.Observable[_ <: Any]] = (t: T) => leftDuration(t).asJavaObservable
+ val right: Func1[_ >: S, _ <: rx.Observable[_ <: Any]] = (s: S) => rightDuration(s).asJavaObservable
+ val f: Func2[_ >: T, _ >: rx.Observable[S], _ <: R] = (t: T, o: rx.Observable[S]) => resultSelector(t, toScalaObservable[S](o))
+ toScalaObservable[R](
+ outer.asInstanceOf[rx.Observable[T]].groupJoin[S, Any, Any, R](
+ inner.asInstanceOf[rx.Observable[S]],
+ left.asInstanceOf[Func1[T, rx.Observable[Any]]],
+ right.asInstanceOf[Func1[S, rx.Observable[Any]]],
+ f)
+ )
+ }
+
+ /**
+ * Returns a new Observable by applying a function that you supply to each item emitted by the source
+ * Observable that returns an Observable, and then emitting the items emitted by the most recently emitted
+ * of these Observables.
+ *
+ *
+ *
+ * @param f a function that, when applied to an item emitted by the source Observable, returns an Observable
+ * @return an Observable that emits the items emitted by the Observable returned from applying a function to
+ * the most recently emitted item emitted by the source Observable
+ */
+ def switchMap[R](f: T => Observable[R]): Observable[R] = {
+ toScalaObservable[R](asJavaObservable.switchMap[R](new Func1[T, rx.Observable[_ <: R]] {
+ def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable
+ }))
+ }
+
/**
* Given an Observable that emits Observables, creates a single Observable that
* emits the items emitted by the most recently published of those Observables.
@@ -2305,6 +2406,28 @@ trait Observable[+T]
toScalaObservable[U](o5)
}
+ /**
+ * Flattens an Observable that emits Observables into a single Observable that emits the items emitted by
+ * those Observables, without any transformation, while limiting the maximum number of concurrent
+ * subscriptions to these Observables.
+ *
+ *
+ *
+ * You can combine the items emitted by multiple Observables so that they appear as a single Observable, by
+ * using the `flatten` method.
+ *
+ * @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently
+ * @return an Observable that emits items that are the result of flattening the Observables emitted by the `source` Observable
+ * @throws IllegalArgumentException if `maxConcurrent` is less than or equal to 0
+ */
+ def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = this
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.merge[U](o4, maxConcurrent)
+ toScalaObservable[U](o5)
+ }
+
/**
* This behaves like `flatten` except that if any of the merged Observables
* notify of an error via [[rx.lang.scala.Observer.onError onError]], this method will
@@ -2912,6 +3035,26 @@ trait Observable[+T]
}
}
+ /**
+ * Returns an Observable that emits the items emitted by the source Observable or a specified default item
+ * if the source Observable is empty.
+ *
+ *
+ *
+ * @param default the item to emit if the source Observable emits no items. This is a by-name parameter, so it is
+ * only evaluated if the source Observable doesn't emit anything.
+ * @return an Observable that emits either the specified default item if the source Observable emits no
+ * items, or the items emitted by the source Observable
+ */
+ def orElse[U >: T](default: => U): Observable[U] = {
+ val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]]
+ val o = toScalaObservable[Option[T]](jObservableOption.defaultIfEmpty(None))
+ o map {
+ case Some(element) => element
+ case None => default
+ }
+ }
+
/**
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
*
@@ -3023,6 +3166,22 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.retry())
}
+ /**
+ * Returns an Observable that mirrors the source Observable, resubscribing to it if it calls `onError`
+ * and the predicate returns true for that specific exception and retry count.
+ *
+ *
+ *
+ * @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry count
+ * @return the source Observable modified with retry logic
+ */
+ def retry(predicate: (Int, Throwable) => Boolean): Observable[T] = {
+ val f = new Func2[java.lang.Integer, Throwable, java.lang.Boolean] {
+ def call(times: java.lang.Integer, e: Throwable): java.lang.Boolean = predicate(times, e)
+ }
+ toScalaObservable[T](asJavaObservable.retry(f))
+ }
+
/**
* Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
*
@@ -3602,6 +3761,214 @@ trait Observable[+T]
def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = {
toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator)))
}
+
+ /**
+ * Converts the source `Observable[T]` into an `Observable[Observable[T]]` that emits the source Observable as its single emission.
+ *
+ *
+ *
+ * @return an Observable that emits a single item: the source Observable
+ */
+ def nest: Observable[Observable[T]] = {
+ toScalaObservable(asJavaObservable.nest).map(toScalaObservable[T](_))
+ }
+
+ /**
+ * Subscribes to the [[Observable]] and receives notifications for each element.
+ *
+ * Alias to `subscribe(T => Unit)`.
+ *
+ * @param onNext function to execute for each item.
+ * @throws IllegalArgumentException if `onNext` is null
+ * @since 0.19
+ */
+ def foreach(onNext: T => Unit): Unit = {
+ asJavaObservable.subscribe(onNext)
+ }
+
+ /**
+ * Subscribes to the [[Observable]] and receives notifications for each element and error events.
+ *
+ * Alias to `subscribe(T => Unit, Throwable => Unit)`.
+ *
+ * @param onNext function to execute for each item.
+ * @param onError function to execute when an error is emitted.
+ * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null
+ * @since 0.19
+ */
+ def foreach(onNext: T => Unit, onError: Throwable => Unit): Unit = {
+ asJavaObservable.subscribe(onNext, onError)
+ }
+
+ /**
+ * Subscribes to the [[Observable]] and receives notifications for each element and the terminal events.
+ *
+ * Alias to `subscribe(T => Unit, Throwable => Unit, () => Unit)`.
+ *
+ * @param onNext function to execute for each item.
+ * @param onError function to execute when an error is emitted.
+ * @param onComplete function to execute when completion is signalled.
+ * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null
+ * @since 0.19
+ */
+ def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = {
+ asJavaObservable.subscribe(onNext, onError, onComplete)
+ }
+
+ /**
+ * Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
+ * and and the set on which their items are grouped.
+ *
+ *
+ *
+ * For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
+ *
+ *
+ *
+ * Note: A `(K, Observable[_])` will cache the items it is to emit until such time as it
+ * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
+ * `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
+ * discard their buffers by applying an operator like `take(0)` to them.
+ *
+ * @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
+ * inner-outer keys.
+ */
+ def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = {
+ import rx.observables.{GroupedObservable => JGroupedObservable}
+ val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() {
+ override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = {
+ val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() {
+ override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = {
+ JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]])
+ }
+ })
+ JGroupedObservable.from(t1._1, jo)
+ }
+ }
+ val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this
+ val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1)))
+ o2.map {
+ (jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => {
+ val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() {
+ override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2))
+ })
+ (jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo))
+ }
+ }
+ }
+
+ /**
+ * Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long.
+ *
+ *
+ *
+ * @return an Observable that emits a single item: the number of items emitted by the source Observable as a 64-bit Long item
+ */
+ def longCount: Observable[Long] = {
+ toScalaObservable[java.lang.Long](asJavaObservable.longCount()).map(_.longValue())
+ }
+
+ /**
+ * Returns an Observable that emits a single `Map` that contains an `Seq` of items emitted by the
+ * source Observable keyed by a specified keySelector` function.
+ *
+ *
+ *
+ * @param keySelector the function that extracts the key from the source items to be used as key in the HashMap
+ * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
+ * the source Observable
+ */
+ def toMultimap[K](keySelector: T => K): Observable[scala.collection.Map[K, Seq[T]]] = {
+ toMultimap(keySelector, k => k)
+ }
+
+ /**
+ * Returns an Observable that emits a single `Map` that contains an `Seq` of values extracted by a
+ * specified `valueSelector` function from items emitted by the source Observable, keyed by a
+ * specified `keySelector` function.
+ *
+ *
+ *
+ * @param keySelector the function that extracts a key from the source items to be used as key in the HashMap
+ * @param valueSelector the function that extracts a value from the source items to be used as value in the HashMap
+ * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
+ * the source Observable
+ */
+ def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[scala.collection.Map[K, Seq[V]]] = {
+ toMultimap(keySelector, valueSelector, () => mutable.Map[K, mutable.Buffer[V]]())
+ }
+
+ /**
+ * Returns an Observable that emits a single `mutable.Map[K, mutable.Buffer[V]]`, returned by a specified `mapFactory` function, that
+ * contains values, extracted by a specified `valueSelector` function from items emitted by the source Observable and
+ * keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`.
+ *
+ *
+ *
+ * @param keySelector the function that extracts a key from the source items to be used as the key in the Map
+ * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map
+ * @param mapFactory he function that returns a `mutable.Map[K, mutable.Buffer[V]]` instance to be used
+ * @return an Observable that emits a single item: a `mutable.Map[K, mutable.Buffer[V]]` that contains items mapped
+ * from the source Observable
+ */
+ def toMultimap[K, V, M <: mutable.Map[K, mutable.Buffer[V]]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M): Observable[M] = {
+ toMultimap[K, V, mutable.Buffer[V], M](keySelector, valueSelector, mapFactory, k => mutable.Buffer[V]())
+ }
+
+ /**
+ * Returns an Observable that emits a single `mutable.Map[K, B]`, returned by a specified `mapFactory` function, that
+ * contains values extracted by a specified `valueSelector` function from items emitted by the source Observable, and
+ * keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`.
+ *
+ *
+ *
+ * @param keySelector the function that extracts a key from the source items to be used as the key in the Map
+ * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map
+ * @param mapFactory the function that returns a Map instance to be used
+ * @param bufferFactory the function that returns a `mutable.Buffer[V]` instance for a particular key to be used in the Map
+ * @return an Observable that emits a single item: a `mutable.Map[K, B]` that contains mapped items from the source Observable.
+ */
+ def toMultimap[K, V, B <: mutable.Buffer[V], M <: mutable.Map[K, B]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M, bufferFactory: K => B): Observable[M] = {
+ // It's complicated to convert `mutable.Map[K, mutable.Buffer[V]]` to `java.util.Map[K, java.util.Collection[V]]`,
+ // so RxScala implements `toMultimap` directly.
+ // Choosing `mutable.Buffer/Map` is because `append/update` is necessary to implement an efficient `toMultimap`.
+ lift {
+ (subscriber: Subscriber[M]) => {
+ val map = mapFactory()
+ Subscriber[T](
+ subscriber,
+ (t: T) => {
+ val key = keySelector(t)
+ val values = map.get(key) match {
+ case Some(v) => v
+ case None => bufferFactory(key)
+ }
+ values += valueSelector(t)
+ map += key -> values: Unit
+ },
+ e => subscriber.onError(e),
+ () => {
+ subscriber.onNext(map)
+ subscriber.onCompleted()
+ }
+ )
+ }
+ }
+ }
}
/**
@@ -3721,6 +4088,21 @@ object Observable {
toScalaObservable[T](rx.Observable.error(exception))
}
+ /**
+ * Returns an Observable that invokes an `Observer`'s `onError` method on the
+ * specified Scheduler.
+ *
+ *
+ *
+ * @param exception the particular Throwable to pass to `onError`
+ * @param scheduler the Scheduler on which to call `onError`
+ * @tparam T the type of the items (ostensibly) emitted by the Observable
+ * @return an Observable that invokes the `Observer`'s `onError` method, on the specified Scheduler
+ */
+ def error[T](exception: Throwable, scheduler: Scheduler): Observable[T] = {
+ toScalaObservable[T](rx.Observable.error(exception, scheduler))
+ }
+
/**
* Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and
* immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method
@@ -4038,6 +4420,26 @@ object Observable {
def amb[T](sources: Observable[T]*): Observable[T] = {
toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava))
}
+
+ /**
+ * Combines a list of source Observables by emitting an item that aggregates the latest values of each of
+ * the source Observables each time an item is received from any of the source Observables, where this
+ * aggregation is defined by a specified function.
+ *
+ * @tparam T the common base type of source values
+ * @tparam R the result type
+ * @param sources the list of source Observables
+ * @param combineFunction the aggregation function used to combine the items emitted by the source Observables
+ * @return an Observable that emits items that are the result of combining the items emitted by the source
+ * Observables by means of the given aggregation function
+ */
+ def combineLatest[T, R](sources: Seq[Observable[T]], combineFunction: Seq[T] => R): Observable[R] = {
+ val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava)
+ val jCombineFunction = new rx.functions.FuncN[R] {
+ override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
+ }
+ toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
+ }
}
diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
index d8a9528daf..f3482f5376 100644
--- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
+++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala
@@ -70,25 +70,36 @@ class CompletenessTest extends JUnitSuite {
"aggregate(Func2[T, T, T])" -> "reduce((U, U) => U)",
"aggregate(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
+ "asObservable()" -> unnecessary,
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
"buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(=> Observable[Any])",
"buffer(Observable[B])" -> "buffer(=> Observable[Any])",
"buffer(Observable[B], Int)" -> "buffer(Observable[Any], Int)",
"buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "buffer(Observable[Opening], Opening => Observable[Any])",
+ "cast(Class[R])" -> "[RxJava needs this one because `rx.Observable` is invariant. `Observable` in RxScala is covariant and does not need this operator.]",
+ "collect(R, Action2[R, _ >: T])" -> "foldLeft(R)((R, T) => R)",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
"debounce(Func1[_ >: T, _ <: Observable[U]])" -> "debounce(T => Observable[Any])",
+ "defaultIfEmpty(T)" -> "orElse(=> U)",
"delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])",
"delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
+ "doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
+ "forEach(Action1[_ >: T])" -> "foreach(T => Unit)",
+ "forEach(Action1[_ >: T], Action1[Throwable])" -> "foreach(T => Unit, Throwable => Unit)",
+ "forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
+ "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V, (K, Observable[V]) => Observable[Any])",
+ "groupJoin(Observable[T2], Func1[_ >: T, _ <: Observable[D1]], Func1[_ >: T2, _ <: Observable[D2]], Func2[_ >: T, _ >: Observable[T2], _ <: R])" -> "groupJoin(Observable[S], T => Observable[Any], S => Observable[Any], (T, Observable[S]) => R)",
"ignoreElements()" -> "[use `filter(_ => false)`]",
+ "join(Observable[TRight], Func1[T, Observable[TLeftDuration]], Func1[TRight, Observable[TRightDuration]], Func2[T, TRight, R])" -> "join(Observable[S], T => Observable[Any], S => Observable[Any], (T, S) => R)",
"last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]",
"lastOrDefault(T)" -> "lastOrElse(=> U)",
"lastOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).lastOrElse(default)`]",
@@ -102,10 +113,12 @@ class CompletenessTest extends JUnitSuite {
"mergeMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterable(T => Iterable[U], (T, U) => R)",
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
+ "ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
+ "onErrorFlatMap(Func1[OnErrorThrowable, _ <: Observable[_ <: T]])" -> "onErrorFlatMap((Throwable, Option[Any]) => Observable[U])",
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
"publish(T)" -> "publish(U)",
@@ -122,6 +135,7 @@ class CompletenessTest extends JUnitSuite {
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)",
+ "retry(Func2[Integer, Throwable, Boolean])" -> "retry((Int, Throwable) => Boolean)",
"sample(Observable[U])" -> "sample(Observable[Any])",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
@@ -163,6 +177,8 @@ class CompletenessTest extends JUnitSuite {
"timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)",
"timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)",
"toList()" -> "toSeq",
+ "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultimap(T => K, T => V, () => M)",
+ "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => M, K => B)",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Observable[U])" -> "window(=> Observable[Any])",
@@ -172,6 +188,7 @@ class CompletenessTest extends JUnitSuite {
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
// manually added entries for Java static methods
+ "amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)",
"average(Observable[Integer])" -> averageProblem,
"averageDoubles(Observable[Double])" -> averageProblem,
"averageFloats(Observable[Float])" -> averageProblem,
@@ -179,9 +196,12 @@ class CompletenessTest extends JUnitSuite {
"create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)",
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
+ "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]], Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
- "from(Array[T])" -> "[use `items(T*)`]",
+ "from(Array[T])" -> "items(T*)",
+ "from(