Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Adaptor Improvements by Erik #562

Merged
merged 44 commits into from
Dec 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
83f1e56
Removed spurious package
AppliedDuality Nov 29, 2013
1949c42
Moved Subscription object to package rx.lang.scala.subscriptions with…
AppliedDuality Nov 29, 2013
3f844a8
Added immediate scheduler
AppliedDuality Nov 29, 2013
65c92bd
Added new thread scheduler
AppliedDuality Nov 29, 2013
e62cd3f
Added current thread scheduler
AppliedDuality Nov 29, 2013
3ad450c
Added ExecutorScheduler
AppliedDuality Nov 29, 2013
188ae7d
Added ScheduledExecutorServiceScheduler
AppliedDuality Nov 29, 2013
451f517
Added ThreadPoolForComputationScheduler
AppliedDuality Nov 29, 2013
4c23697
Added ThreadPoolForIOScheduler
AppliedDuality Nov 29, 2013
4f44f6e
Deleted old scheduler factory methods.
AppliedDuality Nov 29, 2013
54271d0
Deleted old Schedulers object.
AppliedDuality Nov 29, 2013
04a3ed3
Deleted spurious package object.
AppliedDuality Nov 29, 2013
ced58c8
Added TestScheduler
AppliedDuality Nov 29, 2013
0d00f26
Deleted spurious package object
AppliedDuality Nov 29, 2013
93073e2
Moved stuff around.
AppliedDuality Nov 29, 2013
b579835
Moved stuff around some more.
AppliedDuality Nov 29, 2013
ca2ce54
Fixed doc comment.
AppliedDuality Nov 29, 2013
180cebf
Removed dead function, left package object for scala docs.
AppliedDuality Nov 29, 2013
deebc0e
Made Subscription.app(rx.Subscription) private.
AppliedDuality Nov 29, 2013
4f34376
Added package names in doc comments to keep IntelliJ happy
AppliedDuality Nov 29, 2013
0ee0eb8
Factory methods for Observer.
AppliedDuality Nov 29, 2013
3a61819
Added copyright headers
AppliedDuality Nov 29, 2013
fc8ea03
Added copyright headers
AppliedDuality Nov 29, 2013
02ff7f7
Fixed ambigous definitions; kuddos to https://class.coursera.org/reac…
AppliedDuality Nov 29, 2013
096cb0b
Fixed type errors in Schedulers.
AppliedDuality Nov 30, 2013
0009925
Fixed implicit conversions to make compile work
AppliedDuality Nov 30, 2013
8ac60d9
Fixing tests
AppliedDuality Nov 30, 2013
eaa0de0
Fixing tests
AppliedDuality Nov 30, 2013
cbae35d
restore original gradlew
samuelgruetter Nov 30, 2013
baebd8c
add one missing import
samuelgruetter Nov 30, 2013
4fbb4ef
deleting def observable[T](=>T) requires deleting corresponding demo,…
samuelgruetter Nov 30, 2013
4f9105d
reorg imports of ImplicitFunctionConversions
samuelgruetter Nov 30, 2013
24e0baf
make zip with selector function private because
samuelgruetter Nov 30, 2013
3152d0e
scaladoc links in ThreadPoolForXxxScheduler
samuelgruetter Nov 30, 2013
ece590c
scaladoc for package rx.lang.scala
samuelgruetter Nov 30, 2013
766847f
restore subjects package object for scaladoc
samuelgruetter Nov 30, 2013
ea3894a
Merge pull request #1 from samuelgruetter/RxJavaBugFixesSam
headinthebox Nov 30, 2013
99a1d03
Pulled in changes from Samuel.
AppliedDuality Nov 30, 2013
25ce82f
Added Observable.create.
AppliedDuality Nov 30, 2013
5805deb
Double negation Observer.
AppliedDuality Dec 1, 2013
f699cd4
Made subjects safe
AppliedDuality Dec 1, 2013
4023679
Trivial code movements
AppliedDuality Dec 1, 2013
227e514
IntelliJ suggested style changes.
AppliedDuality Dec 2, 2013
38d4371
undo commit 99a1d035233100e32050240472a9bdc85521bc61
samuelgruetter Dec 4, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.scalatest.junit.JUnitSuite

