-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blocking buffer until experiment #864
Conversation
RxJava-pull-requests #792 FAILURE |
RxJava-pull-requests #793 SUCCESS |
Test I guess there is a race issue with repeat() as it schedules a new repeat after take unsubscribes. I guess adding a |
Reviewing code ... considering the drawbacks, what do you think is worse, possibility of (deterministic?) deadlock with this solution? or possibility of non-deterministic data-loss when using Is the deadlock deterministic (it would always happen in dev so it gets found) or could it happen if a Scheduler becomes saturated, or the buffer size is higher than available threads? |
Non-deterministic data loss is definitely worse. Deadlock due to the computation scheduler being single threaded is worrying, but might affect other concurrent operators as well regardless. I think the documentation could mention that if pushback or blocking behavior is expected, one should use NewThread or IO scheduler for the unblocking operation. |
I agree, so let's continue down this path :-) I'll review through your code in a bit. |
First pass through reading this code it seems good, and mature enough to handle the different scenarios we could throw against it. I'm going to spend some more time playing but nothing right now suggests that this should not be the path we take. |
* on the specified {@link Scheduler} | ||
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a> | ||
*/ | ||
public final Observable<T> subscribeOn(Scheduler scheduler, int bufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overload should help mitigate issues when subscribing to a PublishSubject (and derivatives such as GroupedObservable in operator groupBy) and events fired between the original and actual subscriptions are lost.
That doesn't seem to need this overload since we special-case those two Observable
instances even if bufferSize
is not passed in and we say false
for dontLoseEvents
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see ... the automatic has an unbounded buffer so it never blocks. This overload allows for blocking as well.
Are you suggesting subscribeOn
by itself never use the blocking form but any operator implementation that uses subscribeOn
would choose to do so?
In that case, should subscribeOn
do any buffering by default or only on demand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My suggestion is that subscribeOn(Scheduler) never blocks with any source and events may be lost, and subscribeOn(Scheduler, int) does not lose events and may block depending on the buffer size.
I added some things on top of this at #869. |
Work on this is picked up in #869 |
This is a solution to the time gap problem for #844.
I've added an subscribeOn overload where the user can explicitly request a buffering behavior. In addition, SubscribeOn checks the type of the Observable and enters buffering mode for GroupedObservable and PublishSubject. I think these code options should be mutually exclusive:
I personally favor option 2).
A drawback is that this blocking subscribeOn deadlocks on pools with a single thread. We can, of course, check for Trampoline, Test and Immediate schedulers, but not schedulers created via
Schedulers.executor
, or the computation scheduler on a single-core machine.