diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index e7c09e769c..e09e6a7ec9 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -533,4 +533,35 @@ class RxScalaDemo extends JUnitSuite { println(result) } + @Test def ambExample(): Unit = { + val o1 = List(100L, 200L, 300L).toObservable.delay(4 seconds) + val o2 = List(1000L, 2000L, 3000L).toObservable.delay(2 seconds) + val result = o1.amb(o2).toBlockingObservable.toList + println(result) + } + + @Test def delayExample(): Unit = { + val o = List(100L, 200L, 300L).toObservable.delay(2 seconds) + val result = o.toBlockingObservable.toList + println(result) + } + + @Test def delayExample2(): Unit = { + val o = List(100L, 200L, 300L).toObservable.delay(2 seconds, IOScheduler()) + val result = o.toBlockingObservable.toList + println(result) + } + + @Test def delaySubscriptionExample(): Unit = { + val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds) + val result = o.toBlockingObservable.toList + println(result) + } + + @Test def delaySubscriptionExample2(): Unit = { + val o = List(100L, 200L, 300L).toObservable.delaySubscription(2 seconds, IOScheduler()) + val result = o.toBlockingObservable.toList + println(result) + } + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 6794770232..8dafa30e2b 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2236,6 +2236,75 @@ trait Observable[+T] def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = { toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted))) } + + /** + * Given two Observables, mirror the one that first emits an item. + * + * + * + * @param that + * an Observable competing to react first + * @return an Observable that emits the same sequence of items as whichever of `this` or `that` first emitted an item. + */ + def amb[U >: T](that: Observable[U]): Observable[U] = { + val thisJava: rx.Observable[_ <: U] = this.asJavaObservable + val thatJava: rx.Observable[_ <: U] = that.asJavaObservable + toScalaObservable[U](rx.Observable.amb(thisJava, thatJava)) + } + + /** + * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a + * specified delay. Error notifications from the source Observable are not delayed. + * + * + * + * @param delay the delay to shift the source by + * @return the source Observable shifted in time by the specified delay + */ + def delay(delay: Duration): Observable[T] = { + toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit)) + } + + /** + * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a + * specified delay. Error notifications from the source Observable are not delayed. + * + * + * + * @param delay the delay to shift the source by + * @param scheduler the Scheduler to use for delaying + * @return the source Observable shifted in time by the specified delay + */ + def delay(delay: Duration, scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler)) + } + + /** + * Return an Observable that delays the subscription to the source Observable by a given amount of time. + * + * + * + * @param delay the time to delay the subscription + * @return an Observable that delays the subscription to the source Observable by the given amount + */ + def delaySubscription(delay: Duration): Observable[T] = { + toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit)) + } + + /** + * Return an Observable that delays the subscription to the source Observable by a given amount of time, + * both waiting and subscribing on a given Scheduler. + * + * + * + * @param delay the time to delay the subscription + * @param scheduler the Scheduler on which the waiting and subscription will happen + * @return an Observable that delays the subscription to the source Observable by a given + * amount, waiting and subscribing on the given Scheduler + */ + def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler)) + } } /** diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5bfb5ad278..ab5405f916 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -52,7 +52,7 @@ import rx.operators.OnSubscribeFromIterable; import rx.operators.OnSubscribeRange; import rx.operators.OperationAll; -import rx.operators.OperationAmb; +import rx.operators.OperatorAmb; import rx.operators.OperationAny; import rx.operators.OperationAsObservable; import rx.operators.OperationAverage; @@ -300,7 +300,7 @@ public void call(Subscriber o) { * @see MSDN: Observable.Amb */ public final static Observable amb(Iterable> sources) { - return create(OperationAmb.amb(sources)); + return create(OperatorAmb.amb(sources)); } /** @@ -318,7 +318,7 @@ public final static Observable amb(IterableMSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2) { - return create(OperationAmb.amb(o1, o2)); + return create(OperatorAmb.amb(o1, o2)); } /** @@ -338,7 +338,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3) { - return create(OperationAmb.amb(o1, o2, o3)); + return create(OperatorAmb.amb(o1, o2, o3)); } /** @@ -360,7 +360,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4) { - return create(OperationAmb.amb(o1, o2, o3, o4)); + return create(OperatorAmb.amb(o1, o2, o3, o4)); } /** @@ -384,7 +384,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { - return create(OperationAmb.amb(o1, o2, o3, o4, o5)); + return create(OperatorAmb.amb(o1, o2, o3, o4, o5)); } /** @@ -410,7 +410,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { - return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6)); + return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6)); } /** @@ -438,7 +438,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { - return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7)); + return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7)); } /** @@ -468,7 +468,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { - return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8)); + return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8)); } /** @@ -500,7 +500,7 @@ public final static Observable amb(Observable o1, Observable * @see MSDN: Observable.Amb */ public final static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { - return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9)); + return create(OperatorAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAmb.java b/rxjava-core/src/main/java/rx/operators/OperatorAmb.java similarity index 50% rename from rxjava-core/src/main/java/rx/operators/OperationAmb.java rename to rxjava-core/src/main/java/rx/operators/OperatorAmb.java index 16a6c307dd..a2bb92ebaf 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAmb.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorAmb.java @@ -20,24 +20,22 @@ import java.util.concurrent.atomic.AtomicInteger; import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.CompositeSubscription; +import rx.Observable.OnSubscribe; +import rx.Subscriber; /** * Propagates the observable sequence that reacts first. */ -public class OperationAmb { +public final class OperatorAmb implements OnSubscribe{ - public static OnSubscribeFunc amb(Observable o1, Observable o2) { + public static OnSubscribe amb(Observable o1, Observable o2) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -45,7 +43,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -54,7 +52,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -64,7 +62,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -75,7 +73,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -87,7 +85,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -100,7 +98,7 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { + public static OnSubscribe amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { List> sources = new ArrayList>(); sources.add(o1); sources.add(o2); @@ -114,41 +112,20 @@ public static OnSubscribeFunc amb(Observable o1, Observable< return amb(sources); } - public static OnSubscribeFunc amb( - final Iterable> sources) { - return new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(final Observer observer) { - AtomicInteger choice = new AtomicInteger(AmbObserver.NONE); - int index = 0; - CompositeSubscription parentSubscription = new CompositeSubscription(); - for (Observable source : sources) { - SafeObservableSubscription subscription = new SafeObservableSubscription(); - AmbObserver ambObserver = new AmbObserver( - subscription, observer, index, choice); - parentSubscription.add(subscription.wrap(source - .subscribe(ambObserver))); - index++; - } - return parentSubscription; - } - }; + public static OnSubscribe amb(final Iterable> sources) { + return new OperatorAmb(sources); } - private static class AmbObserver implements Observer { + private static final class AmbSubscriber extends Subscriber { private static final int NONE = -1; - private Subscription subscription; - private Observer observer; - private int index; - private AtomicInteger choice; + private final Subscriber subscriber; + private final int index; + private final AtomicInteger choice; - private AmbObserver(Subscription subscription, - Observer observer, int index, AtomicInteger choice) { - this.subscription = subscription; - this.observer = observer; + private AmbSubscriber(Subscriber subscriber, int index, AtomicInteger choice) { + this.subscriber = subscriber; this.choice = choice; this.index = index; } @@ -156,35 +133,61 @@ private AmbObserver(Subscription subscription, @Override public void onNext(T args) { if (!isSelected()) { - subscription.unsubscribe(); + unsubscribe(); return; } - observer.onNext(args); + subscriber.onNext(args); } @Override public void onCompleted() { if (!isSelected()) { - subscription.unsubscribe(); + unsubscribe(); return; } - observer.onCompleted(); + subscriber.onCompleted(); } @Override public void onError(Throwable e) { if (!isSelected()) { - subscription.unsubscribe(); + unsubscribe(); return; } - observer.onError(e); + subscriber.onError(e); } private boolean isSelected() { - if (choice.get() == NONE) { + int ch = choice.get(); + if (ch == NONE) { return choice.compareAndSet(NONE, index); } - return choice.get() == index; + return ch == index; + } + } + + private final Iterable> sources; + + private OperatorAmb(Iterable> sources) { + this.sources = sources; + } + + @Override + public void call(Subscriber subscriber) { + AtomicInteger choice = new AtomicInteger(AmbSubscriber.NONE); + int index = 0; + for (Observable source : sources) { + if (subscriber.isUnsubscribed()) { + break; + } + if (choice.get() != AmbSubscriber.NONE) { + // Already choose someone, the rest Observables can be skipped. + break; + } + AmbSubscriber ambSubscriber = new AmbSubscriber(subscriber, index, choice); + subscriber.add(ambSubscriber); + source.subscribe(ambSubscriber); + index++; } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java b/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java similarity index 86% rename from rxjava-core/src/test/java/rx/operators/OperationAmbTest.java rename to rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java index 42d48f30df..323eda3ca5 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationAmbTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorAmbTest.java @@ -15,8 +15,10 @@ */ package rx.operators; -import static org.mockito.Mockito.*; -import static rx.operators.OperationAmb.*; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static rx.operators.OperatorAmb.amb; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -26,15 +28,15 @@ import org.mockito.InOrder; import rx.Observable; -import rx.Observable.OnSubscribeFunc; +import rx.Observable.OnSubscribe; import rx.Observer; import rx.Scheduler.Inner; -import rx.Subscription; +import rx.Subscriber; import rx.functions.Action1; import rx.schedulers.TestScheduler; import rx.subscriptions.CompositeSubscription; -public class OperationAmbTest { +public class OperatorAmbTest { private TestScheduler scheduler; @@ -45,17 +47,18 @@ public void setUp() { private Observable createObservable(final String[] values, final long interval, final Throwable e) { - return Observable.create(new OnSubscribeFunc() { + return Observable.create(new OnSubscribe() { @Override - public Subscription onSubscribe(final Observer observer) { + public void call(final Subscriber subscriber) { CompositeSubscription parentSubscription = new CompositeSubscription(); + subscriber.add(parentSubscription); long delay = interval; for (final String value : values) { parentSubscription.add(scheduler.schedule(new Action1() { @Override public void call(Inner inner) { - observer.onNext(value); + subscriber.onNext(value); } }, delay, TimeUnit.MILLISECONDS)); delay += interval; @@ -64,13 +67,12 @@ public void call(Inner inner) { @Override public void call(Inner inner) { if (e == null) { - observer.onCompleted(); + subscriber.onCompleted(); } else { - observer.onError(e); + subscriber.onError(e); } } }, delay, TimeUnit.MILLISECONDS)); - return parentSubscription; } }); } @@ -104,12 +106,12 @@ public void testAmb() { @Test public void testAmb2() { - IOException needHappenedException = new IOException( + IOException expectedException = new IOException( "fake exception"); Observable observable1 = createObservable(new String[] {}, 2000, new IOException("fake exception")); Observable observable2 = createObservable(new String[] { - "2", "22", "222", "2222" }, 1000, needHappenedException); + "2", "22", "222", "2222" }, 1000, expectedException); Observable observable3 = createObservable(new String[] {}, 3000, new IOException("fake exception")); @@ -127,7 +129,7 @@ public void testAmb2() { inOrder.verify(observer, times(1)).onNext("22"); inOrder.verify(observer, times(1)).onNext("222"); inOrder.verify(observer, times(1)).onNext("2222"); - inOrder.verify(observer, times(1)).onError(needHappenedException); + inOrder.verify(observer, times(1)).onError(expectedException); inOrder.verifyNoMoreInteractions(); }