Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi onComplete* operators #1806

Merged
merged 3 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.helidon.common.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -62,6 +63,28 @@ static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secon
return ConcatPublisher.create(firstMulti, secondMulti);
}

/**
* Concat streams to one.
*
* @param firstMulti first stream
* @param secondMulti second stream
* @param publishers more publishers to concat
* @param <T> item type
* @return Multi
*/
@SafeVarargs
@SuppressWarnings("varargs")
static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secondMulti, Flow.Publisher<T>... publishers) {
if (publishers.length == 0) {
return concat(firstMulti, secondMulti);
} else if (publishers.length == 1) {
return concat(concat(firstMulti, secondMulti), publishers[0]);
} else {
return concat(concat(firstMulti, secondMulti), publishers[0],
Arrays.copyOfRange(publishers, 1, publishers.length));
}
}

/**
* Call the given supplier function for each individual downstream Subscriber
* to return a Flow.Publisher to subscribe to.
Expand Down Expand Up @@ -638,6 +661,28 @@ default Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Pu
return new MultiOnErrorResumeWith<>(this, onError);
}

/**
* Resume stream from single item if onComplete signal is intercepted. Effectively do an {@code append} to the stream.
*
* @param item one item to resume stream with
* @return Multi
*/
default Multi<T> onCompleteResume(T item) {
Objects.requireNonNull(item, "item is null");
return onCompleteResumeWith(Multi.singleton(item));
}

/**
* Resume stream from supplied publisher if onComplete signal is intercepted.
*
* @param publisher new stream publisher
* @return Multi
*/
default Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> publisher) {
Objects.requireNonNull(publisher, "publisher is null");
return new MultiOnCompleteResumeWith<>(this, publisher);
}

/**
* Executes given {@link java.lang.Runnable} when any of signals onComplete, onCancel or onError is received.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* If the completes, switch to a generated Flow.Publisher and relay its signals then on.
*
* @param <T> the element type of the flows
*/
final class MultiOnCompleteResumeWith<T> implements Multi<T> {

private final Multi<T> source;

private final Flow.Publisher<? extends T> fallbackPublisher;

MultiOnCompleteResumeWith(Multi<T> source, Flow.Publisher<? extends T> fallbackPublisher) {
this.source = source;
this.fallbackPublisher = fallbackPublisher;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new OnCompleteResumeWithSubscriber<>(subscriber, fallbackPublisher));
}

static final class OnCompleteResumeWithSubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Flow.Publisher<? extends T> fallbackPublisher;

private Flow.Subscription upstream;

private long received;

private final AtomicLong requested;

private final FallbackSubscriber<T> fallbackSubscriber;

OnCompleteResumeWithSubscriber(Flow.Subscriber<? super T> downstream,
Flow.Publisher<? extends T> fallbackPublisher) {
this.downstream = downstream;
this.fallbackPublisher = fallbackPublisher;
this.requested = new AtomicLong();
fallbackSubscriber = new FallbackSubscriber<>(downstream, requested);
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
received++;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
upstream = SubscriptionHelper.CANCELED;
downstream.onError(throwable);
}

@Override
public void onComplete() {
upstream = SubscriptionHelper.CANCELED;
long p = received;
if (p != 0L) {
SubscriptionHelper.produced(requested, p);
}

Flow.Publisher<? extends T> publisher;

try {
publisher = Objects.requireNonNull(fallbackPublisher,
"The fallback function returned a null Flow.Publisher");
} catch (Throwable ex) {
downstream.onError(ex);
return;
}

publisher.subscribe(fallbackSubscriber);
}

@Override
public void request(long n) {
if (n <= 0L) {
downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
SubscriptionHelper.deferredRequest(fallbackSubscriber, requested, n);
upstream.request(n);
}
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallbackSubscriber);
}

static final class FallbackSubscriber<T> extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T> {

private final Flow.Subscriber<? super T> downstream;

private final AtomicLong requested;

FallbackSubscriber(Flow.Subscriber<? super T> downstream, AtomicLong requested) {
this.downstream = downstream;
this.requested = requested;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.deferredSetOnce(this, requested, subscription);
}

@Override
public void onNext(T item) {
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
downstream.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,41 @@ default Single<T> onErrorResume(Function<? super Throwable, ? extends T> onError
* @param onError supplier of new stream publisher
* @return Single
*/
default Single<T> onErrorResumeWith(Function<? super Throwable, ? extends Single<? extends T>> onError) {
default Single<T> onErrorResumeWithSingle(Function<? super Throwable, ? extends Single<? extends T>> onError) {
return new SingleOnErrorResumeWith<>(this, onError);
}

/**
* Resume stream from supplied publisher if onError signal is intercepted.
*
* @param onError supplier of new stream publisher
* @return Single
*/
default Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> onError) {
return new MultiOnErrorResumeWith<>(Multi.from(this), onError);
}

/**
* Resume stream from single item if onComplete signal is intercepted. Effectively do an {@code append} to the stream.
*
* @param item one item to resume stream with
* @return Multi
*/
default Multi<T> onCompleteResume(T item) {
Objects.requireNonNull(item, "item is null");
return onCompleteResumeWith(Multi.singleton(item));
}

/**
* Resume stream from supplied publisher if onComplete signal is intercepted.
*
* @param publisher new stream publisher
* @return Multi
*/
default Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> publisher) {
return new MultiOnCompleteResumeWith<>(Multi.from(this), publisher);
}

/**
* Executes given {@link java.lang.Runnable} when any of signals onComplete, onCancel or onError is received.
*
Expand Down
Loading