Skip to content

Commit

Permalink
Merge pull request #952 from zsxwing/amb
Browse files Browse the repository at this point in the history
rxjava-scala improvements and reimplemented the `amb` operator
  • Loading branch information
benjchristensen committed Mar 13, 2014
2 parents 90d5978 + 29b5150 commit 11e1bd0
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -533,4 +533,35 @@ class RxScalaDemo extends JUnitSuite {
println(result)
}

@Test def ambExample(): Unit = {
val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds)
val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds)
val result = o1.amb(o2).toBlockingObservable.toList
println(result)
}

@Test def delayExample(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds)
val result = o.toBlockingObservable.toList
println(result)
}

@Test def delayExample2(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delay(2 seconds, IOScheduler())
val result = o.toBlockingObservable.toList
println(result)
}

@Test def delaySubscriptionExample(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds)
val result = o.toBlockingObservable.toList
println(result)
}

@Test def delaySubscriptionExample2(): Unit = {
val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds, IOScheduler())
val result = o.toBlockingObservable.toList
println(result)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,75 @@ trait Observable[+T]
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
}

/**
* Given two Observables, mirror the one that first emits an item.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/amb.png">
*
* @param that
* an Observable competing to react first
* @return an Observable that emits the same sequence of items as whichever of `this` or `that` first emitted an item.
*/
def amb[U >: T](that: Observable[U]): Observable[U] = {
val thisJava: rx.Observable[_ <: U] = this.asJavaObservable
val thatJava: rx.Observable[_ <: U] = that.asJavaObservable
toScalaObservable[U](rx.Observable.amb(thisJava, thatJava))
}

/**
* Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
* specified delay. Error notifications from the source Observable are not delayed.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.png">
*
* @param delay the delay to shift the source by
* @return the source Observable shifted in time by the specified delay
*/
def delay(delay: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit))
}

/**
* Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
* specified delay. Error notifications from the source Observable are not delayed.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delay.s.png">
*
* @param delay the delay to shift the source by
* @param scheduler the Scheduler to use for delaying
* @return the source Observable shifted in time by the specified delay
*/
def delay(delay: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler))
}

/**
* Return an Observable that delays the subscription to the source Observable by a given amount of time.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delaySubscription.png">
*
* @param delay the time to delay the subscription
* @return an Observable that delays the subscription to the source Observable by the given amount
*/
def delaySubscription(delay: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit))
}

/**
* Return an Observable that delays the subscription to the source Observable by a given amount of time,
* both waiting and subscribing on a given Scheduler.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/delaySubscription.s.png">
*
* @param delay the time to delay the subscription
* @param scheduler the Scheduler on which the waiting and subscription will happen
* @return an Observable that delays the subscription to the source Observable by a given
* amount, waiting and subscribing on the given Scheduler
*/
def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
}
}

/**
Expand Down
20 changes: 10 additions & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperatorAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
Expand Down Expand Up @@ -300,7 +300,7 @@ public void call(Subscriber<? super R> o) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229115.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>> sources) {
return create(OperationAmb.amb(sources));
return create(OperatorAmb.amb(sources));
}

/**
Expand All @@ -318,7 +318,7 @@ public final static <T> Observable<T> amb(Iterable<? extends Observable<? extend
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return create(OperationAmb.amb(o1, o2));
return create(OperatorAmb.amb(o1, o2));
}

/**
Expand All @@ -338,7 +338,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3) {
return create(OperationAmb.amb(o1, o2, o3));
return create(OperatorAmb.amb(o1, o2, o3));
}

/**
Expand All @@ -360,7 +360,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4) {
return create(OperationAmb.amb(o1, o2, o3, o4));
return create(OperatorAmb.amb(o1, o2, o3, o4));
}

/**
Expand All @@ -384,7 +384,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5));
return create(OperatorAmb.amb(o1, o2, o3, o4, o5));
}

/**
Expand All @@ -410,7 +410,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6));
return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6));
}

/**
Expand Down Expand Up @@ -438,7 +438,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7));
return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7));
}

/**
Expand Down Expand Up @@ -468,7 +468,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8));
return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8));
}

/**
Expand Down Expand Up @@ -500,7 +500,7 @@ public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2, Observable<? extends T> o3, Observable<? extends T> o4, Observable<? extends T> o5, Observable<? extends T> o6, Observable<? extends T> o7, Observable<? extends T> o8, Observable<? extends T> o9) {
return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9));
return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9));
}

/**
Expand Down
Loading

0 comments on commit 11e1bd0

Please sign in to comment.