Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable creation from Subscriber[T]=>Unit for Scala #923

Merged
merged 6 commits into from
Mar 6, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object Olympics {
case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String)

def mountainBikeMedals: Observable[Medal] = Observable.items(
duration(100 millis), // a short delay because medals are only awarded some time after the Games began
Observable.items(
Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"),
Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
Expand Down Expand Up @@ -69,18 +70,33 @@ object Olympics {
).concat

// speed it up :D
val fourYears = 4000.millis
val oneYear = 1000.millis

val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?")
//val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?")

def fourYearsEmpty: Observable[Medal] = {
/** runs an infinite loop, and returns Bottom type (Nothing) */
def getNothing: Nothing = {
println("You shouldn't have called this method ;-)")
getNothing
}

/** returns an Observable which emits no elements and completes after a duration of d */
def duration(d: Duration): Observable[Nothing] = Observable.interval(d).take(1).filter(_ => false).map(_ => getNothing)

def fourYearsEmpty: Observable[Medal] = duration(4*oneYear)

def yearTicks: Observable[Int] =
(Observable.from(1996 to 2014) zip (Observable.items(-1) ++ Observable.interval(oneYear))).map(_._1)

/*
def fourYearsEmptyOld: Observable[Medal] = {
// TODO this should return an observable which emits nothing during fourYears and then completes
// Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests
// And this https://github.com/Netflix/RxJava/pull/289#issuecomment-24738668 also causes problems
// So we don't use this:
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
// But we just return empty, which completes immediately
Observable.empty
}
// Observable.empty
}*/

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import org.scalatest.junit.JUnitSuite
import rx.lang.scala._
import rx.lang.scala.schedulers._

/**
* Demo how the different operators can be used. In Eclipse, you can right-click
* a test and choose "Run As" > "Scala JUnit Test".
*
* For each operator added to Observable.java, we add a little usage demo here.
* It does not need to test the functionality (that's already done by the tests in
* RxJava core), but it should demonstrate how it can be used, to make sure that
* the method signature makes sense.
*/
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {

Expand Down Expand Up @@ -78,13 +87,9 @@ class RxScalaDemo extends JUnitSuite {
val first = Observable.from(List(10, 11, 12))
val second = Observable.from(List(10, 11, 12))

val b1 = (first zip second) map (p => p._1 == p._2) forall (b => b)

val equality = (a: Any, b: Any) => a == b
val b2 = (first zip second) map (p => equality(p._1, p._2)) forall (b => b)
val b = (first zip second) forall { case (a, b) => a == b }

assertTrue(b1.toBlockingObservable.single)
assertTrue(b2.toBlockingObservable.single)
assertTrue(b.toBlockingObservable.single)
}

@Test def testObservableComparisonWithForComprehension() {
Expand All @@ -93,7 +98,7 @@ class RxScalaDemo extends JUnitSuite {

val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)

val b1 = booleans.forall(_ == true) // without `== true`, b1 is assigned the forall function
val b1 = booleans.forall(identity)

assertTrue(b1.toBlockingObservable.single)
}
Expand Down Expand Up @@ -216,18 +221,21 @@ class RxScalaDemo extends JUnitSuite {
}).flatten.toBlockingObservable.foreach(println(_))
}

@Ignore // TODO something's bad here
@Test def timingTest1() {
val numbersByModulo3 = Observable.interval(1000 millis).take(9).groupBy(_ % 3)

val t0 = System.currentTimeMillis

(for ((modulo, numbers) <- numbersByModulo3) yield {
println("Observable for modulo" + modulo + " started at t = " + (System.currentTimeMillis - t0))
numbers.take(1) // <- TODO very unexpected
//numbers
numbers.map(n => s"${n} is in the modulo-$modulo group")
}).flatten.toBlockingObservable.foreach(println(_))
}

@Test def testOlympicYearTicks() {
Olympics.yearTicks.subscribe(println(_))
waitFor(Olympics.yearTicks)
}

@Test def groupByExample() {
val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country)
Expand All @@ -238,8 +246,10 @@ class RxScalaDemo extends JUnitSuite {
firstMedalOfEachCountry.subscribe(medal => {
println(s"${medal.country} wins its first medal in ${medal.year}")
})

Olympics.yearTicks.subscribe(year => println(s"\nYear $year starts."))

waitFor(firstMedalOfEachCountry)
waitFor(Olympics.yearTicks)
}

@Test def groupByUntilExample() {
Expand All @@ -250,30 +260,36 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def combineLatestExample() {
val first_counter = Observable.interval(250 millis)
val second_counter = Observable.interval(550 millis)
val combined_counter = first_counter.combineLatest(second_counter,
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
val combinedCounter = firstCounter.combineLatest(secondCounter,
(x: Long, y: Long) => List(x,y)) take 10

combined_counter subscribe {x => println(s"Emitted group: $x")}
combinedCounter subscribe {x => println(s"Emitted group: $x")}
waitFor(combinedCounter)
}

@Test def olympicsExampleWithoutPublish() {
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext"))
medals.subscribe(println(_)) // triggers an execution of medals Observable
waitFor(medals) // triggers another execution of medals Observable
}

@Test def olympicsExample() {
val medals = Olympics.mountainBikeMedals.publish
medals.subscribe(println(_))
@Test def olympicsExampleWithPublish() {
val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext")).publish
medals.subscribe(println(_)) // triggers an execution of medals Observable
medals.connect
//waitFor(medals)
waitFor(medals) // triggers another execution of medals Observable
}

@Test def exampleWithoutPublish() {
val unshared = List(1 to 4).toObservable
val unshared = Observable.from(1 to 4)
unshared.subscribe(n => println(s"subscriber 1 gets $n"))
unshared.subscribe(n => println(s"subscriber 2 gets $n"))
}

@Test def exampleWithPublish() {
val unshared = List(1 to 4).toObservable
val unshared = Observable.from(1 to 4)
val shared = unshared.publish
shared.subscribe(n => println(s"subscriber 1 gets $n"))
shared.subscribe(n => println(s"subscriber 2 gets $n"))
Expand Down Expand Up @@ -402,7 +418,7 @@ class RxScalaDemo extends JUnitSuite {
}

@Test def timestampExample() {
val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable
val timestamped = Observable.interval(100 millis).take(6).timestamp.toBlockingObservable
for ((millis, value) <- timestamped if value > 0) {
println(value + " at t = " + millis)
}
Expand Down Expand Up @@ -441,35 +457,57 @@ class RxScalaDemo extends JUnitSuite {
val oc3: rx.Notification[_ <: Int] = oc2.asJavaNotification
val oc4: rx.Notification[_ <: Any] = oc2.asJavaNotification
}

@Test def elementAtReplacement() {
assertEquals("b", List("a", "b", "c").toObservable.drop(1).first.toBlockingObservable.single)
}

@Test def elementAtOrDefaultReplacement() {
assertEquals("b", List("a", "b", "c").toObservable.drop(1).firstOrElse("!").toBlockingObservable.single)
assertEquals("!!", List("a", "b", "c").toObservable.drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def takeWhileWithIndexAlternative {
val condition = true
List("a", "b").toObservable.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
}

@Test def createExample() {
def calculateElement(index: Int): String = {
println("omg I'm calculating so hard")
index match {
case 0 => "a"
case 1 => "b"
case _ => throw new IllegalArgumentException
}
}

/**
* This is a bad way of using Observable.create, because even if the consumer unsubscribes,
* all elements are calculated.
*/
@Test def createExampleBad() {
val o = Observable.create[String](observer => {
// this is bad because you cannot unsubscribe!
observer.onNext("a")
observer.onNext("b")
observer.onNext(calculateElement(0))
observer.onNext(calculateElement(1))
observer.onCompleted()
Subscription {}
})
o.subscribe(println(_))
o.take(1).subscribe(println(_))
}

/**
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
* calculated.
*/
@Test def createExampleGood() {
val o = Observable[String](subscriber => {
var i = 0
while (i < 2 && !subscriber.isUnsubscribed) {
subscriber.onNext(calculateElement(i))
i += 1
}
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
})
o.take(1).subscribe(println(_))
}

def output(s: String): Unit = println(s)

// blocks until obs has completed
/** Subscribes to obs and waits until obs has completed. Note that if you subscribe to
* obs yourself and also call waitFor(obs), all side-effects of subscribing to obs
* will happen twice.
*/
def waitFor[T](obs: Observable[T]): Unit = {
obs.toBlockingObservable.toIterable.last
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package rx.lang.scala

import java.lang.Exception
import java.{ lang => jlang }

import scala.language.implicitConversions
import scala.collection.Seq

import rx.functions._
import rx.lang.scala.JavaConversions._

Expand Down Expand Up @@ -56,6 +54,13 @@ object ImplicitFunctionConversions {
}
}

implicit def scalaAction1ToOnSubscribe[T](f: Subscriber[T] => Unit) =
new rx.Observable.OnSubscribe[T] {
def call(s: rx.Subscriber[_ >: T]): Unit = {
f(s)
}
}

implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
new Func0[B] {
def call(): B = param
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ object JavaConversions {

implicit def toScalaSubscription(s: rx.Subscription): Subscription = Subscription(s)

implicit def toJavaSubscriber[T](s: Subscriber[T]): rx.Subscriber[_ >: T] = s.asJavaSubscriber

implicit def toScalaSubscriber[T](s: rx.Subscriber[_ >: T]): Subscriber[T] = Subscriber(s)

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJavaScheduler
implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,21 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler))
}

/**
* Return an Observable that emits the results of sampling the items emitted by the source Observable
* whenever the specified sampler Observable emits an item or completes.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sample.o.png">
*
* @param sampler
* the Observable to use for sampling the source Observable
* @return an Observable that emits the results of sampling the items emitted by this Observable whenever
* the sampler Observable emits an item or completes
*/
def sample(sampler: Observable[Any]): Observable[T] = {
toScalaObservable[T](asJavaObservable.sample(sampler))
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -2257,9 +2272,6 @@ object Observable {
* should invoke the Observer's [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onError onError]], and [[rx.lang.scala.Observer.onCompleted onCompleted]] methods
* appropriately.
*
* A well-formed Observable must invoke either the Observer's `onCompleted` method
* exactly once or its `onError` method exactly once.
*
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a>
* for detailed information.
*
Expand All @@ -2273,6 +2285,7 @@ object Observable {
* @return
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
*/
@deprecated("Use `apply[T](Subscriber[T] => Unit)` instead", "0.17.0")
def create[T](func: Observer[T] => Subscription): Observable[T] = {
toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] {
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
Expand All @@ -2281,6 +2294,41 @@ object Observable {
}))
}

/*
Note: It's dangerous to have two overloads where one takes an `Observer[T] => Subscription`
function and the other takes a `Subscriber[T] => Unit` function, because expressions like
`o => Subscription{}` have both of these types.
So we call the old create method "create", and the new create method "apply".
Try it out yourself here:
def foo[T]: Unit = {
val fMeant: Observer[T] => Subscription = o => Subscription{}
val fWrong: Subscriber[T] => Unit = o => Subscription{}
}
*/

/**
* Returns an Observable that will execute the specified function when a someone subscribes to it.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/create.png">
*
* Write the function you pass so that it behaves as an Observable: It should invoke the
* Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
*
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
* information.
*
* @tparam T
* the type of the items that this Observable emits
* @param f
* a function that accepts a `Subscriber[T]`, and invokes its `onNext`,
* `onError`, and `onCompleted` methods as appropriate
* @return an Observable that, when someone subscribes to it, will execute the specified
* function
*/
def apply[T](f: Subscriber[T] => Unit): Observable[T] = {
toScalaObservable(rx.Observable.create(f))
}

/**
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
* method when the Observer subscribes to it.
Expand Down
Loading