Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #1173 #1178

Merged
merged 6 commits into from
May 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object Notification {
* @param notification
* The [[rx.lang.scala.Notification]] to be deconstructed
* @return
* The [[java.lang.Throwable]] value contained in this notification.
* The `java.lang.Throwable` value contained in this notification.
*/
def unapply[U](notification: Notification[U]): Option[Throwable] = notification match {
case onError: OnError[U] => Some(onError.error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ import collection.JavaConversions._
* the observer
* @define subscribeObserverParamScheduler
* the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable
*
* @define subscribeSubscriberMain
* Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]].
*
* A typical implementation of `subscribe` does the following:
*
* It stores a reference to the Observer in a collection object, such as a `List[T]` object.
*
* It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
* sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
*
* An [[Observable]] instance is responsible for accepting all subscriptions
* and notifying all [[Subscriber]]s. Unless the documentation for a particular
* [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no
* assumptions about the order in which multiple [[Subscriber]]s will receive their notifications.
*
* @define subscribeSubscriberParamObserver
* the [[Subscriber]]
* @define subscribeSubscriberParamScheduler
* the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable
*
* @define subscribeAllReturn
* a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items
* before the Observable has finished sending them
Expand Down Expand Up @@ -125,6 +147,39 @@ trait Observable[+T]
*/
def apply(observer: Observer[T]): Subscription = subscribe(observer)

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @param scheduler $subscribeSubscriberParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = {
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
thisJava.subscribe(subscriber.asJavaSubscriber, scheduler)
}

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @return $subscribeAllReturn
*/
def subscribe(subscriber: Subscriber[T]): Subscription = {
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
thisJava.subscribe(subscriber.asJavaSubscriber)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compile error of removing casting:

error: ambiguous reference to overloaded definition,
[ant:scalac] both method subscribe in class Observable of type (x$1: rx.Subscriber[_ >: _$1], x$2: rx.Scheduler)rx.Subscription
[ant:scalac] and  method subscribe in class Observable of type (x$1: rx.Observer[_ >: _$1], x$2: rx.Scheduler)rx.Subscription
[ant:scalac] match argument types (rx.Subscriber[_$2],rx.lang.scala.Scheduler) and expected result type rx.lang.scala.Subscription
[ant:scalac]     asJavaObservable.subscribe(subscriber.asJavaSubscriber, scheduler)
[ant:scalac]                      ^

}

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @return $subscribeAllReturn
*/
def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber)

/**
* $subscribeCallbacksMainNoNotifications
*
Expand Down Expand Up @@ -287,7 +342,7 @@ trait Observable[+T]
*
* A well-behaved Observable does not interleave its invocations of the [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onCompleted onCompleted]], and [[rx.lang.scala.Observer.onError onError]] methods of
* its [[rx.lang.scala.Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`.
* `synchronize` enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
* [[Observable.serialize serialize]] enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
*
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s
Expand Down Expand Up @@ -2405,8 +2460,7 @@ trait Observable[+T]

/**
* Perform work in parallel by sharding an `Observable[T]` on a
* [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation computation]]
* [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output.
* [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output.
*
* @param f
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
Expand Down Expand Up @@ -2636,12 +2690,10 @@ trait Observable[+T]
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
* Observable, or index is less than 0
* @see `Observable.elementAt`
* @deprecated("Use `elementAt`", "0.18.0")
*/
@deprecated("Use `elementAt`", "0.18.0")
def apply(index: Int): Observable[T] = elementAt(index)

/**
Expand All @@ -2656,9 +2708,7 @@ trait Observable[+T]
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
* Observable, or index is less than 0
*/
def elementAt(index: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.elementAt(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package rx.lang.scala
trait Subscriber[-T] extends Observer[T] with Subscription {

self =>

private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber


private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
def onNext(value: T): Unit = self.onNext(value)
def onError(error: Throwable): Unit = self.onError(error)
def onCompleted(): Unit = self.onCompleted()
}


private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber

/**
* Used to register an unsubscribe callback.
*/
Expand All @@ -34,6 +34,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {

private[scala] def apply[T](subscriber: rx.Subscriber[T]): Subscriber[T] = new Subscriber[T] {
override val asJavaSubscriber = subscriber
override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
override val asJavaSubscription: rx.Subscription = asJavaSubscriber

override def onNext(value: T): Unit = asJavaSubscriber.onNext(value)
override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.

/**
* Adds a subscription to the group,
* or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed.
* or unsubscribes immediately is the [[rx.lang.scala.subscriptions.CompositeSubscription]] is unsubscribed.
* @param subscription the subscription to be added.
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
*/
def +=(subscription: Subscription): this.type = {
asJavaSubscription.add(subscription.asJavaSubscription)
Expand All @@ -62,7 +62,7 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
/**
* Removes and unsubscribes a subscription to the group,
* @param subscription the subscription be removed.
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
*/
def -=(subscription: Subscription): this.type = {
asJavaSubscription.remove(subscription.asJavaSubscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,16 @@ class CompletenessTest extends JUnitSuite {
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
"contains(T)" -> "contains(Any)",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"elementAt(Int)" -> "[use `.drop(index).first`]",
"elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
Expand All @@ -95,6 +94,7 @@ class CompletenessTest extends JUnitSuite {
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"repeat()" -> "repeat()",
"retry()" -> "retry()",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
Expand All @@ -113,8 +113,8 @@ class CompletenessTest extends JUnitSuite {
"skipLast(Int)" -> "dropRight(Int)",
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
"takeFirst()" -> "first",
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"subscribe()" -> "subscribe()",
"takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]",
"takeLast(Int)" -> "takeRight(Int)",
"takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]",
"timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])",
Expand All @@ -126,7 +126,6 @@ class CompletenessTest extends JUnitSuite {
"toList()" -> "toSeq",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)",
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",

Expand All @@ -135,32 +134,28 @@ class CompletenessTest extends JUnitSuite {
"averageDoubles(Observable[Double])" -> averageProblem,
"averageFloats(Observable[Float])" -> averageProblem,
"averageLongs(Observable[Long])" -> averageProblem,
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
"create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)",
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
"empty()" -> "apply(T*)",
"error(Throwable)" -> "apply(Throwable)",
"from(Array[T])" -> "apply(T*)",
"from(Iterable[_ <: T])" -> "apply(T*)",
"from(Array[T])" -> "[use `items(T*)`]",
"from(Iterable[_ <: T])" -> "from(Iterable[T])",
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
"from(Future[_ <: T], Scheduler)" -> fromFuture,
"just(T)" -> "apply(T*)",
"just(T)" -> "[use `items(T*)`]",
"just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
"range(Int, Int)" -> "apply(Range)",
"repeat()" -> "repeat()",
"retry()" -> "retry()",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]",
"range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]",
"range(Int, Int, Scheduler)" -> "[use `(start until (start + count)).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]`]",
"sum(Observable[Integer])" -> "sum(Numeric[U])",
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
"synchronize(Observable[T])" -> "synchronize",
"switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated,
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
Expand All @@ -174,7 +169,7 @@ class CompletenessTest extends JUnitSuite {
"concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]"
).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map(
// all 10 overloads of from:
"from(" + _ + ")" -> "apply(T*)"
"from(" + _ + ")" -> "[use `items(T*)`]"
).toMap ++ (3 to 9).map(i => {
// zip3-9:
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
Expand Down Expand Up @@ -216,6 +211,8 @@ class CompletenessTest extends JUnitSuite {
// TODO how can we filter out instance methods which were put into companion because
// of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'?
.filter(! _.contains("$extension"))
// `access$000` is public. How to distinguish it from others without hard-code?
.filter(! _.contains("access$000"))
}

// also applicable for Java types
Expand Down Expand Up @@ -373,7 +370,10 @@ class CompletenessTest extends JUnitSuite {
def escape(s: String) = s.replaceAllLiterally("[", "&lt;").replaceAllLiterally("]", "&gt;")

println("""
## Comparison of Scala Observable and Java Observable
---
layout: comparison
title: Comparison of Scala Observable and Java Observable
---

Note:
* This table contains both static methods and instance methods.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

import org.junit.Test
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.scalatest.junit.JUnitSuite

class SubscriberTests extends JUnitSuite {

@Test def testIssue1173() {
// https://github.com/Netflix/RxJava/issues/1173
val subscriber = Subscriber((n: Int) => println(n))
assertNotNull(subscriber.asJavaObserver)
assertNotNull(subscriber.asJavaSubscription)
assertNotNull(subscriber.asJavaSubscriber)
}

@Test def testUnsubscribeForSubscriber() {
var innerSubscriber: Subscriber[Int] = null
val o = Observable[Int](subscriber => {
Observable[Int](subscriber => {
innerSubscriber = subscriber
}).subscribe(subscriber)
})
o.subscribe().unsubscribe()
// If we unsubscribe outside, the inner Subscriber should also be unsubscribed
assertTrue(innerSubscriber.isUnsubscribed)
}

}