Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: remove buffer/window with supplier & onExceptionResumeNext #6564

Merged
merged 1 commit into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 1 addition & 192 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6943,79 +6943,6 @@ public final <B, U extends Collection<? super T>> Flowable<U> buffer(Publisher<B
return RxJavaPlugins.onAssembly(new FlowableBufferExactBoundary<T, U, B>(this, boundaryIndicator, bufferSupplier));
}

/**
* Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting
* Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a
* new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item.
* <p>
* <img width="640" height="395" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer1.png" alt="">
* <p>
* If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on
* immediately without first emitting the buffer it is in the process of assembling.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it is instead controlled by the given Publishers and
* buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <B> the value type of the boundary-providing Publisher
* @param boundaryIndicatorSupplier
* a {@link Supplier} that produces a Publisher that governs the boundary between buffers.
* Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and
* begins to fill a new one
* @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher
* each time the Publisher created with the {@code closingIndicator} argument emits an item
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <B> Flowable<List<T>> buffer(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier) {
return buffer(boundaryIndicatorSupplier, ArrayListSupplier.<T>asSupplier());
}

/**
* Returns a Flowable that emits buffers of items it collects from the source Publisher. The resulting
* Publisher emits connected, non-overlapping buffers. It emits the current buffer and replaces it with a
* new buffer whenever the Publisher produced by the specified {@code boundaryIndicatorSupplier} emits an item.
* <p>
* <img width="640" height="395" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer1.png" alt="">
* <p>
* If either the source {@code Publisher} or the boundary {@code Publisher} issues an {@code onError} notification the event is passed on
* immediately without first emitting the buffer it is in the process of assembling.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator does not support backpressure as it is instead controlled by the given Publishers and
* buffers data. It requests {@code Long.MAX_VALUE} upstream and does not obey downstream requests.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code buffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <U> the collection subclass type to buffer into
* @param <B> the value type of the boundary-providing Publisher
* @param boundaryIndicatorSupplier
* a {@link Callable} that produces a Publisher that governs the boundary between buffers.
* Whenever the supplied {@code Publisher} emits an item, {@code buffer} emits the current buffer and
* begins to fill a new one
* @param bufferSupplier
* a factory function that returns an instance of the collection subclass to be used and returned
* as the buffer
* @return a Flowable that emits a connected, non-overlapping buffer of items from the source Publisher
* each time the Publisher created with the {@code closingIndicator} argument emits an item
* @see <a href="http://reactivex.io/documentation/operators/buffer.html">ReactiveX operators documentation: Buffer</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <B, U extends Collection<? super T>> Flowable<U> buffer(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier,
Supplier<U> bufferSupplier) {
ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null");
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
return RxJavaPlugins.onAssembly(new FlowableBufferBoundarySupplier<T, U, B>(this, boundaryIndicatorSupplier, bufferSupplier));
}

/**
* Returns a Flowable that subscribes to this Publisher lazily, caches all of its events
* and replays them, in the same order as received, to all the downstream subscribers.
Expand Down Expand Up @@ -12186,7 +12113,7 @@ public final Flowable<T> onBackpressureLatest() {
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> onErrorResumeNext(Function<? super Throwable, ? extends Publisher<? extends T>> resumeFunction) {
ObjectHelper.requireNonNull(resumeFunction, "resumeFunction is null");
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, resumeFunction, false));
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, resumeFunction));
}

/**
Expand Down Expand Up @@ -12313,53 +12240,6 @@ public final Flowable<T> onErrorReturnItem(final T item) {
return onErrorReturn(Functions.justFunction(item));
}

/**
* Instructs a Publisher to pass control to another Publisher rather than invoking
* {@link Subscriber#onError onError} if it encounters an {@link java.lang.Exception}.
* <p>
* This differs from {@link #onErrorResumeNext} in that this one does not handle {@link java.lang.Throwable}
* or {@link java.lang.Error} but lets those continue through.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onExceptionResumeNextViaPublisher.png" alt="">
* <p>
* By default, when a Publisher encounters an exception that prevents it from emitting the expected item
* to its {@link Subscriber}, the Publisher invokes its Subscriber's {@code onError} method, and then quits
* without invoking any more of its Subscriber's methods. The {@code onExceptionResumeNext} method changes
* this behavior. If you pass another Publisher ({@code resumeSequence}) to a Publisher's
* {@code onExceptionResumeNext} method, if the original Publisher encounters an exception, instead of
* invoking its Subscriber's {@code onError} method, it will instead relinquish control to
* {@code resumeSequence} which will invoke the Subscriber's {@link Subscriber#onNext onNext} method if it is
* able to do so. In such a case, because no Publisher necessarily invokes {@code onError}, the Subscriber
* may never know that an exception happened.
* <p>
* You can use this to prevent exceptions from propagating or to supply fallback data should exceptions be
* encountered.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream. This and the resuming {@code Publisher}s
* are expected to honor backpressure as well.
* If any of them violate this expectation, the operator <em>may</em> throw an
* {@code IllegalStateException} when the source {@code Publisher} completes or
* {@code MissingBackpressureException} is signaled somewhere downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onExceptionResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param next
* the next Publisher that will take over if the source Publisher encounters
* an exception
* @return the original Publisher, with appropriately modified behavior
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> onExceptionResumeNext(final Publisher<? extends T> next) {
ObjectHelper.requireNonNull(next, "next is null");
return RxJavaPlugins.onAssembly(new FlowableOnErrorNext<T>(this, Functions.justFunction(next), true));
}

/**
* Nulls out references to the upstream producer and downstream Subscriber if
* the sequence is terminated or downstream cancels.
Expand Down Expand Up @@ -18290,77 +18170,6 @@ public final <U, V> Flowable<Flowable<T>> window(
return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySelector<T, U, V>(this, openingIndicator, closingIndicator, bufferSize));
}

/**
* Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting
* Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one
* whenever the Publisher produced by the specified {@code closingSelector} emits an item.
* <p>
* <img width="640" height="455" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window1.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
* The returned {@code Publisher} doesn't support backpressure as it uses
* the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor
* backpressure but have an unbounded inner buffer that <em>may</em> lead to {@code OutOfMemoryError}
* if left unconsumed.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <B> the element type of the boundary Publisher
* @param boundaryIndicatorSupplier
* a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows.
* When the source {@code Publisher} emits an item, {@code window} emits the current window and begins
* a new one.
* @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher
* whenever {@code closingSelector} emits an item
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <B> Flowable<Flowable<T>> window(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier) {
return window(boundaryIndicatorSupplier, bufferSize());
}

/**
* Returns a Flowable that emits windows of items it collects from the source Publisher. The resulting
* Publisher emits connected, non-overlapping windows. It emits the current window and opens a new one
* whenever the Publisher produced by the specified {@code closingSelector} emits an item.
* <p>
* <img width="640" height="455" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window1.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner.
* The returned {@code Publisher} doesn't support backpressure as it uses
* the {@code closingSelector} to control the creation of windows. The returned inner {@code Publisher}s honor
* backpressure but have an unbounded inner buffer that <em>may</em> lead to {@code OutOfMemoryError}
* if left unconsumed.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <B> the element type of the boundary Publisher
* @param boundaryIndicatorSupplier
* a {@link Supplier} that returns a {@code Publisher} that governs the boundary between windows.
* When the source {@code Publisher} emits an item, {@code window} emits the current window and begins
* a new one.
* @param bufferSize
* the capacity hint for the buffer in the inner windows
* @return a Flowable that emits connected, non-overlapping windows of items from the source Publisher
* whenever {@code closingSelector} emits an item
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.ERROR)
@SchedulerSupport(SchedulerSupport.NONE)
public final <B> Flowable<Flowable<T>> window(Supplier<? extends Publisher<B>> boundaryIndicatorSupplier, int bufferSize) {
ObjectHelper.requireNonNull(boundaryIndicatorSupplier, "boundaryIndicatorSupplier is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableWindowBoundarySupplier<T, B>(this, boundaryIndicatorSupplier, bufferSize));
}

/**
* Merges the specified Publisher into this Publisher sequence by using the {@code resultSelector}
* function only when the source Publisher (this instance) emits an item.
Expand Down
Loading