-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
subscribeOn + groupBy #869
subscribeOn + groupBy #869
Conversation
…xJava into subscribeOn-blockingBuffer
* | ||
* See https://github.com/Netflix/RxJava/issues/844 for more information. | ||
*/ | ||
return dontLoseEvents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs decision as currently GroupedObservable will be buffered in unlimited mode with the subscribeOn(Scheduler) overload. I favor explicitly requested buffering instead of type-based buffering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree, it just means groupBy is dangerous for anyone to use subscribeOn
with.
- I want to have it for internal usage but am not ready to publicly expose it.
- the previously performed observeOn changes appear to have resolved the non-determinism
- timeout test could be interrupted when unsubscribed - groupBy.subscribeOn needs blocking buffer
@zsxwing Can you take a look at this unit test? I'm not sure if it's a legit problem or not. When timeout happens it calls unsubscribe which can result in the work being interrupted. This in turn causes it to not pass this test, but the test may just need to account for work possibly being interrupted when a timeout occurs. I'd appreciate your review.
See changes I did here to not block, but it sometimes still fails: benjchristensen@54b19be#diff-381ec8d12950bcd8ddf7e4751875d74fR349 |
@benjchristensen Where is the |
RxJava-pull-requests #803 ABORTED |
It's from the line: benjchristensen@43437fe#diff-a726afb863d72e4bd1938fcc01f443b3R101 The scheduler is unsubscribed which will cancel a future (when bread based) that interrupts the thread. |
I don't know what is causing the tests to abort. Other than this timeout one being discussed all others pass on my machine. |
The following codes which try to dispatch the unsubscribe to the scheduler have been removed. |
…test to be consistent with the new subscribeOn
I sent a PR to fix this test: benjchristensen#7 |
Because no one can give me a good reason for why the For example, someone could write code like this (not necessarily for good reason):
Note how the unsubscribe is received on a different thread than is used to emit data. We can't schedule back on to that other thread, because scheduling goes down not up. Thus, I see no point in scheduling an Considering this, why should we schedule the public static void main(String args[]) {
TestSubscriber<String> ts = new TestSubscriber<String>(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("OnCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String t) {
System.out.println("Received: " + t + " on thread: " + Thread.currentThread());
}
});
Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> s) {
System.out.println("OnSubscribed on thread: " + Thread.currentThread());
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("Unsubscribe received on thread: " + Thread.currentThread());
}
}));
for (int i = 0; !s.isUnsubscribed(); i++) {
s.onNext(i);
}
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer i) {
System.out.println("Map function on thread: " + Thread.currentThread());
return "value-" + i;
}
}).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).take(10).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
System.out.println("FlatMap function on thread: " + Thread.currentThread());
return Observable.from("network-result+" + s);
}
}).subscribe(ts);
ts.awaitTerminalEvent();
} Output is:
|
I actually wonder if the issue is more related to |
I thought about this. This might be an artifact of the .NET's threads, pools, etc. The closest example I could come up was the Swing scheduler where subscription should happen on the EDT and unsubscription as well, but Swing's addlistener/removelistener methods are generally thread-safe. |
Here is an example without public static void main(String args[]) {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("OnCompleted");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer t) {
System.out.println("Received: " + t + " on thread: " + Thread.currentThread());
}
});
Subscription s = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> s) {
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner t1) {
System.out.println("OnSubscribed on thread: " + Thread.currentThread());
s.add(Subscriptions.create(new Action0() {
@Override
public void call() {
System.out.println("Unsubscribe received on thread: " + Thread.currentThread());
}
}));
for (int i = 0; !s.isUnsubscribed(); i++) {
s.onNext(i);
}
}
});
}
}).subscribe(ts);
s.unsubscribe();
ts.awaitTerminalEvent();
} It receives the
|
If someone is using the Rx provided |
Add timeout to CoundDownLatch, ignore InterruptException and fix the tes...
RxJava-pull-requests #805 SUCCESS |
It does come from .NET and binding events You must do The only reason .NET Rx has |
From the codes of |
According to the doc here http://docs.oracle.com/javase/tutorial/uiswing/concurrency/dispatch.html
and the
I think addXXXlistener/removeXXXlistener is not thread-safe out of the EDT. |
Same in .NET. |
Working with @headinthebox based on discussions at ReactiveX#869 and ReactiveX#880 (comment) we determined that there are times when `unsubscribeOn` behavior is needed. The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers.
Working with @headinthebox based on discussions at ReactiveX#869 and ReactiveX#880 (comment) we determined that there are times when `unsubscribeOn` behavior is needed. The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers.
See #890 for work done with @headinthebox that resulted in simplification of |
Some changes on top of #864 as part of work on #844.
Primarily I did two things here:
subscribeOn
for synchronousObservable
ssubscribeOn
behavior to make thegroupBy
unit tests work