diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 363d334921..574eb37a50 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6943,79 +6943,6 @@ public final > Flowable buffer(Publisher(this, boundaryIndicator, bufferSupplier)); } - /** - * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting - * Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item. - *

- * - *

- * If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on - * immediately without first emitting the buffer it is in the process of assembling. - *

- *
Backpressure:
- *
This operator does not support backpressure as it is instead controlled by the given Publishers and - * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
- *
Scheduler:
- *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the value type of the boundary-providing Publisher - * @param boundaryIndicatorSupplier - * a {@link Supplier} that produces a Publisher that governs the boundary between buffers. - * Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and - * begins to fill a new one - * @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher - * each time the Publisher created with the {@code closingIndicator} argument emits an item - * @see ReactiveX operators documentation: Buffer - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.ERROR) - @SchedulerSupport(SchedulerSupport.NONE) - public final Flowable> buffer(Supplier> boundaryIndicatorSupplier) { - return buffer(boundaryIndicatorSupplier, ArrayListSupplier.asSupplier()); - } - - /** - * Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting - * Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item. - *

- * - *

- * If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on - * immediately without first emitting the buffer it is in the process of assembling. - *

- *
Backpressure:
- *
This operator does not support backpressure as it is instead controlled by the given Publishers and - * buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.
- *
Scheduler:
- *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the collection subclass type to buffer into - * @param the value type of the boundary-providing Publisher - * @param boundaryIndicatorSupplier - * a {@link Callable} that produces a Publisher that governs the boundary between buffers. - * Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and - * begins to fill a new one - * @param bufferSupplier - * a factory function that returns an instance of the collection subclass to be used and returned - * as the buffer - * @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher - * each time the Publisher created with the {@code closingIndicator} argument emits an item - * @see ReactiveX operators documentation: Buffer - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.ERROR) - @SchedulerSupport(SchedulerSupport.NONE) - public final > Flowable buffer(Supplier> boundaryIndicatorSupplier, - Supplier bufferSupplier) { - ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null"); - ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); - return RxJavaPlugins.onAssembly(new FlowableBufferBoundarySupplier(this, boundaryIndicatorSupplier, bufferSupplier)); - } - /** * Returns a Flowable that subscribes to this Publisher lazily, caches all of its events * and replays them, in the same order as received, to all the downstream subscribers. @@ -12186,7 +12113,7 @@ public final Flowable onBackpressureLatest() { @SchedulerSupport(SchedulerSupport.NONE) public final Flowable onErrorResumeNext(Function> resumeFunction) { ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null"); - return RxJavaPlugins.onAssembly(new FlowableOnErrorNext(this, resumeFunction, false)); + return RxJavaPlugins.onAssembly(new FlowableOnErrorNext(this, resumeFunction)); } /** @@ -12313,53 +12240,6 @@ public final Flowable onErrorReturnItem(final T item) { return onErrorReturn(Functions.justFunction(item)); } - /** - * Instructs a Publisher to pass control to another Publisher rather than invoking - * {@link Subscriber#onError onError} if it encounters an {@link java.lang.Exception}. - *

- * This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} - * or {@link java.lang.Error} but lets those continue through. - *

- * - *

- * By default, when a Publisher encounters an exception that prevents it from emitting the expected item - * to its {@link Subscriber}, the Publisher invokes its Subscriber's {@code onError} method, and then quits - * without invoking any more of its Subscriber's methods. The {@code onExceptionResumeNext} method changes - * this behavior. If you pass another Publisher ({@code resumeSequence}) to a Publisher's - * {@code onExceptionResumeNext} method, if the original Publisher encounters an exception, instead of - * invoking its Subscriber's {@code onError} method, it will instead relinquish control to - * {@code resumeSequence} which will invoke the Subscriber's {@link Subscriber#onNext onNext} method if it is - * able to do so. In such a case, because no Publisher necessarily invokes {@code onError}, the Subscriber - * may never know that an exception happened. - *

- * You can use this to prevent exceptions from propagating or to supply fallback data should exceptions be - * encountered. - *

- *
Backpressure:
- *
The operator honors backpressure from downstream. This and the resuming {@code Publisher}s - * are expected to honor backpressure as well. - * If any of them violate this expectation, the operator may throw an - * {@code IllegalStateException} when the source {@code Publisher} completes or - * {@code MissingBackpressureException} is signaled somewhere downstream.
- *
Scheduler:
- *
{@code onExceptionResumeNext} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param next - * the next Publisher that will take over if the source Publisher encounters - * an exception - * @return the original Publisher, with appropriately modified behavior - * @see ReactiveX operators documentation: Catch - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.FULL) - @SchedulerSupport(SchedulerSupport.NONE) - public final Flowable onExceptionResumeNext(final Publisher next) { - ObjectHelper.requireNonNull(next, "next is null"); - return RxJavaPlugins.onAssembly(new FlowableOnErrorNext(this, Functions.justFunction(next), true)); - } - /** * Nulls out references to the upstream producer and downstream Subscriber if * the sequence is terminated or downstream cancels. @@ -18290,77 +18170,6 @@ public final Flowable> window( return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySelector(this, openingIndicator, closingIndicator, bufferSize)); } - /** - * Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting - * Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one - * whenever the Publisher produced by the specified {@code closingSelector} emits an item. - *

- * - *

- *
Backpressure:
- *
The operator consumes the source {@code Publisher} in an unbounded manner. - * The returned {@code Publisher} doesn't support backpressure as it uses - * the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor - * backpressure but have an unbounded inner buffer that may lead to {@code OutOfMemoryError} - * if left unconsumed.
- *
Scheduler:
- *
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the element type of the boundary Publisher - * @param boundaryIndicatorSupplier - * a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows. - * When the source {@code Publisher} emits an item, {@code window} emits the current window and begins - * a new one. - * @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher - * whenever {@code closingSelector} emits an item - * @see ReactiveX operators documentation: Window - */ - @CheckReturnValue - @BackpressureSupport(BackpressureKind.ERROR) - @SchedulerSupport(SchedulerSupport.NONE) - public final Flowable> window(Supplier> boundaryIndicatorSupplier) { - return window(boundaryIndicatorSupplier, bufferSize()); - } - - /** - * Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting - * Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one - * whenever the Publisher produced by the specified {@code closingSelector} emits an item. - *

- * - *

- *
Backpressure:
- *
The operator consumes the source {@code Publisher} in an unbounded manner. - * The returned {@code Publisher} doesn't support backpressure as it uses - * the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor - * backpressure but have an unbounded inner buffer that may lead to {@code OutOfMemoryError} - * if left unconsumed.
- *
Scheduler:
- *
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the element type of the boundary Publisher - * @param boundaryIndicatorSupplier - * a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows. - * When the source {@code Publisher} emits an item, {@code window} emits the current window and begins - * a new one. - * @param bufferSize - * the capacity hint for the buffer in the inner windows - * @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher - * whenever {@code closingSelector} emits an item - * @see ReactiveX operators documentation: Window - */ - @CheckReturnValue - @NonNull - @BackpressureSupport(BackpressureKind.ERROR) - @SchedulerSupport(SchedulerSupport.NONE) - public final Flowable> window(Supplier> boundaryIndicatorSupplier, int bufferSize) { - ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySupplier(this, boundaryIndicatorSupplier, bufferSize)); - } - /** * Merges the specified Publisher into this Publisher sequence by using the {@code resultSelector} * function only when the source Publisher (this instance) emits an item. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index f6de4c980f..1af2119087 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6147,70 +6147,6 @@ public final > Observable buffer(Observabl return RxJavaPlugins.onAssembly(new ObservableBufferExactBoundary(this, boundary, bufferSupplier)); } - /** - * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting - * ObservableSource emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the ObservableSource produced by the specified {@code boundarySupplier} emits an item. - *

- * - *

- * If either the source {@code ObservableSource} or the boundary {@code ObservableSource} issues an {@code onError} notification the event - * is passed on immediately without first emitting the buffer it is in the process of assembling. - *

- *
Scheduler:
- *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the value type of the boundary-providing ObservableSource - * @param boundarySupplier - * a {@link Supplier} that produces an ObservableSource that governs the boundary between buffers. - * Whenever the supplied {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and - * begins to fill a new one - * @return an Observable that emits a connected, non-overlapping buffer of items from the source ObservableSource - * each time the ObservableSource created with the {@code closingIndicator} argument emits an item - * @see ReactiveX operators documentation: Buffer - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable> buffer(Supplier> boundarySupplier) { - return buffer(boundarySupplier, ArrayListSupplier.asSupplier()); - } - - /** - * Returns an Observable that emits buffers of items it collects from the source ObservableSource. The resulting - * ObservableSource emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a - * new buffer whenever the ObservableSource produced by the specified {@code boundarySupplier} emits an item. - *

- * - *

- * If either the source {@code ObservableSource} or the boundary {@code ObservableSource} issues an {@code onError} notification the event - * is passed on immediately without first emitting the buffer it is in the process of assembling. - *

- *
Scheduler:
- *
This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the collection subclass type to buffer into - * @param the value type of the boundary-providing ObservableSource - * @param boundarySupplier - * a {@link Supplier} that produces an ObservableSource that governs the boundary between buffers. - * Whenever the supplied {@code ObservableSource} emits an item, {@code buffer} emits the current buffer and - * begins to fill a new one - * @param bufferSupplier - * a factory function that returns an instance of the collection subclass to be used and returned - * as the buffer - * @return an Observable that emits a connected, non-overlapping buffer of items from the source ObservableSource - * each time the ObservableSource created with the {@code closingIndicator} argument emits an item - * @see ReactiveX operators documentation: Buffer - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final > Observable buffer(Supplier> boundarySupplier, Supplier bufferSupplier) { - ObjectHelper.requireNonNull(boundarySupplier, "boundarySupplier is null"); - ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); - return RxJavaPlugins.onAssembly(new ObservableBufferBoundarySupplier(this, boundarySupplier, bufferSupplier)); - } - /** * Returns an Observable that subscribes to this ObservableSource lazily, caches all of its events * and replays them, in the same order as received, to all the downstream subscribers. @@ -10115,7 +10051,7 @@ public final Observable ofType(final Class clazz) { @SchedulerSupport(SchedulerSupport.NONE) public final Observable onErrorResumeNext(Function> resumeFunction) { ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null"); - return RxJavaPlugins.onAssembly(new ObservableOnErrorNext(this, resumeFunction, false)); + return RxJavaPlugins.onAssembly(new ObservableOnErrorNext(this, resumeFunction)); } /** @@ -10220,45 +10156,6 @@ public final Observable onErrorReturnItem(final T item) { return onErrorReturn(Functions.justFunction(item)); } - /** - * Instructs an ObservableSource to pass control to another ObservableSource rather than invoking - * {@link Observer#onError onError} if it encounters an {@link java.lang.Exception}. - *

- * This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable} - * or {@link java.lang.Error} but lets those continue through. - *

- * - *

- * By default, when an ObservableSource encounters an exception that prevents it from emitting the expected item - * to its {@link Observer}, the ObservableSource invokes its Observer's {@code onError} method, and then quits - * without invoking any more of its Observer's methods. The {@code onExceptionResumeNext} method changes - * this behavior. If you pass another ObservableSource ({@code resumeSequence}) to an ObservableSource's - * {@code onExceptionResumeNext} method, if the original ObservableSource encounters an exception, instead of - * invoking its Observer's {@code onError} method, it will instead relinquish control to - * {@code resumeSequence} which will invoke the Observer's {@link Observer#onNext onNext} method if it is - * able to do so. In such a case, because no ObservableSource necessarily invokes {@code onError}, the Observer - * may never know that an exception happened. - *

- * You can use this to prevent exceptions from propagating or to supply fallback data should exceptions be - * encountered. - *

- *
Scheduler:
- *
{@code onExceptionResumeNext} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param next - * the next ObservableSource that will take over if the source ObservableSource encounters - * an exception - * @return the original ObservableSource, with appropriately modified behavior - * @see ReactiveX operators documentation: Catch - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable onExceptionResumeNext(final ObservableSource next) { - ObjectHelper.requireNonNull(next, "next is null"); - return RxJavaPlugins.onAssembly(new ObservableOnErrorNext(this, Functions.justFunction(next), true)); - } - /** * Nulls out references to the upstream producer and downstream Observer if * the sequence is terminated or downstream calls dispose(). @@ -15175,62 +15072,6 @@ public final Observable> window( return RxJavaPlugins.onAssembly(new ObservableWindowBoundarySelector(this, openingIndicator, closingIndicator, bufferSize)); } - /** - * Returns an Observable that emits windows of items it collects from the source ObservableSource. The resulting - * ObservableSource emits connected, non-overlapping windows. It emits the current window and opens a new one - * whenever the ObservableSource produced by the specified {@code closingIndicator} emits an item. - *

- * - *

- *
Scheduler:
- *
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the element type of the boundary ObservableSource - * @param boundary - * a {@link Supplier} that returns an {@code ObservableSource} that governs the boundary between windows. - * When the source {@code ObservableSource} emits an item, {@code window} emits the current window and begins - * a new one. - * @return an Observable that emits connected, non-overlapping windows of items from the source ObservableSource - * whenever {@code closingIndicator} emits an item - * @see ReactiveX operators documentation: Window - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable> window(Supplier> boundary) { - return window(boundary, bufferSize()); - } - - /** - * Returns an Observable that emits windows of items it collects from the source ObservableSource. The resulting - * ObservableSource emits connected, non-overlapping windows. It emits the current window and opens a new one - * whenever the ObservableSource produced by the specified {@code closingIndicator} emits an item. - *

- * - *

- *
Scheduler:
- *
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param the element type of the boundary ObservableSource - * @param boundary - * a {@link Supplier} that returns an {@code ObservableSource} that governs the boundary between windows. - * When the source {@code ObservableSource} emits an item, {@code window} emits the current window and begins - * a new one. - * @param bufferSize - * the capacity hint for the buffer in the inner windows - * @return an Observable that emits connected, non-overlapping windows of items from the source ObservableSource - * whenever {@code closingIndicator} emits an item - * @see ReactiveX operators documentation: Window - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public final Observable> window(Supplier> boundary, int bufferSize) { - ObjectHelper.requireNonNull(boundary, "boundary is null"); - ObjectHelper.verifyPositive(bufferSize, "bufferSize"); - return RxJavaPlugins.onAssembly(new ObservableWindowBoundarySupplier(this, boundary, bufferSize)); - } - /** * Merges the specified ObservableSource into this ObservableSource sequence by using the {@code resultSelector} * function only when the source ObservableSource (this instance) emits an item. diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java deleted file mode 100644 index 362acb9645..0000000000 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.flowable; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; - -import org.reactivestreams.*; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Supplier; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; -import io.reactivex.internal.subscriptions.*; -import io.reactivex.internal.util.QueueDrainHelper; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subscribers.*; - -public final class FlowableBufferBoundarySupplier, B> -extends AbstractFlowableWithUpstream { - final Supplier> boundarySupplier; - final Supplier bufferSupplier; - - public FlowableBufferBoundarySupplier(Flowable source, Supplier> boundarySupplier, Supplier bufferSupplier) { - super(source); - this.boundarySupplier = boundarySupplier; - this.bufferSupplier = bufferSupplier; - } - - @Override - protected void subscribeActual(Subscriber s) { - source.subscribe(new BufferBoundarySupplierSubscriber(new SerializedSubscriber(s), bufferSupplier, boundarySupplier)); - } - - static final class BufferBoundarySupplierSubscriber, B> - extends QueueDrainSubscriber implements FlowableSubscriber, Subscription, Disposable { - - final Supplier bufferSupplier; - final Supplier> boundarySupplier; - - Subscription upstream; - - final AtomicReference other = new AtomicReference(); - - U buffer; - - BufferBoundarySupplierSubscriber(Subscriber actual, Supplier bufferSupplier, - Supplier> boundarySupplier) { - super(actual, new MpscLinkedQueue()); - this.bufferSupplier = bufferSupplier; - this.boundarySupplier = boundarySupplier; - } - - @Override - public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.upstream, s)) { - return; - } - this.upstream = s; - - Subscriber actual = this.downstream; - - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.get(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - cancelled = true; - s.cancel(); - EmptySubscription.error(e, actual); - return; - } - - buffer = b; - - Publisher boundary; - - try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.get(), "The boundary publisher supplied is null"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancelled = true; - s.cancel(); - EmptySubscription.error(ex, actual); - return; - } - - BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); - other.set(bs); - - actual.onSubscribe(this); - - if (!cancelled) { - s.request(Long.MAX_VALUE); - - boundary.subscribe(bs); - } - } - - @Override - public void onNext(T t) { - synchronized (this) { - U b = buffer; - if (b == null) { - return; - } - b.add(t); - } - } - - @Override - public void onError(Throwable t) { - cancel(); - downstream.onError(t); - } - - @Override - public void onComplete() { - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; - } - buffer = null; - } - queue.offer(b); - done = true; - if (enter()) { - QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this); - } - } - - @Override - public void request(long n) { - requested(n); - } - - @Override - public void cancel() { - if (!cancelled) { - cancelled = true; - upstream.cancel(); - disposeOther(); - - if (enter()) { - queue.clear(); - } - } - } - - void disposeOther() { - DisposableHelper.dispose(other); - } - - void next() { - - U next; - - try { - next = ObjectHelper.requireNonNull(bufferSupplier.get(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - cancel(); - downstream.onError(e); - return; - } - - Publisher boundary; - - try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.get(), "The boundary publisher supplied is null"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancelled = true; - upstream.cancel(); - downstream.onError(ex); - return; - } - - BufferBoundarySubscriber bs = new BufferBoundarySubscriber(this); - - if (DisposableHelper.replace(other, bs)) { - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; - } - buffer = next; - } - - boundary.subscribe(bs); - - fastPathEmitMax(b, false, this); - } - } - - @Override - public void dispose() { - upstream.cancel(); - disposeOther(); - } - - @Override - public boolean isDisposed() { - return other.get() == DisposableHelper.DISPOSED; - } - - @Override - public boolean accept(Subscriber a, U v) { - downstream.onNext(v); - return true; - } - - } - - static final class BufferBoundarySubscriber, B> extends DisposableSubscriber { - final BufferBoundarySupplierSubscriber parent; - - boolean once; - - BufferBoundarySubscriber(BufferBoundarySupplierSubscriber parent) { - this.parent = parent; - } - - @Override - public void onNext(B t) { - if (once) { - return; - } - once = true; - cancel(); - parent.next(); - } - - @Override - public void onError(Throwable t) { - if (once) { - RxJavaPlugins.onError(t); - return; - } - once = true; - parent.onError(t); - } - - @Override - public void onComplete() { - if (once) { - return; - } - once = true; - parent.next(); - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java index 7110d086e0..8fa826e08c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnErrorNext.java @@ -24,18 +24,16 @@ public final class FlowableOnErrorNext extends AbstractFlowableWithUpstream { final Function> nextSupplier; - final boolean allowFatal; public FlowableOnErrorNext(Flowable source, - Function> nextSupplier, boolean allowFatal) { + Function> nextSupplier) { super(source); this.nextSupplier = nextSupplier; - this.allowFatal = allowFatal; } @Override protected void subscribeActual(Subscriber s) { - OnErrorNextSubscriber parent = new OnErrorNextSubscriber(s, nextSupplier, allowFatal); + OnErrorNextSubscriber parent = new OnErrorNextSubscriber(s, nextSupplier); s.onSubscribe(parent); source.subscribe(parent); } @@ -49,19 +47,16 @@ static final class OnErrorNextSubscriber final Function> nextSupplier; - final boolean allowFatal; - boolean once; boolean done; long produced; - OnErrorNextSubscriber(Subscriber actual, Function> nextSupplier, boolean allowFatal) { + OnErrorNextSubscriber(Subscriber actual, Function> nextSupplier) { super(false); this.downstream = actual; this.nextSupplier = nextSupplier; - this.allowFatal = allowFatal; } @Override @@ -92,11 +87,6 @@ public void onError(Throwable t) { } once = true; - if (allowFatal && !(t instanceof Exception)) { - downstream.onError(t); - return; - } - Publisher p; try { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java deleted file mode 100644 index 50ddca4e22..0000000000 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ /dev/null @@ -1,338 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.flowable; - -import java.util.concurrent.atomic.*; - -import org.reactivestreams.*; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; -import io.reactivex.functions.Supplier; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.*; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.UnicastProcessor; -import io.reactivex.subscribers.DisposableSubscriber; - -public final class FlowableWindowBoundarySupplier extends AbstractFlowableWithUpstream> { - final Supplier> other; - final int capacityHint; - - public FlowableWindowBoundarySupplier(Flowable source, - Supplier> other, int capacityHint) { - super(source); - this.other = other; - this.capacityHint = capacityHint; - } - - @Override - protected void subscribeActual(Subscriber> subscriber) { - WindowBoundaryMainSubscriber parent = new WindowBoundaryMainSubscriber(subscriber, capacityHint, other); - - source.subscribe(parent); - } - - static final class WindowBoundaryMainSubscriber - extends AtomicInteger - implements FlowableSubscriber, Subscription, Runnable { - - private static final long serialVersionUID = 2233020065421370272L; - - final Subscriber> downstream; - - final int capacityHint; - - final AtomicReference> boundarySubscriber; - - static final WindowBoundaryInnerSubscriber BOUNDARY_DISPOSED = new WindowBoundaryInnerSubscriber(null); - - final AtomicInteger windows; - - final MpscLinkedQueue queue; - - final AtomicThrowable errors; - - final AtomicBoolean stopWindows; - - final Supplier> other; - - static final Object NEXT_WINDOW = new Object(); - - final AtomicLong requested; - - Subscription upstream; - - volatile boolean done; - - UnicastProcessor window; - - long emitted; - - WindowBoundaryMainSubscriber(Subscriber> downstream, int capacityHint, Supplier> other) { - this.downstream = downstream; - this.capacityHint = capacityHint; - this.boundarySubscriber = new AtomicReference>(); - this.windows = new AtomicInteger(1); - this.queue = new MpscLinkedQueue(); - this.errors = new AtomicThrowable(); - this.stopWindows = new AtomicBoolean(); - this.other = other; - this.requested = new AtomicLong(); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(upstream, s)) { - upstream = s; - downstream.onSubscribe(this); - queue.offer(NEXT_WINDOW); - drain(); - s.request(Long.MAX_VALUE); - } - } - - @Override - public void onNext(T t) { - queue.offer(t); - drain(); - } - - @Override - public void onError(Throwable e) { - disposeBoundary(); - if (errors.addThrowable(e)) { - done = true; - drain(); - } else { - RxJavaPlugins.onError(e); - } - } - - @Override - public void onComplete() { - disposeBoundary(); - done = true; - drain(); - } - - @Override - public void cancel() { - if (stopWindows.compareAndSet(false, true)) { - disposeBoundary(); - if (windows.decrementAndGet() == 0) { - upstream.cancel(); - } - } - } - - @Override - public void request(long n) { - BackpressureHelper.add(requested, n); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - void disposeBoundary() { - Disposable d = boundarySubscriber.getAndSet((WindowBoundaryInnerSubscriber)BOUNDARY_DISPOSED); - if (d != null && d != BOUNDARY_DISPOSED) { - d.dispose(); - } - } - - @Override - public void run() { - if (windows.decrementAndGet() == 0) { - upstream.cancel(); - } - } - - void innerNext(WindowBoundaryInnerSubscriber sender) { - boundarySubscriber.compareAndSet(sender, null); - queue.offer(NEXT_WINDOW); - drain(); - } - - void innerError(Throwable e) { - upstream.cancel(); - if (errors.addThrowable(e)) { - done = true; - drain(); - } else { - RxJavaPlugins.onError(e); - } - } - - void innerComplete() { - upstream.cancel(); - done = true; - drain(); - } - - @SuppressWarnings("unchecked") - void drain() { - if (getAndIncrement() != 0) { - return; - } - - int missed = 1; - Subscriber> downstream = this.downstream; - MpscLinkedQueue queue = this.queue; - AtomicThrowable errors = this.errors; - long emitted = this.emitted; - - for (;;) { - - for (;;) { - if (windows.get() == 0) { - queue.clear(); - window = null; - return; - } - - UnicastProcessor w = window; - - boolean d = done; - - if (d && errors.get() != null) { - queue.clear(); - Throwable ex = errors.terminate(); - if (w != null) { - window = null; - w.onError(ex); - } - downstream.onError(ex); - return; - } - - Object v = queue.poll(); - - boolean empty = v == null; - - if (d && empty) { - Throwable ex = errors.terminate(); - if (ex == null) { - if (w != null) { - window = null; - w.onComplete(); - } - downstream.onComplete(); - } else { - if (w != null) { - window = null; - w.onError(ex); - } - downstream.onError(ex); - } - return; - } - - if (empty) { - break; - } - - if (v != NEXT_WINDOW) { - w.onNext((T)v); - continue; - } - - if (w != null) { - window = null; - w.onComplete(); - } - - if (!stopWindows.get()) { - if (emitted != requested.get()) { - w = UnicastProcessor.create(capacityHint, this); - window = w; - windows.getAndIncrement(); - - Publisher otherSource; - - try { - otherSource = ObjectHelper.requireNonNull(other.get(), "The other Supplier returned a null Publisher"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - errors.addThrowable(ex); - done = true; - continue; - } - - WindowBoundaryInnerSubscriber bo = new WindowBoundaryInnerSubscriber(this); - - if (boundarySubscriber.compareAndSet(null, bo)) { - otherSource.subscribe(bo); - - emitted++; - downstream.onNext(w); - } - } else { - upstream.cancel(); - disposeBoundary(); - errors.addThrowable(new MissingBackpressureException("Could not deliver a window due to lack of requests")); - done = true; - } - } - } - - this.emitted = emitted; - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } - } - } - - static final class WindowBoundaryInnerSubscriber extends DisposableSubscriber { - final WindowBoundaryMainSubscriber parent; - - boolean done; - - WindowBoundaryInnerSubscriber(WindowBoundaryMainSubscriber parent) { - this.parent = parent; - } - - @Override - public void onNext(B t) { - if (done) { - return; - } - done = true; - dispose(); - parent.innerNext(this); - } - - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; - parent.innerError(t); - } - - @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.innerComplete(); - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java deleted file mode 100644 index dca3ddbb1a..0000000000 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableBufferBoundarySupplier.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.observable; - -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Supplier; -import io.reactivex.internal.disposables.*; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.observers.QueueDrainObserver; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.QueueDrainHelper; -import io.reactivex.observers.*; -import io.reactivex.plugins.RxJavaPlugins; - -public final class ObservableBufferBoundarySupplier, B> -extends AbstractObservableWithUpstream { - final Supplier> boundarySupplier; - final Supplier bufferSupplier; - - public ObservableBufferBoundarySupplier(ObservableSource source, Supplier> boundarySupplier, Supplier bufferSupplier) { - super(source); - this.boundarySupplier = boundarySupplier; - this.bufferSupplier = bufferSupplier; - } - - @Override - protected void subscribeActual(Observer t) { - source.subscribe(new BufferBoundarySupplierObserver(new SerializedObserver(t), bufferSupplier, boundarySupplier)); - } - - static final class BufferBoundarySupplierObserver, B> - extends QueueDrainObserver implements Observer, Disposable { - - final Supplier bufferSupplier; - final Supplier> boundarySupplier; - - Disposable upstream; - - final AtomicReference other = new AtomicReference(); - - U buffer; - - BufferBoundarySupplierObserver(Observer actual, Supplier bufferSupplier, - Supplier> boundarySupplier) { - super(actual, new MpscLinkedQueue()); - this.bufferSupplier = bufferSupplier; - this.boundarySupplier = boundarySupplier; - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(this.upstream, d)) { - this.upstream = d; - - Observer actual = this.downstream; - - U b; - - try { - b = ObjectHelper.requireNonNull(bufferSupplier.get(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - cancelled = true; - d.dispose(); - EmptyDisposable.error(e, actual); - return; - } - - buffer = b; - - ObservableSource boundary; - - try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.get(), "The boundary ObservableSource supplied is null"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancelled = true; - d.dispose(); - EmptyDisposable.error(ex, actual); - return; - } - - BufferBoundaryObserver bs = new BufferBoundaryObserver(this); - other.set(bs); - - actual.onSubscribe(this); - - if (!cancelled) { - boundary.subscribe(bs); - } - } - } - - @Override - public void onNext(T t) { - synchronized (this) { - U b = buffer; - if (b == null) { - return; - } - b.add(t); - } - } - - @Override - public void onError(Throwable t) { - dispose(); - downstream.onError(t); - } - - @Override - public void onComplete() { - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; - } - buffer = null; - } - queue.offer(b); - done = true; - if (enter()) { - QueueDrainHelper.drainLoop(queue, downstream, false, this, this); - } - } - - @Override - public void dispose() { - if (!cancelled) { - cancelled = true; - upstream.dispose(); - disposeOther(); - - if (enter()) { - queue.clear(); - } - } - } - - @Override - public boolean isDisposed() { - return cancelled; - } - - void disposeOther() { - DisposableHelper.dispose(other); - } - - void next() { - - U next; - - try { - next = ObjectHelper.requireNonNull(bufferSupplier.get(), "The buffer supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - dispose(); - downstream.onError(e); - return; - } - - ObservableSource boundary; - - try { - boundary = ObjectHelper.requireNonNull(boundarySupplier.get(), "The boundary ObservableSource supplied is null"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - cancelled = true; - upstream.dispose(); - downstream.onError(ex); - return; - } - - BufferBoundaryObserver bs = new BufferBoundaryObserver(this); - - if (DisposableHelper.replace(other, bs)) { - U b; - synchronized (this) { - b = buffer; - if (b == null) { - return; - } - buffer = next; - } - - boundary.subscribe(bs); - - fastPathEmit(b, false, this); - } - } - - @Override - public void accept(Observer a, U v) { - downstream.onNext(v); - } - - } - - static final class BufferBoundaryObserver, B> - extends DisposableObserver { - final BufferBoundarySupplierObserver parent; - - boolean once; - - BufferBoundaryObserver(BufferBoundarySupplierObserver parent) { - this.parent = parent; - } - - @Override - public void onNext(B t) { - if (once) { - return; - } - once = true; - dispose(); - parent.next(); - } - - @Override - public void onError(Throwable t) { - if (once) { - RxJavaPlugins.onError(t); - return; - } - once = true; - parent.onError(t); - } - - @Override - public void onComplete() { - if (once) { - return; - } - once = true; - parent.next(); - } - } -} diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableOnErrorNext.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableOnErrorNext.java index 649831d59f..c670ca8d8e 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableOnErrorNext.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableOnErrorNext.java @@ -22,18 +22,16 @@ public final class ObservableOnErrorNext extends AbstractObservableWithUpstream { final Function> nextSupplier; - final boolean allowFatal; public ObservableOnErrorNext(ObservableSource source, - Function> nextSupplier, boolean allowFatal) { + Function> nextSupplier) { super(source); this.nextSupplier = nextSupplier; - this.allowFatal = allowFatal; } @Override public void subscribeActual(Observer t) { - OnErrorNextObserver parent = new OnErrorNextObserver(t, nextSupplier, allowFatal); + OnErrorNextObserver parent = new OnErrorNextObserver(t, nextSupplier); t.onSubscribe(parent.arbiter); source.subscribe(parent); } @@ -41,17 +39,15 @@ public void subscribeActual(Observer t) { static final class OnErrorNextObserver implements Observer { final Observer downstream; final Function> nextSupplier; - final boolean allowFatal; final SequentialDisposable arbiter; boolean once; boolean done; - OnErrorNextObserver(Observer actual, Function> nextSupplier, boolean allowFatal) { + OnErrorNextObserver(Observer actual, Function> nextSupplier) { this.downstream = actual; this.nextSupplier = nextSupplier; - this.allowFatal = allowFatal; this.arbiter = new SequentialDisposable(); } @@ -80,11 +76,6 @@ public void onError(Throwable t) { } once = true; - if (allowFatal && !(t instanceof Exception)) { - downstream.onError(t); - return; - } - ObservableSource p; try { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java deleted file mode 100644 index f8ef619397..0000000000 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.observable; - -import java.util.concurrent.atomic.*; - -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Supplier; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.AtomicThrowable; -import io.reactivex.observers.DisposableObserver; -import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subjects.UnicastSubject; - -public final class ObservableWindowBoundarySupplier extends AbstractObservableWithUpstream> { - final Supplier> other; - final int capacityHint; - - public ObservableWindowBoundarySupplier( - ObservableSource source, - Supplier> other, int capacityHint) { - super(source); - this.other = other; - this.capacityHint = capacityHint; - } - - @Override - public void subscribeActual(Observer> observer) { - WindowBoundaryMainObserver parent = new WindowBoundaryMainObserver(observer, capacityHint, other); - - source.subscribe(parent); - } - - static final class WindowBoundaryMainObserver - extends AtomicInteger - implements Observer, Disposable, Runnable { - - private static final long serialVersionUID = 2233020065421370272L; - - final Observer> downstream; - - final int capacityHint; - - final AtomicReference> boundaryObserver; - - static final WindowBoundaryInnerObserver BOUNDARY_DISPOSED = new WindowBoundaryInnerObserver(null); - - final AtomicInteger windows; - - final MpscLinkedQueue queue; - - final AtomicThrowable errors; - - final AtomicBoolean stopWindows; - - final Supplier> other; - - static final Object NEXT_WINDOW = new Object(); - - Disposable upstream; - - volatile boolean done; - - UnicastSubject window; - - WindowBoundaryMainObserver(Observer> downstream, int capacityHint, Supplier> other) { - this.downstream = downstream; - this.capacityHint = capacityHint; - this.boundaryObserver = new AtomicReference>(); - this.windows = new AtomicInteger(1); - this.queue = new MpscLinkedQueue(); - this.errors = new AtomicThrowable(); - this.stopWindows = new AtomicBoolean(); - this.other = other; - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(upstream, d)) { - upstream = d; - downstream.onSubscribe(this); - queue.offer(NEXT_WINDOW); - drain(); - } - } - - @Override - public void onNext(T t) { - queue.offer(t); - drain(); - } - - @Override - public void onError(Throwable e) { - disposeBoundary(); - if (errors.addThrowable(e)) { - done = true; - drain(); - } else { - RxJavaPlugins.onError(e); - } - } - - @Override - public void onComplete() { - disposeBoundary(); - done = true; - drain(); - } - - @Override - public void dispose() { - if (stopWindows.compareAndSet(false, true)) { - disposeBoundary(); - if (windows.decrementAndGet() == 0) { - upstream.dispose(); - } - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - void disposeBoundary() { - Disposable d = boundaryObserver.getAndSet((WindowBoundaryInnerObserver)BOUNDARY_DISPOSED); - if (d != null && d != BOUNDARY_DISPOSED) { - d.dispose(); - } - } - - @Override - public boolean isDisposed() { - return stopWindows.get(); - } - - @Override - public void run() { - if (windows.decrementAndGet() == 0) { - upstream.dispose(); - } - } - - void innerNext(WindowBoundaryInnerObserver sender) { - boundaryObserver.compareAndSet(sender, null); - queue.offer(NEXT_WINDOW); - drain(); - } - - void innerError(Throwable e) { - upstream.dispose(); - if (errors.addThrowable(e)) { - done = true; - drain(); - } else { - RxJavaPlugins.onError(e); - } - } - - void innerComplete() { - upstream.dispose(); - done = true; - drain(); - } - - @SuppressWarnings("unchecked") - void drain() { - if (getAndIncrement() != 0) { - return; - } - - int missed = 1; - Observer> downstream = this.downstream; - MpscLinkedQueue queue = this.queue; - AtomicThrowable errors = this.errors; - - for (;;) { - - for (;;) { - if (windows.get() == 0) { - queue.clear(); - window = null; - return; - } - - UnicastSubject w = window; - - boolean d = done; - - if (d && errors.get() != null) { - queue.clear(); - Throwable ex = errors.terminate(); - if (w != null) { - window = null; - w.onError(ex); - } - downstream.onError(ex); - return; - } - - Object v = queue.poll(); - - boolean empty = v == null; - - if (d && empty) { - Throwable ex = errors.terminate(); - if (ex == null) { - if (w != null) { - window = null; - w.onComplete(); - } - downstream.onComplete(); - } else { - if (w != null) { - window = null; - w.onError(ex); - } - downstream.onError(ex); - } - return; - } - - if (empty) { - break; - } - - if (v != NEXT_WINDOW) { - w.onNext((T)v); - continue; - } - - if (w != null) { - window = null; - w.onComplete(); - } - - if (!stopWindows.get()) { - w = UnicastSubject.create(capacityHint, this); - window = w; - windows.getAndIncrement(); - - ObservableSource otherSource; - - try { - otherSource = ObjectHelper.requireNonNull(other.get(), "The other Supplier returned a null ObservableSource"); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - errors.addThrowable(ex); - done = true; - continue; - } - - WindowBoundaryInnerObserver bo = new WindowBoundaryInnerObserver(this); - - if (boundaryObserver.compareAndSet(null, bo)) { - otherSource.subscribe(bo); - - downstream.onNext(w); - } - } - } - - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } - } - } - - static final class WindowBoundaryInnerObserver extends DisposableObserver { - final WindowBoundaryMainObserver parent; - - boolean done; - - WindowBoundaryInnerObserver(WindowBoundaryMainObserver parent) { - this.parent = parent; - } - - @Override - public void onNext(B t) { - if (done) { - return; - } - done = true; - dispose(); - parent.innerNext(this); - } - - @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - done = true; - parent.innerError(t); - } - - @Override - public void onComplete() { - if (done) { - return; - } - done = true; - parent.innerComplete(); - } - } -} diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 7ec883493f..751ef7e6d8 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -860,46 +860,6 @@ public Collection get() { }).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2Null() { - just1.buffer((Supplier>)null); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2ReturnsNull() { - just1.buffer(new Supplier>() { - @Override - public Publisher get() { - return null; - } - }).blockingSubscribe(); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2SupplierNull() { - just1.buffer(new Supplier>() { - @Override - public Flowable get() { - return just1; - } - }, null); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2SupplierReturnsNull() { - just1.buffer(new Supplier>() { - @Override - public Flowable get() { - return just1; - } - }, new Supplier>() { - @Override - public Collection get() { - return null; - } - }).blockingSubscribe(); - } - @Test(expected = NullPointerException.class) public void castNull() { just1.cast(null); @@ -1701,11 +1661,6 @@ public Object apply(Throwable e) { } } - @Test(expected = NullPointerException.class) - public void onExceptionResumeNext() { - just1.onExceptionResumeNext(null); - } - @Test(expected = NullPointerException.class) public void publishFunctionNull() { just1.publish(null); @@ -2606,21 +2561,6 @@ public Publisher apply(Integer v) { }).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void windowBoundarySupplierNull() { - just1.window((Supplier>)null); - } - - @Test(expected = NullPointerException.class) - public void windowBoundarySupplierReturnsNull() { - just1.window(new Supplier>() { - @Override - public Publisher get() { - return null; - } - }).blockingSubscribe(); - } - @Test(expected = NullPointerException.class) public void withLatestFromOtherNull() { just1.withLatestFrom(null, new BiFunction() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java index 7d76ee7296..8600dc3478 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.io.IOException; @@ -29,13 +30,11 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.Functions; -import io.reactivex.internal.operators.flowable.FlowableBufferBoundarySupplier.BufferBoundarySupplierSubscriber; import io.reactivex.internal.operators.flowable.FlowableBufferTimed.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.*; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; import io.reactivex.testsupport.TestHelper; @@ -238,50 +237,6 @@ public void subscribe(Subscriber subscriber) { inOrder.verify(subscriber, Mockito.times(1)).onComplete(); } - @Test - public void flowableBasedCloser() { - Flowable source = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - push(subscriber, "one", 10); - push(subscriber, "two", 60); - push(subscriber, "three", 110); - push(subscriber, "four", 160); - push(subscriber, "five", 210); - complete(subscriber, 250); - } - }); - - Supplier> closer = new Supplier>() { - @Override - public Flowable get() { - return Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - push(subscriber, new Object(), 100); - push(subscriber, new Object(), 200); - push(subscriber, new Object(), 300); - complete(subscriber, 301); - } - }); - } - }; - - Flowable> buffered = source.buffer(closer); - buffered.subscribe(subscriber); - - InOrder inOrder = Mockito.inOrder(subscriber); - scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); - inOrder.verify(subscriber, Mockito.times(1)).onNext(list("one", "two")); - inOrder.verify(subscriber, Mockito.times(1)).onNext(list("three", "four")); - inOrder.verify(subscriber, Mockito.times(1)).onNext(list("five")); - inOrder.verify(subscriber, Mockito.never()).onNext(Mockito.anyList()); - inOrder.verify(subscriber, Mockito.never()).onError(Mockito.any(Throwable.class)); - inOrder.verify(subscriber, Mockito.times(1)).onComplete(); - } - @Test public void longTimeAction() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); @@ -1333,209 +1288,6 @@ public void bufferTimeSkipDefault() { .assertResult(Arrays.asList(1, 2, 3, 4, 5)); } - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierThrows() { - Flowable.never() - .buffer(Functions.justSupplier(Flowable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - throw new TestException(); - } - }) - .test() - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierThrows() { - Flowable.never() - .buffer(new Supplier>() { - @Override - public Publisher get() throws Exception { - throw new TestException(); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierThrows2() { - Flowable.never() - .buffer(Functions.justSupplier(Flowable.timer(1, TimeUnit.MILLISECONDS)), new Supplier>() { - int count; - @Override - public Collection get() throws Exception { - if (count++ == 1) { - throw new TestException(); - } else { - return new ArrayList(); - } - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierReturnsNull() { - Flowable.never() - .buffer(Functions.justSupplier(Flowable.timer(1, TimeUnit.MILLISECONDS)), new Supplier>() { - int count; - @Override - public Collection get() throws Exception { - if (count++ == 1) { - return null; - } else { - return new ArrayList(); - } - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierThrows2() { - Flowable.never() - .buffer(new Supplier>() { - int count; - @Override - public Publisher get() throws Exception { - if (count++ == 1) { - throw new TestException(); - } - return Flowable.timer(1, TimeUnit.MILLISECONDS); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(TestException.class); - } - - @Test - public void boundaryCancel() { - PublishProcessor pp = PublishProcessor.create(); - - TestSubscriber> ts = pp - .buffer(Functions.justSupplier(Flowable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - assertTrue(pp.hasSubscribers()); - - ts.cancel(); - - assertFalse(pp.hasSubscribers()); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierReturnsNull() { - Flowable.never() - .buffer(new Supplier>() { - int count; - @Override - public Publisher get() throws Exception { - if (count++ == 1) { - return null; - } - return Flowable.timer(1, TimeUnit.MILLISECONDS); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierReturnsNull2() { - Flowable.never() - .buffer(new Supplier>() { - int count; - @Override - public Publisher get() throws Exception { - if (count++ == 1) { - return null; - } - return Flowable.empty(); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @SuppressWarnings("unchecked") - @Test - public void boundaryMainError() { - PublishProcessor pp = PublishProcessor.create(); - - TestSubscriber> ts = pp - .buffer(Functions.justSupplier(Flowable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - pp.onError(new TestException()); - - ts.assertFailure(TestException.class); - } - - @SuppressWarnings("unchecked") - @Test - public void boundaryBoundaryError() { - PublishProcessor pp = PublishProcessor.create(); - - TestSubscriber> ts = pp - .buffer(Functions.justSupplier(Flowable.error(new TestException())), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - pp.onError(new TestException()); - - ts.assertFailure(TestException.class); - } - @Test public void dispose() { TestHelper.checkDisposed(Flowable.range(1, 5).buffer(1, TimeUnit.DAYS, Schedulers.single())); @@ -2547,81 +2299,6 @@ protected void subscribeActual(Subscriber s) { pp.buffer(b).test(); } - @Test - public void bufferBoundaryErrorTwice() { - List errors = TestHelper.trackPluginErrors(); - try { - BehaviorProcessor.createDefault(1) - .buffer(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber s) { - s.onSubscribe(new BooleanSubscription()); - s.onError(new TestException("first")); - s.onError(new TestException("second")); - } - })) - .to(TestHelper.>testConsumer()) - .assertError(TestException.class) - .assertErrorMessage("first") - .assertNotComplete(); - - TestHelper.assertUndeliverable(errors, 0, TestException.class, "second"); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void bufferBoundarySupplierDisposed() { - TestSubscriber> ts = new TestSubscriber>(); - BufferBoundarySupplierSubscriber, Integer> sub = - new BufferBoundarySupplierSubscriber, Integer>( - ts, Functions.justSupplier((List)new ArrayList()), - Functions.justSupplier(Flowable.never()) - ); - - BooleanSubscription bs = new BooleanSubscription(); - - sub.onSubscribe(bs); - - assertFalse(sub.isDisposed()); - - sub.dispose(); - - assertTrue(sub.isDisposed()); - - sub.next(); - - assertSame(DisposableHelper.DISPOSED, sub.other.get()); - - sub.cancel(); - sub.cancel(); - - assertTrue(bs.isCancelled()); - } - - @Test - public void bufferBoundarySupplierBufferAlreadyCleared() { - TestSubscriber> ts = new TestSubscriber>(); - BufferBoundarySupplierSubscriber, Integer> sub = - new BufferBoundarySupplierSubscriber, Integer>( - ts, Functions.justSupplier((List)new ArrayList()), - Functions.justSupplier(Flowable.never()) - ); - - BooleanSubscription bs = new BooleanSubscription(); - - sub.onSubscribe(bs); - - sub.buffer = null; - - sub.next(); - - sub.onNext(1); - - sub.onComplete(); - } - @Test public void timedDoubleOnSubscribe() { TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java deleted file mode 100644 index 70015de88b..0000000000 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnExceptionResumeNextViaFlowableTest.java +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.flowable; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; - -import java.util.concurrent.TimeUnit; - -import org.junit.Test; -import org.mockito.Mockito; -import org.reactivestreams.*; - -import io.reactivex.Flowable; -import io.reactivex.exceptions.TestException; -import io.reactivex.functions.Function; -import io.reactivex.internal.subscriptions.BooleanSubscription; -import io.reactivex.processors.PublishProcessor; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.subscribers.TestSubscriber; -import io.reactivex.testsupport.TestHelper; - -public class FlowableOnExceptionResumeNextViaFlowableTest { - - @Test - public void resumeNextWithException() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "EXCEPTION", "two", "three"); - Flowable w = Flowable.unsafeCreate(f); - Flowable resume = Flowable.just("twoResume", "threeResume"); - Flowable flowable = w.onExceptionResumeNext(resume); - - Subscriber subscriber = TestHelper.mockSubscriber(); - flowable.subscribe(subscriber); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(subscriber).onSubscribe((Subscription)any()); - verify(subscriber, times(1)).onNext("one"); - verify(subscriber, Mockito.never()).onNext("two"); - verify(subscriber, Mockito.never()).onNext("three"); - verify(subscriber, times(1)).onNext("twoResume"); - verify(subscriber, times(1)).onNext("threeResume"); - verify(subscriber, Mockito.never()).onError(any(Throwable.class)); - verify(subscriber, times(1)).onComplete(); - verifyNoMoreInteractions(subscriber); - } - - @Test - public void resumeNextWithRuntimeException() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "RUNTIMEEXCEPTION", "two", "three"); - Flowable w = Flowable.unsafeCreate(f); - Flowable resume = Flowable.just("twoResume", "threeResume"); - Flowable flowable = w.onExceptionResumeNext(resume); - - Subscriber subscriber = TestHelper.mockSubscriber(); - flowable.subscribe(subscriber); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(subscriber).onSubscribe((Subscription)any()); - verify(subscriber, times(1)).onNext("one"); - verify(subscriber, Mockito.never()).onNext("two"); - verify(subscriber, Mockito.never()).onNext("three"); - verify(subscriber, times(1)).onNext("twoResume"); - verify(subscriber, times(1)).onNext("threeResume"); - verify(subscriber, Mockito.never()).onError(any(Throwable.class)); - verify(subscriber, times(1)).onComplete(); - verifyNoMoreInteractions(subscriber); - } - - @Test - public void throwablePassesThru() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "THROWABLE", "two", "three"); - Flowable w = Flowable.unsafeCreate(f); - Flowable resume = Flowable.just("twoResume", "threeResume"); - Flowable flowable = w.onExceptionResumeNext(resume); - - Subscriber subscriber = TestHelper.mockSubscriber(); - flowable.subscribe(subscriber); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(subscriber).onSubscribe((Subscription)any()); - verify(subscriber, times(1)).onNext("one"); - verify(subscriber, never()).onNext("two"); - verify(subscriber, never()).onNext("three"); - verify(subscriber, never()).onNext("twoResume"); - verify(subscriber, never()).onNext("threeResume"); - verify(subscriber, times(1)).onError(any(Throwable.class)); - verify(subscriber, never()).onComplete(); - verifyNoMoreInteractions(subscriber); - } - - @Test - public void errorPassesThru() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "ERROR", "two", "three"); - Flowable w = Flowable.unsafeCreate(f); - Flowable resume = Flowable.just("twoResume", "threeResume"); - Flowable flowable = w.onExceptionResumeNext(resume); - - Subscriber subscriber = TestHelper.mockSubscriber(); - flowable.subscribe(subscriber); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(subscriber).onSubscribe((Subscription)any()); - verify(subscriber, times(1)).onNext("one"); - verify(subscriber, never()).onNext("two"); - verify(subscriber, never()).onNext("three"); - verify(subscriber, never()).onNext("twoResume"); - verify(subscriber, never()).onNext("threeResume"); - verify(subscriber, times(1)).onError(any(Throwable.class)); - verify(subscriber, never()).onComplete(); - verifyNoMoreInteractions(subscriber); - } - - @Test - public void mapResumeAsyncNext() { - // Trigger multiple failures - Flowable w = Flowable.just("one", "fail", "two", "three", "fail"); - // Resume Observable is async - TestObservable f = new TestObservable("twoResume", "threeResume"); - Flowable resume = Flowable.unsafeCreate(f); - - // Introduce map function that fails intermittently (Map does not prevent this when the observer is a - // rx.operator incl onErrorResumeNextViaObservable) - w = w.map(new Function() { - @Override - public String apply(String s) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("BadMapper:" + s); - return s; - } - }); - - Flowable flowable = w.onExceptionResumeNext(resume); - - Subscriber subscriber = TestHelper.mockSubscriber(); - flowable.subscribe(subscriber); - - try { - // if the thread gets started (which it shouldn't if it's working correctly) - if (f.t != null) { - f.t.join(); - } - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(subscriber, times(1)).onNext("one"); - verify(subscriber, never()).onNext("two"); - verify(subscriber, never()).onNext("three"); - verify(subscriber, times(1)).onNext("twoResume"); - verify(subscriber, times(1)).onNext("threeResume"); - verify(subscriber, Mockito.never()).onError(any(Throwable.class)); - verify(subscriber, times(1)).onComplete(); - } - - @Test - public void backpressure() { - TestSubscriber ts = new TestSubscriber(); - Flowable.range(0, 100000) - .onExceptionResumeNext(Flowable.just(1)) - .observeOn(Schedulers.computation()) - .map(new Function() { - int c; - - @Override - public Integer apply(Integer t1) { - if (c++ <= 1) { - // slow - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return t1; - } - - }) - .subscribe(ts); - ts.awaitDone(5, TimeUnit.SECONDS); - ts.assertNoErrors(); - } - - private static class TestObservable implements Publisher { - - final String[] values; - Thread t; - - TestObservable(String... values) { - this.values = values; - } - - @Override - public void subscribe(final Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - System.out.println("TestObservable subscribed to ..."); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("running TestObservable thread"); - for (String s : values) { - if ("EXCEPTION".equals(s)) { - throw new Exception("Forced Exception"); - } else if ("RUNTIMEEXCEPTION".equals(s)) { - throw new RuntimeException("Forced RuntimeException"); - } else if ("ERROR".equals(s)) { - throw new Error("Forced Error"); - } else if ("THROWABLE".equals(s)) { - throw new Throwable("Forced Throwable"); - } - System.out.println("TestObservable onNext: " + s); - subscriber.onNext(s); - } - System.out.println("TestObservable onComplete"); - subscriber.onComplete(); - } catch (Throwable e) { - System.out.println("TestObservable onError: " + e); - subscriber.onError(e); - } - } - - }); - System.out.println("starting TestObservable thread"); - t.start(); - System.out.println("done starting TestObservable thread"); - } - } - - @Test - public void normalBackpressure() { - TestSubscriber ts = TestSubscriber.create(0); - - PublishProcessor pp = PublishProcessor.create(); - - pp.onExceptionResumeNext(Flowable.range(3, 2)).subscribe(ts); - - ts.request(2); - - pp.onNext(1); - pp.onNext(2); - pp.onError(new TestException("Forced failure")); - - ts.assertValues(1, 2); - ts.assertNoErrors(); - ts.assertNotComplete(); - - ts.request(2); - - ts.assertValues(1, 2, 3, 4); - ts.assertNoErrors(); - ts.assertComplete(); - } - -} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java index b2c6c4a5d3..c8acbce95b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -248,229 +248,11 @@ public void onComplete() { verify(subscriber).onError(any(TestException.class)); } - @Test - public void windowNoDuplication() { - final PublishProcessor source = PublishProcessor.create(); - final TestSubscriber tsw = new TestSubscriber() { - boolean once; - @Override - public void onNext(Integer t) { - if (!once) { - once = true; - source.onNext(2); - } - super.onNext(t); - } - }; - TestSubscriber> ts = new TestSubscriber>() { - @Override - public void onNext(Flowable t) { - t.subscribe(tsw); - super.onNext(t); - } - }; - source.window(new Supplier>() { - @Override - public Flowable get() { - return Flowable.never(); - } - }).subscribe(ts); - - source.onNext(1); - source.onComplete(); - - ts.assertValueCount(1); - tsw.assertValues(1, 2); - } - - @Test - public void windowViaFlowableNoUnsubscribe() { - Flowable source = Flowable.range(1, 10); - Supplier> boundary = new Supplier>() { - @Override - public Flowable get() { - return Flowable.empty(); - } - }; - - TestSubscriber> ts = new TestSubscriber>(); - source.window(boundary).subscribe(ts); - - assertFalse(ts.isCancelled()); - } - - @Test - public void boundaryUnsubscribedOnMainCompletion() { - PublishProcessor source = PublishProcessor.create(); - final PublishProcessor boundary = PublishProcessor.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Flowable get() { - return boundary; - } - }; - - TestSubscriber> ts = new TestSubscriber>(); - source.window(boundaryFunc).subscribe(ts); - - assertTrue(source.hasSubscribers()); - assertTrue(boundary.hasSubscribers()); - - source.onComplete(); - - assertFalse(source.hasSubscribers()); - assertFalse(boundary.hasSubscribers()); - - ts.assertComplete(); - ts.assertNoErrors(); - ts.assertValueCount(1); - } - - @Test - public void mainUnsubscribedOnBoundaryCompletion() { - PublishProcessor source = PublishProcessor.create(); - final PublishProcessor boundary = PublishProcessor.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Flowable get() { - return boundary; - } - }; - - TestSubscriber> ts = new TestSubscriber>(); - source.window(boundaryFunc).subscribe(ts); - - assertTrue(source.hasSubscribers()); - assertTrue(boundary.hasSubscribers()); - - boundary.onComplete(); - - assertFalse(source.hasSubscribers()); - assertFalse(boundary.hasSubscribers()); - - ts.assertComplete(); - ts.assertNoErrors(); - ts.assertValueCount(1); - } - - @Test - public void childUnsubscribed() { - PublishProcessor source = PublishProcessor.create(); - final PublishProcessor boundary = PublishProcessor.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Flowable get() { - return boundary; - } - }; - - TestSubscriber> ts = new TestSubscriber>(); - source.window(boundaryFunc).subscribe(ts); - - assertTrue(source.hasSubscribers()); - assertTrue(boundary.hasSubscribers()); - - ts.cancel(); - - assertTrue(source.hasSubscribers()); - - assertFalse(boundary.hasSubscribers()); - - ts.values().get(0).test().cancel(); - - assertFalse(source.hasSubscribers()); - - ts.assertNotComplete(); - ts.assertNoErrors(); - ts.assertValueCount(1); - } - - @Test - public void innerBackpressure() { - Flowable source = Flowable.range(1, 10); - final PublishProcessor boundary = PublishProcessor.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Flowable get() { - return boundary; - } - }; - - final TestSubscriber ts = new TestSubscriber(1L); - final TestSubscriber> ts1 = new TestSubscriber>(1L) { - @Override - public void onNext(Flowable t) { - super.onNext(t); - t.subscribe(ts); - } - }; - source.window(boundaryFunc) - .subscribe(ts1); - - ts1.assertNoErrors(); - ts1.assertComplete(); - ts1.assertValueCount(1); - - ts.assertNoErrors(); - ts.assertNotComplete(); - ts.assertValues(1); - - ts.request(11); - - ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - ts.assertNoErrors(); - ts.assertComplete(); - } - - @Test - public void newBoundaryCalledAfterWindowClosed() { - final AtomicInteger calls = new AtomicInteger(); - PublishProcessor source = PublishProcessor.create(); - final PublishProcessor boundary = PublishProcessor.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Flowable get() { - calls.getAndIncrement(); - return boundary; - } - }; - - TestSubscriber> ts = new TestSubscriber>(); - source.window(boundaryFunc).subscribe(ts); - - source.onNext(1); - boundary.onNext(1); - assertTrue(boundary.hasSubscribers()); - - source.onNext(2); - boundary.onNext(2); - assertTrue(boundary.hasSubscribers()); - - source.onNext(3); - boundary.onNext(3); - assertTrue(boundary.hasSubscribers()); - - source.onNext(4); - source.onComplete(); - - ts.assertNoErrors(); - ts.assertValueCount(4); - ts.assertComplete(); - - assertFalse(source.hasSubscribers()); - assertFalse(boundary.hasSubscribers()); - } - @Test public void boundaryDispose() { TestHelper.checkDisposed(Flowable.never().window(Flowable.never())); } - @Test - public void boundaryDispose2() { - TestHelper.checkDisposed(Flowable.never().window(Functions.justSupplier(Flowable.never()))); - } - @Test public void boundaryOnError() { TestSubscriberEx ts = Flowable.error(new TestException()) @@ -484,14 +266,6 @@ public void boundaryOnError() { TestHelper.assertError(errors, 0, TestException.class); } - @Test - public void mainError() { - Flowable.error(new TestException()) - .window(Functions.justSupplier(Flowable.never())) - .test() - .assertError(TestException.class); - } - @Test public void innerBadSource() { TestHelper.checkBadSourceFlowable(new Function, Object>() { @@ -505,28 +279,6 @@ public Flowable apply(Flowable v) throws Exception { }); } }, false, 1, 1, (Object[])null); - - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(final Flowable f) throws Exception { - return Flowable.just(1).window(new Supplier>() { - int count; - @Override - public Publisher get() throws Exception { - if (++count > 1) { - return Flowable.never(); - } - return f; - } - }) - .flatMap(new Function, Flowable>() { - @Override - public Flowable apply(Flowable v) throws Exception { - return v; - } - }); - } - }, false, 1, 1, (Object[])null); } @Test @@ -560,47 +312,6 @@ public Flowable apply(Flowable v) throws Exception { .assertResult(1, 2); } - @Test - public void reentrantCallable() { - final FlowableProcessor ps = PublishProcessor.create(); - - TestSubscriber ts = new TestSubscriber() { - @Override - public void onNext(Integer t) { - super.onNext(t); - if (t == 1) { - ps.onNext(2); - ps.onComplete(); - } - } - }; - - ps.window(new Supplier>() { - boolean once; - @Override - public Flowable get() throws Exception { - if (!once) { - once = true; - return BehaviorProcessor.createDefault(1); - } - return Flowable.never(); - } - }) - .flatMap(new Function, Flowable>() { - @Override - public Flowable apply(Flowable v) throws Exception { - return v; - } - }) - .subscribe(ts); - - ps.onNext(1); - - ts - .awaitDone(1, TimeUnit.SECONDS) - .assertResult(1, 2); - } - @Test public void badSource() { TestHelper.checkBadSourceFlowable(new Function, Object>() { @@ -616,84 +327,6 @@ public Flowable apply(Flowable v) throws Exception { }, false, 1, 1, 1); } - @Test - public void badSourceCallable() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable f) throws Exception { - return f.window(Functions.justSupplier(Flowable.never())).flatMap(new Function, Flowable>() { - @Override - public Flowable apply(Flowable v) throws Exception { - return v; - } - }); - } - }, false, 1, 1, 1); - } - - @Test - public void boundaryError() { - BehaviorProcessor.createDefault(1) - .window(Functions.justSupplier(Flowable.error(new TestException()))) - .test() - .assertValueCount(1) - .assertNotComplete() - .assertError(TestException.class); - } - - @SuppressWarnings("unchecked") - @Test - public void boundaryMissingBackpressure() { - BehaviorProcessor.createDefault(1) - .window(Functions.justSupplier(Flowable.error(new TestException()))) - .test(0) - .assertFailure(MissingBackpressureException.class); - } - - @Test - public void boundaryCallableCrashOnCall2() { - BehaviorProcessor.createDefault(1) - .window(new Supplier>() { - int calls; - @Override - public Flowable get() throws Exception { - if (++calls == 2) { - throw new TestException(); - } - return Flowable.just(1); - } - }) - .test() - .assertError(TestException.class) - .assertNotComplete(); - } - - @Test - public void boundarySecondMissingBackpressure() { - BehaviorProcessor.createDefault(1) - .window(Functions.justSupplier(Flowable.just(1))) - .test(1) - .assertError(MissingBackpressureException.class) - .assertNotComplete(); - } - - @Test - public void oneWindow() { - PublishProcessor pp = PublishProcessor.create(); - - TestSubscriber> ts = BehaviorProcessor.createDefault(1) - .window(Functions.justSupplier(pp)) - .take(1) - .test(); - - pp.onNext(1); - - ts - .assertValueCount(1) - .assertNoErrors() - .assertComplete(); - } - @SuppressWarnings("unchecked") @Test public void boundaryDirectMissingBackpressure() { @@ -1040,329 +673,4 @@ public void run() { TestHelper.race(r1, r2); } } - - @Test - public void boundarySupplierDoubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>>() { - @Override - public Flowable> apply(Flowable f) - throws Exception { - return f.window(Functions.justSupplier(Flowable.never())).takeLast(1); - } - }); - } - - @Test - public void selectorUpstreamDisposedWhenOutputsDisposed() { - PublishProcessor source = PublishProcessor.create(); - PublishProcessor boundary = PublishProcessor.create(); - - TestSubscriber ts = source.window(Functions.justSupplier(boundary)) - .take(1) - .flatMap(new Function, Flowable>() { - @Override - public Flowable apply( - Flowable w) throws Exception { - return w.take(1); - } - }) - .test(); - - source.onNext(1); - - assertFalse("source not disposed", source.hasSubscribers()); - assertFalse("boundary not disposed", boundary.hasSubscribers()); - - ts.assertResult(1); - } - - @Test - public void supplierMainAndBoundaryBothError() { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> ref = new AtomicReference>(); - - TestSubscriberEx> ts = Flowable.error(new TestException("main")) - .window(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - ref.set(subscriber); - } - })) - .to(TestHelper.>testConsumer()); - - ts - .assertValueCount(1) - .assertError(TestException.class) - .assertErrorMessage("main") - .assertNotComplete(); - - ref.get().onError(new TestException("inner")); - - TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void supplierMainCompleteBoundaryErrorRace() { - final TestException ex = new TestException(); - - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestSubscriberEx> ts = new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - refMain.set(subscriber); - } - } - .window(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - ref.set(subscriber); - } - })) - .to(TestHelper.>testConsumer()); - - Runnable r1 = new Runnable() { - @Override - public void run() { - refMain.get().onComplete(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - ref.get().onError(ex); - } - }; - - TestHelper.race(r1, r2); - - ts - .assertValueCount(1) - .assertTerminated(); - - if (!errors.isEmpty()) { - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } - } finally { - RxJavaPlugins.reset(); - } - } - } - - @Test - public void supplierMainNextBoundaryNextRace() { - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestSubscriber> ts = new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - refMain.set(subscriber); - } - } - .window(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - ref.set(subscriber); - } - })) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - refMain.get().onNext(1); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - ref.get().onNext(1); - } - }; - - TestHelper.race(r1, r2); - - ts - .assertValueCount(2) - .assertNotComplete() - .assertNoErrors(); - } - } - - @Test - public void supplierTakeOneAnotherBoundary() { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestSubscriberEx> ts = new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - refMain.set(subscriber); - } - } - .window(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - ref.set(subscriber); - } - })) - .to(TestHelper.>testConsumer()); - - ts.assertValueCount(1) - .assertNotTerminated() - .cancel(); - - ref.get().onNext(1); - - ts.assertValueCount(1) - .assertNotTerminated(); - } - - @Test - public void supplierDisposeMainBoundaryCompleteRace() { - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - final TestSubscriber> ts = new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - refMain.set(subscriber); - } - } - .window(Functions.justSupplier(new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - final AtomicInteger counter = new AtomicInteger(); - subscriber.onSubscribe(new Subscription() { - - @Override - public void cancel() { - // about a microsecond - for (int i = 0; i < 100; i++) { - counter.incrementAndGet(); - } - } - - @Override - public void request(long n) { - } - }); - ref.set(subscriber); - } - })) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - ts.cancel(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - Subscriber subscriber = ref.get(); - subscriber.onNext(1); - subscriber.onComplete(); - } - }; - - TestHelper.race(r1, r2); - } - } - - @Test - public void supplierDisposeMainBoundaryErrorRace() { - final TestException ex = new TestException(); - - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - final TestSubscriber> ts = new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - refMain.set(subscriber); - } - } - .window(new Supplier>() { - int count; - @Override - public Flowable get() throws Exception { - if (++count > 1) { - return Flowable.never(); - } - return (new Flowable() { - @Override - protected void subscribeActual(Subscriber subscriber) { - final AtomicInteger counter = new AtomicInteger(); - subscriber.onSubscribe(new Subscription() { - - @Override - public void cancel() { - // about a microsecond - for (int i = 0; i < 100; i++) { - counter.incrementAndGet(); - } - } - - @Override - public void request(long n) { - } - }); - ref.set(subscriber); - } - }); - } - }) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - ts.cancel(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - Subscriber subscriber = ref.get(); - subscriber.onNext(1); - subscriber.onError(ex); - } - }; - - TestHelper.race(r1, r2); - - if (!errors.isEmpty()) { - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } - } finally { - RxJavaPlugins.reset(); - } - } - } - } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index 3ba28d2f88..5b5b752ffb 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -95,56 +95,6 @@ public void subscribe(Subscriber subscriber) { assertEquals(lists.get(1), list("five")); } - @Test - public void flowableBasedCloser() { - final List list = new ArrayList(); - final List> lists = new ArrayList>(); - - Flowable source = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - push(subscriber, "one", 10); - push(subscriber, "two", 60); - push(subscriber, "three", 110); - push(subscriber, "four", 160); - push(subscriber, "five", 210); - complete(subscriber, 250); - } - }); - - Supplier> closer = new Supplier>() { - int calls; - @Override - public Flowable get() { - return Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new BooleanSubscription()); - int c = calls++; - if (c == 0) { - push(subscriber, new Object(), 100); - } else - if (c == 1) { - push(subscriber, new Object(), 100); - } else { - complete(subscriber, 101); - } - } - }); - } - }; - - Flowable> windowed = source.window(closer); - windowed.subscribe(observeWindow(list, lists)); - - scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); - assertEquals(3, lists.size()); - assertEquals(lists.get(0), list("one", "two")); - assertEquals(lists.get(1), list("three", "four")); - assertEquals(lists.get(2), list("five")); - } - private List list(String... args) { List list = new ArrayList(); for (String arg : args) { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java index eecbb5d6d6..248655e814 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.io.IOException; @@ -30,15 +31,13 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.observable.ObservableBuffer.BufferExactObserver; -import io.reactivex.internal.operators.observable.ObservableBufferBoundarySupplier.BufferBoundarySupplierObserver; import io.reactivex.internal.operators.observable.ObservableBufferTimed.*; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.*; -import io.reactivex.subjects.*; +import io.reactivex.subjects.PublishSubject; import io.reactivex.testsupport.TestHelper; public class ObservableBufferTest { @@ -239,50 +238,6 @@ public void subscribe(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onComplete(); } - @Test - public void observableBasedCloser() { - Observable source = Observable.unsafeCreate(new ObservableSource() { - @Override - public void subscribe(Observer observer) { - observer.onSubscribe(Disposables.empty()); - push(observer, "one", 10); - push(observer, "two", 60); - push(observer, "three", 110); - push(observer, "four", 160); - push(observer, "five", 210); - complete(observer, 250); - } - }); - - Supplier> closer = new Supplier>() { - @Override - public Observable get() { - return Observable.unsafeCreate(new ObservableSource() { - @Override - public void subscribe(Observer observer) { - observer.onSubscribe(Disposables.empty()); - push(observer, new Object(), 100); - push(observer, new Object(), 200); - push(observer, new Object(), 300); - complete(observer, 301); - } - }); - } - }; - - Observable> buffered = source.buffer(closer); - buffered.subscribe(observer); - - InOrder inOrder = Mockito.inOrder(observer); - scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); - inOrder.verify(observer, Mockito.times(1)).onNext(list("one", "two")); - inOrder.verify(observer, Mockito.times(1)).onNext(list("three", "four")); - inOrder.verify(observer, Mockito.times(1)).onNext(list("five")); - inOrder.verify(observer, Mockito.never()).onNext(Mockito.anyList()); - inOrder.verify(observer, Mockito.never()).onError(Mockito.any(Throwable.class)); - inOrder.verify(observer, Mockito.times(1)).onComplete(); - } - @Test public void longTimeAction() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); @@ -1019,209 +974,6 @@ public Collection get() throws Exception { .assertFailure(NullPointerException.class); } - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierThrows() { - Observable.never() - .buffer(Functions.justSupplier(Observable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - throw new TestException(); - } - }) - .test() - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierThrows() { - Observable.never() - .buffer(new Supplier>() { - @Override - public ObservableSource get() throws Exception { - throw new TestException(); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierThrows2() { - Observable.never() - .buffer(Functions.justSupplier(Observable.timer(1, TimeUnit.MILLISECONDS)), new Supplier>() { - int count; - @Override - public Collection get() throws Exception { - if (count++ == 1) { - throw new TestException(); - } else { - return new ArrayList(); - } - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(TestException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBufferSupplierReturnsNull() { - Observable.never() - .buffer(Functions.justSupplier(Observable.timer(1, TimeUnit.MILLISECONDS)), new Supplier>() { - int count; - @Override - public Collection get() throws Exception { - if (count++ == 1) { - return null; - } else { - return new ArrayList(); - } - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierThrows2() { - Observable.never() - .buffer(new Supplier>() { - int count; - @Override - public ObservableSource get() throws Exception { - if (count++ == 1) { - throw new TestException(); - } - return Observable.timer(1, TimeUnit.MILLISECONDS); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(TestException.class); - } - - @Test - public void boundaryCancel() { - PublishSubject ps = PublishSubject.create(); - - TestObserver> to = ps - .buffer(Functions.justSupplier(Observable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - assertTrue(ps.hasObservers()); - - to.dispose(); - - assertFalse(ps.hasObservers()); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierReturnsNull() { - Observable.never() - .buffer(new Supplier>() { - int count; - @Override - public ObservableSource get() throws Exception { - if (count++ == 1) { - return null; - } - return Observable.timer(1, TimeUnit.MILLISECONDS); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @Test - @SuppressWarnings("unchecked") - public void boundaryBoundarySupplierReturnsNull2() { - Observable.never() - .buffer(new Supplier>() { - int count; - @Override - public ObservableSource get() throws Exception { - if (count++ == 1) { - return null; - } - return Observable.empty(); - } - }, new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test() - .awaitDone(5, TimeUnit.SECONDS) - .assertFailure(NullPointerException.class); - } - - @SuppressWarnings("unchecked") - @Test - public void boundaryMainError() { - PublishSubject ps = PublishSubject.create(); - - TestObserver> to = ps - .buffer(Functions.justSupplier(Observable.never()), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - ps.onError(new TestException()); - - to.assertFailure(TestException.class); - } - - @SuppressWarnings("unchecked") - @Test - public void boundaryBoundaryError() { - PublishSubject ps = PublishSubject.create(); - - TestObserver> to = ps - .buffer(Functions.justSupplier(Observable.error(new TestException())), new Supplier>() { - @Override - public Collection get() throws Exception { - return new ArrayList(); - } - }) - .test(); - - ps.onError(new TestException()); - - to.assertFailure(TestException.class); - } - @Test public void dispose() { TestHelper.checkDisposed(Observable.range(1, 5).buffer(1, TimeUnit.DAYS, Schedulers.single())); @@ -1241,8 +993,6 @@ public void dispose() { TestHelper.checkDisposed(PublishSubject.create().buffer(Observable.never())); - TestHelper.checkDisposed(PublishSubject.create().buffer(Functions.justSupplier(Observable.never()))); - TestHelper.checkDisposed(PublishSubject.create().buffer(Observable.never(), Functions.justFunction(Observable.never()))); } @@ -1875,81 +1625,6 @@ protected void subscribeActual(Observer observer) { to.assertResult(Collections.emptyList()); } - @Test - public void bufferBoundaryErrorTwice() { - List errors = TestHelper.trackPluginErrors(); - try { - BehaviorSubject.createDefault(1) - .buffer(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - observer.onError(new TestException("first")); - observer.onError(new TestException("second")); - } - })) - .to(TestHelper.>testConsumer()) - .assertError(TestException.class) - .assertErrorMessage("first") - .assertNotComplete(); - - TestHelper.assertUndeliverable(errors, 0, TestException.class, "second"); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void bufferBoundarySupplierDisposed() { - TestObserver> to = new TestObserver>(); - BufferBoundarySupplierObserver, Integer> sub = - new BufferBoundarySupplierObserver, Integer>( - to, Functions.justSupplier((List)new ArrayList()), - Functions.justSupplier(Observable.never()) - ); - - Disposable bs = Disposables.empty(); - - sub.onSubscribe(bs); - - assertFalse(sub.isDisposed()); - - sub.dispose(); - - assertTrue(sub.isDisposed()); - - sub.next(); - - assertSame(DisposableHelper.DISPOSED, sub.other.get()); - - sub.dispose(); - sub.dispose(); - - assertTrue(bs.isDisposed()); - } - - @Test - public void bufferBoundarySupplierBufferAlreadyCleared() { - TestObserver> to = new TestObserver>(); - BufferBoundarySupplierObserver, Integer> sub = - new BufferBoundarySupplierObserver, Integer>( - to, Functions.justSupplier((List)new ArrayList()), - Functions.justSupplier(Observable.never()) - ); - - Disposable bs = Disposables.empty(); - - sub.onSubscribe(bs); - - sub.buffer = null; - - sub.next(); - - sub.onNext(1); - - sub.onComplete(); - } - @Test public void timedDoubleOnSubscribe() { TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableOnExceptionResumeNextViaObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableOnExceptionResumeNextViaObservableTest.java deleted file mode 100644 index b7cc7b57a1..0000000000 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableOnExceptionResumeNextViaObservableTest.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Copyright (c) 2016-present, RxJava Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in - * compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is - * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See - * the License for the specific language governing permissions and limitations under the License. - */ - -package io.reactivex.internal.operators.observable; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; - -import java.util.concurrent.TimeUnit; - -import org.junit.Test; -import org.mockito.Mockito; - -import io.reactivex.*; -import io.reactivex.disposables.*; -import io.reactivex.functions.Function; -import io.reactivex.observers.TestObserver; -import io.reactivex.schedulers.Schedulers; -import io.reactivex.testsupport.TestHelper; - -public class ObservableOnExceptionResumeNextViaObservableTest { - - @Test - public void resumeNextWithException() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "EXCEPTION", "two", "three"); - Observable w = Observable.unsafeCreate(f); - Observable resume = Observable.just("twoResume", "threeResume"); - Observable observable = w.onExceptionResumeNext(resume); - - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(observer).onSubscribe((Disposable)any()); - verify(observer, times(1)).onNext("one"); - verify(observer, Mockito.never()).onNext("two"); - verify(observer, Mockito.never()).onNext("three"); - verify(observer, times(1)).onNext("twoResume"); - verify(observer, times(1)).onNext("threeResume"); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onComplete(); - verifyNoMoreInteractions(observer); - } - - @Test - public void resumeNextWithRuntimeException() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "RUNTIMEEXCEPTION", "two", "three"); - Observable w = Observable.unsafeCreate(f); - Observable resume = Observable.just("twoResume", "threeResume"); - Observable observable = w.onExceptionResumeNext(resume); - - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(observer).onSubscribe((Disposable)any()); - verify(observer, times(1)).onNext("one"); - verify(observer, Mockito.never()).onNext("two"); - verify(observer, Mockito.never()).onNext("three"); - verify(observer, times(1)).onNext("twoResume"); - verify(observer, times(1)).onNext("threeResume"); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onComplete(); - verifyNoMoreInteractions(observer); - } - - @Test - public void throwablePassesThru() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "THROWABLE", "two", "three"); - Observable w = Observable.unsafeCreate(f); - Observable resume = Observable.just("twoResume", "threeResume"); - Observable observable = w.onExceptionResumeNext(resume); - - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(observer).onSubscribe((Disposable)any()); - verify(observer, times(1)).onNext("one"); - verify(observer, never()).onNext("two"); - verify(observer, never()).onNext("three"); - verify(observer, never()).onNext("twoResume"); - verify(observer, never()).onNext("threeResume"); - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onComplete(); - verifyNoMoreInteractions(observer); - } - - @Test - public void errorPassesThru() { - // Trigger failure on second element - TestObservable f = new TestObservable("one", "ERROR", "two", "three"); - Observable w = Observable.unsafeCreate(f); - Observable resume = Observable.just("twoResume", "threeResume"); - Observable observable = w.onExceptionResumeNext(resume); - - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - try { - f.t.join(); - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(observer).onSubscribe((Disposable)any()); - verify(observer, times(1)).onNext("one"); - verify(observer, never()).onNext("two"); - verify(observer, never()).onNext("three"); - verify(observer, never()).onNext("twoResume"); - verify(observer, never()).onNext("threeResume"); - verify(observer, times(1)).onError(any(Throwable.class)); - verify(observer, never()).onComplete(); - verifyNoMoreInteractions(observer); - } - - @Test - public void mapResumeAsyncNext() { - // Trigger multiple failures - Observable w = Observable.just("one", "fail", "two", "three", "fail"); - // Resume Observable is async - TestObservable f = new TestObservable("twoResume", "threeResume"); - Observable resume = Observable.unsafeCreate(f); - - // Introduce map function that fails intermittently (Map does not prevent this when the Observer is a - // rx.operator incl onErrorResumeNextViaObservable) - w = w.map(new Function() { - @Override - public String apply(String s) { - if ("fail".equals(s)) { - throw new RuntimeException("Forced Failure"); - } - System.out.println("BadMapper:" + s); - return s; - } - }); - - Observable observable = w.onExceptionResumeNext(resume); - - Observer observer = TestHelper.mockObserver(); - observable.subscribe(observer); - - try { - // if the thread gets started (which it shouldn't if it's working correctly) - if (f.t != null) { - f.t.join(); - } - } catch (InterruptedException e) { - fail(e.getMessage()); - } - - verify(observer, times(1)).onNext("one"); - verify(observer, never()).onNext("two"); - verify(observer, never()).onNext("three"); - verify(observer, times(1)).onNext("twoResume"); - verify(observer, times(1)).onNext("threeResume"); - verify(observer, Mockito.never()).onError(any(Throwable.class)); - verify(observer, times(1)).onComplete(); - } - - @Test - public void backpressure() { - TestObserver to = new TestObserver(); - Observable.range(0, 100000) - .onExceptionResumeNext(Observable.just(1)) - .observeOn(Schedulers.computation()) - .map(new Function() { - int c; - - @Override - public Integer apply(Integer t1) { - if (c++ <= 1) { - // slow - try { - Thread.sleep(500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - return t1; - } - - }) - .subscribe(to); - to.awaitDone(5, TimeUnit.SECONDS); - to.assertNoErrors(); - } - - private static class TestObservable implements ObservableSource { - - final String[] values; - Thread t; - - TestObservable(String... values) { - this.values = values; - } - - @Override - public void subscribe(final Observer observer) { - observer.onSubscribe(Disposables.empty()); - System.out.println("TestObservable subscribed to ..."); - t = new Thread(new Runnable() { - - @Override - public void run() { - try { - System.out.println("running TestObservable thread"); - for (String s : values) { - if ("EXCEPTION".equals(s)) { - throw new Exception("Forced Exception"); - } else if ("RUNTIMEEXCEPTION".equals(s)) { - throw new RuntimeException("Forced RuntimeException"); - } else if ("ERROR".equals(s)) { - throw new Error("Forced Error"); - } else if ("THROWABLE".equals(s)) { - throw new Throwable("Forced Throwable"); - } - System.out.println("TestObservable onNext: " + s); - observer.onNext(s); - } - System.out.println("TestObservable onComplete"); - observer.onComplete(); - } catch (Throwable e) { - System.out.println("TestObservable onError: " + e); - observer.onError(e); - } - } - - }); - System.out.println("starting TestObservable thread"); - t.start(); - System.out.println("done starting TestObservable thread"); - } - } -} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java index c1b49ddbd3..f65de1152b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -249,194 +249,11 @@ public void onComplete() { verify(o).onError(any(TestException.class)); } - @Test - public void windowNoDuplication() { - final PublishSubject source = PublishSubject.create(); - final TestObserver tow = new TestObserver() { - boolean once; - @Override - public void onNext(Integer t) { - if (!once) { - once = true; - source.onNext(2); - } - super.onNext(t); - } - }; - TestObserver> to = new TestObserver>() { - @Override - public void onNext(Observable t) { - t.subscribe(tow); - super.onNext(t); - } - }; - source.window(new Supplier>() { - @Override - public Observable get() { - return Observable.never(); - } - }).subscribe(to); - - source.onNext(1); - source.onComplete(); - - to.assertValueCount(1); - tow.assertValues(1, 2); - } - - @Test - public void windowViaObservableNoUnsubscribe() { - Observable source = Observable.range(1, 10); - Supplier> boundary = new Supplier>() { - @Override - public Observable get() { - return Observable.empty(); - } - }; - - TestObserver> to = new TestObserver>(); - source.window(boundary).subscribe(to); - - // 2.0.2 - not anymore - // assertTrue("Not cancelled!", ts.isCancelled()); - to.assertComplete(); - } - - @Test - public void boundaryUnsubscribedOnMainCompletion() { - PublishSubject source = PublishSubject.create(); - final PublishSubject boundary = PublishSubject.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Observable get() { - return boundary; - } - }; - - TestObserver> to = new TestObserver>(); - source.window(boundaryFunc).subscribe(to); - - assertTrue(source.hasObservers()); - assertTrue(boundary.hasObservers()); - - source.onComplete(); - - assertFalse(source.hasObservers()); - assertFalse(boundary.hasObservers()); - - to.assertComplete(); - to.assertNoErrors(); - to.assertValueCount(1); - } - - @Test - public void mainUnsubscribedOnBoundaryCompletion() { - PublishSubject source = PublishSubject.create(); - final PublishSubject boundary = PublishSubject.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Observable get() { - return boundary; - } - }; - - TestObserver> to = new TestObserver>(); - source.window(boundaryFunc).subscribe(to); - - assertTrue(source.hasObservers()); - assertTrue(boundary.hasObservers()); - - boundary.onComplete(); - - assertFalse(source.hasObservers()); - assertFalse(boundary.hasObservers()); - - to.assertComplete(); - to.assertNoErrors(); - to.assertValueCount(1); - } - - @Test - public void childUnsubscribed() { - PublishSubject source = PublishSubject.create(); - final PublishSubject boundary = PublishSubject.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Observable get() { - return boundary; - } - }; - - TestObserver> to = new TestObserver>(); - source.window(boundaryFunc).subscribe(to); - - assertTrue(source.hasObservers()); - assertTrue(boundary.hasObservers()); - - to.dispose(); - - assertTrue(source.hasObservers()); - - assertFalse(boundary.hasObservers()); - - to.values().get(0).test(true); - - assertFalse(source.hasObservers()); - - to.assertNotComplete(); - to.assertNoErrors(); - to.assertValueCount(1); - } - - @Test - public void newBoundaryCalledAfterWindowClosed() { - final AtomicInteger calls = new AtomicInteger(); - PublishSubject source = PublishSubject.create(); - final PublishSubject boundary = PublishSubject.create(); - Supplier> boundaryFunc = new Supplier>() { - @Override - public Observable get() { - calls.getAndIncrement(); - return boundary; - } - }; - - TestObserver> to = new TestObserver>(); - source.window(boundaryFunc).subscribe(to); - - source.onNext(1); - boundary.onNext(1); - assertTrue(boundary.hasObservers()); - - source.onNext(2); - boundary.onNext(2); - assertTrue(boundary.hasObservers()); - - source.onNext(3); - boundary.onNext(3); - assertTrue(boundary.hasObservers()); - - source.onNext(4); - source.onComplete(); - - to.assertNoErrors(); - to.assertValueCount(4); - to.assertComplete(); - - assertFalse(source.hasObservers()); - assertFalse(boundary.hasObservers()); - } - @Test public void boundaryDispose() { TestHelper.checkDisposed(Observable.never().window(Observable.never())); } - @Test - public void boundaryDispose2() { - TestHelper.checkDisposed(Observable.never().window(Functions.justSupplier(Observable.never()))); - } - @Test public void boundaryOnError() { TestObserverEx to = Observable.error(new TestException()) @@ -450,14 +267,6 @@ public void boundaryOnError() { TestHelper.assertError(errors, 0, TestException.class); } - @Test - public void mainError() { - Observable.error(new TestException()) - .window(Functions.justSupplier(Observable.never())) - .test() - .assertError(TestException.class); - } - @Test public void innerBadSource() { TestHelper.checkBadSourceObservable(new Function, Object>() { @@ -471,18 +280,6 @@ public ObservableSource apply(Observable v) throws Exception { }); } }, false, 1, 1, (Object[])null); - - TestHelper.checkBadSourceObservable(new Function, Object>() { - @Override - public Object apply(Observable o) throws Exception { - return Observable.just(1).window(Functions.justSupplier(o)).flatMap(new Function, ObservableSource>() { - @Override - public ObservableSource apply(Observable v) throws Exception { - return v; - } - }); - } - }, false, 1, 1, (Object[])null); } @Test @@ -516,47 +313,6 @@ public ObservableSource apply(Observable v) throws Exception { .assertResult(1, 2); } - @Test - public void reentrantCallable() { - final Subject ps = PublishSubject.create(); - - TestObserver to = new TestObserver() { - @Override - public void onNext(Integer t) { - super.onNext(t); - if (t == 1) { - ps.onNext(2); - ps.onComplete(); - } - } - }; - - ps.window(new Supplier>() { - boolean once; - @Override - public Observable get() throws Exception { - if (!once) { - once = true; - return BehaviorSubject.createDefault(1); - } - return Observable.never(); - } - }) - .flatMap(new Function, ObservableSource>() { - @Override - public ObservableSource apply(Observable v) throws Exception { - return v; - } - }) - .subscribe(to); - - ps.onNext(1); - - to - .awaitDone(1, TimeUnit.SECONDS) - .assertResult(1, 2); - } - @Test public void badSource() { TestHelper.checkBadSourceObservable(new Function, Object>() { @@ -572,66 +328,6 @@ public ObservableSource apply(Observable v) throws Exception { }, false, 1, 1, 1); } - @Test - public void badSourceCallable() { - TestHelper.checkBadSourceObservable(new Function, Object>() { - @Override - public Object apply(Observable o) throws Exception { - return o.window(Functions.justSupplier(Observable.never())).flatMap(new Function, ObservableSource>() { - @Override - public ObservableSource apply(Observable v) throws Exception { - return v; - } - }); - } - }, false, 1, 1, 1); - } - - @Test - public void boundaryError() { - BehaviorSubject.createDefault(1) - .window(Functions.justSupplier(Observable.error(new TestException()))) - .test() - .assertValueCount(1) - .assertNotComplete() - .assertError(TestException.class); - } - - @Test - public void boundaryCallableCrashOnCall2() { - BehaviorSubject.createDefault(1) - .window(new Supplier>() { - int calls; - @Override - public Observable get() throws Exception { - if (++calls == 2) { - throw new TestException(); - } - return Observable.just(1); - } - }) - .test() - .assertError(TestException.class) - .assertNotComplete(); - } - - @Test - public void oneWindow() { - PublishSubject ps = PublishSubject.create(); - - TestObserver> to = BehaviorSubject.createDefault(1) - .window(Functions.justSupplier(ps)) - .take(1) - .test(); - - ps.onNext(1); - - to - .assertValueCount(1) - .assertNoErrors() - .assertComplete(); - } - @Test public void boundaryDirectDoubleOnSubscribe() { TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { @@ -939,330 +635,4 @@ public void run() { TestHelper.race(r1, r2); } } - - @Test - public void boundarySupplierDoubleOnSubscribe() { - TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { - @Override - public Observable> apply(Observable f) - throws Exception { - return f.window(Functions.justSupplier(Observable.never())).takeLast(1); - } - }); - } - - @Test - public void selectorUpstreamDisposedWhenOutputsDisposed() { - PublishSubject source = PublishSubject.create(); - PublishSubject boundary = PublishSubject.create(); - - TestObserver to = source.window(Functions.justSupplier(boundary)) - .take(1) - .flatMap(new Function, ObservableSource>() { - @Override - public ObservableSource apply( - Observable w) throws Exception { - return w.take(1); - } - }) - .test(); - - source.onNext(1); - - assertFalse("source not disposed", source.hasObservers()); - assertFalse("boundary not disposed", boundary.hasObservers()); - - to.assertResult(1); - } - - @Test - public void supplierMainAndBoundaryBothError() { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> ref = new AtomicReference>(); - - TestObserverEx> to = Observable.error(new TestException("main")) - .window(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - ref.set(observer); - } - })) - .to(TestHelper.>testConsumer()); - - to - .assertValueCount(1) - .assertError(TestException.class) - .assertErrorMessage("main") - .assertNotComplete(); - - ref.get().onError(new TestException("inner")); - - TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); - } finally { - RxJavaPlugins.reset(); - } - } - - @Test - public void supplierMainCompleteBoundaryErrorRace() { - final TestException ex = new TestException(); - - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestObserverEx> to = new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - refMain.set(observer); - } - } - .window(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - ref.set(observer); - } - })) - .to(TestHelper.>testConsumer()); - - Runnable r1 = new Runnable() { - @Override - public void run() { - refMain.get().onComplete(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - ref.get().onError(ex); - } - }; - - TestHelper.race(r1, r2); - - to - .assertValueCount(1) - .assertTerminated(); - - if (!errors.isEmpty()) { - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } - } finally { - RxJavaPlugins.reset(); - } - } - } - - @Test - public void supplierMainNextBoundaryNextRace() { - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestObserver> to = new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - refMain.set(observer); - } - } - .window(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - ref.set(observer); - } - })) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - refMain.get().onNext(1); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - ref.get().onNext(1); - } - }; - - TestHelper.race(r1, r2); - - to - .assertValueCount(2) - .assertNotComplete() - .assertNoErrors(); - } - } - - @Test - public void supplierTakeOneAnotherBoundary() { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - TestObserverEx> to = new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - refMain.set(observer); - } - } - .window(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - ref.set(observer); - } - })) - .to(TestHelper.>testConsumer()); - - to.assertValueCount(1) - .assertNotTerminated() - .dispose(); - - ref.get().onNext(1); - - to.assertValueCount(1) - .assertNotTerminated(); - } - - @Test - public void supplierDisposeMainBoundaryCompleteRace() { - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - final TestObserver> to = new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - refMain.set(observer); - } - } - .window(Functions.justSupplier(new Observable() { - @Override - protected void subscribeActual(Observer observer) { - final AtomicInteger counter = new AtomicInteger(); - observer.onSubscribe(new Disposable() { - - @Override - public void dispose() { - // about a microsecond - for (int i = 0; i < 100; i++) { - counter.incrementAndGet(); - } - } - - @Override - public boolean isDisposed() { - return false; - } - }); - ref.set(observer); - } - })) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - to.dispose(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - Observer o = ref.get(); - o.onNext(1); - o.onComplete(); - } - }; - - TestHelper.race(r1, r2); - } - } - - @Test - public void supplierDisposeMainBoundaryErrorRace() { - final TestException ex = new TestException(); - - for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { - List errors = TestHelper.trackPluginErrors(); - try { - final AtomicReference> refMain = new AtomicReference>(); - final AtomicReference> ref = new AtomicReference>(); - - final TestObserver> to = new Observable() { - @Override - protected void subscribeActual(Observer observer) { - observer.onSubscribe(Disposables.empty()); - refMain.set(observer); - } - } - .window(new Supplier>() { - int count; - @Override - public ObservableSource get() throws Exception { - if (++count > 1) { - return Observable.never(); - } - return (new Observable() { - @Override - protected void subscribeActual(Observer observer) { - final AtomicInteger counter = new AtomicInteger(); - observer.onSubscribe(new Disposable() { - - @Override - public void dispose() { - // about a microsecond - for (int i = 0; i < 100; i++) { - counter.incrementAndGet(); - } - } - - @Override - public boolean isDisposed() { - return false; - } - }); - ref.set(observer); - } - }); - } - }) - .test(); - - Runnable r1 = new Runnable() { - @Override - public void run() { - to.dispose(); - } - }; - Runnable r2 = new Runnable() { - @Override - public void run() { - Observer o = ref.get(); - o.onNext(1); - o.onError(ex); - } - }; - - TestHelper.race(r1, r2); - - if (!errors.isEmpty()) { - TestHelper.assertUndeliverable(errors, 0, TestException.class); - } - } finally { - RxJavaPlugins.reset(); - } - } - } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java index 120f938dbf..152bebf4ff 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java @@ -96,56 +96,6 @@ public void subscribe(Observer innerObserver) { assertEquals(lists.get(1), list("five")); } - @Test - public void observableBasedCloser() { - final List list = new ArrayList(); - final List> lists = new ArrayList>(); - - Observable source = Observable.unsafeCreate(new ObservableSource() { - @Override - public void subscribe(Observer innerObserver) { - innerObserver.onSubscribe(Disposables.empty()); - push(innerObserver, "one", 10); - push(innerObserver, "two", 60); - push(innerObserver, "three", 110); - push(innerObserver, "four", 160); - push(innerObserver, "five", 210); - complete(innerObserver, 250); - } - }); - - Supplier> closer = new Supplier>() { - int calls; - @Override - public Observable get() { - return Observable.unsafeCreate(new ObservableSource() { - @Override - public void subscribe(Observer innerObserver) { - innerObserver.onSubscribe(Disposables.empty()); - int c = calls++; - if (c == 0) { - push(innerObserver, new Object(), 100); - } else - if (c == 1) { - push(innerObserver, new Object(), 100); - } else { - complete(innerObserver, 101); - } - } - }); - } - }; - - Observable> windowed = source.window(closer); - windowed.subscribe(observeWindow(list, lists)); - - scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); - assertEquals(3, lists.size()); - assertEquals(lists.get(0), list("one", "two")); - assertEquals(lists.get(1), list("three", "four")); - assertEquals(lists.get(2), list("five")); - } - private List list(String... args) { List list = new ArrayList(); for (String arg : args) { diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index fb1e9968e8..b7d7186cf9 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -947,46 +947,6 @@ public Collection get() { }).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2Null() { - just1.buffer((Supplier>)null); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2ReturnsNull() { - just1.buffer(new Supplier>() { - @Override - public Observable get() { - return null; - } - }).blockingSubscribe(); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2SupplierNull() { - just1.buffer(new Supplier>() { - @Override - public Observable get() { - return just1; - } - }, null); - } - - @Test(expected = NullPointerException.class) - public void bufferBoundarySupplier2SupplierReturnsNull() { - just1.buffer(new Supplier>() { - @Override - public Observable get() { - return just1; - } - }, new Supplier>() { - @Override - public Collection get() { - return null; - } - }).blockingSubscribe(); - } - @Test(expected = NullPointerException.class) public void castNull() { just1.cast(null); @@ -1738,11 +1698,6 @@ public Object apply(Throwable e) { }).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void onExceptionResumeNext() { - just1.onExceptionResumeNext(null); - } - @Test(expected = NullPointerException.class) public void publishFunctionNull() { just1.publish(null); @@ -2642,21 +2597,6 @@ public Observable apply(Integer v) { }).blockingSubscribe(); } - @Test(expected = NullPointerException.class) - public void windowBoundarySupplierNull() { - just1.window((Supplier>)null); - } - - @Test(expected = NullPointerException.class) - public void windowBoundarySupplierReturnsNull() { - just1.window(new Supplier>() { - @Override - public Observable get() { - return null; - } - }).blockingSubscribe(); - } - @Test(expected = NullPointerException.class) public void withLatestFromOtherNull() { just1.withLatestFrom(null, new BiFunction() { diff --git a/src/test/java/io/reactivex/schedulers/TrampolineSchedulerTest.java b/src/test/java/io/reactivex/schedulers/TrampolineSchedulerTest.java index 441fb37d6f..76910de05e 100644 --- a/src/test/java/io/reactivex/schedulers/TrampolineSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/TrampolineSchedulerTest.java @@ -31,7 +31,6 @@ import org.reactivestreams.Subscriber; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; public class TrampolineSchedulerTest extends AbstractSchedulerTests {