diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 7533c5a991..3246993537 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -12019,7 +12019,7 @@ public final Flowable startWithArray(T... items) { @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe() { return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, - Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax()); + Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); } /** @@ -12046,7 +12046,7 @@ public final Disposable subscribe() { @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext) { return subscribe(onNext, Functions.ERROR_CONSUMER, - Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax()); + Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); } /** @@ -12075,7 +12075,7 @@ public final Disposable subscribe(Consumer onNext) { @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer onNext, Consumer onError) { - return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax()); + return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE); } /** @@ -12109,7 +12109,7 @@ public final Disposable subscribe(Consumer onNext, Consumer onNext, Consumer onError, Action onComplete) { - return subscribe(onNext, onError, onComplete, FlowableInternalHelper.requestMax()); + return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java index f5cc8d692a..4056879947 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java @@ -13,9 +13,17 @@ package io.reactivex.internal.operators.flowable; -import java.util.Iterator; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.*; -import org.reactivestreams.Publisher; +import org.reactivestreams.*; + +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.ExceptionHelper; public final class BlockingFlowableIterable implements Iterable { final Publisher source; @@ -33,4 +41,145 @@ public Iterator iterator() { source.subscribe(it); return it; } + + static final class BlockingFlowableIterator + extends AtomicReference + implements Subscriber, Iterator, Runnable, Disposable { + + private static final long serialVersionUID = 6695226475494099826L; + + final SpscArrayQueue queue; + + final long batchSize; + + final long limit; + + final Lock lock; + + final Condition condition; + + long produced; + + volatile boolean done; + Throwable error; + + BlockingFlowableIterator(int batchSize) { + this.queue = new SpscArrayQueue(batchSize); + this.batchSize = batchSize; + this.limit = batchSize - (batchSize >> 2); + this.lock = new ReentrantLock(); + this.condition = lock.newCondition(); + } + + @Override + public boolean hasNext() { + for (;;) { + boolean d = done; + boolean empty = queue.isEmpty(); + if (d) { + Throwable e = error; + if (e != null) { + throw ExceptionHelper.wrapOrThrow(e); + } else + if (empty) { + return false; + } + } + if (empty) { + lock.lock(); + try { + while (!done && queue.isEmpty()) { + condition.await(); + } + } catch (InterruptedException ex) { + run(); + throw ExceptionHelper.wrapOrThrow(ex); + } finally { + lock.unlock(); + } + } else { + return true; + } + } + } + + @Override + public T next() { + if (hasNext()) { + T v = queue.poll(); + + long p = produced + 1; + if (p == limit) { + produced = 0; + get().request(p); + } else { + produced = p; + } + + return v; + } + throw new NoSuchElementException(); + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(batchSize); + } + } + + @Override + public void onNext(T t) { + if (!queue.offer(t)) { + SubscriptionHelper.cancel(this); + + onError(new MissingBackpressureException("Queue full?!")); + } else { + signalConsumer(); + } + } + + @Override + public void onError(Throwable t) { + error = t; + done = true; + signalConsumer(); + } + + @Override + public void onComplete() { + done = true; + signalConsumer(); + } + + void signalConsumer() { + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public void run() { + SubscriptionHelper.cancel(this); + signalConsumer(); + } + + @Override // otherwise default method which isn't available in Java 7 + public void remove() { + throw new UnsupportedOperationException("remove"); + } + + @Override + public void dispose() { + SubscriptionHelper.cancel(this); + } + + @Override + public boolean isDisposed() { + return SubscriptionHelper.isCancelled(get()); + } + } } \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterator.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterator.java deleted file mode 100644 index 8a7ce2e277..0000000000 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterator.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.flowable; - -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.*; - -import org.reactivestreams.*; - -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.queue.SpscLinkedArrayQueue; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.ExceptionHelper; - -public final class BlockingFlowableIterator -extends AtomicReference -implements Subscriber, Iterator, Runnable, Disposable { - - private static final long serialVersionUID = 6695226475494099826L; - - final SpscLinkedArrayQueue queue; - - final long batchSize; - - final long limit; - - final Lock lock; - - final Condition condition; - - long produced; - - volatile boolean done; - Throwable error; - - public BlockingFlowableIterator(int batchSize) { - this.queue = new SpscLinkedArrayQueue(batchSize); - this.batchSize = batchSize; - this.limit = batchSize - (batchSize >> 2); - this.lock = new ReentrantLock(); - this.condition = lock.newCondition(); - } - - @Override - public boolean hasNext() { - for (;;) { - boolean d = done; - boolean empty = queue.isEmpty(); - if (d) { - Throwable e = error; - if (e != null) { - throw ExceptionHelper.wrapOrThrow(e); - } else - if (empty) { - return false; - } - } - if (empty) { - lock.lock(); - try { - while (!done && queue.isEmpty()) { - condition.await(); - } - } catch (InterruptedException ex) { - run(); - throw ExceptionHelper.wrapOrThrow(ex); - } finally { - lock.unlock(); - } - } else { - return true; - } - } - } - - @Override - public T next() { - if (hasNext()) { - T v = queue.poll(); - - if (v == null) { - run(); - - throw new IllegalStateException("Queue empty?!"); - } - - long p = produced + 1; - if (p == limit) { - produced = 0; - get().request(p); - } else { - produced = p; - } - - return v; - } - throw new NoSuchElementException(); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.setOnce(this, s)) { - s.request(batchSize); - } - } - - @Override - public void onNext(T t) { - if (!queue.offer(t)) { - SubscriptionHelper.cancel(this); - - onError(new IllegalStateException("Queue full?!")); - } else { - signalConsumer(); - } - } - - @Override - public void onError(Throwable t) { - error = t; - done = true; - signalConsumer(); - } - - @Override - public void onComplete() { - done = true; - signalConsumer(); - } - - void signalConsumer() { - lock.lock(); - try { - condition.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void run() { - SubscriptionHelper.cancel(this); - signalConsumer(); - } - - @Override // otherwise default method which isn't available in Java 7 - public void remove() { - throw new UnsupportedOperationException("remove"); - } - - @Override - public void dispose() { - SubscriptionHelper.cancel(this); - } - - @Override - public boolean isDisposed() { - return SubscriptionHelper.isCancelled(get()); - } -} \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java index 3523fbcd29..fb5b76851d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableAmb.java @@ -15,12 +15,11 @@ import java.util.concurrent.atomic.*; -import io.reactivex.exceptions.Exceptions; import org.reactivestreams.*; import io.reactivex.Flowable; +import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.subscriptions.*; -import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; public final class FlowableAmb extends Flowable { @@ -106,17 +105,15 @@ public void subscribe(Publisher[] sources) { @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - - int w = winner.get(); - if (w > 0) { - subscribers[w - 1].request(n); - } else - if (w == 0) { - for (AmbInnerSubscriber a : subscribers) { - a.request(n); + if (SubscriptionHelper.validate(n)) { + int w = winner.get(); + if (w > 0) { + subscribers[w - 1].request(n); + } else + if (w == 0) { + for (AmbInnerSubscriber a : subscribers) { + a.request(n); + } } } } @@ -134,9 +131,8 @@ public boolean win(int index) { } return true; } - return false; } - return w == index; + return false; } @Override @@ -170,29 +166,12 @@ static final class AmbInnerSubscriber extends AtomicReference i @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.setOnce(this, s)) { - long r = missedRequested.getAndSet(0L); - if (r != 0L) { - s.request(r); - } - } + SubscriptionHelper.deferredSetOnce(this, missedRequested, s); } @Override public void request(long n) { - Subscription s = get(); - if (s != null) { - s.request(n); - } else if (SubscriptionHelper.validate(n)) { - BackpressureHelper.add(missedRequested, n); - s = get(); - if (s != null && s != SubscriptionHelper.CANCELLED) { - long r = missedRequested.getAndSet(0L); - if (r != 0L) { - s.request(r); - } - } - } + SubscriptionHelper.deferredRequest(this, missedRequested, n); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java index 2fae45da89..a92ca0df99 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java @@ -175,7 +175,7 @@ static final class PublisherBufferSkipSubscriber actual, int size, int skip, Callable bufferSupplier) { @@ -187,16 +187,18 @@ static final class PublisherBufferSkipSubscriber bs = buffers; - long i = index; + int i = index; - if (i % skip == 0L) { // FIXME no need for modulo + if (i++ == 0) { C b; try { @@ -405,7 +407,10 @@ public void onNext(T t) { b0.add(t); } - index = i + 1; + if (i == skip) { + i = 0; + } + index = i; } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java index 2c234ca539..e69b793f2d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java @@ -90,8 +90,6 @@ static final class BufferExactUnboundedSubscriber timer = new AtomicReference(); BufferExactUnboundedSubscriber( @@ -106,32 +104,31 @@ static final class BufferExactUnboundedSubscriber a, U v) { @Override public void dispose() { - selfCancel = true; cancel(); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index 8176e4476b..6641c0f949 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -165,7 +165,7 @@ public void onNext(T t) { if (done) { return; } - if (fusionMode != ASYNC && !queue.offer(t)) { + if (fusionMode == NONE && !queue.offer(t)) { onError(new MissingBackpressureException("Queue is full?!")); return; } @@ -410,10 +410,7 @@ public void clear() { @Override public boolean isEmpty() { Iterator it = current; - if (it != null) { - return it.hasNext(); - } - return queue.isEmpty(); // estimate + return (it != null && !it.hasNext()) || queue.isEmpty(); } @Override @@ -429,6 +426,7 @@ public R poll() throws Exception { it = mapper.apply(v).iterator(); if (!it.hasNext()) { + it = null; continue; } current = it; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromFuture.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromFuture.java index 9a93f20dcd..2e54452823 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromFuture.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFromFuture.java @@ -46,8 +46,6 @@ public void subscribeActual(Subscriber s) { s.onError(ex); } return; - } finally { - future.cancel(true); // TODO ?? not sure about this } if (v == null) { s.onError(new NullPointerException("The future returned null")); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java index 391c1d815b..8f68e63c4b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java @@ -25,8 +25,12 @@ /** * Helper utility class to support Flowable with inner classes. */ -public enum FlowableInternalHelper { - ; +public final class FlowableInternalHelper { + + /** Utility class. */ + private FlowableInternalHelper() { + throw new IllegalStateException("No instances!"); + } static final class SimpleGenerator implements BiFunction, S> { final Consumer> consumer; @@ -233,7 +237,7 @@ public Publisher apply(Flowable t) throws Exception { }; } - enum RequestMax implements Consumer { + public enum RequestMax implements Consumer { INSTANCE; @Override public void accept(Subscription t) throws Exception { @@ -241,10 +245,6 @@ public void accept(Subscription t) throws Exception { } } - public static Consumer requestMax() { - return RequestMax.INSTANCE; - } - static final class ZipIterableFunction implements Function>, Publisher> { private final Function zipper; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java index bfd5c4930f..17bec996bf 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java @@ -13,16 +13,14 @@ package io.reactivex.internal.operators.flowable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Function; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; public final class FlowableMapNotification extends AbstractFlowableWithUpstream { @@ -46,49 +44,24 @@ protected void subscribeActual(Subscriber s) { source.subscribe(new MapNotificationSubscriber(s, onNextMapper, onErrorMapper, onCompleteSupplier)); } - // FIXME needs post-complete drain management static final class MapNotificationSubscriber - extends AtomicLong - implements Subscriber, Subscription { + extends SinglePostCompleteSubscriber { private static final long serialVersionUID = 2757120512858778108L; - - final Subscriber actual; final Function onNextMapper; final Function onErrorMapper; final Callable onCompleteSupplier; - Subscription s; - - R value; - - volatile boolean done; - - final AtomicInteger state = new AtomicInteger(); - - static final int NO_REQUEST_NO_VALUE = 0; - static final int NO_REQUEST_HAS_VALUE = 1; - static final int HAS_REQUEST_NO_VALUE = 2; - static final int HAS_REQUEST_HAS_VALUE = 3; - MapNotificationSubscriber(Subscriber actual, Function onNextMapper, Function onErrorMapper, Callable onCompleteSupplier) { - this.actual = actual; + super(actual); this.onNextMapper = onNextMapper; this.onErrorMapper = onErrorMapper; this.onCompleteSupplier = onCompleteSupplier; } - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - } - } - @Override public void onNext(T t) { R p; @@ -101,12 +74,8 @@ public void onNext(T t) { return; } + produced++; actual.onNext(p); - - long r = get(); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } } @Override @@ -121,7 +90,7 @@ public void onError(Throwable t) { return; } - tryEmit(p); + complete(p); } @Override @@ -136,75 +105,7 @@ public void onComplete() { return; } - tryEmit(p); - } - - - void tryEmit(R p) { - long r = get(); - if (r != 0L) { - actual.onNext(p); - actual.onComplete(); - } else { - for (;;) { - int s = state.get(); - if (s == HAS_REQUEST_NO_VALUE) { - if (state.compareAndSet(HAS_REQUEST_NO_VALUE, HAS_REQUEST_HAS_VALUE)) { - actual.onNext(p); - actual.onComplete(); - } - return; - } else - if (s == NO_REQUEST_NO_VALUE) { - value = p; - done = true; - if (state.compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) { - return; - } - } else - if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE) { - return; - } - } - } - } - - @Override - public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - - BackpressureHelper.add(this, n); - if (done) { - for (;;) { - int s = state.get(); - - if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE) { - return; - } else - if (s == NO_REQUEST_HAS_VALUE) { - if (state.compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) { - R p = value; - value = null; - actual.onNext(p); - actual.onComplete(); - } - return; - } else - if (state.compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) { - return; - } - } - } else { - s.request(n); - } - } - - @Override - public void cancel() { - state.lazySet(HAS_REQUEST_HAS_VALUE); - s.cancel(); + complete(p); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java index dcc0c4da4a..760d6a3c1d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMaterialize.java @@ -13,13 +13,10 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicLong; - import org.reactivestreams.*; import io.reactivex.Notification; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; import io.reactivex.plugins.RxJavaPlugins; public final class FlowableMaterialize extends AbstractFlowableWithUpstream> { @@ -33,31 +30,12 @@ protected void subscribeActual(Subscriber> s) { source.subscribe(new MaterializeSubscriber(s)); } - static final class MaterializeSubscriber extends AtomicLong implements Subscriber, Subscription { + static final class MaterializeSubscriber extends SinglePostCompleteSubscriber> { private static final long serialVersionUID = -3740826063558713822L; - final Subscriber> actual; - - Subscription s; - - Notification value; - - long produced; - - static final long COMPLETE_MASK = Long.MIN_VALUE; - static final long REQUEST_MASK = Long.MAX_VALUE; - MaterializeSubscriber(Subscriber> actual) { - this.actual = actual; - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - } + super(actual); } @Override @@ -76,57 +54,11 @@ public void onComplete() { complete(Notification.createOnComplete()); } - void complete(Notification n) { - long p = produced; - if (p != 0) { - BackpressureHelper.produced(this, p); - } - - for (;;) { - long r = get(); - if ((r & COMPLETE_MASK) != 0) { - if (n.isOnError()) { - RxJavaPlugins.onError(n.getError()); - } - return; - } - if ((r & REQUEST_MASK) != 0) { - lazySet(COMPLETE_MASK + 1); - actual.onNext(n); - actual.onComplete(); - return; - } - value = n; - if (compareAndSet(0, COMPLETE_MASK)) { - return; - } - } - } - @Override - public void request(long n) { - if (SubscriptionHelper.validate(n)) { - for (;;) { - long r = get(); - if ((r & COMPLETE_MASK) != 0) { - if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) { - actual.onNext(value); - actual.onComplete(); - } - break; - } - long u = BackpressureHelper.addCap(r, n); - if (compareAndSet(r, u)) { - s.request(n); - break; - } - } + protected void onDrop(Notification n) { + if (n.isOnError()) { + RxJavaPlugins.onError(n.getError()); } } - - @Override - public void cancel() { - s.cancel(); - } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatest.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatest.java index 9c3e4ea70c..82872a1230 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatest.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatest.java @@ -92,10 +92,10 @@ public void request(long n) { public void cancel() { if (!cancelled) { cancelled = true; + s.cancel(); if (getAndIncrement() == 0) { current.lazySet(null); - s.cancel(); } } } @@ -106,20 +106,18 @@ void drain() { } final Subscriber a = actual; int missed = 1; - for (;;) { - - if (checkTerminated(done, current.get() == null, a)) { - return; - } + final AtomicLong r = requested; + final AtomicReference q = current; - long r = requested.get(); + for (;;) { + long e = 0L; - while (r != 0L) { + while (e != r.get()) { boolean d = done; - T v = current.getAndSet(null); + T v = q.getAndSet(null); boolean empty = v == null; - if (checkTerminated(d, empty, a)) { + if (checkTerminated(d, empty, a, q)) { return; } @@ -129,16 +127,17 @@ void drain() { a.onNext(v); - if (r != Long.MAX_VALUE) { - r--; - requested.decrementAndGet(); - } + e++; } - if (checkTerminated(done, current.get() == null, a)) { + if (e == r.get() && checkTerminated(done, q.get() == null, a, q)) { return; } + if (e != 0L) { + BackpressureHelper.produced(r, e); + } + missed = addAndGet(-missed); if (missed == 0) { break; @@ -146,17 +145,16 @@ void drain() { } } - boolean checkTerminated(boolean d, boolean empty, Subscriber a) { + boolean checkTerminated(boolean d, boolean empty, Subscriber a, AtomicReference q) { if (cancelled) { - current.lazySet(null); - s.cancel(); + q.lazySet(null); return true; } if (d) { Throwable e = error; if (e != null) { - current.lazySet(null); + q.lazySet(null); a.onError(e); return true; } else diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java index 72ca07c548..fe0e618447 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorReturn.java @@ -13,15 +13,12 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicLong; - import org.reactivestreams.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; public final class FlowableOnErrorReturn extends AbstractFlowableWithUpstream { final Function valueSupplier; @@ -35,36 +32,17 @@ protected void subscribeActual(Subscriber s) { source.subscribe(new OnErrorReturnSubscriber(s, valueSupplier)); } - static final class OnErrorReturnSubscriber extends AtomicLong - implements Subscriber, Subscription { + static final class OnErrorReturnSubscriber + extends SinglePostCompleteSubscriber { private static final long serialVersionUID = -3740826063558713822L; - final Subscriber actual; - final Function valueSupplier; - Subscription s; - - T value; - - long produced; - - static final long COMPLETE_MASK = Long.MIN_VALUE; - static final long REQUEST_MASK = Long.MAX_VALUE; - OnErrorReturnSubscriber(Subscriber actual, Function valueSupplier) { - this.actual = actual; + super(actual); this.valueSupplier = valueSupplier; } - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - } - } - @Override public void onNext(T t) { produced++; @@ -88,55 +66,5 @@ public void onError(Throwable t) { public void onComplete() { actual.onComplete(); } - - void complete(T n) { - long p = produced; - if (p != 0L) { - BackpressureHelper.produced(this, p); - } - - for (;;) { - long r = get(); - if ((r & COMPLETE_MASK) != 0) { - return; - } - if ((r & REQUEST_MASK) != 0) { - lazySet(COMPLETE_MASK + 1); - actual.onNext(n); - actual.onComplete(); - return; - } - value = n; - if (compareAndSet(0, COMPLETE_MASK)) { - return; - } - } - } - - @Override - public void request(long n) { - if (SubscriptionHelper.validate(n)) { - for (;;) { - long r = get(); - if ((r & COMPLETE_MASK) != 0) { - if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) { - actual.onNext(value); - actual.onComplete(); - } - break; - } - long u = BackpressureHelper.addCap(r, n); - if (compareAndSet(r, u)) { - s.request(n); - break; - } - } - } - } - - @Override - public void cancel() { - s.cancel(); - } } } \ No newline at end of file diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java index 7dd8c5fb61..76de888f5a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSamplePublisher.java @@ -87,13 +87,7 @@ public void onComplete() { } boolean setOther(Subscription o) { - if (other.get() == null) { - if (other.compareAndSet(null, o)) { - return true; - } - o.cancel(); - } - return false; + return SubscriptionHelper.setOnce(other, o); } @Override @@ -125,9 +119,7 @@ public void emit() { long r = requested.get(); if (r != 0L) { actual.onNext(value); - if (r != Long.MAX_VALUE) { - requested.decrementAndGet(); - } + BackpressureHelper.produced(requested, 1); } else { cancel(); actual.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!")); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java index 7093be18d9..9e4045bb51 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSampleTimed.java @@ -19,9 +19,8 @@ import org.reactivestreams.*; import io.reactivex.Scheduler; -import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.subscribers.SerializedSubscriber; @@ -55,7 +54,7 @@ static final class SampleTimedSubscriber extends AtomicReference implement final AtomicLong requested = new AtomicLong(); - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); Subscription s; @@ -71,14 +70,8 @@ public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); - if (timer.get() == null) { - Disposable d = scheduler.schedulePeriodicallyDirect(this, period, period, unit); - if (!timer.compareAndSet(null, d)) { - d.dispose(); - return; - } - s.request(Long.MAX_VALUE); - } + timer.replace(scheduler.schedulePeriodicallyDirect(this, period, period, unit)); + s.request(Long.MAX_VALUE); } } @@ -123,9 +116,7 @@ public void run() { long r = requested.get(); if (r != 0L) { actual.onNext(value); - if (r != Long.MAX_VALUE) { - requested.decrementAndGet(); - } + BackpressureHelper.produced(requested, 1); } else { cancel(); actual.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!")); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java index aa106cca7a..a454312540 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqual.java @@ -47,7 +47,18 @@ public void subscribeActual(Subscriber s) { parent.subscribe(first, second); } - static final class EqualCoordinator extends DeferredScalarSubscription { + /** + * Provides callbacks for the EqualSubscribers. + */ + interface EqualCoordinatorHelper { + + void drain(); + + void innerError(Throwable ex); + } + + static final class EqualCoordinator extends DeferredScalarSubscription + implements EqualCoordinatorHelper { private static final long serialVersionUID = -6178010334400373240L; @@ -97,7 +108,8 @@ void cancelAndClear() { second.clear(); } - void drain() { + @Override + public void drain() { if (wip.getAndIncrement() != 0) { return; } @@ -219,6 +231,15 @@ void drain() { } } } + + @Override + public void innerError(Throwable t) { + if (error.addThrowable(t)) { + drain(); + } else { + RxJavaPlugins.onError(t); + } + } } static final class EqualSubscriber @@ -227,7 +248,7 @@ static final class EqualSubscriber private static final long serialVersionUID = 4804128302091633067L; - final EqualCoordinator parent; + final EqualCoordinatorHelper parent; final int prefetch; @@ -241,7 +262,7 @@ static final class EqualSubscriber int sourceMode; - EqualSubscriber(EqualCoordinator parent, int prefetch) { + EqualSubscriber(EqualCoordinatorHelper parent, int prefetch) { this.parent = parent; this.limit = prefetch - (prefetch >> 2); this.prefetch = prefetch; @@ -289,12 +310,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - EqualCoordinator p = parent; - if (p.error.addThrowable(t)) { - p.drain(); - } else { - RxJavaPlugins.onError(t); - } + parent.innerError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualSingle.java index d94ce29e74..56e3c3b621 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualSingle.java @@ -13,16 +13,16 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicInteger; -import org.reactivestreams.*; +import org.reactivestreams.Publisher; import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; +import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiPredicate; import io.reactivex.internal.fuseable.*; -import io.reactivex.internal.queue.SpscArrayQueue; +import io.reactivex.internal.operators.flowable.FlowableSequenceEqual.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; @@ -55,7 +55,7 @@ public Flowable fuseToFlowable() { static final class EqualCoordinator extends AtomicInteger - implements Disposable { + implements Disposable, EqualCoordinatorHelper { private static final long serialVersionUID = -6178010334400373240L; @@ -108,7 +108,8 @@ void cancelAndClear() { second.clear(); } - void drain() { + @Override + public void drain() { if (getAndIncrement() != 0) { return; } @@ -230,111 +231,14 @@ void drain() { } } } - } - - static final class EqualSubscriber - extends AtomicReference - implements Subscriber { - - private static final long serialVersionUID = 4804128302091633067L; - - final EqualCoordinator parent; - - final int prefetch; - - final int limit; - - long produced; - - volatile SimpleQueue queue; - - volatile boolean done; - - int sourceMode; - - EqualSubscriber(EqualCoordinator parent, int prefetch) { - this.parent = parent; - this.limit = prefetch - (prefetch >> 2); - this.prefetch = prefetch; - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.setOnce(this, s)) { - if (s instanceof QueueSubscription) { - @SuppressWarnings("unchecked") - QueueSubscription qs = (QueueSubscription) s; - - int m = qs.requestFusion(QueueSubscription.ANY); - if (m == QueueSubscription.SYNC) { - sourceMode = m; - queue = qs; - done = true; - parent.drain(); - return; - } - if (m == QueueSubscription.ASYNC) { - sourceMode = m; - queue = qs; - s.request(prefetch); - return; - } - } - - queue = new SpscArrayQueue(prefetch); - - s.request(prefetch); - } - } @Override - public void onNext(T t) { - if (sourceMode == QueueSubscription.NONE) { - if (!queue.offer(t)) { - onError(new MissingBackpressureException()); - return; - } - } - parent.drain(); - } - - @Override - public void onError(Throwable t) { - EqualCoordinator p = parent; - if (p.error.addThrowable(t)) { - p.drain(); + public void innerError(Throwable t) { + if (error.addThrowable(t)) { + drain(); } else { RxJavaPlugins.onError(t); } } - - @Override - public void onComplete() { - done = true; - parent.drain(); - } - - public void request() { - if (sourceMode != QueueSubscription.SYNC) { - long p = produced + 1; - if (p >= limit) { - produced = 0; - get().request(p); - } else { - produced = p; - } - } - } - - public void cancel() { - SubscriptionHelper.cancel(this); - } - - void clear() { - SimpleQueue sq = queue; - if (sq != null) { - sq.clear(); - } - } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java index d2fc150488..2ce5015234 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimed.java @@ -113,12 +113,12 @@ public void request(long n) { @Override public void cancel() { - if (cancelled) { + if (!cancelled) { cancelled = true; + s.cancel(); if (getAndIncrement() == 0) { queue.clear(); - s.cancel(); } } } @@ -139,10 +139,6 @@ void drain() { for (;;) { - if (checkTerminated(done, q.isEmpty(), a, delayError)) { - return; - } - long r = requested.get(); long e = 0L; @@ -171,20 +167,13 @@ void drain() { @SuppressWarnings("unchecked") T v = (T)q.poll(); - if (ts > now - time) { - // not old enough - break; - } - a.onNext(v); e++; } if (e != 0L) { - if (r != Long.MAX_VALUE) { - requested.addAndGet(-e); - } + BackpressureHelper.produced(requested, e); } missed = addAndGet(-missed); @@ -197,7 +186,6 @@ void drain() { boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { if (cancelled) { queue.clear(); - s.cancel(); return true; } if (d) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java index b6be3978c3..d847856959 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java @@ -13,13 +13,14 @@ package io.reactivex.internal.operators.flowable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -95,6 +96,10 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { + if (done) { + return; + } + long c = unique + 1; unique = c; @@ -129,11 +134,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - if (error.addThrowable(t)) { + if (!done && error.addThrowable(t)) { if (!delayErrors) { disposeInner(); } @@ -155,14 +156,13 @@ public void onComplete() { @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; - } - BackpressureHelper.add(requested, n); - if (unique == 0L) { - s.request(Long.MAX_VALUE); - } else { - drain(); + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + if (unique == 0L) { + s.request(Long.MAX_VALUE); + } else { + drain(); + } } } @@ -199,6 +199,7 @@ void drain() { for (;;) { if (cancelled) { + active.lazySet(null); return; } @@ -217,7 +218,6 @@ void drain() { Throwable err = error.get(); if (err != null) { disposeInner(); - s.cancel(); a.onError(error.terminate()); return; } else @@ -229,15 +229,12 @@ void drain() { } SwitchMapInnerSubscriber inner = active.get(); - - if (inner != null) { - SpscArrayQueue q = inner.queue; - + SimpleQueue q = inner != null ? inner.queue : null; + if (q != null) { if (inner.done) { if (!delayErrors) { Throwable err = error.get(); if (err != null) { - s.cancel(); disposeInner(); a.onError(error.terminate()); return; @@ -264,7 +261,17 @@ void drain() { } boolean d = inner.done; - R v = q.poll(); + R v; + + try { + v = q.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + inner.cancel(); + error.addThrowable(ex); + d = true; + v = null; + } boolean empty = v == null; if (inner != active.get()) { @@ -276,7 +283,6 @@ void drain() { if (!delayErrors) { Throwable err = error.get(); if (err != null) { - s.cancel(); a.onError(error.terminate()); return; } else @@ -323,74 +329,78 @@ void drain() { } } } - - boolean checkTerminated(boolean d, boolean empty, Subscriber a) { - if (cancelled) { - s.cancel(); - return true; - } - if (d) { - Throwable e = error.get(); - if (e != null) { - cancelled = true; - s.cancel(); - a.onError(error.terminate()); - return true; - } else - if (empty) { - a.onComplete(); - return true; - } - } - - return false; - } } - static final class SwitchMapInnerSubscriber extends AtomicReference implements Subscriber { + static final class SwitchMapInnerSubscriber + extends AtomicReference implements Subscriber { private static final long serialVersionUID = 3837284832786408377L; final SwitchMapSubscriber parent; final long index; final int bufferSize; - final SpscArrayQueue queue; + + volatile SimpleQueue queue; volatile boolean done; + int fusionMode; + SwitchMapInnerSubscriber(SwitchMapSubscriber parent, long index, int bufferSize) { this.parent = parent; this.index = index; this.bufferSize = bufferSize; - this.queue = new SpscArrayQueue(bufferSize); } @Override public void onSubscribe(Subscription s) { - if (index == parent.unique) { - if (SubscriptionHelper.setOnce(this, s)) { - s.request(bufferSize); + if (SubscriptionHelper.setOnce(this, s)) { + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription) s; + + int m = qs.requestFusion(QueueSubscription.ANY); + if (m == QueueSubscription.SYNC) { + fusionMode = m; + queue = qs; + done = true; + parent.drain(); + return; + } + if (m == QueueSubscription.ASYNC) { + fusionMode = m; + queue = qs; + s.request(bufferSize); + return; + } } - } else { - s.cancel(); + + queue = new SpscArrayQueue(bufferSize); + + s.request(bufferSize); } } @Override public void onNext(R t) { - if (index == parent.unique) { - if (!queue.offer(t)) { - onError(new IllegalStateException("Queue full?!")); + SwitchMapSubscriber p = parent; + if (index == p.unique) { + if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) { + onError(new MissingBackpressureException("Queue full?!")); return; } - parent.drain(); + p.drain(); } } @Override public void onError(Throwable t) { - if (index == parent.unique && parent.error.addThrowable(t)) { + SwitchMapSubscriber p = parent; + if (index == p.unique && p.error.addThrowable(t)) { + if (!p.delayErrors) { + p.s.cancel(); + } done = true; - parent.drain(); + p.drain(); } else { RxJavaPlugins.onError(t); } @@ -398,9 +408,10 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (index == parent.unique) { + SwitchMapSubscriber p = parent; + if (index == p.unique) { done = true; - parent.drain(); + p.drain(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java index 3c20707870..3eb043940e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimed.java @@ -141,12 +141,12 @@ public void request(long n) { @Override public void cancel() { - if (cancelled) { + if (!cancelled) { cancelled = true; + s.cancel(); if (getAndIncrement() == 0) { queue.clear(); - s.cancel(); } } } @@ -172,7 +172,6 @@ void drain() { } long r = requested.get(); - boolean unbounded = r == Long.MAX_VALUE; // NOPMD long e = 0L; for (;;) { @@ -183,29 +182,21 @@ void drain() { return; } - if (empty || r == 0L) { + if (r == e) { break; } q.poll(); @SuppressWarnings("unchecked") T o = (T)q.poll(); - if (o == null) { - s.cancel(); - a.onError(new IllegalStateException("Queue empty?!")); - return; - } a.onNext(o); - r--; - e--; + e++; } if (e != 0L) { - if (!unbounded) { - requested.addAndGet(e); - } + BackpressureHelper.produced(requested, e); } } @@ -219,7 +210,6 @@ void drain() { boolean checkTerminated(boolean empty, Subscriber a, boolean delayError) { if (cancelled) { queue.clear(); - s.cancel(); return true; } if (delayError) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java index de6cc99b62..3e20f11087 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTimed.java @@ -14,7 +14,7 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; @@ -22,7 +22,7 @@ import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -59,16 +59,7 @@ static final class DebounceTimedSubscriber Subscription s; - final AtomicReference timer = new AtomicReference(); - - static final Disposable NEW_TIMER = new Disposable() { - @Override - public void dispose() { } - - @Override public boolean isDisposed() { - return true; - } - }; + final SequentialDisposable timer = new SequentialDisposable(); volatile boolean gate; @@ -101,9 +92,7 @@ public void onNext(T t) { long r = get(); if (r != 0L) { actual.onNext(t); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); } else { done = true; cancel(); @@ -111,18 +100,12 @@ public void onNext(T t) { return; } - // FIXME should this be a periodic blocking or a value-relative blocking? Disposable d = timer.get(); if (d != null) { d.dispose(); } - if (timer.compareAndSet(d, NEW_TIMER)) { - d = worker.schedule(this, timeout, unit); - if (!timer.compareAndSet(NEW_TIMER, d)) { - d.dispose(); - } - } + timer.replace(worker.schedule(this, timeout, unit)); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java index 5d5d086ca2..2e0f015af0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java @@ -19,9 +19,9 @@ import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; +import io.reactivex.exceptions.MissingBackpressureException; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -74,39 +74,38 @@ static final class WindowBoundaryMainSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } - this.s = s; + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; - Subscriber> a = actual; - a.onSubscribe(this); + Subscriber> a = actual; + a.onSubscribe(this); - if (cancelled) { - return; - } + if (cancelled) { + return; + } - UnicastProcessor w = UnicastProcessor.create(bufferSize); + UnicastProcessor w = UnicastProcessor.create(bufferSize); - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + long r = requested(); + if (r != 0L) { + a.onNext(w); + if (r != Long.MAX_VALUE) { + produced(1); + } + } else { + a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); + return; } - } else { - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); - return; - } - window = w; + window = w; - WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); + WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - s.request(Long.MAX_VALUE); - other.subscribe(inner); + if (boundary.compareAndSet(null, inner)) { + windows.getAndIncrement(); + s.request(Long.MAX_VALUE); + other.subscribe(inner); + } } } @@ -177,7 +176,7 @@ public void cancel() { } void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; int missed = 1; UnicastProcessor w = window; @@ -186,16 +185,7 @@ void drainLoop() { for (;;) { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - DisposableHelper.dispose(boundary); - w.onError(ex); - return; - } + Object o = q.poll(); boolean empty = o == null; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java index b90ae6d3cd..232c596f26 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java @@ -23,7 +23,7 @@ import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -193,7 +193,7 @@ public void cancel() { } void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; int missed = 1; UnicastProcessor w = window; @@ -202,16 +202,7 @@ void drainLoop() { for (;;) { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - DisposableHelper.dispose(boundary); - w.onError(ex); - return; - } + Object o = q.poll(); boolean empty = o == null; @@ -297,12 +288,6 @@ void next() { drainLoop(); } } - - @Override - public boolean accept(Subscriber> a, Object v) { - // not used by this operator - return false; - } } static final class WindowBoundaryInnerSubscriber extends DisposableSubscriber { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 689ae6e8c4..a4f027352f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -13,7 +13,6 @@ package io.reactivex.internal.operators.flowable; -import java.nio.channels.CancelledKeyException; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -25,7 +24,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; @@ -78,7 +77,7 @@ protected void subscribeActual(Subscriber> s) { static final class WindowExactUnboundedSubscriber extends QueueDrainSubscriber> - implements Subscriber, Subscription, Disposable, Runnable { + implements Subscriber, Subscription, Runnable { final long timespan; final TimeUnit unit; final Scheduler scheduler; @@ -86,8 +85,6 @@ static final class WindowExactUnboundedSubscriber Subscription s; - boolean selfCancel; - UnicastProcessor window; final AtomicReference timer = new AtomicReference(); @@ -193,23 +190,13 @@ public void cancel() { cancelled = true; } - @Override public void dispose() { - selfCancel = true; DisposableHelper.dispose(timer); } - @Override public boolean isDisposed() { - return timer.get() == DisposableHelper.DISPOSED; - } - @Override public void run() { - if (selfCancel) { - throw new CancelledKeyException(); - } - if (cancelled) { terminated = true; dispose(); @@ -223,7 +210,7 @@ public void run() { void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; UnicastProcessor w = window; @@ -235,17 +222,7 @@ void drainLoop() { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - dispose(); - a.onError(ex); - return; - } + Object o = q.poll(); if (d && (o == null || o == NEXT)) { window = null; @@ -299,17 +276,11 @@ void drainLoop() { } } } - - @Override - public boolean accept(Subscriber> a, Object v) { - // not used in this operator - return true; - } } static final class WindowExactBoundedSubscriber extends QueueDrainSubscriber> - implements Subscription, Disposable { + implements Subscription { final long timespan; final TimeUnit unit; final Scheduler scheduler; @@ -317,8 +288,6 @@ static final class WindowExactBoundedSubscriber final boolean restartTimerOnMaxSize; final long maxSize; - boolean selfCancel; - long count; long producerIndex; @@ -489,25 +458,12 @@ public void cancel() { cancelled = true; } - @Override public void dispose() { - selfCancel = true; DisposableHelper.dispose(timer); } - @Override - public boolean isDisposed() { - return timer.get() == DisposableHelper.DISPOSED; - } - - @Override - public boolean accept(Subscriber> a, Object v) { - // not needed in this operator - return false; - } - void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; UnicastProcessor w = window; @@ -524,17 +480,7 @@ void drainLoop() { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - dispose(); - a.onError(ex); - return; - } + Object o = q.poll(); boolean empty = o == null; boolean isHolder = o instanceof ConsumerIndexHolder; @@ -640,9 +586,6 @@ static final class ConsumerIndexHolder implements Runnable { @Override public void run() { WindowExactBoundedSubscriber p = parent; - if (p.selfCancel) { - throw new CancelledKeyException(); - } if (!p.cancelled) { p.queue.offer(this); @@ -659,7 +602,7 @@ public void run() { static final class WindowSkipSubscriber extends QueueDrainSubscriber> - implements Subscription, Disposable, Runnable { + implements Subscription, Runnable { final long timespan; final long timeskip; final TimeUnit unit; @@ -775,22 +718,10 @@ public void cancel() { cancelled = true; } - @Override public void dispose() { worker.dispose(); } - @Override - public boolean isDisposed() { - return worker.isDisposed(); - } - - @Override - public boolean accept(Subscriber> a, Object v) { - // not used by this operator - return false; - } - void complete(UnicastProcessor w) { queue.offer(new SubjectWork(w, false)); if (enter()) { @@ -800,7 +731,7 @@ void complete(UnicastProcessor w) { @SuppressWarnings("unchecked") void drainLoop() { - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber> a = actual; final List> ws = windows; @@ -819,17 +750,7 @@ void drainLoop() { boolean d = done; - Object v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - dispose(); - a.onError(ex); - return; - } + Object v = q.poll(); boolean empty = v == null; boolean sw = v instanceof SubjectWork; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java index 96da0f3bf8..e410733888 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFrom.java @@ -127,21 +127,7 @@ public void cancel() { } public boolean setOther(Subscription o) { - for (;;) { - Subscription current = other.get(); - if (current == SubscriptionHelper.CANCELLED) { - o.cancel(); - return false; - } - if (current != null) { - RxJavaPlugins.onError(new IllegalStateException("Other subscription already set!")); - o.cancel(); - return false; - } - if (other.compareAndSet(null, o)) { - return true; - } - } + return SubscriptionHelper.setOnce(other, o); } public void otherError(Throwable e) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java index adcc818a59..4c7684427e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java @@ -390,9 +390,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (sourceMode != QueueSubscription.ASYNC) { - parent.error(this, t); - } + parent.error(this, t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index 490b54cccf..b2c2e54c78 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -22,7 +22,6 @@ import io.reactivex.Observer; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.observers.QueueDrainObserver; import io.reactivex.internal.queue.MpscLinkedQueue; @@ -202,16 +201,7 @@ void drainLoop() { boolean d = done; - Object o; - - try { - o = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - disposeTimer(); - w.onError(ex); - return; - } + Object o = q.poll(); if (d && (o == null || o == NEXT)) { window = null; diff --git a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java index d7dbc6bd04..379be7a228 100644 --- a/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/QueueDrainSubscriber.java @@ -127,6 +127,11 @@ protected final void fastPathOrderedEmitMax(U value, boolean delayError, Disposa QueueDrainHelper.drainMaxLoop(q, s, delayError, dispose, this); } + @Override + public boolean accept(Subscriber a, U v) { + return false; + } + @Override public final Throwable error() { return error; diff --git a/src/main/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriber.java new file mode 100644 index 0000000000..3f1d13153e --- /dev/null +++ b/src/main/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriber.java @@ -0,0 +1,125 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.*; + +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.BackpressureHelper; + +/** + * Relays signals from upstream according to downstream requests and allows + * signalling a final value followed by onComplete in a backpressure-aware manner. + * + * @param the input value type + * @param the output value type + */ +public abstract class SinglePostCompleteSubscriber extends AtomicLong implements Subscriber, Subscription { + private static final long serialVersionUID = 7917814472626990048L; + + /** The downstream consumer. */ + protected final Subscriber actual; + + /** The upstream subscription. */ + protected Subscription s; + + /** The last value stored in case there is no request for it. */ + protected R value; + + /** Number of values emitted so far. */ + protected long produced; + + /** Masks out the 2^63 bit indicating a completed state. */ + static final long COMPLETE_MASK = Long.MIN_VALUE; + /** Masks out the lower 63 bit holding the current request amount. */ + static final long REQUEST_MASK = Long.MAX_VALUE; + + public SinglePostCompleteSubscriber(Subscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; + actual.onSubscribe(this); + } + } + + /** + * Signals the given value and an onComplete if the downstream is ready to receive the final value. + * @param n the value to emit + */ + protected final void complete(R n) { + long p = produced; + if (p != 0) { + BackpressureHelper.produced(this, p); + } + + for (;;) { + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + onDrop(n); + return; + } + if ((r & REQUEST_MASK) != 0) { + lazySet(COMPLETE_MASK + 1); + actual.onNext(n); + actual.onComplete(); + return; + } + value = n; + if (compareAndSet(0, COMPLETE_MASK)) { + return; + } + value = null; + } + } + + /** + * Called in case of multiple calls to complete. + * @param n the value dropped + */ + protected void onDrop(R n) { + // default is no-op + } + + @Override + public final void request(long n) { + if (SubscriptionHelper.validate(n)) { + for (;;) { + long r = get(); + if ((r & COMPLETE_MASK) != 0) { + if (compareAndSet(COMPLETE_MASK, COMPLETE_MASK + 1)) { + actual.onNext(value); + actual.onComplete(); + } + break; + } + long u = BackpressureHelper.addCap(r, n); + if (compareAndSet(r, u)) { + s.request(n); + break; + } + } + } + } + + @Override + public void cancel() { + s.cancel(); + } +} diff --git a/src/main/java/io/reactivex/internal/util/BackpressureHelper.java b/src/main/java/io/reactivex/internal/util/BackpressureHelper.java index ba035a3093..ba01cb4579 100644 --- a/src/main/java/io/reactivex/internal/util/BackpressureHelper.java +++ b/src/main/java/io/reactivex/internal/util/BackpressureHelper.java @@ -100,7 +100,7 @@ public static long addCancel(AtomicLong requested, long n) { } /** - * Atomically subtract the given number (positive, not validated) from the target field. + * Atomically subtract the given number (positive, not validated) from the target field unless it contains Long.MAX_VALUE. * @param requested the target field holding the current requested amount * @param n the produced element count, positive (not validated) * @return the new amount @@ -124,7 +124,7 @@ public static long produced(AtomicLong requested, long n) { /** * Atomically subtract the given number (positive, not validated) from the target field if - * it doesn't contain Long.MIN_VALUE (indicating some cancelled state). + * it doesn't contain Long.MIN_VALUE (indicating some cancelled state) or Long.MAX_VALUE (unbounded mode). * @param requested the target field holding the current requested amount * @param n the produced element count, positive (not validated) * @return the new amount diff --git a/src/test/java/io/reactivex/flowable/FlowableCollectTest.java b/src/test/java/io/reactivex/flowable/FlowableCollectTest.java index 3c6d2beaa4..1442699a8c 100644 --- a/src/test/java/io/reactivex/flowable/FlowableCollectTest.java +++ b/src/test/java/io/reactivex/flowable/FlowableCollectTest.java @@ -23,7 +23,8 @@ import org.junit.Test; import io.reactivex.*; -import io.reactivex.functions.BiConsumer; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.plugins.RxJavaPlugins; public final class FlowableCollectTest { @@ -329,4 +330,50 @@ public void accept(HashSet s, Integer v) throws Exception { .assertResult(new HashSet(Arrays.asList(1, 2))); } + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.just(1, 2) + .collect(Functions.justCallable(new ArrayList()), new BiConsumer, Integer>() { + @Override + public void accept(ArrayList a, Integer b) throws Exception { + a.add(b); + } + })); + + TestHelper.checkDisposed(Flowable.just(1, 2) + .collect(Functions.justCallable(new ArrayList()), new BiConsumer, Integer>() { + @Override + public void accept(ArrayList a, Integer b) throws Exception { + a.add(b); + } + }).toFlowable()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>>() { + @Override + public Flowable> apply(Flowable f) throws Exception { + return f.collect(Functions.justCallable(new ArrayList()), + new BiConsumer, Integer>() { + @Override + public void accept(ArrayList a, Integer b) throws Exception { + a.add(b); + } + }).toFlowable(); + } + }); + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, Single>>() { + @Override + public Single> apply(Flowable f) throws Exception { + return f.collect(Functions.justCallable(new ArrayList()), + new BiConsumer, Integer>() { + @Override + public void accept(ArrayList a, Integer b) throws Exception { + a.add(b); + } + }); + } + }); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java index 89e73c450c..7b6f16ac4d 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java @@ -21,7 +21,8 @@ import org.reactivestreams.*; import io.reactivex.Flowable; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; +import io.reactivex.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator; import io.reactivex.internal.subscriptions.BooleanSubscription; public class BlockingFlowableToIteratorTest { @@ -168,4 +169,20 @@ public void emptyThrowsNoSuch() { it.onComplete(); it.next(); } + + @Test(expected = MissingBackpressureException.class) + public void overflowQueue() { + Iterator it = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + } + } + .blockingIterable(1) + .iterator(); + + it.next(); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java index 1a954fcc3c..f4032007db 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAmbTest.java @@ -29,7 +29,8 @@ import io.reactivex.*; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; +import io.reactivex.internal.util.CrashingMappedIterable; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.*; @@ -533,10 +534,10 @@ public void ambArraySingleElement() { assertSame(Flowable.never(), Flowable.ambArray(Flowable.never())); } + @SuppressWarnings("unchecked") @Test - @Ignore("RS Subscription no isCancelled") public void disposed() { - //TestHelper.checkDisposed(Flowable.ambArray(Flowable.never(), Flowable.never())); + TestHelper.checkDisposed(Flowable.ambArray(Flowable.never(), Flowable.never())); } @Test @@ -659,4 +660,41 @@ public void run() { } } + @SuppressWarnings("unchecked") + @Test + public void nullIterableElement() { + Flowable.amb(Arrays.asList(Flowable.never(), null, Flowable.never())) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void iteratorThrows() { + Flowable.amb(new CrashingMappedIterable>(1, 100, 100, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.never(); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "iterator()"); + + Flowable.amb(new CrashingMappedIterable>(100, 1, 100, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.never(); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "hasNext()"); + + Flowable.amb(new CrashingMappedIterable>(100, 100, 1, new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return Flowable.never(); + } + })) + .test() + .assertFailureAndMessage(TestException.class, "next()"); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAnyTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAnyTest.java index c5beccefcb..af6b80a9e7 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableAnyTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableAnyTest.java @@ -556,9 +556,10 @@ public boolean test(String v) { } @Test - @Ignore("RS Subscription no isCancelled") public void dispose() { - // TestHelper.checkDisposed(Flowable.just(1).any(Functions.alwaysTrue()).toFlowable()); + TestHelper.checkDisposed(Flowable.just(1).any(Functions.alwaysTrue()).toFlowable()); + + TestHelper.checkDisposed(Flowable.just(1).any(Functions.alwaysTrue())); } @Test @@ -569,6 +570,13 @@ public Publisher apply(Flowable o) throws Exception { return o.any(Functions.alwaysTrue()).toFlowable(); } }); + + TestHelper.checkDoubleOnSubscribeFlowableToSingle(new Function, Single>() { + @Override + public Single apply(Flowable o) throws Exception { + return o.any(Functions.alwaysTrue()); + } + }); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index b7f36adf70..a45f8bc2f0 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -1740,6 +1740,24 @@ public List call() throws Exception { @SuppressWarnings("unchecked") @Test public void bufferSkipSupplierCrash2() { + Flowable.range(1, 2) + .buffer(1, 2, new Callable>() { + int calls; + @Override + public List call() throws Exception { + if (++calls == 1) { + throw new TestException(); + } + return new ArrayList(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void bufferOverlapSupplierCrash2() { Flowable.range(1, 2) .buffer(2, 1, new Callable>() { int calls; @@ -1870,4 +1888,89 @@ public void bufferTimedExactBoundedError() { to .assertFailure(TestException.class); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.buffer(1); + } + }, false, 1, 1, Arrays.asList(1)); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.buffer(1, 2); + } + }, false, 1, 1, Arrays.asList(1)); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.buffer(2, 1); + } + }, false, 1, 1, Arrays.asList(1)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) throws Exception { + return f.buffer(1); + } + }); + + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) throws Exception { + return f.buffer(1, 2); + } + }); + + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) throws Exception { + return f.buffer(2, 1); + } + }); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(PublishProcessor.create().buffer(1)); + + TestHelper.assertBadRequestReported(PublishProcessor.create().buffer(1, 2)); + + TestHelper.assertBadRequestReported(PublishProcessor.create().buffer(2, 1)); + } + + @SuppressWarnings("unchecked") + @Test + public void skipError() { + Flowable.error(new TestException()) + .buffer(1, 2) + .test() + .assertFailure(TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void skipSingleResult() { + Flowable.just(1) + .buffer(2, 3) + .test() + .assertResult(Arrays.asList(1)); + } + + @SuppressWarnings("unchecked") + @Test + public void skipBackpressure() { + Flowable.range(1, 10) + .buffer(2, 3) + .rebatchRequests(1) + .test() + .assertResult(Arrays.asList(1, 2), Arrays.asList(4, 5), Arrays.asList(7, 8), Arrays.asList(10)); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java index 52fd6e336f..bdb49cd71b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java @@ -27,6 +27,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; @@ -367,4 +368,121 @@ public Object apply(Mutable m) throws Exception { ts.assertResult(m, m); } + + @Test + public void conditionalNormal() { + Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5) + .distinctUntilChanged() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .test() + .assertResult(2, 4); + } + + @Test + public void conditionalNormal2() { + Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5).hide() + .distinctUntilChanged() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .test() + .assertResult(2, 4); + } + + @Test + public void conditionalNormal3() { + UnicastProcessor up = UnicastProcessor.create(); + + TestSubscriber ts = up.hide() + .distinctUntilChanged() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .test(); + + TestHelper.emit(up, 1, 2, 1, 3, 3, 4, 3, 5, 5); + + ts + .assertResult(2, 4); + } + + @Test + public void conditionalSelectorCrash() { + Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5) + .distinctUntilChanged(new BiPredicate() { + @Override + public boolean test(Integer a, Integer b) throws Exception { + throw new TestException(); + } + }) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void conditionalFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.just(1, 2, 1, 3, 3, 4, 3, 5, 5) + .distinctUntilChanged() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.SYNC) + .assertResult(2, 4); + } + + @Test + public void conditionalAsyncFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + UnicastProcessor up = UnicastProcessor.create(); + + up + .distinctUntilChanged() + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + + TestHelper.emit(up, 1, 2, 1, 3, 3, 4, 3, 5, 5); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertResult(2, 4); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.distinctUntilChanged().filter(Functions.alwaysTrue()); + } + }, false, 1, 1, 1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java index ee6fe599b8..d26e767d00 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java @@ -13,16 +13,24 @@ package io.reactivex.internal.operators.flowable; +import static org.junit.Assert.*; + import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; +import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.processors.PublishProcessor; -import io.reactivex.subscribers.TestSubscriber; +import io.reactivex.subscribers.*; public class FlowableFlattenIterableTest { @@ -583,4 +591,276 @@ public Iterable apply(Object v) throws Exception { } }, false, 1, 1, 10, 20); } + + @Test + public void callableThrows() { + Flowable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + throw new TestException(); + } + }) + .flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3))) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusionMethods() { + Flowable.just(1, 2) + .flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3))) + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + assertEquals(QueueSubscription.SYNC, qs.requestFusion(QueueSubscription.ANY)); + + try { + assertFalse("Source reports being empty!", qs.isEmpty()); + + assertEquals(1, qs.poll().intValue()); + + assertFalse("Source reports being empty!", qs.isEmpty()); + + assertEquals(2, qs.poll().intValue()); + + assertFalse("Source reports being empty!", qs.isEmpty()); + + qs.clear(); + + assertTrue("Source reports not empty!", qs.isEmpty()); + + assertNull(qs.poll()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void smallPrefetch() { + Flowable.just(1, 2, 3) + .flatMapIterable(Functions.justFunction(Arrays.asList(1, 2, 3)), 1) + .test() + .assertResult(1, 2, 3, 1, 2, 3, 1, 2, 3); + } + + @Test + public void smallPrefetch2() { + Flowable.just(1, 2, 3).hide() + .flatMapIterable(Functions.justFunction(Collections.emptyList()), 1) + .test() + .assertResult(); + } + + @Test + public void mixedInnerSource() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.just(1, 2, 3) + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + if ((v & 1) == 0) { + return Collections.emptyList(); + } + return Arrays.asList(1, 2); + } + }) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.SYNC) + .assertResult(1, 2, 1, 2); + } + + @Test + public void mixedInnerSource2() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.just(1, 2, 3) + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + if ((v & 1) == 1) { + return Collections.emptyList(); + } + return Arrays.asList(1, 2); + } + }) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.SYNC) + .assertResult(1, 2); + } + + @Test + public void fusionRejected() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.just(1, 2, 3).hide() + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return Arrays.asList(1, 2); + } + }) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertResult(1, 2, 1, 2, 1, 2); + } + + @Test + public void fusedIsEmptyWithEmptySource() { + Flowable.just(1, 2, 3) + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + if ((v & 1) == 0) { + return Collections.emptyList(); + } + return Arrays.asList(v); + } + }) + .subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + + assertEquals(QueueSubscription.SYNC, qs.requestFusion(QueueSubscription.ANY)); + + try { + assertFalse("Source reports being empty!", qs.isEmpty()); + + assertEquals(1, qs.poll().intValue()); + + assertFalse("Source reports being empty!", qs.isEmpty()); + + assertEquals(3, qs.poll().intValue()); + + assertTrue("Source reports being non-empty!", qs.isEmpty()); + } catch (Throwable ex) { + throw ExceptionHelper.wrapOrThrow(ex); + } + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + }); + } + + @Test + public void fusedSourceCrash() { + Flowable.range(1, 3) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .flatMapIterable(Functions.justFunction(Collections.emptyList()), 1) + .test() + .assertFailure(TestException.class); + } + + @Test + public void take() { + Flowable.range(1, 3) + .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1) + .take(1) + .test() + .assertResult(1); + } + + @Test + public void overflowSource() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onNext(3); + } + } + .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1) + .test(0L) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void oneByOne() { + Flowable.range(1, 3).hide() + .flatMapIterable(Functions.justFunction(Arrays.asList(1)), 1) + .rebatchRequests(1) + .test() + .assertResult(1, 1, 1); + } + + @Test + public void cancelAfterHasNext() { + final TestSubscriber ts = new TestSubscriber(); + + Flowable.range(1, 3).hide() + .flatMapIterable(new Function>() { + @Override + public Iterable apply(Integer v) throws Exception { + return new Iterable() { + int count; + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public boolean hasNext() { + if (++count == 2) { + ts.cancel(); + ts.onComplete(); + } + return true; + } + + @Override + public Integer next() { + return 1; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + }) + .subscribe(ts); + + ts.assertResult(1); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableInternalHelperTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableInternalHelperTest.java new file mode 100644 index 0000000000..87955085eb --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableInternalHelperTest.java @@ -0,0 +1,30 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package io.reactivex.internal.operators.flowable; + +import org.junit.Test; + +import io.reactivex.TestHelper; + +public class FlowableInternalHelperTest { + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(FlowableInternalHelper.class); + } + + @Test + public void requestMaxEnum() { + TestHelper.checkEnum(FlowableInternalHelper.RequestMax.class); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatestTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatestTest.java index ef147fcbee..661c741c5c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureLatestTest.java @@ -17,9 +17,11 @@ import java.util.concurrent.TimeUnit; import org.junit.*; +import org.reactivestreams.Publisher; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -128,4 +130,33 @@ public void onNext(Integer t) { System.out.println("testAsynchronousDrop -> " + n); Assert.assertTrue("All events received?", n < m); } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.onBackpressureLatest(); + } + }); + } + + @Test + public void take() { + Flowable.just(1, 2) + .onBackpressureLatest() + .take(1) + .test() + .assertResult(1); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(Flowable.never().onBackpressureLatest()); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.never().onBackpressureLatest()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeTest.java index 75428c9e6e..66cdaad132 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRangeTest.java @@ -24,7 +24,8 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.functions.Consumer; +import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.subscribers.*; @@ -290,4 +291,299 @@ public void requestWrongFusion() { SubscriberFusion.assertFusion(to, QueueDisposable.NONE) .assertResult(1, 2, 3, 4, 5); } + + @Test + public void countOne() { + Flowable.range(5495454, 1) + .test() + .assertResult(5495454); + } + + @Test + public void fused() { + TestSubscriber to = SubscriberFusion.newTest(QueueDisposable.ANY); + + Flowable.range(1, 2).subscribe(to); + + SubscriberFusion.assertFusion(to, QueueDisposable.SYNC) + .assertResult(1, 2); + } + + @Test + public void fusedReject() { + TestSubscriber to = SubscriberFusion.newTest(QueueDisposable.ASYNC); + + Flowable.range(1, 2).subscribe(to); + + SubscriberFusion.assertFusion(to, QueueDisposable.NONE) + .assertResult(1, 2); + } + + @Test + public void disposed() { + TestHelper.checkDisposed(Flowable.range(1, 2)); + } + + @Test + public void fusedClearIsEmpty() { + TestHelper.checkFusedIsEmptyClear(Flowable.range(1, 2)); + } + + @Test + public void noOverflow() { + Flowable.range(Integer.MAX_VALUE - 1, 2); + Flowable.range(Integer.MIN_VALUE, 2); + Flowable.range(Integer.MIN_VALUE, Integer.MAX_VALUE); + } + + @Test + public void conditionalNormal() { + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.range(1, 5)); + + TestHelper.assertBadRequestReported(Flowable.range(1, 5).filter(Functions.alwaysTrue())); + } + + @Test + public void conditionalNormalSlowpath() { + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .test(5) + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void conditionalSlowPathTakeExact() { + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void slowPathTakeExact() { + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .take(5) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void conditionalSlowPathRebatch() { + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void slowPathRebatch() { + Flowable.range(1, 5) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void slowPathCancel() { + TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void fastPathCancel() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void conditionalSlowPathCancel() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void conditionalFastPathCancel() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + onComplete(); + } + }; + + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void conditionalRequestOneByOne() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + request(1); + } + }; + + Flowable.range(1, 5) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + ts.assertResult(2, 4); + } + + @Test + public void conditionalRequestOneByOne2() { + TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + request(1); + } + }; + + Flowable.range(1, 5) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1, 2, 3, 4, 5); + } + + @Test + public void fastPathCancelExact() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 5L) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 5) + .subscribe(ts); + + ts.assertResult(1, 2, 3, 4, 5); + } + + @Test + public void conditionalFastPathCancelExact() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 5L) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 5) + .filter(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + return v % 2 == 0; + } + }) + .subscribe(ts); + + ts.assertResult(2, 4); + } + + @Test + public void conditionalCancel1() { + TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 2) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void conditionalCancel2() { + TestSubscriber ts = new TestSubscriber(2L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 2) { + cancel(); + onComplete(); + } + } + }; + + Flowable.range(1, 2) + .filter(Functions.alwaysTrue()) + .subscribe(ts); + + ts.assertResult(1, 2); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java index 53cb136584..6b17041a18 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSampleTest.java @@ -23,9 +23,9 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.processors.PublishProcessor; +import io.reactivex.processors.*; import io.reactivex.schedulers.TestScheduler; public class FlowableSampleTest { @@ -292,4 +292,22 @@ public void error() { .test() .assertFailure(TestException.class); } + + @Test + public void backpressureOverflow() { + BehaviorProcessor.createDefault(1) + .sample(1, TimeUnit.MILLISECONDS) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressureOverflowWithOtherPublisher() { + BehaviorProcessor.createDefault(1) + .sample(Flowable.timer(1, TimeUnit.MILLISECONDS)) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualTest.java index 5b959fdd91..1dd2e8f815 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSequenceEqualTest.java @@ -16,15 +16,21 @@ import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.*; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.exceptions.TestException; -import io.reactivex.functions.BiPredicate; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.observers.TestObserver; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; public class FlowableSequenceEqualTest { @@ -370,4 +376,211 @@ public void prefetchFlowable() { .test() .assertResult(true); } + + @Test + public void longSequenceEqualsFlowable() { + Flowable source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation()); + + Flowable.sequenceEqual(source, source) + .toFlowable() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(true); + } + + @Test + public void syncFusedCrashFlowable() { + Flowable source = Flowable.range(1, 10).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { throw new TestException(); } + }); + + Flowable.sequenceEqual(source, Flowable.range(1, 10).hide()) + .toFlowable() + .test() + .assertFailure(TestException.class); + + Flowable.sequenceEqual(Flowable.range(1, 10).hide(), source) + .toFlowable() + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAndDrainRaceFlowable() { + Flowable neverNever = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + } + }; + + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = new TestSubscriber(); + + final PublishProcessor pp = PublishProcessor.create(); + + boolean swap = (i & 1) == 0; + + Flowable.sequenceEqual(swap ? pp : neverNever, swap ? neverNever : pp) + .toFlowable() + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertEmpty(); + } + } + + @Test + public void sourceOverflowsFlowable() { + Flowable.sequenceEqual(Flowable.never(), new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + }, 8) + .toFlowable() + .test() + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void doubleErrorFlowable() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.sequenceEqual(Flowable.never(), new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + s.onError(new TestException("Second")); + } + }, 8) + .toFlowable() + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + + @Test + public void longSequenceEquals() { + Flowable source = Flowable.range(1, Flowable.bufferSize() * 4).subscribeOn(Schedulers.computation()); + + Flowable.sequenceEqual(source, source) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(true); + } + + @Test + public void syncFusedCrash() { + Flowable source = Flowable.range(1, 10).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { throw new TestException(); } + }); + + Flowable.sequenceEqual(source, Flowable.range(1, 10).hide()) + .test() + .assertFailure(TestException.class); + + Flowable.sequenceEqual(Flowable.range(1, 10).hide(), source) + .test() + .assertFailure(TestException.class); + } + + @Test + public void cancelAndDrainRace() { + Flowable neverNever = new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + } + }; + + for (int i = 0; i < 500; i++) { + final TestObserver ts = new TestObserver(); + + final PublishProcessor pp = PublishProcessor.create(); + + boolean swap = (i & 1) == 0; + + Flowable.sequenceEqual(swap ? pp : neverNever, swap ? neverNever : pp) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + + ts.assertEmpty(); + } + } + + @Test + public void sourceOverflows() { + Flowable.sequenceEqual(Flowable.never(), new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + }, 8) + .test() + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void doubleError() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.sequenceEqual(Flowable.never(), new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + s.onError(new TestException("Second")); + } + }, 8) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java index 8f4a7c29b1..df586a488f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSkipLastTimedTest.java @@ -237,4 +237,16 @@ public void take() { .assertResult(1); } + @Test + public void observeOn() { + Flowable.range(1, 1000) + .skipLast(0, TimeUnit.SECONDS) + .observeOn(Schedulers.single(), false, 16) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueCount(1000) + .assertComplete() + .assertNoErrors(); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java index a6f1f049c3..8eb3e1f368 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java @@ -574,7 +574,7 @@ public void testInitialRequestsDontOverflow() { .map(new Function>() { @Override public Flowable apply(Long t) { - return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)); + return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)).hide(); } }).take(3)).subscribe(ts); ts.request(Long.MAX_VALUE - 1); @@ -592,7 +592,7 @@ public void testSecondaryRequestsDontOverflow() throws InterruptedException { .map(new Function>() { @Override public Flowable apply(Long t) { - return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)); + return Flowable.fromIterable(Arrays.asList(1L, 2L, 3L)).hide(); } }).take(3)).subscribe(ts); ts.request(1); @@ -674,7 +674,7 @@ public void delayErrors() { public void switchOnNextPrefetch() { final List list = new ArrayList(); - Flowable source = Flowable.range(1, 10).doOnNext(new Consumer() { + Flowable source = Flowable.range(1, 10).hide().doOnNext(new Consumer() { @Override public void accept(Integer v) throws Exception { list.add(v); @@ -691,7 +691,7 @@ public void accept(Integer v) throws Exception { public void switchOnNextDelayError() { final List list = new ArrayList(); - Flowable source = Flowable.range(1, 10).doOnNext(new Consumer() { + Flowable source = Flowable.range(1, 10).hide().doOnNext(new Consumer() { @Override public void accept(Integer v) throws Exception { list.add(v); @@ -708,7 +708,7 @@ public void accept(Integer v) throws Exception { public void switchOnNextDelayErrorPrefetch() { final List list = new ArrayList(); - Flowable source = Flowable.range(1, 10).doOnNext(new Consumer() { + Flowable source = Flowable.range(1, 10).hide().doOnNext(new Consumer() { @Override public void accept(Integer v) throws Exception { list.add(v); @@ -1077,4 +1077,80 @@ public void scalarMapDelayError() { .test() .assertResult(1); } + + @Test + public void scalarXMap() { + Flowable.fromCallable(Functions.justCallable(1)) + .switchMap(Functions.justFunction(Flowable.just(1))) + .test() + .assertResult(1); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.switchMap(Functions.justFunction(Flowable.just(1))); + } + }, false, 1, 1, 1); + } + + @Test + public void innerOverflow() { + Flowable.just(1).hide() + .switchMap(Functions.justFunction(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 10; i++) { + s.onNext(i); + } + } + }), 8) + .test(1L) + .assertFailure(MissingBackpressureException.class, 0); + } + + @Test + public void drainCancelRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = new TestSubscriber(); + + final PublishProcessor pp = PublishProcessor.create(); + + Flowable.just(1).hide() + .switchMap(Functions.justFunction(pp)) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + pp.onNext(1); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void fusedInnerCrash() { + Flowable.just(1).hide() + .switchMap(Functions.justFunction(Flowable.just(1).map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + throw new TestException(); + } + }))) + .test() + .assertFailure(TestException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimedTest.java index 606f90b8af..6049f068fe 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimedTest.java @@ -19,10 +19,11 @@ import org.junit.Test; import org.mockito.InOrder; -import org.reactivestreams.Subscriber; +import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.*; import io.reactivex.subscribers.TestSubscriber; @@ -313,4 +314,26 @@ public void run() { } } + @Test + public void emptyDelayError() { + Flowable.empty() + .takeLast(1, TimeUnit.DAYS, true) + .test() + .assertResult(); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.takeLast(1, TimeUnit.SECONDS); + } + }); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(PublishProcessor.create().takeLast(1, TimeUnit.SECONDS)); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTest.java index 5187d99ee1..cf1033fb05 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableThrottleFirstTest.java @@ -24,7 +24,7 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -193,4 +193,11 @@ protected void subscribeActual(Subscriber observer) { } } + @Test + public void backpressureNoRequest() { + Flowable.range(1, 3) + .throttleFirst(1, TimeUnit.MINUTES) + .test(0L) + .assertFailure(MissingBackpressureException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java index 4fc4cecb32..a28744c19c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTimerTest.java @@ -13,6 +13,7 @@ package io.reactivex.internal.operators.flowable; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; @@ -22,7 +23,7 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subscribers.*; @@ -287,4 +288,40 @@ public void onComplete() { public void disposed() { TestHelper.checkDisposed(Flowable.timer(1, TimeUnit.DAYS)); } + + @Test + public void backpressureNotReady() { + Flowable.timer(1, TimeUnit.MILLISECONDS) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void timerCancelRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = new TestSubscriber(); + + final TestScheduler scheduler = new TestScheduler(); + + Flowable.timer(1, TimeUnit.SECONDS, scheduler) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToFutureTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToFutureTest.java index a822044af8..713315856b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToFutureTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToFutureTest.java @@ -45,7 +45,7 @@ public void testSuccess() throws Exception { verify(o, times(1)).onNext(value); verify(o, times(1)).onComplete(); verify(o, never()).onError(any(Throwable.class)); - verify(future, times(1)).cancel(true); + verify(future, never()).cancel(anyBoolean()); } @Test @@ -87,7 +87,7 @@ public void testFailure() throws Exception { verify(o, never()).onNext(null); verify(o, never()).onComplete(); verify(o, times(1)).onError(e); - verify(future, times(1)).cancel(true); + verify(future, never()).cancel(anyBoolean()); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java index 9226624db3..9b06cfb156 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromTest.java @@ -655,4 +655,30 @@ public Object apply(Integer a, Integer b, Integer c) throws Exception { } } + @Test + public void otherErrors() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .withLatestFrom(new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onError(new TestException("First")); + s.onError(new TestException("Second")); + } + }, new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertFailureAndMessage(TestException.class, "First"); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java index 7692811aae..a0edbac51c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java @@ -31,6 +31,7 @@ import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.*; @@ -1615,4 +1616,155 @@ public Object apply(Integer a, Integer b) throws Exception { })); } + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.zip(Flowable.just(1), Flowable.just(1), new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return a + b; + } + })); + } + + @Test + public void multiError() { + List errors = TestHelper.trackPluginErrors(); + try { + PublishProcessor pp = PublishProcessor.create(); + + @SuppressWarnings("rawtypes") + final Subscriber[] sub = { null }; + TestSubscriber ts = Flowable.zip(pp, new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + sub[0] = s; + } + }, new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .test(); + + pp.onError(new TestException("First")); + + ts + .assertFailureAndMessage(TestException.class, "First"); + + sub[0].onError(new TestException("Second")); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void singleErrorDelayed() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Flowable.zip(pp1, pp2, new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }, true) + .test(); + + pp1.onError(new TestException("First")); + pp2.onComplete(); + + ts + .assertFailureAndMessage(TestException.class, "First"); + } + + @Test + public void singleErrorDelayedBackpressured() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = Flowable.zip(pp1, pp2, new BiFunction() { + @Override + public Object apply(Object a, Object b) throws Exception { + return a; + } + }) + .test(0L); + + pp1.onError(new TestException("First")); + pp2.onComplete(); + + ts + .assertFailureAndMessage(TestException.class, "First"); + } + + @Test + public void fusedInputThrows() { + Flowable.zip(Flowable.just(1).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }), Flowable.just(2), new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedInputThrowsDelayError() { + Flowable.zip(Flowable.just(1).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }), Flowable.just(2), new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }, true) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedInputThrowsBackpressured() { + Flowable.zip(Flowable.just(1).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }), Flowable.just(2), new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test(0L) + .assertFailure(TestException.class); + } + + @Test + public void fusedInputThrowsDelayErrorBackpressured() { + Flowable.zip(Flowable.just(1).map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }), Flowable.just(2), new BiFunction() { + @Override + public Integer apply(Integer a, Integer b) throws Exception { + return a + b; + } + }, true) + .test(0L) + .assertFailure(TestException.class); + } } \ No newline at end of file