Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux.cache(0) doesn't replay the terminal signal #3164

Closed
plcarmel opened this issue Aug 23, 2022 · 14 comments
Closed

Flux.cache(0) doesn't replay the terminal signal #3164

plcarmel opened this issue Aug 23, 2022 · 14 comments
Assignees
Labels
status/need-design This needs more in depth design work

Comments

@plcarmel
Copy link

Flux.cache(0) doesn't replay the terminal signal for late subscribers, contrary to what is stated in the documentation.

Here is an example:

jshell> var f = Flux.just(1,2,3,4).cache(0).log().doOnTerminate(() -> System.err.println("hello world"))

jshell> f.subscribe()
[ INFO] (main) onSubscribe(FluxPublish.PublishInner)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
hello world

jshell> f.subscribe()
[ INFO] (main) onSubscribe(FluxPublish.PublishInner)
[ INFO] (main) request(unbounded)

No complete signal and thus no "hello world".

Tested with 3.4.17 and 3.4.22

Documentation says:
"Note that cache(0) will only cache the terminal signal without expiration. "

Which I understand as meaning that the signal will be replayed for late subscribers.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Aug 23, 2022
@chemicL
Copy link
Member

chemicL commented Aug 25, 2022

Thank you for the report. We are aware of the issue and it might require some significant design decisions to fix. Unfortunately, for the time being, I have no immediate workaround for the situation where the source terminates.

For full disclosure, the cause for this is that when the underlying reactor.core.publisher.FluxPublish.PublishSubscriber (returned from cache(0)) receives a terminal signal from the source, it sets its' state to a disconnected state and requires a new connect() call to be made (and cache(0) internally uses autoConnect() that does not re-connect upon termination of source). Which won't produce the desired result in the presented case, as that would cause the late subscriber receive the entire stream.

@chemicL chemicL added status/need-design This needs more in depth design work and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Aug 25, 2022
@plcarmel
Copy link
Author

I am not an expert, but to me, what needs to be done is quite simple.

Obviously, FluxPublish should not be used for cache(0). The comment in Flux.cache(int history) is misleading at best. replay(0) makes a lot of sense, because it produces (or not) something that is valuable in itself: the terminal signal.

		if (history == 0) {
			//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
			return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
		}

That leaves two options:

  1. Make FluxReplay work for history = 0
  2. Create a FluxReplayZero, or something like that, to handle that special case, if it is too complicated to modify FluxReplay so that it can handle history = 0

Did I miss something ?

@plcarmel
Copy link
Author

The request rate (if it is the right term) of FluxReplay is equal to the history size, but I am not really sure I understand why...

If replay(history) is supposed to transform a cold flux into a hot flux, couldn't it just send a request(unbounded) signal once and be done with it ?

@chemicL chemicL self-assigned this Aug 29, 2022
@chemicL
Copy link
Member

chemicL commented Aug 29, 2022

Thanks for the suggestions @plcarmel. We had a bit of a discussion around this problem and the background of the current state of affairs. It looks like we can try to incorporate your suggestions and make this case work as expected. I'll experiment and try to implement it.
Can you elaborate a bit on the last comment? From my understanding the requests to the upstream will depend on the replay history and has to do with the buffer consumption. It can only request more once all the subscribers got the onNext signals - then it's possible to discard the item that is out of scope of the history. To be able to request(unbounded), an infinite buffer would be necessary to guarantee that no items are lost by the subscribers, and that's not really practical. Please guide me towards what you are thinking if I'm missing the point.

@plcarmel
Copy link
Author

plcarmel commented Aug 30, 2022

No problem @chemicL, thank you for taking the time to address this.

Concerning request(unbounded), you are right and I am clearly still struggling at mastering back-pressure concepts.

The problem is that I was testing FluxReplay with one subscriber that cancelled its subscription after a while and I saw that FluxReplay kept asking for data in chunks of "historySize" size and I was like "what's the point of asking for data by chunks if you are going go ask for everything anyway and sending it to nowhere ?".

There is no back-pressure where there are no subscribers, but there is some when someone subscribes, and the fact that the Flux is hot doesn't mean that it doesn't respect back-pressure, it just means that it runs even when there are no subscribers. I understand that better now.

An other funny thing is that there is a discontinuity at historySize=0. Request rate goes down as historySize tends toward 0, but when it is exactly 0, since there are no data to remember, request(unbounded) is the way to go.

Damn it, I did it again. For historySize=0, the way to go is to transmit the request from the subscriber as-is.

No, because there can be multiple subscribers. We have to use request(1) or we would have to keep a tally of the requested amount of each subscriber and use the current minimum value for the next request. Or do whatever publish() does I guess.

@chemicL chemicL closed this as completed in db8902d Oct 3, 2022
@chemicL
Copy link
Member

chemicL commented Oct 3, 2022

@plcarmel please have a look at the latest 3.4.x snapshot and test with your use case. Feedback is welcome. The implementation resides in FluxPublish instead of FluxReplay as the latter is based around buffers - the buffer size instructs the prefetch size used in request(n). For 0 a buffer is unnecessary, that case is more similar to FluxPublish.

@plcarmel
Copy link
Author

plcarmel commented Oct 4, 2022

Thank you @chemicL, that's awesome ! Yes, I think having the implementation in FluxPublish was the way to go after all. I just tested it and it works well.

And thank you for the caveat in your commit message. It would be nice if it could make its way in the documentation (warning: this case is not implemented yet ...).

From the commit message:

For cases with expiry (TTL arguments), FluxReplay does indeed reset itself. Therefore, the behaviour for the time constrained values remains as before, using FluxPublish implementation for the 0 history case, but without caching terminals, while not honouring the TTL. This case can be later implemented if needed.

@chemicL
Copy link
Member

chemicL commented Oct 5, 2022

@plcarmel would you like to submit a PR with updates to the javadoc so it reflects what you'd expect to read? I guess I wouldn't do a better job at it than a conscious user :)

@plcarmel
Copy link
Author

Slowly looking into this.

@plcarmel
Copy link
Author

@chemicL, couldn't cache(0,ttl) simply return cache(0) ? If we remember only the terminal signal, the ttl becomes irrelevant.

I could create a new ticket for this, and an associated PR. That would be an easy first code contribution for me.

@chemicL
Copy link
Member

chemicL commented Oct 17, 2022

That sounds like a good idea. It still does not honour the TTL as other cases of non-zero history, which would reset themselves after the TTL. But it is an improvement to the current fallback to plain FluxPublish, so please go ahead with opening an issue and providing a PR and that can drive a further discussion if necessary. Please do remember also to support the replay(...) operator, which is the one backing cache(...).

@plcarmel
Copy link
Author

cache(Queues.SMALL_BUFFER_SIZE, ttl).cache(0) ?

That way, we will be honouring the TTL and we will cache only the terminal signal.

For replay:
cache(Queues.SMALL_BUFFER_SIZE, ttl).replay(0)

@plcarmel
Copy link
Author

The fetching behaviour would be different though (than with only FluxPublish). Using FluxReplay directly, for cache(Queues.SMALL_BUFFER_SIZE, ttl), might give enough flexibility to achieve the same fetching behaviour. I will be looking into this.

@plcarmel
Copy link
Author

Ugh, please don't comment on this, I don't think it would work. I will do my homework and test my things first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-design This needs more in depth design work
Projects
None yet
Development

No branches or pull requests

3 participants