diff --git a/language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java b/language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java index fde62e0dd8..e9b3f6be64 100644 --- a/language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java +++ b/language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java @@ -19,7 +19,7 @@ import rx.lang.scala.examples.Movie; import rx.lang.scala.examples.MovieLib; import rx.util.functions.Action1; -import static rx.lang.scala.ImplicitFunctionConversions.toScalaObservable; +import static rx.lang.scala.JavaConversions.toScalaObservable; public class MovieLibUsage { diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index be7a30e9d6..3dbd9461fd 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationLong import scala.language.postfixOps +import scala.language.implicitConversions import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue @@ -28,8 +29,7 @@ import org.junit.Ignore import org.junit.Test import org.scalatest.junit.JUnitSuite -import rx.lang.scala.Notification -import rx.lang.scala.Observable +import rx.lang.scala._ import rx.lang.scala.concurrency._ @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily @@ -132,15 +132,14 @@ class RxScalaDemo extends JUnitSuite { } @Test def rangeAndBufferExample() { - val o = Observable(1 to 18) + val o = Observable.from(1 to 18) o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]"))) } @Test def windowExample() { - // this will be nicer once we have zipWithIndex - (for ((o, i) <- Observable(1 to 18).window(5) zip Observable(0 until 4); n <- o) - yield s"Observable#$i emits $n") - .subscribe(output(_)) + (for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o) + yield s"Observable#$i emits $n" + ).subscribe(output(_)) } @Test def testReduce() { @@ -217,6 +216,7 @@ class RxScalaDemo extends JUnitSuite { }).flatten.toBlockingObservable.foreach(println(_)) } + @Ignore // TODO something's bad here @Test def timingTest1() { val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3) @@ -368,13 +368,13 @@ class RxScalaDemo extends JUnitSuite { @Test def parallelExample() { val t0 = System.currentTimeMillis() - Observable(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_)) + Observable.from(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_)) println(s"Work took ${System.currentTimeMillis()-t0} ms") } @Test def exampleWithoutParallel() { val t0 = System.currentTimeMillis() - work(Observable(1 to 10)).toBlockingObservable.foreach(println(_)) + work(Observable.from(1 to 10)).toBlockingObservable.foreach(println(_)) println(s"Work took ${System.currentTimeMillis()-t0} ms") } @@ -402,11 +402,10 @@ class RxScalaDemo extends JUnitSuite { } val o1 = Observable.interval(100 millis).take(3) - val o2 = Observable(new IOException("Oops")) + val o2 = Observable.error(new IOException("Oops")) printObservable(o1) - //waitFor(o1) printObservable(o2) - //waitFor(o2) + Thread.sleep(500) } @Test def materializeExample2() { @@ -431,6 +430,17 @@ class RxScalaDemo extends JUnitSuite { val condition = true Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1) } + + @Test def createExample() { + val o = Observable.create[String](observer => { + // this is bad because you cannot unsubscribe! + observer.onNext("a") + observer.onNext("b") + observer.onCompleted() + Subscription {} + }) + o.subscribe(println(_)) + } def output(s: String): Unit = println(s) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index a932eca936..77cb577277 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -18,19 +18,22 @@ package rx.lang.scala import java.lang.Exception import java.{ lang => jlang } -import scala.collection.Seq import scala.language.implicitConversions +import scala.collection.Seq import rx.util.functions._ +import rx.lang.scala.JavaConversions._ + /** * These function conversions convert between Scala functions and Rx `Func`s and `Action`s. * Most RxScala users won't need them, but they might be useful if one wants to use * the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants * to use a Java library taking/returning `Func`s and `Action`s. + * This object only contains conversions between functions. For conversions between types, + * use [[rx.lang.scala.JavaConversions]]. */ object ImplicitFunctionConversions { - import language.implicitConversions // implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} = // new Func2[rx.Scheduler, T, rx.Subscription] { @@ -46,25 +49,10 @@ object ImplicitFunctionConversions { } } - implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava - implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s) - - implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription - implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s) - - implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler - implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s) - - implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver - implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s) - - implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable - implicit def toScalaObservable[T](s: rx.Observable[_ <: T]): Observable[T] = Observable(s) - implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = new rx.Observable.OnSubscribeFunc[T] { def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = { - f(Observer(obs)) + f(obs) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala new file mode 100644 index 0000000000..276322f935 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala @@ -0,0 +1,36 @@ +package rx.lang.scala + +/** + * These functions convert between RxScala types RxJava types. + * Pure Scala projects won't need them, but they will be useful for polyglot projects. + * This object only contains conversions between types. For conversions between functions, + * use [[rx.lang.scala.ImplicitFunctionConversions]]. + */ +object JavaConversions { + import language.implicitConversions + + implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava + + implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s) + + implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription + + implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s) + + implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler + + implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s) + + implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver + + implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s) + + implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable + + implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = { + new Observable[T]{ + def asJavaObservable = observable + } + } + +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala index 21491bc96b..8a254af08c 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala @@ -20,6 +20,11 @@ package rx.lang.scala */ sealed trait Notification[+T] { def asJava: rx.Notification[_ <: T] + override def equals(that: Any): Boolean = that match { + case other: Notification[_] => asJava.equals(other.asJava) + case _ => false + } + override def hashCode(): Int = asJava.hashCode() } /** @@ -48,6 +53,7 @@ object Notification { class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { def value: T = asJava.getValue + override def toString = s"OnNext($value)" } object OnNext { @@ -64,6 +70,7 @@ object Notification { class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { def error: Throwable = asJava.getThrowable + override def toString = s"OnError($error)" } object OnError { @@ -78,7 +85,9 @@ object Notification { } } - class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {} + class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] { + override def toString = "OnCompleted()" + } object OnCompleted { diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index f031a2730c..9999ed9751 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -77,7 +77,8 @@ trait Observable[+T] import rx.util.functions._ import rx.lang.scala.util._ import rx.lang.scala.observables.BlockingObservable - import rx.lang.scala.ImplicitFunctionConversions._ + import ImplicitFunctionConversions._ + import JavaConversions._ def asJavaObservable: rx.Observable[_ <: T] @@ -197,7 +198,7 @@ trait Observable[+T] */ def multicast[R](subject: rx.lang.scala.Subject[T, R]): (() => Subscription, Observable[R]) = { val javaCO = asJavaObservable.multicast[R](subject.asJavaSubject) - (() => javaCO.connect(), Observable[R](javaCO)) + (() => javaCO.connect(), toScalaObservable[R](javaCO)) } /** @@ -214,7 +215,7 @@ trait Observable[+T] def ++[U >: T](that: Observable[U]): Observable[U] = { val o1: rx.Observable[_ <: U] = this.asJavaObservable val o2: rx.Observable[_ <: U] = that.asJavaObservable - Observable(rx.Observable.concat(o1, o2)) + toScalaObservable(rx.Observable.concat(o1, o2)) } /** @@ -231,7 +232,7 @@ trait Observable[+T] val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable val o5 = rx.Observable.concat[U](o4) - Observable[U](o5) + toScalaObservable[U](o5) } /** @@ -248,7 +249,7 @@ trait Observable[+T] * Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s */ def synchronize: Observable[T] = { - Observable[T](asJavaObservable.synchronize) + toScalaObservable[T](asJavaObservable.synchronize) } /** @@ -259,7 +260,7 @@ trait Observable[+T] * @return an Observable that emits timestamped items from the source Observable */ def timestamp: Observable[(Long, T)] = { - Observable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp()) + toScalaObservable[rx.util.Timestamped[_ <: T]](asJavaObservable.timestamp()) .map((t: rx.util.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue)) } @@ -282,7 +283,7 @@ trait Observable[+T] * Note that this function is private because Scala collections don't have such a function. */ private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = { - Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector)) + toScalaObservable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector)) } /** @@ -294,7 +295,7 @@ trait Observable[+T] def zipWithIndex: Observable[(T, Int)] = { val fScala: (T, Integer) => (T, Int) = (elem: T, index: Integer) => (elem, index) val fJava : Func2[_ >: T, Integer, _ <: (T, Int)] = fScala - Observable[(T, Int)](asJavaObservable.mapWithIndex[(T, Int)](fJava)) + toScalaObservable[(T, Int)](asJavaObservable.mapWithIndex[(T, Int)](fJava)) } /** @@ -526,9 +527,9 @@ trait Observable[+T] def window(closings: () => Observable[Closing]): Observable[Observable[T]] = { val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func) - val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { + val o2 = toScalaObservable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => { val x2 = x.asInstanceOf[rx.Observable[_ <: T]] - Observable[T](x2) + toScalaObservable[T](x2) }) o2 } @@ -724,7 +725,7 @@ trait Observable[+T] * evaluates as `true` */ def filter(predicate: T => Boolean): Observable[T] = { - Observable[T](asJavaObservable.filter(predicate)) + toScalaObservable[T](asJavaObservable.filter(predicate)) } /** @@ -737,7 +738,7 @@ trait Observable[+T] * @return an Observable that emits the same items as the source Observable, then invokes the function */ def finallyDo(action: () => Unit): Observable[T] = { - Observable[T](asJavaObservable.finallyDo(action)) + toScalaObservable[T](asJavaObservable.finallyDo(action)) } /** @@ -755,7 +756,7 @@ trait Observable[+T] * obtained from this transformation. */ def flatMap[R](f: T => Observable[R]): Observable[R] = { - Observable[R](asJavaObservable.flatMap[R](new Func1[T, rx.Observable[_ <: R]]{ + toScalaObservable[R](asJavaObservable.flatMap[R](new Func1[T, rx.Observable[_ <: R]]{ def call(t1: T): rx.Observable[_ <: R] = { f(t1).asJavaObservable } })) } @@ -772,7 +773,7 @@ trait Observable[+T] * given function */ def map[R](func: T => R): Observable[R] = { - Observable[R](asJavaObservable.map[R](new Func1[T,R] { + toScalaObservable[R](asJavaObservable.map[R](new Func1[T,R] { def call(t1: T): R = func(t1) })) } @@ -787,7 +788,7 @@ trait Observable[+T] * notifications of the source Observable */ def materialize: Observable[Notification[T]] = { - Observable[rx.Notification[_ <: T]](asJavaObservable.materialize()).map(Notification(_)) + toScalaObservable[rx.Notification[_ <: T]](asJavaObservable.materialize()).map(Notification(_)) } /** @@ -801,7 +802,7 @@ trait Observable[+T] * on the specified [[rx.lang.scala.Scheduler]] */ def subscribeOn(scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.subscribeOn(scheduler)) + toScalaObservable[T](asJavaObservable.subscribeOn(scheduler)) } /** @@ -815,7 +816,7 @@ trait Observable[+T] * specified [[rx.lang.scala.Scheduler]] */ def observeOn(scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.observeOn(scheduler)) + toScalaObservable[T](asJavaObservable.observeOn(scheduler)) } /** @@ -839,7 +840,7 @@ trait Observable[+T] val o1: Observable[Notification[U]] = this val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJava) val o3 = o2.asJavaObservable.dematerialize[U]() - Observable[U](o3) + toScalaObservable[U](o3) } /** @@ -870,7 +871,7 @@ trait Observable[+T] def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U] = { val f: Func1[Throwable, rx.Observable[_ <: U]] = (t: Throwable) => resumeFunction(t).asJavaObservable val f2 = f.asInstanceOf[Func1[Throwable, rx.Observable[Nothing]]] - Observable[U](asJavaObservable.onErrorResumeNext(f2)) + toScalaObservable[U](asJavaObservable.onErrorResumeNext(f2)) } /** @@ -901,7 +902,7 @@ trait Observable[+T] def onErrorResumeNext[U >: T](resumeSequence: Observable[U]): Observable[U] = { val rSeq1: rx.Observable[_ <: U] = resumeSequence.asJavaObservable val rSeq2: rx.Observable[Nothing] = rSeq1.asInstanceOf[rx.Observable[Nothing]] - Observable[U](asJavaObservable.onErrorResumeNext(rSeq2)) + toScalaObservable[U](asJavaObservable.onErrorResumeNext(rSeq2)) } /** @@ -934,7 +935,7 @@ trait Observable[+T] def onExceptionResumeNext[U >: T](resumeSequence: Observable[U]): Observable[U] = { val rSeq1: rx.Observable[_ <: U] = resumeSequence.asJavaObservable val rSeq2: rx.Observable[Nothing] = rSeq1.asInstanceOf[rx.Observable[Nothing]] - Observable[U](asJavaObservable.onExceptionResumeNext(rSeq2)) + toScalaObservable[U](asJavaObservable.onExceptionResumeNext(rSeq2)) } /** @@ -963,7 +964,7 @@ trait Observable[+T] def onErrorReturn[U >: T](resumeFunction: Throwable => U): Observable[U] = { val f1: Func1[Throwable, _ <: U] = resumeFunction val f2 = f1.asInstanceOf[Func1[Throwable, Nothing]] - Observable[U](asJavaObservable.onErrorReturn(f2)) + toScalaObservable[U](asJavaObservable.onErrorReturn(f2)) } /** @@ -988,7 +989,7 @@ trait Observable[+T] def reduce[U >: T](accumulator: (U, U) => U): Observable[U] = { val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator val func2 = func.asInstanceOf[Func2[T, T, T]] - Observable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].reduce(func2)) + toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].reduce(func2)) } /** @@ -1002,7 +1003,7 @@ trait Observable[+T] */ def replay: (() => Subscription, Observable[T]) = { val javaCO = asJavaObservable.replay() - (() => javaCO.connect(), Observable[T](javaCO)) + (() => javaCO.connect(), toScalaObservable[T](javaCO)) } /** @@ -1022,7 +1023,7 @@ trait Observable[+T] * the benefit of subsequent subscribers. */ def cache: Observable[T] = { - Observable[T](asJavaObservable.cache()) + toScalaObservable[T](asJavaObservable.cache()) } /** @@ -1036,7 +1037,7 @@ trait Observable[+T] */ def publish: (() => Subscription, Observable[T]) = { val javaCO = asJavaObservable.publish() - (() => javaCO.connect(), Observable[T](javaCO)) + (() => javaCO.connect(), toScalaObservable[T](javaCO)) } // TODO add Scala-like aggregate function @@ -1063,7 +1064,7 @@ trait Observable[+T] * from the items emitted by the source Observable */ def foldLeft[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = { - Observable[R](asJavaObservable.reduce(initialValue, new Func2[R,T,R]{ + toScalaObservable[R](asJavaObservable.reduce(initialValue, new Func2[R,T,R]{ def call(t1: R, t2: T): R = accumulator(t1,t2) })) } @@ -1079,7 +1080,7 @@ trait Observable[+T] * Observable at the specified time interval */ def sample(duration: Duration): Observable[T] = { - Observable[T](asJavaObservable.sample(duration.length, duration.unit)) + toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit)) } /** @@ -1095,7 +1096,7 @@ trait Observable[+T] * Observable at the specified time interval */ def sample(duration: Duration, scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler)) + toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler)) } /** @@ -1119,7 +1120,7 @@ trait Observable[+T] * @return an Observable that emits the results of each call to the accumulator function */ def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = { - Observable[R](asJavaObservable.scan(initialValue, new Func2[R,T,R]{ + toScalaObservable[R](asJavaObservable.scan(initialValue, new Func2[R,T,R]{ def call(t1: R, t2: T): R = accumulator(t1,t2) })) } @@ -1154,7 +1155,7 @@ trait Observable[+T] * emit the first `num` items that the source emits */ def drop(n: Int): Observable[T] = { - Observable[T](asJavaObservable.skip(n)) + toScalaObservable[T](asJavaObservable.skip(n)) } /** @@ -1169,7 +1170,7 @@ trait Observable[+T] * becomes false. */ def dropWhile(predicate: T => Boolean): Observable[T] = { - Observable(asJavaObservable.skipWhile(predicate)) + toScalaObservable(asJavaObservable.skipWhile(predicate)) } /** @@ -1189,7 +1190,7 @@ trait Observable[+T] * fewer than `num` items */ def take(n: Int): Observable[T] = { - Observable[T](asJavaObservable.take(n)) + toScalaObservable[T](asJavaObservable.take(n)) } /** @@ -1205,7 +1206,7 @@ trait Observable[+T] * satisfies the condition defined by `predicate` */ def takeWhile(predicate: T => Boolean): Observable[T] = { - Observable[T](asJavaObservable.takeWhile(predicate)) + toScalaObservable[T](asJavaObservable.takeWhile(predicate)) } /** @@ -1221,7 +1222,7 @@ trait Observable[+T] * Observable */ def takeRight(count: Int): Observable[T] = { - Observable[T](asJavaObservable.takeLast(count)) + toScalaObservable[T](asJavaObservable.takeLast(count)) } /** @@ -1239,7 +1240,7 @@ trait Observable[+T] * `other` emits its first item */ def takeUntil[E](that: Observable[E]): Observable[T] = { - Observable[T](asJavaObservable.takeUntil(that.asJavaObservable)) + toScalaObservable[T](asJavaObservable.takeUntil(that.asJavaObservable)) } /** @@ -1277,8 +1278,8 @@ trait Observable[+T] */ def groupBy[K](f: T => K): Observable[(K, Observable[T])] = { val o1 = asJavaObservable.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]] - val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, Observable[T](o)) - Observable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) + val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o)) + toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) } /** @@ -1301,7 +1302,7 @@ trait Observable[+T] val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable val o5 = rx.Observable.switchOnNext[U](o4) - Observable[U](o5) + toScalaObservable[U](o5) } // Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword @@ -1321,7 +1322,7 @@ trait Observable[+T] def merge[U >: T](that: Observable[U]): Observable[U] = { val thisJava: rx.Observable[_ <: U] = this.asJavaObservable val thatJava: rx.Observable[_ <: U] = that.asJavaObservable - Observable[U](rx.Observable.merge(thisJava, thatJava)) + toScalaObservable[U](rx.Observable.merge(thisJava, thatJava)) } /** @@ -1344,7 +1345,7 @@ trait Observable[+T] * `this` and `that` */ def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = { - Observable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable)) + toScalaObservable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable)) } /** @@ -1370,7 +1371,7 @@ trait Observable[+T] val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable val o5 = rx.Observable.merge[U](o4) - Observable[U](o5) + toScalaObservable[U](o5) } /** @@ -1401,7 +1402,7 @@ trait Observable[+T] val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable val o5 = rx.Observable.mergeDelayError[U](o4) - Observable[U](o5) + toScalaObservable[U](o5) } /** @@ -1415,7 +1416,7 @@ trait Observable[+T] */ def combineLatest[U](that: Observable[U]): Observable[(T, U)] = { val f: Func2[_ >: T, _ >: U, _ <: (T, U)] = (t: T, u: U) => (t, u) - Observable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, f)) + toScalaObservable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, f)) } /** @@ -1434,7 +1435,7 @@ trait Observable[+T] * @see `Observable.debounce` */ def throttleWithTimeout(timeout: Duration): Observable[T] = { - Observable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit)) + toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit)) } /** @@ -1453,7 +1454,7 @@ trait Observable[+T] * @see `Observable.throttleWithTimeout` */ def debounce(timeout: Duration): Observable[T] = { - Observable[T](asJavaObservable.debounce(timeout.length, timeout.unit)) + toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit)) } /** @@ -1473,7 +1474,7 @@ trait Observable[+T] * @see `Observable.throttleWithTimeout` */ def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.debounce(timeout.length, timeout.unit, scheduler)) + toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit, scheduler)) } /** @@ -1491,7 +1492,7 @@ trait Observable[+T] * @see `Observable.debounce` */ def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit, scheduler)) + toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit, scheduler)) } /** @@ -1508,7 +1509,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. */ def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit, scheduler)) + toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit, scheduler)) } /** @@ -1523,7 +1524,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. */ def throttleFirst(skipDuration: Duration): Observable[T] = { - Observable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit)) + toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit)) } /** @@ -1538,7 +1539,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. */ def throttleLast(intervalDuration: Duration): Observable[T] = { - Observable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit)) + toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit)) } /** @@ -1553,7 +1554,7 @@ trait Observable[+T] * @return Observable which performs the throttle operation. */ def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = { - Observable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler)) + toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler)) } /** @@ -1661,7 +1662,7 @@ trait Observable[+T] * @return an Observable of sequentially distinct items */ def distinctUntilChanged: Observable[T] = { - Observable[T](asJavaObservable.distinctUntilChanged) + toScalaObservable[T](asJavaObservable.distinctUntilChanged) } /** @@ -1676,7 +1677,7 @@ trait Observable[+T] * @return an Observable of sequentially distinct items */ def distinctUntilChanged[U](keySelector: T => U): Observable[T] = { - Observable[T](asJavaObservable.distinctUntilChanged[U](keySelector)) + toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector)) } /** @@ -1687,7 +1688,7 @@ trait Observable[+T] * @return an Observable of distinct items */ def distinct: Observable[T] = { - Observable[T](asJavaObservable.distinct()) + toScalaObservable[T](asJavaObservable.distinct()) } /** @@ -1702,7 +1703,7 @@ trait Observable[+T] * @return an Observable of distinct items */ def distinct[U](keySelector: T => U): Observable[T] = { - Observable[T](asJavaObservable.distinct[U](keySelector)) + toScalaObservable[T](asJavaObservable.distinct[U](keySelector)) } /** @@ -1714,7 +1715,7 @@ trait Observable[+T] * as its single item. */ def length: Observable[Int] = { - Observable[Integer](asJavaObservable.count()).map(_.intValue()) + toScalaObservable[Integer](asJavaObservable.count()).map(_.intValue()) } /** @@ -1744,7 +1745,7 @@ trait Observable[+T] * @return Observable with retry logic. */ def retry(retryCount: Int): Observable[T] = { - Observable[T](asJavaObservable.retry(retryCount)) + toScalaObservable[T](asJavaObservable.retry(retryCount)) } /** @@ -1761,7 +1762,7 @@ trait Observable[+T] * @return Observable with retry logic. */ def retry: Observable[T] = { - Observable[T](asJavaObservable.retry()) + toScalaObservable[T](asJavaObservable.retry()) } /** @@ -1785,8 +1786,8 @@ trait Observable[+T] */ def parallel[R](f: Observable[T] => Observable[R]): Observable[R] = { val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => f(Observable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - Observable[R](asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava)) + (jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava)) } /** @@ -1800,8 +1801,8 @@ trait Observable[+T] */ def parallel[R](f: Observable[T] => Observable[R], scheduler: Scheduler): Observable[R] = { val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => f(Observable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - Observable[R](asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler)) + (jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler)) } /** Tests whether a predicate holds for some of the elements of this `Observable`. @@ -1811,7 +1812,7 @@ trait Observable[+T] * holds for some of the elements of this Observable, and `false` otherwise. */ def exists(p: T => Boolean): Observable[Boolean] = { - Observable[java.lang.Boolean](asJavaObservable.exists(p)).map(_.booleanValue()) + toScalaObservable[java.lang.Boolean](asJavaObservable.exists(p)).map(_.booleanValue()) } /** Tests whether this `Observable` emits no elements. @@ -1820,7 +1821,7 @@ trait Observable[+T] * emits no elements, and `false` otherwise. */ def isEmpty: Observable[Boolean] = { - Observable[java.lang.Boolean](asJavaObservable.isEmpty).map(_.booleanValue()) + toScalaObservable[java.lang.Boolean](asJavaObservable.isEmpty()).map(_.booleanValue()) } def withFilter(p: T => Boolean): WithFilter[T] = { @@ -1836,7 +1837,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(observer: Observer[T]): Observable[T] = { - Observable[T](asJavaObservable.doOnEach(observer.asJavaObserver)) + toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver)) } /** @@ -1848,7 +1849,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit): Observable[T] = { - Observable[T](asJavaObservable.doOnEach( + toScalaObservable[T](asJavaObservable.doOnEach( onNext )) } @@ -1863,7 +1864,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = { - Observable[T](asJavaObservable.doOnEach( + toScalaObservable[T](asJavaObservable.doOnEach( onNext, onError )) @@ -1880,7 +1881,7 @@ trait Observable[+T] * @return an Observable with the side-effecting behavior applied. */ def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = { - Observable[T](asJavaObservable.doOnEach( + toScalaObservable[T](asJavaObservable.doOnEach( onNext, onError, onCompleted @@ -1895,7 +1896,11 @@ object Observable { import scala.collection.JavaConverters._ import scala.collection.immutable.Range import scala.concurrent.duration.Duration + import scala.concurrent.{Future, ExecutionContext} + import scala.util.{Success, Failure} import ImplicitFunctionConversions._ + import JavaConversions._ + import rx.lang.scala.subjects.AsyncSubject private[scala] def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = { @@ -1906,16 +1911,7 @@ object Observable { private[scala] def jObsOfJObsToScObsOfScObs[T](jObs: rx.Observable[_ <: rx.Observable[_ <: T]]): Observable[Observable[T]] = { val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]]{ def asJavaObservable = jObs } - oScala1.map((oJava: rx.Observable[_ <: T]) => new Observable[T]{ def asJavaObservable = oJava}) - } - - /** - * Creates a new Scala Observable from a given Java Observable. - */ - private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = { - new Observable[T]{ - def asJavaObservable = observable - } + oScala1.map((oJava: rx.Observable[_ <: T]) => oJava) } /** @@ -1943,16 +1939,8 @@ object Observable { * @return * an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function. */ - def apply[T](func: Observer[T] => Subscription): Observable[T] = { - Observable[T](rx.Observable.create(new OnSubscribeFunc[T] { - def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = { - func(Observer(t1)) - } - })) - } - def create[T](func: Observer[T] => Subscription): Observable[T] = { - Observable[T](rx.Observable.create(new OnSubscribeFunc[T] { + toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] { def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = { func(Observer(t1)) } @@ -1972,8 +1960,8 @@ object Observable { * @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] * method when the Observer subscribes to it */ - def apply[T](exception: Throwable): Observable[T] = { - Observable[T](rx.Observable.error(exception)) + def error[T](exception: Throwable): Observable[T] = { + toScalaObservable[T](rx.Observable.error(exception)) } /** @@ -1993,23 +1981,45 @@ object Observable { * @return an Observable that emits each item in the source Array */ def apply[T](items: T*): Observable[T] = { - Observable[T](rx.Observable.from(items.toIterable.asJava)) + toScalaObservable[T](rx.Observable.from(items.toIterable.asJava)) + } + + /** Returns an Observable emitting the value produced by the Future as its single item. + * If the future fails, the Observable will fail as well. + * + * @param f Future whose value ends up in the resulting Observable + * @return an Observable completed after producing the value of the future, or with an exception + */ + def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T] = { + val s = AsyncSubject[T]() + f.onComplete { + case Failure(e) => + s.onError(e) + case Success(c) => + s.onNext(c) + s.onCompleted() + } + s } /** - * Generates an Observable that emits a sequence of integers within a specified range. + * Converts an `Iterable` into an Observable. * - * + * * - * Implementation note: the entire range will be immediately emitted each time an [[rx.lang.scala.Observer]] subscribes. - * Since this occurs before the [[rx.lang.scala.Subscription]] is returned, - * it in not possible to unsubscribe from the sequence before it completes. + * Note: the entire iterable sequence is immediately emitted each time an + * Observer subscribes. Since this occurs before the + * `Subscription` is returned, it is not possible to unsubscribe from + * the sequence before it completes. * - * @param range the range - * @return an Observable that emits a range of sequential integers + * @param iterable the source `Iterable` sequence + * @param the type of items in the `Iterable` sequence and the + * type of items to be emitted by the resulting Observable + * @return an Observable that emits each item in the source `Iterable` + * sequence */ - def apply(range: Range): Observable[Int] = { - Observable[Int](rx.Observable.from(range.toIterable.asJava)) + def from[T](iterable: Iterable[T]): Observable[T] = { + toScalaObservable(rx.Observable.from(iterable.asJava)) } /** @@ -2032,7 +2042,7 @@ object Observable { * factory function */ def defer[T](observable: => Observable[T]): Observable[T] = { - Observable[T](rx.Observable.defer[T](() => observable.asJavaObservable)) + toScalaObservable[T](rx.Observable.defer[T](() => observable.asJavaObservable)) } /** @@ -2045,7 +2055,7 @@ object Observable { * @return an Observable that never sends any items or notifications to an [[rx.lang.scala.Observer]] */ def never: Observable[Nothing] = { - Observable[Nothing](rx.Observable.never()) + toScalaObservable[Nothing](rx.Observable.never()) } /** @@ -2056,7 +2066,7 @@ object Observable { * @return an Observable that emits the zipped Observables */ def zip[A, B, C](obA: Observable[A], obB: Observable[B], obC: Observable[C]): Observable[(A, B, C)] = { - Observable[(A, B, C)](rx.Observable.zip[A, B, C, (A, B, C)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, (a: A, b: B, c: C) => (a, b, c))) + toScalaObservable[(A, B, C)](rx.Observable.zip[A, B, C, (A, B, C)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, (a: A, b: B, c: C) => (a, b, c))) } /** @@ -2067,7 +2077,7 @@ object Observable { * @return an Observable that emits the zipped Observables */ def zip[A, B, C, D](obA: Observable[A], obB: Observable[B], obC: Observable[C], obD: Observable[D]): Observable[(A, B, C, D)] = { - Observable[(A, B, C, D)](rx.Observable.zip[A, B, C, D, (A, B, C, D)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, obD.asJavaObservable, (a: A, b: B, c: C, d: D) => (a, b, c, d))) + toScalaObservable[(A, B, C, D)](rx.Observable.zip[A, B, C, D, (A, B, C, D)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, obD.asJavaObservable, (a: A, b: B, c: C, d: D) => (a, b, c, d))) } /** @@ -2090,7 +2100,7 @@ object Observable { } val list = observables.map(_.asJavaObservable).asJavaObservable val o = rx.Observable.zip(list, f) - Observable[Seq[T]](o) + toScalaObservable[Seq[T]](o) } /** @@ -2103,7 +2113,7 @@ object Observable { * @return An Observable that emits a number each time interval. */ def interval(duration: Duration): Observable[Long] = { - Observable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit)).map(_.longValue()) + toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit)).map(_.longValue()) /*XXX*/ } @@ -2119,7 +2129,7 @@ object Observable { * @return An Observable that emits a number each time interval. */ def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = { - Observable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit, scheduler)).map(_.longValue()) + toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit, scheduler)).map(_.longValue()) /*XXX*/ } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala index 729450d2a5..88fd326e2f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/WithFilter.scala @@ -24,17 +24,18 @@ import ImplicitFunctionConversions.scalaFunction1ToRxFunc1 private[scala] class WithFilter[+T] (p: T => Boolean, asJava: rx.Observable[_ <: T]) { import ImplicitFunctionConversions._ + import JavaConversions._ def map[B](f: T => B): Observable[B] = { - Observable[B](asJava.filter(p).map[B](f)) + toScalaObservable[B](asJava.filter(p).map[B](f)) } def flatMap[B](f: T => Observable[B]): Observable[B] = { - Observable[B](asJava.filter(p).flatMap[B]((x: T) => f(x).asJavaObservable)) + toScalaObservable[B](asJava.filter(p).flatMap[B]((x: T) => f(x).asJavaObservable)) } def withFilter(q: T => Boolean): Observable[T] = { - Observable[T](asJava.filter((x: T) => p(x) && q(x))) + toScalaObservable[T](asJava.filter((x: T) => p(x) && q(x))) } // there is no foreach here, that's only available on BlockingObservable diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 21f272d03b..48c4423373 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -1,5 +1,8 @@ package rx.lang.scala +import scala.concurrent.{Future, Await} +import scala.concurrent.duration.Duration +import scala.concurrent.ExecutionContext.Implicits.global import org.junit.Assert._ import org.junit.{ Ignore, Test } import org.scalatest.junit.JUnitSuite @@ -60,13 +63,37 @@ class ObservableTests extends JUnitSuite { val msg = "msg6251" var receivedMsg = "none" try { - Observable[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single + Observable.error[Int](new Exception(msg)).firstOrElse(10).toBlockingObservable.single } catch { case e: Exception => receivedMsg = e.getCause.getMessage } assertEquals(receivedMsg, msg) } + @Test def testFromFuture() { + val o = Observable from Future { 5 } + assertEquals(5, o.toBlockingObservable.single) + } + + @Test def testFromFutureWithDelay() { + val o = Observable from Future { Thread.sleep(200); 42 } + assertEquals(42, o.toBlockingObservable.single) + } + + @Test def testFromFutureWithError() { + val err = new Exception("ooops42") + val o: Observable[Int] = Observable from Future { Thread.sleep(200); throw err } + assertEquals(List(Notification.OnError(err)), o.materialize.toBlockingObservable.toList) + } + + @Test def testFromFutureWithSubscribeOnlyAfterCompletion() { + val f = Future { Thread.sleep(200); 6 } + val o = Observable from f + val res = Await.result(f, Duration.Inf) + assertEquals(6, res) + assertEquals(6, o.toBlockingObservable.single) + } + /* @Test def testHead() { val observer = mock(classOf[Observer[Int]])