-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replay terminal signals to late subscribers in Flux.replay(int) and Flux.cache(int) #3200
Conversation
reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxPublish.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me
@@ -120,15 +127,21 @@ public void subscribe(CoreSubscriber<? super T> actual) { | |||
c = u; | |||
} | |||
|
|||
inner.parent = c; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a behavior change. not sure about the impact but let's keep the old logic as it was
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. In the earlier stages this was necessary, but it should be only necessary in the previous logic. Restored.
@chemicL this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
Supporting the caching of only terminal signals in case of
Flux.replay(int)
andFlux.cache(int)
operators.As it currently stands, these operators, when provided
0
as the argument, resort to behaving likeFlux.publish()
which differs in the way termination signals are handled by the> 0
cases. This change still uses theFluxPublish
class to implement the logic, however with a few minor changes to its implementation.First of all,
FluxPublish
resets itself after source termination to be able toconnect()
again.FluxReplay
on the other hand, does not reset itself in the case of only buffering signals without timeout. Therefore, for the behaviour of caching the terminals,FluxPublish
does not reset itself and replays the terminal when it receives a late subscription.For cases with expiry (TTL arguments),
FluxReplay
does indeed reset itself. Therefore, the behaviour for the time constrained values remains as before, usingFluxPublish
implementation for the0
history case, but without caching terminals, while not honouring the TTL. This case can be later implemented if needed.