import rx.lang.scala.Notification
import rx.lang.scala.Observable
import rx.lang.scala.observable
import rx.lang.scala.concurrency.Schedulers
import rx.lang.scala.concurrency._

@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {
Expand Down Expand Up @@ -167,21 +166,21 @@ class RxScalaDemo extends JUnitSuite {
@Test def testTwoSubscriptionsToOneInterval() {
val o = Observable.interval(100 millis).take(8)
o.subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
)
o.subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
)
waitFor(o)
}

@Test def schedulersExample() {
val o = Observable.interval(100 millis).take(8)
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
o.observeOn(NewThreadScheduler()).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
)
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
o.observeOn(NewThreadScheduler()).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
)
waitFor(o)
}
Expand Down Expand Up @@ -357,13 +356,13 @@ class RxScalaDemo extends JUnitSuite {
}

def square(x: Int): Int = {
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}")
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
Thread.sleep(100) // calculating a square is heavy work :)
x*x
}

def work(o1: Observable[Int]): Observable[String] = {
println(s"map() is being called on thread ${Thread.currentThread().getId()}")
println(s"map() is being called on thread ${Thread.currentThread().getId}")
o1.map(i => s"The square of $i is ${square(i)}")
}

Expand Down Expand Up @@ -428,40 +427,6 @@ class RxScalaDemo extends JUnitSuite {
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def observableLikeFuture1() {
implicit val scheduler = Schedulers.threadPoolForIO
val o1 = observable {
Thread.sleep(1000)
5
}
val o2 = observable {
Thread.sleep(500)
4
}
Thread.sleep(500)
val t1 = System.currentTimeMillis
println((o1 merge o2).first.toBlockingObservable.single)
println(System.currentTimeMillis - t1)
}

@Test def observableLikeFuture2() {
class Friend {}
val session = new Object {
def getFriends: List[Friend] = List(new Friend, new Friend)
}

implicit val scheduler = Schedulers.threadPoolForIO
val o: Observable[List[Friend]] = observable {
session.getFriends
}
o.subscribe(
friendList => println(friendList),
err => println(err.getMessage)
)

Thread.sleep(1500) // or convert to BlockingObservable
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestSchedulerExample extends JUnitSuite {

scheduler.advanceTimeTo(2 seconds)

val inOrdr = inOrder(observer);
val inOrdr = inOrder(observer)
inOrdr.verify(observer, times(1)).onNext(0L)
inOrdr.verify(observer, times(1)).onNext(1L)
inOrdr.verify(observer, never).onNext(2L)
Expand All @@ -37,7 +37,7 @@ class TestSchedulerExample extends JUnitSuite {

verify(observer, never).onNext(2L)

sub.unsubscribe();
sub.unsubscribe()

scheduler.advanceTimeTo(4 seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package rx.lang.scala

import java.lang.Exception
import java.{ lang => jlang }
import rx.lang.scala._
import rx.util.functions._

import scala.collection.Seq
import java.{lang => jlang}
import scala.language.implicitConversions

import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
* Most RxScala users won't need them, but they might be useful if one wants to use
Expand All @@ -32,10 +32,17 @@ import scala.language.implicitConversions
object ImplicitFunctionConversions {
import language.implicitConversions

// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
// new Func2[rx.Scheduler, T, rx.Subscription] {
// def call(s: rx.Scheduler, t: T): rx.Subscription = {
// action(rx.lang.scala.Scheduler(s), t).asJavaSubscription
// }
// }

implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
new Func2[rx.Scheduler, T, rx.Subscription] {
def call(s: rx.Scheduler, t: T): rx.Subscription = {
action(s, t).asJavaSubscription
action(Scheduler(s), t).asJavaSubscription
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc
/**
* The Observable interface that implements the Reactive Pattern.
*
* @param asJavaObservable the underlying Java observable
*
* @define subscribeObserverMain
* Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
* items and notifications from the Observable.
Expand Down Expand Up @@ -227,7 +225,6 @@ trait Observable[+T]
* otherwise you'll get a compilation error.
*
* @usecase def concat[U]: Observable[U]
* @inheritdoc
*/
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
Expand Down Expand Up @@ -273,7 +270,19 @@ trait Observable[+T]
* is the minumum of the number of `onNext` invocations of `this` and `that`.
*/
def zip[U](that: Observable[U]): Observable[(T, U)] = {
Observable[(T, U)](rx.Observable.zip[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, (t: T, u: U) => (t, u)))
zip(that, (t: T, u: U) => (t, u))
}

/**
* Returns an Observable formed from this Observable and another Observable by combining
* corresponding elements using the selector function.
* The number of `onNext` invocations of the resulting `Observable[(T, U)]`
* is the minumum of the number of `onNext` invocations of `this` and `that`.
*
* Note that this function is private because Scala collections don't have such a function.
*/
private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
}

/**
Expand Down Expand Up @@ -1844,7 +1853,7 @@ object Observable {
/**
* Creates a new Scala Observable from a given Java Observable.
*/
def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
new Observable[T]{
def asJavaObservable = observable
}
Expand All @@ -1867,13 +1876,13 @@ object Observable {
*
*
* @tparam T
* the type of the items that this Observable emits
* the type of the items that this Observable emits.
* @param func
* a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
* as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
* canceling the subscription
* @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given
* function
* canceling the subscription.
* @return
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
*/
def apply[T](func: Observer[T] => Subscription): Observable[T] = {
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
Expand All @@ -1883,16 +1892,26 @@ object Observable {
}))
}

def create[T](func: Observer[T] => Subscription): Observable[T] = {
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
func(Observer(t1))
}
}))
}

/**
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
* method when the Observer subscribes to it.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/error.png">
*
* @param exception
* the particular error to report
* @tparam T
* the type of the items (ostensibly) emitted by the Observable
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
* method when the Observer subscribes to it
*/
def apply[T](exception: Throwable): Observable[T] = {
Observable[T](rx.Observable.error(exception))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.lang.scala

import rx.joins.ObserverBase

/**
Provides a mechanism for receiving push-based notifications.
*
Expand All @@ -24,7 +26,11 @@ package rx.lang.scala
*/
trait Observer[-T] {

def asJavaObserver: rx.Observer[_ >: T]
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
protected def onCompletedCore(): Unit = onCompleted()
protected def onErrorCore(error: Throwable): Unit = onError(error)
protected def onNextCore(value: T): Unit = onNext(value)
}

/**
* Provides the Observer with new data.
Expand All @@ -33,30 +39,37 @@ trait Observer[-T] {
*
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
*/
def onNext(value: T): Unit = asJavaObserver.onNext(value)
def onNext(value: T): Unit

/**
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
*
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
*/
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
def onError(error: Throwable): Unit

/**
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
*
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
*/
def onCompleted(): Unit = asJavaObserver.onCompleted()
def onCompleted(): Unit

}

object Observer {
def apply[T](observer: rx.Observer[T]) : Observer[T] = {
new Observer[T]() {
def asJavaObserver: rx.Observer[_ >: T] = observer
}
}
}
/**
* Assume that the underlying rx.Observer does not need to be wrapped.
*/
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
new Observer[T] {

override def asJavaObserver = observer

def onNext(value: T): Unit = asJavaObserver.onNext(value)
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
def onCompleted(): Unit = asJavaObserver.onCompleted()

}
}
}
Loading