-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplemented the 'reduce' operator #436
Conversation
RxJava-pull-requests #348 SUCCESS |
private volatile boolean isSourceSequenceEmpty = true; | ||
|
||
@Override | ||
public synchronized void onNext(T args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be synchronized? We have a single Observable sequence that will not interleave onNext
calls (by contract) so I don't think we need this here.
What is the overhead of
Can we not achieve this check without re-implementing the entire operator? This duplicates logic between The guiding principle for this decision is: |
Hi, @benjchristensen I removed the synchronized. However, I'm still confused about concurrency in RxJava. Could you review my discussion in #417 and help me solve the problems? I also updated the unit tests. Finally, I reviewed the current operators and could not find a way to throw an exception when the observable is empty. Could you provide some suggestion? |
RxJava-pull-requests #360 FAILURE |
RxJava-pull-requests #361 FAILURE |
RxJava-pull-requests #362 FAILURE |
Does this do what you need? .flatMap( o -> {
if(good) {
return Observable.just(goodValue);
} else {
// if bad ... return an error
return Observable.error(new RuntimeException());
}
}) |
Sorry that I still have no idea about how to use |
Maybe it would be nice to have an |
Materialize and dematerialize are good tool for implementing operators that have behavior based on onError and onCompleted. x = from([])
hadValue = false;
x.materialize().map({ n ->
if (n.kind == Notification.Kind.OnNext) {
hadValue = true
}
else if (n.kind == Notification.Kind.OnCompleted && !hadValue) {
return new Notification(new Exception())
}
return n
}).dematerialize() |
Thanks, @abersnaze . It works. @samuelgruetter , I added an private |
RxJava-pull-requests #375 FAILURE |
I just had this idea. replace the takeLast(1) with takeLast(2) and use a second scan to figure out what to do at the end of the sequence. public Observable<T> reduce(Func2<T, T, T> accumulator) {
Func2<Notification<T>, Notification<T>, Notification<T>> func = new Func2<Notification<T>, Notification<T>, Notification<T>>() {
@Override
public Notification<T> call(Notification<T> value, Notification<T> end) {
if (end.isOnError())
return end;
if (value == null)
return new Notification<T>(new UnsupportedOperationException("Can not apply on an empty sequence"));
return value;
}
};
return create(OperationScan.scan(this, accumulator)).materialize().takeLast(2).scan(null, func).dematerialize();
} |
Sounds good. But I'm not sure if it's better that using one more |
I updated this PR to use the new |
RxJava-pull-requests #397 FAILURE |
It has been fixed in #474 |
Hi,
I reimplemented the 'reduce' operator. The improvements are as follow: