Skip to content

Commit 669911a

Browse files
Merge pull request #561 from samuelgruetter/scala-constructors
Creating Observables in Scala: Approach04
2 parents af81c41 + 4df4cca commit 669911a

File tree

8 files changed

+224
-143
lines changed

8 files changed

+224
-143
lines changed

language-adaptors/rxjava-scala/src/examples/java/rx/lang/scala/examples/MovieLibUsage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import rx.lang.scala.examples.Movie;
2020
import rx.lang.scala.examples.MovieLib;
2121
import rx.util.functions.Action1;
22-
import static rx.lang.scala.ImplicitFunctionConversions.toScalaObservable;
22+
import static rx.lang.scala.JavaConversions.toScalaObservable;
2323

2424
public class MovieLibUsage {
2525

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

+22-12
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,15 @@ import scala.concurrent.duration.Duration
2121
import scala.concurrent.duration.DurationInt
2222
import scala.concurrent.duration.DurationLong
2323
import scala.language.postfixOps
24+
import scala.language.implicitConversions
2425

2526
import org.junit.Assert.assertEquals
2627
import org.junit.Assert.assertTrue
2728
import org.junit.Ignore
2829
import org.junit.Test
2930
import org.scalatest.junit.JUnitSuite
3031

31-
import rx.lang.scala.Notification
32-
import rx.lang.scala.Observable
32+
import rx.lang.scala._
3333
import rx.lang.scala.concurrency._
3434

3535
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
@@ -132,15 +132,14 @@ class RxScalaDemo extends JUnitSuite {
132132
}
133133

134134
@Test def rangeAndBufferExample() {
135-
val o = Observable(1 to 18)
135+
val o = Observable.from(1 to 18)
136136
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
137137
}
138138

139139
@Test def windowExample() {
140-
// this will be nicer once we have zipWithIndex
141-
(for ((o, i) <- Observable(1 to 18).window(5) zip Observable(0 until 4); n <- o)
142-
yield s"Observable#$i emits $n")
143-
.subscribe(output(_))
140+
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
141+
yield s"Observable#$i emits $n"
142+
).subscribe(output(_))
144143
}
145144

146145
@Test def testReduce() {
@@ -217,6 +216,7 @@ class RxScalaDemo extends JUnitSuite {
217216
}).flatten.toBlockingObservable.foreach(println(_))
218217
}
219218

219+
@Ignore // TODO something's bad here
220220
@Test def timingTest1() {
221221
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)
222222

@@ -368,13 +368,13 @@ class RxScalaDemo extends JUnitSuite {
368368

369369
@Test def parallelExample() {
370370
val t0 = System.currentTimeMillis()
371-
Observable(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_))
371+
Observable.from(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_))
372372
println(s"Work took ${System.currentTimeMillis()-t0} ms")
373373
}
374374

375375
@Test def exampleWithoutParallel() {
376376
val t0 = System.currentTimeMillis()
377-
work(Observable(1 to 10)).toBlockingObservable.foreach(println(_))
377+
work(Observable.from(1 to 10)).toBlockingObservable.foreach(println(_))
378378
println(s"Work took ${System.currentTimeMillis()-t0} ms")
379379
}
380380

@@ -402,11 +402,10 @@ class RxScalaDemo extends JUnitSuite {
402402
}
403403

404404
val o1 = Observable.interval(100 millis).take(3)
405-
val o2 = Observable(new IOException("Oops"))
405+
val o2 = Observable.error(new IOException("Oops"))
406406
printObservable(o1)
407-
//waitFor(o1)
408407
printObservable(o2)
409-
//waitFor(o2)
408+
Thread.sleep(500)
410409
}
411410

412411
@Test def materializeExample2() {
@@ -431,6 +430,17 @@ class RxScalaDemo extends JUnitSuite {
431430
val condition = true
432431
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
433432
}
433+
434+
@Test def createExample() {
435+
val o = Observable.create[String](observer => {
436+
// this is bad because you cannot unsubscribe!
437+
observer.onNext("a")
438+
observer.onNext("b")
439+
observer.onCompleted()
440+
Subscription {}
441+
})
442+
o.subscribe(println(_))
443+
}
434444

435445
def output(s: String): Unit = println(s)
436446

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

+6-18
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@ package rx.lang.scala
1818
import java.lang.Exception
1919
import java.{ lang => jlang }
2020

21-
import scala.collection.Seq
2221
import scala.language.implicitConversions
22+
import scala.collection.Seq
2323

2424
import rx.util.functions._
25+
import rx.lang.scala.JavaConversions._
26+
2527

2628
/**
2729
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
2830
* Most RxScala users won't need them, but they might be useful if one wants to use
2931
* the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
3032
* to use a Java library taking/returning `Func`s and `Action`s.
33+
* This object only contains conversions between functions. For conversions between types,
34+
* use [[rx.lang.scala.JavaConversions]].
3135
*/
3236
object ImplicitFunctionConversions {
33-
import language.implicitConversions
3437

3538
// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
3639
// new Func2[rx.Scheduler, T, rx.Subscription] {
@@ -46,25 +49,10 @@ object ImplicitFunctionConversions {
4649
}
4750
}
4851

49-
implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava
50-
implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s)
51-
52-
implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription
53-
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)
54-
55-
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
56-
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
57-
58-
implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver
59-
implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)
60-
61-
implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable
62-
implicit def toScalaObservable[T](s: rx.Observable[_ <: T]): Observable[T] = Observable(s)
63-
6452
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
6553
new rx.Observable.OnSubscribeFunc[T] {
6654
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
67-
f(Observer(obs))
55+
f(obs)
6856
}
6957
}
7058

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package rx.lang.scala
2+
3+
/**
4+
* These functions convert between RxScala types RxJava types.
5+
* Pure Scala projects won't need them, but they will be useful for polyglot projects.
6+
* This object only contains conversions between types. For conversions between functions,
7+
* use [[rx.lang.scala.ImplicitFunctionConversions]].
8+
*/
9+
object JavaConversions {
10+
import language.implicitConversions
11+
12+
implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava
13+
14+
implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s)
15+
16+
implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription
17+
18+
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)
19+
20+
implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
21+
22+
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)
23+
24+
implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver
25+
26+
implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)
27+
28+
implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable
29+
30+
implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = {
31+
new Observable[T]{
32+
def asJavaObservable = observable
33+
}
34+
}
35+
36+
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ package rx.lang.scala
2020
*/
2121
sealed trait Notification[+T] {
2222
def asJava: rx.Notification[_ <: T]
23+
override def equals(that: Any): Boolean = that match {
24+
case other: Notification[_] => asJava.equals(other.asJava)
25+
case _ => false
26+
}
27+
override def hashCode(): Int = asJava.hashCode()
2328
}
2429

2530
/**
@@ -48,6 +53,7 @@ object Notification {
4853

4954
class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
5055
def value: T = asJava.getValue
56+
override def toString = s"OnNext($value)"
5157
}
5258

5359
object OnNext {
@@ -64,6 +70,7 @@ object Notification {
6470

6571
class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
6672
def error: Throwable = asJava.getThrowable
73+
override def toString = s"OnError($error)"
6774
}
6875

6976
object OnError {
@@ -78,7 +85,9 @@ object Notification {
7885
}
7986
}
8087

81-
class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {}
88+
class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
89+
override def toString = "OnCompleted()"
90+
}
8291

8392
object OnCompleted {
8493

0 commit comments

Comments
 (0)