Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onErrorFlatMap + OnErrorThrowable #892

Merged
merged 4 commits into from
Feb 18, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -79,16 +80,13 @@
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperatorObserveOnBounded;
import rx.operators.OperatorScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
Expand Down Expand Up @@ -117,8 +115,11 @@
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorScan;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
Expand Down Expand Up @@ -5209,7 +5210,7 @@ public final Boolean call(T t) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorresumenext">RxJava Wiki: onErrorResumeNext()</a>
*/
public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction) {
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction));
return lift(new OperatorOnErrorResumeNextViaFunction<T>(resumeFunction));
}

/**
Expand Down Expand Up @@ -5267,6 +5268,15 @@ public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFun
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
}

/**
* Allows inserting onNext events into a stream when onError events are received
* and continuing the original sequence instead of terminating. Thus it allows a sequence
* with multiple onError events.
*/
public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extends Observable<? extends T>> resumeFunction) {
return lift(new OperatorOnErrorFlatMap<T>(resumeFunction));
}

/**
* Instruct an Observable to pass control to another Observable rather than invoking
* {@link Observer#onError onError} if it encounters an {@link java.lang.Exception}.
Expand Down
44 changes: 44 additions & 0 deletions rxjava-core/src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package rx.exceptions;

import java.util.HashSet;
import java.util.Set;

public class Exceptions {
private Exceptions() {

Expand Down Expand Up @@ -53,4 +56,45 @@ else if (t instanceof StackOverflowError) {
throw (LinkageError) t;
}
}

private static final int MAX_DEPTH = 25;

public static void addCause(Throwable e, Throwable cause) {
Set<Throwable> seenCauses = new HashSet<Throwable>();

int i = 0;
while (e.getCause() != null) {
if (i++ >= MAX_DEPTH) {
// stack too deep to associate cause
return;
}
e = e.getCause();
if (seenCauses.contains(e.getCause())) {
break;
} else {
seenCauses.add(e.getCause());
}
}
// we now have 'e' as the last in the chain
try {
e.initCause(cause);
} catch (Throwable t) {
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null
}
}

public static Throwable getFinalCause(Throwable e) {
int i = 0;
while (e.getCause() != null) {
if (i++ >= MAX_DEPTH) {
// stack too deep to get final cause
return new RuntimeException("Stack too deep to get final cause");
}
e = e.getCause();
}
return e;
}

}
81 changes: 81 additions & 0 deletions rxjava-core/src/main/java/rx/exceptions/OnErrorThrowable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public class OnErrorThrowable extends RuntimeException {

private static final long serialVersionUID = -569558213262703934L;

private final boolean hasValue;
private final Object value;

private OnErrorThrowable(Throwable exception) {
super(exception);
hasValue = false;
this.value = null;
}

private OnErrorThrowable(Throwable exception, Object value) {
super(exception);
hasValue = true;
this.value = value;
}

public Object getValue() {
return value;
}

public boolean isValueNull() {
return hasValue;
}

public static OnErrorThrowable from(Throwable t) {
Throwable cause = Exceptions.getFinalCause(t);
if (cause instanceof OnErrorThrowable.OnNextValue) {
return new OnErrorThrowable(t, ((OnNextValue) cause).getValue());
} else {
return new OnErrorThrowable(t);
}
}

/**
* Adds the given value as the final cause of the given Throwable wrapped in OnNextValue RuntimeException..
*
* @param e
* @param value
* @return Throwable e passed in
*/
public static Throwable addValueAsLastCause(Throwable e, Object value) {
Exceptions.addCause(e, new OnNextValue(value));
return e;
}

public static class OnNextValue extends RuntimeException {

private static final long serialVersionUID = -3454462756050397899L;
private final Object value;

public OnNextValue(Object value) {
super("OnError while emitting onNext value: " + value);
this.value = value;
}

public Object getValue() {
return value;
}

}
}

This file was deleted.

9 changes: 7 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorCast.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import rx.Observable.Operator;
import rx.Subscriber;

import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;

/**
* Converts the elements of an observable sequence to the specified type.
Expand Down Expand Up @@ -46,7 +47,11 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
o.onNext(castClass.cast(t));
try {
o.onNext(castClass.cast(t));
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
Expand Down
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperatorDoOnEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;

/**
* Converts the elements of an observable sequence to the specified type.
Expand Down Expand Up @@ -59,7 +60,7 @@ public void onNext(T value) {
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
onError(e);
onError(OnErrorThrowable.addValueAsLastCause(e, value));
return;
}
observer.onNext(value);
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

/**
* Filters an Observable by discarding any items it emits that do not meet some test.
Expand Down Expand Up @@ -48,13 +48,13 @@ public void onError(Throwable e) {
}

@Override
public void onNext(T value) {
public void onNext(T t) {
try {
if (predicate.call(value)) {
child.onNext(value);
if (predicate.call(t)) {
child.onNext(t);
}
} catch (Throwable ex) {
child.onError(ex);
} catch (Throwable e) {
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}

Expand Down
Loading