Skip to content

Commit

Permalink
Multi onComplete* operators (#1806)
Browse files Browse the repository at this point in the history
* Multi onComplete* operators
* Subscribable has Multi/Single common methods
* Concat with varargs
  • Loading branch information
danielkec authored May 15, 2020
1 parent 7bef5ce commit 0325cae
Show file tree
Hide file tree
Showing 8 changed files with 768 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.helidon.common.reactive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -62,6 +63,28 @@ static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secon
return ConcatPublisher.create(firstMulti, secondMulti);
}

/**
* Concat streams to one.
*
* @param firstMulti first stream
* @param secondMulti second stream
* @param publishers more publishers to concat
* @param <T> item type
* @return Multi
*/
@SafeVarargs
@SuppressWarnings("varargs")
static <T> Multi<T> concat(Flow.Publisher<T> firstMulti, Flow.Publisher<T> secondMulti, Flow.Publisher<T>... publishers) {
if (publishers.length == 0) {
return concat(firstMulti, secondMulti);
} else if (publishers.length == 1) {
return concat(concat(firstMulti, secondMulti), publishers[0]);
} else {
return concat(concat(firstMulti, secondMulti), publishers[0],
Arrays.copyOfRange(publishers, 1, publishers.length));

This comment has been minimized.

Copy link
@olotenko

olotenko May 18, 2020

This here expression turns a list of unknown length into a stack of unknown depth. It is a left fold. It does not need recursion.

}
}

/**
* Call the given supplier function for each individual downstream Subscriber
* to return a Flow.Publisher to subscribe to.
Expand Down Expand Up @@ -638,6 +661,28 @@ default Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Pu
return new MultiOnErrorResumeWith<>(this, onError);
}

/**
* Resume stream from single item if onComplete signal is intercepted. Effectively do an {@code append} to the stream.
*
* @param item one item to resume stream with
* @return Multi
*/
default Multi<T> onCompleteResume(T item) {
Objects.requireNonNull(item, "item is null");
return onCompleteResumeWith(Multi.singleton(item));
}

/**
* Resume stream from supplied publisher if onComplete signal is intercepted.
*
* @param publisher new stream publisher
* @return Multi
*/
default Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> publisher) {
Objects.requireNonNull(publisher, "publisher is null");
return new MultiOnCompleteResumeWith<>(this, publisher);
}

/**
* Executes given {@link java.lang.Runnable} when any of signals onComplete, onCancel or onError is received.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* If the completes, switch to a generated Flow.Publisher and relay its signals then on.
*
* @param <T> the element type of the flows
*/
final class MultiOnCompleteResumeWith<T> implements Multi<T> {

private final Multi<T> source;

private final Flow.Publisher<? extends T> fallbackPublisher;

MultiOnCompleteResumeWith(Multi<T> source, Flow.Publisher<? extends T> fallbackPublisher) {
this.source = source;
this.fallbackPublisher = fallbackPublisher;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new OnCompleteResumeWithSubscriber<>(subscriber, fallbackPublisher));
}

static final class OnCompleteResumeWithSubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Flow.Publisher<? extends T> fallbackPublisher;

private Flow.Subscription upstream;

private long received;

private final AtomicLong requested;

private final FallbackSubscriber<T> fallbackSubscriber;

OnCompleteResumeWithSubscriber(Flow.Subscriber<? super T> downstream,
Flow.Publisher<? extends T> fallbackPublisher) {
this.downstream = downstream;
this.fallbackPublisher = fallbackPublisher;
this.requested = new AtomicLong();
fallbackSubscriber = new FallbackSubscriber<>(downstream, requested);
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
received++;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
upstream = SubscriptionHelper.CANCELED;
downstream.onError(throwable);
}

@Override
public void onComplete() {
upstream = SubscriptionHelper.CANCELED;
long p = received;
if (p != 0L) {
SubscriptionHelper.produced(requested, p);
}

Flow.Publisher<? extends T> publisher;

try {
publisher = Objects.requireNonNull(fallbackPublisher,
"The fallback function returned a null Flow.Publisher");
} catch (Throwable ex) {
downstream.onError(ex);
return;
}

publisher.subscribe(fallbackSubscriber);
}

@Override
public void request(long n) {
if (n <= 0L) {
downstream.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden"));
} else {
SubscriptionHelper.deferredRequest(fallbackSubscriber, requested, n);
upstream.request(n);
}
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallbackSubscriber);
}

static final class FallbackSubscriber<T> extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T> {

private final Flow.Subscriber<? super T> downstream;

private final AtomicLong requested;

FallbackSubscriber(Flow.Subscriber<? super T> downstream, AtomicLong requested) {
this.downstream = downstream;
this.requested = requested;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.deferredSetOnce(this, requested, subscription);
}

@Override
public void onNext(T item) {
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
downstream.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,41 @@ default Single<T> onErrorResume(Function<? super Throwable, ? extends T> onError
* @param onError supplier of new stream publisher
* @return Single
*/
default Single<T> onErrorResumeWith(Function<? super Throwable, ? extends Single<? extends T>> onError) {
default Single<T> onErrorResumeWithSingle(Function<? super Throwable, ? extends Single<? extends T>> onError) {
return new SingleOnErrorResumeWith<>(this, onError);
}

/**
* Resume stream from supplied publisher if onError signal is intercepted.
*
* @param onError supplier of new stream publisher
* @return Single
*/
default Multi<T> onErrorResumeWith(Function<? super Throwable, ? extends Flow.Publisher<? extends T>> onError) {
return new MultiOnErrorResumeWith<>(Multi.from(this), onError);
}

/**
* Resume stream from single item if onComplete signal is intercepted. Effectively do an {@code append} to the stream.
*
* @param item one item to resume stream with
* @return Multi
*/
default Multi<T> onCompleteResume(T item) {
Objects.requireNonNull(item, "item is null");
return onCompleteResumeWith(Multi.singleton(item));
}

/**
* Resume stream from supplied publisher if onComplete signal is intercepted.
*
* @param publisher new stream publisher
* @return Multi
*/
default Multi<T> onCompleteResumeWith(Flow.Publisher<? extends T> publisher) {
return new MultiOnCompleteResumeWith<>(Multi.from(this), publisher);
}

/**
* Executes given {@link java.lang.Runnable} when any of signals onComplete, onCancel or onError is received.
*
Expand Down
Loading

0 comments on commit 0325cae

Please sign in to comment.