Skip to content

Commit

Permalink
Merge #3200 into 3.5.0-RC1
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Oct 3, 2022
2 parents 4819fdd + db8902d commit 0d40eb1
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 95 deletions.
20 changes: 12 additions & 8 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7212,7 +7212,7 @@ public final ConnectableFlux<T> publish() {
*/
public final ConnectableFlux<T> publish(int prefetch) {
return onAssembly(new FluxPublish<>(this, prefetch, Queues
.get(prefetch)));
.get(prefetch), true));
}

/**
Expand Down Expand Up @@ -7558,10 +7558,12 @@ public final ConnectableFlux<T> replay() {
* Will retain up to the given history size onNext signals. Completion and Error will also be
* replayed.
* <p>
* Note that {@code cache(0)} will only cache the terminal signal without
* Note that {@code replay(0)} will only cache the terminal signal without
* expiration.
*
* <p>
* Re-connects are not supported.
* <p>
* <img class="marble" src="doc-files/marbles/replayWithHistory.svg" alt="">
*
* @param history number of events retained in history excluding complete and
Expand All @@ -7572,8 +7574,8 @@ public final ConnectableFlux<T> replay() {
*/
public final ConnectableFlux<T> replay(int history) {
if (history == 0) {
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE,
Queues.get(Queues.SMALL_BUFFER_SIZE), false));
}
return onAssembly(new FluxReplay<>(this, history, 0L, null));
}
Expand Down Expand Up @@ -7655,8 +7657,8 @@ public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) {
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer) {
Objects.requireNonNull(timer, "timer");
if (history == 0) {
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE,
Queues.get(Queues.SMALL_BUFFER_SIZE), true));
}
return onAssembly(new FluxReplay<>(this, history, ttl.toNanos(), timer));
}
Expand Down Expand Up @@ -7979,8 +7981,10 @@ public final <A> Flux<A> scanWith(Supplier<A> initial, BiFunction<A, ? super T,
* to subscribe once, late subscribers might therefore miss items.
*/
public final Flux<T> share() {
return onAssembly(new FluxRefCount<>(
new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.small()), 1)
return onAssembly(
new FluxRefCount<>(new FluxPublish<>(
this, Queues.SMALL_BUFFER_SIZE, Queues.small(), true
), 1)
);
}

Expand Down
33 changes: 25 additions & 8 deletions reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,11 @@ final class FluxPublish<T> extends ConnectableFlux<T> implements Scannable {

final Supplier<? extends Queue<T>> queueSupplier;

/**
* Whether to prepare for a reconnect after the source terminates.
*/
final boolean resetUponSourceTermination;

volatile PublishSubscriber<T> connection;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<FluxPublish, PublishSubscriber> CONNECTION =
Expand All @@ -66,13 +71,15 @@ final class FluxPublish<T> extends ConnectableFlux<T> implements Scannable {

FluxPublish(Flux<? extends T> source,
int prefetch,
Supplier<? extends Queue<T>> queueSupplier) {
Supplier<? extends Queue<T>> queueSupplier,
boolean resetUponSourceTermination) {
if (prefetch <= 0) {
throw new IllegalArgumentException("bufferSize > 0 required but it was " + prefetch);
}
this.source = Objects.requireNonNull(source, "source");
this.prefetch = prefetch;
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
this.resetUponSourceTermination = resetUponSourceTermination;
}

@Override
Expand Down Expand Up @@ -111,7 +118,7 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

PublishSubscriber<T> c = connection;
if (c == null || c.isTerminated()) {
if (c == null || (this.resetUponSourceTermination && c.isTerminated())) {
PublishSubscriber<T> u = new PublishSubscriber<>(prefetch, this);
if (!CONNECTION.compareAndSet(this, c, u)) {
continue;
Expand All @@ -123,12 +130,18 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (c.add(inner)) {
if (inner.isCancelled()) {
c.remove(inner);
}
else {
} else {
inner.parent = c;
}
c.drain();
break;
} else if (!this.resetUponSourceTermination) {
if (c.error != null) {
inner.actual.onError(c.error);
} else {
inner.actual.onComplete();
}
break;
}
}
}
Expand Down Expand Up @@ -515,16 +528,20 @@ boolean checkTerminated(boolean d, boolean empty) {
if (d) {
Throwable e = error;
if (e != null && e != Exceptions.TERMINATED) {
CONNECTION.compareAndSet(parent, this, null);
e = Exceptions.terminate(ERROR, this);
if (parent.resetUponSourceTermination) {
CONNECTION.compareAndSet(parent, this, null);
e = Exceptions.terminate(ERROR, this);
}
queue.clear();
for (PubSubInner<T> inner : terminate()) {
inner.actual.onError(e);
}
return true;
}
else if (empty) {
CONNECTION.compareAndSet(parent, this, null);
if (parent.resetUponSourceTermination) {
CONNECTION.compareAndSet(parent, this, null);
}
for (PubSubInner<T> inner : terminate()) {
inner.actual.onComplete();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,7 +103,7 @@ public void cacheFluxHistoryTTL() {
}

@Test
public void cacheFluxTTL2() {
public void cacheFluxTTLReconnectsAfterTTL() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();

AtomicInteger i = new AtomicInteger(0);
Expand All @@ -125,6 +125,47 @@ public void cacheFluxTTL2() {
.verifyComplete();
}

@Test
void cacheZeroFluxCachesCompletion() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(Duration.ofMillis(1000)
, vts)
.cache(0)
.elapsed(vts);

StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();

StepVerifier.create(source).verifyComplete();
}

@Test
public void cacheZeroFluxTTLReconnectsAfterSourceCompletion() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();

Flux<Tuple2<Long, Integer>> source = Flux.just(1, 2, 3)
.delayElements(
Duration.ofMillis(1000), vts
)
.cache(0, Duration.ofMillis(2000), vts)
.elapsed(vts);

StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE)
.thenAwait(Duration.ofSeconds(3))
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2)
.expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3)
.verifyComplete();

StepVerifier.create(source).expectTimeout(Duration.ofMillis(500)).verify();
}

@Test
public void cacheContextHistory() {
AtomicInteger contextFillCount = new AtomicInteger();
Expand Down Expand Up @@ -156,6 +197,38 @@ public void cacheContextHistory() {
assertThat(contextFillCount).as("cacheHit3").hasValue(4);
}

@Test
public void cacheZeroContext() {
AtomicInteger contextFillCount = new AtomicInteger();
Flux<String> cached = Flux.just(1, 2)
.flatMap(i -> Mono.deferContextual(Mono::just)
.map(ctx -> ctx.getOrDefault("a", "BAD"))
)
.cache(0)
.contextWrite(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet()));

// at first pass, the Context is propagated to subscriber, but not cached
String cacheMiss = cached.blockLast();
assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1");
assertThat(contextFillCount).as("cacheMiss").hasValue(1);

// at second subscribe, the Context fill attempt is still done, but ultimately
// ignored since source terminated
String zeroCache = cached.blockLast();
assertThat(zeroCache).as("zeroCache").isNull(); //value from the cache
assertThat(contextFillCount).as("zeroCache").hasValue(2); //function was still invoked

//at third subscribe, function is called for the 3rd time, but the context is still cached
String zeroCache2 = cached.blockLast();
assertThat(zeroCache2).as("zeroCache2").isNull();
assertThat(contextFillCount).as("zeroCache2").hasValue(3);

//at fourth subscribe, function is called for the 4th time, but the context is still cached
String zeroCache3 = cached.blockLast();
assertThat(zeroCache3).as("zeroCache3").isNull();
assertThat(contextFillCount).as("zeroCache3").hasValue(4);
}

@Test
public void cacheContextTime() {
AtomicInteger contextFillCount = new AtomicInteger();
Expand Down
Loading

0 comments on commit 0d40eb1

Please sign in to comment.