diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 1f08bed82f..92ee741c04 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7219,7 +7219,7 @@ public final ConnectableFlux publish() { */ public final ConnectableFlux publish(int prefetch) { return onAssembly(new FluxPublish<>(this, prefetch, Queues - .get(prefetch))); + .get(prefetch), true)); } /** @@ -7565,10 +7565,12 @@ public final ConnectableFlux replay() { * Will retain up to the given history size onNext signals. Completion and Error will also be * replayed. *

- * Note that {@code cache(0)} will only cache the terminal signal without + * Note that {@code replay(0)} will only cache the terminal signal without * expiration. * *

+ * Re-connects are not supported. + *

* * * @param history number of events retained in history excluding complete and @@ -7579,8 +7581,8 @@ public final ConnectableFlux replay() { */ public final ConnectableFlux replay(int history) { if (history == 0) { - //TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version - return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE))); + return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, + Queues.get(Queues.SMALL_BUFFER_SIZE), false)); } return onAssembly(new FluxReplay<>(this, history, 0L, null)); } @@ -7662,8 +7664,8 @@ public final ConnectableFlux replay(Duration ttl, Scheduler timer) { public final ConnectableFlux replay(int history, Duration ttl, Scheduler timer) { Objects.requireNonNull(timer, "timer"); if (history == 0) { - //TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version - return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE))); + return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, + Queues.get(Queues.SMALL_BUFFER_SIZE), true)); } return onAssembly(new FluxReplay<>(this, history, ttl.toNanos(), timer)); } @@ -7986,8 +7988,10 @@ public final Flux scanWith(Supplier initial, BiFunction share() { - return onAssembly(new FluxRefCount<>( - new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.small()), 1) + return onAssembly( + new FluxRefCount<>(new FluxPublish<>( + this, Queues.SMALL_BUFFER_SIZE, Queues.small(), true + ), 1) ); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java index 98a03c7790..3b37838b99 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,11 @@ final class FluxPublish extends ConnectableFlux implements Scannable { final Supplier> queueSupplier; + /** + * Whether to prepare for a reconnect after the source terminates. + */ + final boolean resetUponSourceTermination; + volatile PublishSubscriber connection; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater CONNECTION = @@ -66,13 +71,15 @@ final class FluxPublish extends ConnectableFlux implements Scannable { FluxPublish(Flux source, int prefetch, - Supplier> queueSupplier) { + Supplier> queueSupplier, + boolean resetUponSourceTermination) { if (prefetch <= 0) { throw new IllegalArgumentException("bufferSize > 0 required but it was " + prefetch); } this.source = Objects.requireNonNull(source, "source"); this.prefetch = prefetch; this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); + this.resetUponSourceTermination = resetUponSourceTermination; } @Override @@ -111,7 +118,7 @@ public void subscribe(CoreSubscriber actual) { } PublishSubscriber c = connection; - if (c == null || c.isTerminated()) { + if (c == null || (this.resetUponSourceTermination && c.isTerminated())) { PublishSubscriber u = new PublishSubscriber<>(prefetch, this); if (!CONNECTION.compareAndSet(this, c, u)) { continue; @@ -123,12 +130,18 @@ public void subscribe(CoreSubscriber actual) { if (c.add(inner)) { if (inner.isCancelled()) { c.remove(inner); - } - else { + } else { inner.parent = c; } c.drain(); break; + } else if (!this.resetUponSourceTermination) { + if (c.error != null) { + inner.actual.onError(c.error); + } else { + inner.actual.onComplete(); + } + break; } } } @@ -515,8 +528,10 @@ boolean checkTerminated(boolean d, boolean empty) { if (d) { Throwable e = error; if (e != null && e != Exceptions.TERMINATED) { - CONNECTION.compareAndSet(parent, this, null); - e = Exceptions.terminate(ERROR, this); + if (parent.resetUponSourceTermination) { + CONNECTION.compareAndSet(parent, this, null); + e = Exceptions.terminate(ERROR, this); + } queue.clear(); for (PubSubInner inner : terminate()) { inner.actual.onError(e); @@ -524,7 +539,9 @@ boolean checkTerminated(boolean d, boolean empty) { return true; } else if (empty) { - CONNECTION.compareAndSet(parent, this, null); + if (parent.resetUponSourceTermination) { + CONNECTION.compareAndSet(parent, this, null); + } for (PubSubInner inner : terminate()) { inner.actual.onComplete(); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java index 2c0d24ad59..aa6d45afc2 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxCacheTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ public void cacheFluxHistoryTTL() { } @Test - public void cacheFluxTTL2() { + public void cacheFluxTTLReconnectsAfterTTL() { VirtualTimeScheduler vts = VirtualTimeScheduler.create(); AtomicInteger i = new AtomicInteger(0); @@ -125,6 +125,47 @@ public void cacheFluxTTL2() { .verifyComplete(); } + @Test + void cacheZeroFluxCachesCompletion() { + VirtualTimeScheduler vts = VirtualTimeScheduler.create(); + + Flux> source = Flux.just(1, 2, 3) + .delayElements(Duration.ofMillis(1000) + , vts) + .cache(0) + .elapsed(vts); + + StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE) + .thenAwait(Duration.ofSeconds(3)) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) + .verifyComplete(); + + StepVerifier.create(source).verifyComplete(); + } + + @Test + public void cacheZeroFluxTTLReconnectsAfterSourceCompletion() { + VirtualTimeScheduler vts = VirtualTimeScheduler.create(); + + Flux> source = Flux.just(1, 2, 3) + .delayElements( + Duration.ofMillis(1000), vts + ) + .cache(0, Duration.ofMillis(2000), vts) + .elapsed(vts); + + StepVerifier.withVirtualTime(() -> source, () -> vts, Long.MAX_VALUE) + .thenAwait(Duration.ofSeconds(3)) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 1) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 2) + .expectNextMatches(t -> t.getT1() == 1000 && t.getT2() == 3) + .verifyComplete(); + + StepVerifier.create(source).expectTimeout(Duration.ofMillis(500)).verify(); + } + @Test public void cacheContextHistory() { AtomicInteger contextFillCount = new AtomicInteger(); @@ -156,6 +197,38 @@ public void cacheContextHistory() { assertThat(contextFillCount).as("cacheHit3").hasValue(4); } + @Test + public void cacheZeroContext() { + AtomicInteger contextFillCount = new AtomicInteger(); + Flux cached = Flux.just(1, 2) + .flatMap(i -> Mono.deferContextual(Mono::just) + .map(ctx -> ctx.getOrDefault("a", "BAD")) + ) + .cache(0) + .contextWrite(ctx -> ctx.put("a", "GOOD" + contextFillCount.incrementAndGet())); + + // at first pass, the Context is propagated to subscriber, but not cached + String cacheMiss = cached.blockLast(); + assertThat(cacheMiss).as("cacheMiss").isEqualTo("GOOD1"); + assertThat(contextFillCount).as("cacheMiss").hasValue(1); + + // at second subscribe, the Context fill attempt is still done, but ultimately + // ignored since source terminated + String zeroCache = cached.blockLast(); + assertThat(zeroCache).as("zeroCache").isNull(); //value from the cache + assertThat(contextFillCount).as("zeroCache").hasValue(2); //function was still invoked + + //at third subscribe, function is called for the 3rd time, but the context is still cached + String zeroCache2 = cached.blockLast(); + assertThat(zeroCache2).as("zeroCache2").isNull(); + assertThat(contextFillCount).as("zeroCache2").hasValue(3); + + //at fourth subscribe, function is called for the 4th time, but the context is still cached + String zeroCache3 = cached.blockLast(); + assertThat(zeroCache3).as("zeroCache3").isNull(); + assertThat(contextFillCount).as("zeroCache3").hasValue(4); + } + @Test public void cacheContextTime() { AtomicInteger contextFillCount = new AtomicInteger(); diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java index e020930eff..17efb6dfb4 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxPublishTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,10 +26,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.provider.ValueSource; import org.reactivestreams.Subscription; import reactor.core.Disposable; import reactor.core.Scannable; import reactor.core.scheduler.Schedulers; +import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; import reactor.test.publisher.TestPublisher; @@ -53,8 +55,9 @@ protected Scenario defaultScenarioOptions(Scenario> scenarios_operatorSuccess() { return Arrays.asList( scenario(f -> f.publish().autoConnect()), - - scenario(f -> f.publish().refCount()) + scenario(f -> f.publish().refCount()), + scenario(f -> f.replay(0).autoConnect()), + scenario(f -> f.replay(0).refCount()) ); } @@ -118,12 +121,15 @@ public void constructors() { ctb.test(); }*/ - @Test - public void normal() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + void normal(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).hide().publish(); + Flux source = Flux.range(1, 5).hide(); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -147,14 +153,34 @@ public void normal() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } - @Test - public void normalBackpressured() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + void normalBackpressured(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); - ConnectableFlux p = Flux.range(1, 5).hide().publish(); + Flux source = Flux.range(1, 5).hide(); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -202,10 +228,28 @@ public void normalBackpressured() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } - @Test - public void normalAsyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalAsyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); @@ -217,7 +261,8 @@ public void normalAsyncFused() { up.emitNext(5, FAIL_FAST); up.emitComplete(FAIL_FAST); - ConnectableFlux p = up.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + up.asFlux().replay(0) : up.asFlux().publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -241,10 +286,20 @@ public void normalAsyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + if (replayTerminalSignal) { + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } // no else - unicast disallows second connect } - @Test - public void normalBackpressuredAsyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalBackpressuredAsyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); @@ -256,7 +311,8 @@ public void normalBackpressuredAsyncFused() { up.emitNext(5, FAIL_FAST); up.emitComplete(FAIL_FAST); - ConnectableFlux p = up.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + up.asFlux().replay(0) : up.asFlux().publish(); p.subscribe(ts1); p.subscribe(ts2); @@ -304,14 +360,26 @@ public void normalBackpressuredAsyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + if (replayTerminalSignal) { + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } // no else - unicast disallows second connect } - @Test - public void normalSyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalSyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(); AssertSubscriber ts2 = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).publish(5); + Flux source = Flux.range(1, 5); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(5); p.subscribe(ts1); p.subscribe(ts2); @@ -337,12 +405,15 @@ public void normalSyncFused() { .assertComplete(); } - @Test - public void normalBackpressuredSyncFused() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void normalBackpressuredSyncFused(boolean replayTerminalSignal) { AssertSubscriber ts1 = AssertSubscriber.create(0); AssertSubscriber ts2 = AssertSubscriber.create(0); - ConnectableFlux p = Flux.range(1, 5).publish(5); + Flux source = Flux.range(1, 5); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(5); p.subscribe(ts1); p.subscribe(ts2); @@ -390,6 +461,23 @@ public void normalBackpressuredSyncFused() { ts2.assertValues(1, 2, 3, 4, 5) .assertNoError() .assertComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + if (replayTerminalSignal) { + ts3.assertNoValues() + .assertNoError() + .assertComplete(); + } else { + ts3.assertNoEvents(); + + p.connect(); + + ts3.assertValues(1, 2, 3, 4, 5) + .assertNoError() + .assertComplete(); + } } //see https://github.com/reactor/reactor-core/issues/1302 @@ -408,13 +496,15 @@ public void boundaryFused() { .verify(Duration.ofSeconds(5)); } - @Test - public void disconnect() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void disconnect(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -430,15 +520,22 @@ public void disconnect() { .assertNotComplete(); assertThat(e.currentSubscriberCount()).as("still connected").isZero(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoEvents(); } - @Test - public void disconnectBackpressured() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void disconnectBackpressured(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(0); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -451,15 +548,22 @@ public void disconnectBackpressured() { .assertNotComplete(); assertThat(e.currentSubscriberCount()).as("still connected").isZero(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + ts3.assertNoEvents(); } - @Test - public void error() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void error(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); Sinks.Many e = Sinks.many().multicast().onBackpressureBuffer(); - ConnectableFlux p = e.asFlux().publish(); + ConnectableFlux p = replayTerminalSignal ? + e.asFlux().replay(0) : e.asFlux().publish(); p.subscribe(ts); @@ -473,13 +577,28 @@ public void error() { .assertError(RuntimeException.class) .assertErrorWith(x -> assertThat(x).hasMessageContaining("forced failure")) .assertNotComplete(); + + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); + + if (replayTerminalSignal) { + ts3.assertError(RuntimeException.class) + .assertErrorWith(x -> assertThat(x).hasMessageContaining("forced failure")) + .assertNotComplete(); + } else { + ts3.assertNoEvents(); + } } - @Test - public void fusedMapInvalid() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void fusedMapInvalid(boolean replayTerminalSignal) { AssertSubscriber ts = AssertSubscriber.create(); - ConnectableFlux p = Flux.range(1, 5).map(v -> (Integer)null).publish(); + Flux source = Flux.range(1, 5).map(v -> (Integer) null); + ConnectableFlux p = replayTerminalSignal ? + source.replay(0) : source.publish(); p.subscribe(ts); @@ -488,15 +607,27 @@ public void fusedMapInvalid() { ts.assertNoValues() .assertError(NullPointerException.class) .assertNotComplete(); - } + // late subscriber + AssertSubscriber ts3 = AssertSubscriber.create(); + p.subscribe(ts3); - @Test - public void retry() { + if (replayTerminalSignal) { + ts3.assertError(NullPointerException.class) + .assertNotComplete(); + } else { + ts3.assertNoEvents(); + } + } + + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void retry(boolean replayTerminalSignal) { Sinks.Many dp = Sinks.unsafe().many().multicast().directBestEffort(); + ConnectableFlux flux = replayTerminalSignal ? + dp.asFlux().replay(0) : dp.asFlux().publish(); StepVerifier.create( - dp.asFlux() - .publish() + flux .autoConnect().handle((s1, sink) -> { if (s1 == 1) { sink.error(new RuntimeException()); @@ -518,12 +649,16 @@ public void retry() { dp.emitComplete(FAIL_FAST); } - @Test - public void retryWithPublishOn() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void retryWithPublishOn(boolean replayTerminalSignal) { Sinks.Many dp = Sinks.unsafe().many().multicast().directBestEffort(); + Flux source = dp.asFlux().publishOn(Schedulers.parallel()); + + ConnectableFlux flux = replayTerminalSignal ? + source.replay(0) : source.publish(); StepVerifier.create( - dp.asFlux() - .publishOn(Schedulers.parallel()).publish() + flux .autoConnect().handle((s1, sink) -> { if (s1 == 1) { sink.error(new RuntimeException()); @@ -548,7 +683,8 @@ public void retryWithPublishOn() { @Test public void scanMain() { Flux parent = Flux.just(1).map(i -> i); - FluxPublish test = new FluxPublish<>(parent, 123, Queues.unbounded()); + FluxPublish test = + new FluxPublish<>(parent, 123, Queues.unbounded(), true); assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(123); @@ -557,7 +693,8 @@ public void scanMain() { @Test public void scanSubscriber() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber test = new FluxPublish.PublishSubscriber<>(789, main); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); @@ -583,7 +720,8 @@ public void scanSubscriber() { @Test public void scanInner() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber parent = new FluxPublish.PublishSubscriber<>(789, main); Subscription sub = Operators.emptySubscription(); parent.onSubscribe(sub); @@ -607,7 +745,8 @@ public void scanInner() { @Test public void scanPubSubInner() { - FluxPublish main = new FluxPublish<>(Flux.just(1), 123, Queues.unbounded()); + FluxPublish main = + new FluxPublish<>(Flux.just(1), 123, Queues.unbounded(), true); FluxPublish.PublishSubscriber parent = new FluxPublish.PublishSubscriber<>(789, main); Subscription sub = Operators.emptySubscription(); parent.onSubscribe(sub); @@ -624,10 +763,11 @@ public void scanPubSubInner() { } //see https://github.com/reactor/reactor-core/issues/1290 - @Test - public void syncFusionSingle() { //single value in the SYNC fusion - final ConnectableFlux publish = Flux.just("foo") - .publish(); + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void syncFusionSingle(boolean replayTerminalSignal) { //single value in the SYNC fusion + final ConnectableFlux publish = replayTerminalSignal ? + Flux.just("foo").replay(0) : Flux.just("foo").publish(); StepVerifier.create(publish) .then(publish::connect) @@ -637,10 +777,12 @@ public void syncFusionSingle() { //single value in the SYNC fusion } //see https://github.com/reactor/reactor-core/issues/1290 - @Test - public void syncFusionMultiple() { //multiple values in the SYNC fusion - final ConnectableFlux publish = Flux.range(1, 5) - .publish(); + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void syncFusionMultiple(boolean replayTerminalSignal) { //multiple values in the SYNC fusion + final ConnectableFlux publish = replayTerminalSignal ? + Flux.range(1, 5).replay(0) : + Flux.range(1, 5).publish(); StepVerifier.create(publish) .then(publish::connect) @@ -650,12 +792,13 @@ public void syncFusionMultiple() { //multiple values in the SYNC fusion } //see https://github.com/reactor/reactor-core/issues/1528 - @Test + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) @Timeout(4) - public void syncFusionFromInfiniteStream() { - final ConnectableFlux publish = - Flux.fromStream(Stream.iterate(0, i -> i + 1)) - .publish(); + public void syncFusionFromInfiniteStream(boolean replayTerminalSignal) { + Flux source = Flux.fromStream(Stream.iterate(0, i -> i + 1)); + final ConnectableFlux publish = replayTerminalSignal ? + source.replay(0) : source.publish(); StepVerifier.create(publish) .then(publish::connect) @@ -666,12 +809,15 @@ public void syncFusionFromInfiniteStream() { } //see https://github.com/reactor/reactor-core/issues/1528 - @Test + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) @Timeout(4) - public void syncFusionFromInfiniteStreamAndTake() { + public void syncFusionFromInfiniteStreamAndTake(boolean replayTerminalSignal) { + Flux source = Flux.fromStream(Stream.iterate(0, i -> i + 1)); + ConnectableFlux flux = replayTerminalSignal ? + source.replay(0) : source.publish(); final Flux publish = - Flux.fromStream(Stream.iterate(0, i -> i + 1)) - .publish() + flux .autoConnect() .take(10); @@ -681,10 +827,12 @@ public void syncFusionFromInfiniteStreamAndTake() { .verify(Duration.ofSeconds(4)); } - @Test - public void dataDroppedIfConnectImmediately() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void dataDroppedIfConnectImmediately(boolean replayTerminalSignal) { TestPublisher publisher = TestPublisher.create(); - ConnectableFlux connectableFlux = publisher.flux().publish(); + ConnectableFlux connectableFlux = replayTerminalSignal ? + publisher.flux().replay(0) : publisher.flux().publish(); connectableFlux.connect(); @@ -700,10 +848,13 @@ public void dataDroppedIfConnectImmediately() { .verifyComplete(); } - @Test - public void dataDroppedIfAutoconnectZero() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void dataDroppedIfAutoconnectZero(boolean replayTerminalSignal) { TestPublisher publisher = TestPublisher.create(); - Flux flux = publisher.flux().publish().autoConnect(0); + ConnectableFlux publish = replayTerminalSignal ? + publisher.flux().replay(0) : publisher.flux().publish(); + Flux flux = publish.autoConnect(0); publisher.next(1); publisher.next(2); @@ -733,21 +884,22 @@ public void removeUnknownInnerIgnored() { assertThat(subscriber.subscribers).as("post remove inner").isEmpty(); } - @Test - public void subscriberContextPropagation() { + @ParameterizedTestWithName + @ValueSource(booleans = {true, false}) + public void subscriberContextPropagation(boolean replayTerminalSignal) { String key = "key"; int expectedValue = 1; AtomicReference reference = new AtomicReference<>(); + Flux source = Flux.just(1, 2, 3) + .flatMap(value -> Mono.deferContextual(Mono::just) + .doOnNext(reference::set) + .thenReturn(value)); + ConnectableFlux publish = replayTerminalSignal ? + source.replay(0) : source.publish(); Flux integerFlux = - Flux.just(1, 2, 3) - .flatMap(value -> - Mono.deferContextual(Mono::just) - .doOnNext(reference::set) - .thenReturn(value) - ) - .publish() + publish .autoConnect(2); integerFlux.contextWrite(Context.of(key, expectedValue))