Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating Observables in Scala: Approach04 #561

Merged
merged 5 commits into from
Dec 5, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.lang.scala.examples.Movie;
import rx.lang.scala.examples.MovieLib;
import rx.util.functions.Action1;
import static rx.lang.scala.ImplicitFunctionConversions.toScalaObservable;
import static rx.lang.scala.JavaConversions.toScalaObservable;

public class MovieLibUsage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.DurationLong
import scala.language.postfixOps
import scala.language.implicitConversions

import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Ignore
import org.junit.Test
import org.scalatest.junit.JUnitSuite

import rx.lang.scala.Notification
import rx.lang.scala.Observable
import rx.lang.scala._
import rx.lang.scala.concurrency._

@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
Expand Down Expand Up @@ -132,15 +132,14 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def rangeAndBufferExample() {
val o = Observable(1 to 18)
val o = Observable.from(1 to 18)
o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]")))
}

@Test def windowExample() {
// this will be nicer once we have zipWithIndex
(for ((o, i) <- Observable(1 to 18).window(5) zip Observable(0 until 4); n <- o)
yield s"Observable#$i emits $n")
.subscribe(output(_))
(for ((o, i) <- Observable.from(1 to 18).window(5).zipWithIndex; n <- o)
yield s"Observable#$i emits $n"
).subscribe(output(_))
}

@Test def testReduce() {
Expand Down Expand Up @@ -217,6 +216,7 @@ class RxScalaDemo extends JUnitSuite {
}).flatten.toBlockingObservable.foreach(println(_))
}

@Ignore // TODO something's bad here
@Test def timingTest1() {
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)

Expand Down Expand Up @@ -368,13 +368,13 @@ class RxScalaDemo extends JUnitSuite {

@Test def parallelExample() {
val t0 = System.currentTimeMillis()
Observable(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_))
Observable.from(1 to 10).parallel(work(_)).toBlockingObservable.foreach(println(_))
println(s"Work took ${System.currentTimeMillis()-t0} ms")
}

@Test def exampleWithoutParallel() {
val t0 = System.currentTimeMillis()
work(Observable(1 to 10)).toBlockingObservable.foreach(println(_))
work(Observable.from(1 to 10)).toBlockingObservable.foreach(println(_))
println(s"Work took ${System.currentTimeMillis()-t0} ms")
}

Expand Down Expand Up @@ -402,11 +402,10 @@ class RxScalaDemo extends JUnitSuite {
}

val o1 = Observable.interval(100 millis).take(3)
val o2 = Observable(new IOException("Oops"))
val o2 = Observable.error(new IOException("Oops"))
printObservable(o1)
//waitFor(o1)
printObservable(o2)
//waitFor(o2)
Thread.sleep(500)
}

@Test def materializeExample2() {
Expand All @@ -431,6 +430,17 @@ class RxScalaDemo extends JUnitSuite {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
}

@Test def createExample() {
val o = Observable.create[String](observer => {
// this is bad because you cannot unsubscribe!
observer.onNext("a")
observer.onNext("b")
observer.onCompleted()
Subscription {}
})
o.subscribe(println(_))
}

def output(s: String): Unit = println(s)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ package rx.lang.scala
import java.lang.Exception
import java.{ lang => jlang }

import scala.collection.Seq
import scala.language.implicitConversions
import scala.collection.Seq

import rx.util.functions._
import rx.lang.scala.JavaConversions._


/**
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
* Most RxScala users won't need them, but they might be useful if one wants to use
* the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
* to use a Java library taking/returning `Func`s and `Action`s.
* This object only contains conversions between functions. For conversions between types,
* use [[rx.lang.scala.JavaConversions]].
*/
object ImplicitFunctionConversions {
import language.implicitConversions

// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
// new Func2[rx.Scheduler, T, rx.Subscription] {
Expand All @@ -46,25 +49,10 @@ object ImplicitFunctionConversions {
}
}

implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava
implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s)

implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription
implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver
implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)

implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable
implicit def toScalaObservable[T](s: rx.Observable[_ <: T]): Observable[T] = Observable(s)

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
f(Observer(obs))
f(obs)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package rx.lang.scala

/**
* These functions convert between RxScala types RxJava types.
* Pure Scala projects won't need them, but they will be useful for polyglot projects.
* This object only contains conversions between types. For conversions between functions,
* use [[rx.lang.scala.ImplicitFunctionConversions]].
*/
object JavaConversions {
import language.implicitConversions

implicit def toJavaNotification[T](s: Notification[T]): rx.Notification[_ <: T] = s.asJava

implicit def toScalaNotification[T](s: rx.Notification[_ <: T]): Notification[T] = Notification(s)

implicit def toJavaSubscription(s: Subscription): rx.Subscription = s.asJavaSubscription

implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler

implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def toJavaObserver[T](s: Observer[T]): rx.Observer[_ >: T] = s.asJavaObserver

implicit def toScalaObserver[T](s: rx.Observer[_ >: T]): Observer[T] = Observer(s)

implicit def toJavaObservable[T](s: Observable[T]): rx.Observable[_ <: T] = s.asJavaObservable

implicit def toScalaObservable[T](observable: rx.Observable[_ <: T]): Observable[T] = {
new Observable[T]{
def asJavaObservable = observable
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ package rx.lang.scala
*/
sealed trait Notification[+T] {
def asJava: rx.Notification[_ <: T]
override def equals(that: Any): Boolean = that match {
case other: Notification[_] => asJava.equals(other.asJava)
case _ => false
}
override def hashCode(): Int = asJava.hashCode()
}

/**
Expand Down Expand Up @@ -48,6 +53,7 @@ object Notification {

class OnNext[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
def value: T = asJava.getValue
override def toString = s"OnNext($value)"
}

object OnNext {
Expand All @@ -64,6 +70,7 @@ object Notification {

class OnError[+T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
def error: Throwable = asJava.getThrowable
override def toString = s"OnError($error)"
}

object OnError {
Expand All @@ -78,7 +85,9 @@ object Notification {
}
}

class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {}
class OnCompleted[T](val asJava: rx.Notification[_ <: T]) extends Notification[T] {
override def toString = "OnCompleted()"
}

object OnCompleted {

Expand Down
Loading