Skip to content

Commit

Permalink
Merge pull request #892 from benjchristensen/onErrorFlatMap
Browse files Browse the repository at this point in the history
onErrorFlatMap + OnErrorThrowable
  • Loading branch information
benjchristensen committed Feb 18, 2014
2 parents d53f73b + 4328276 commit e676ddd
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 144 deletions.
18 changes: 14 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -79,16 +80,13 @@
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperatorObserveOnBounded;
import rx.operators.OperatorScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
Expand Down Expand Up @@ -117,8 +115,11 @@
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorScan;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
Expand Down Expand Up @@ -5209,7 +5210,7 @@ public final Boolean call(T t) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorresumenext">RxJava Wiki: onErrorResumeNext()</a>
*/
public final Observable<T> onErrorResumeNext(final Func1<Throwable, ? extends Observable<? extends T>> resumeFunction) {
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(this, resumeFunction));
return lift(new OperatorOnErrorResumeNextViaFunction<T>(resumeFunction));
}

/**
Expand Down Expand Up @@ -5267,6 +5268,15 @@ public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFun
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
}

/**
* Allows inserting onNext events into a stream when onError events are received
* and continuing the original sequence instead of terminating. Thus it allows a sequence
* with multiple onError events.
*/
public final Observable<T> onErrorFlatMap(final Func1<OnErrorThrowable, ? extends Observable<? extends T>> resumeFunction) {
return lift(new OperatorOnErrorFlatMap<T>(resumeFunction));
}

/**
* Instruct an Observable to pass control to another Observable rather than invoking
* {@link Observer#onError onError} if it encounters an {@link java.lang.Exception}.
Expand Down
44 changes: 44 additions & 0 deletions rxjava-core/src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package rx.exceptions;

import java.util.HashSet;
import java.util.Set;

public class Exceptions {
private Exceptions() {

Expand Down Expand Up @@ -53,4 +56,45 @@ else if (t instanceof StackOverflowError) {
throw (LinkageError) t;
}
}

private static final int MAX_DEPTH = 25;

public static void addCause(Throwable e, Throwable cause) {
Set<Throwable> seenCauses = new HashSet<Throwable>();

int i = 0;
while (e.getCause() != null) {
if (i++ >= MAX_DEPTH) {
// stack too deep to associate cause
return;
}
e = e.getCause();
if (seenCauses.contains(e.getCause())) {
break;
} else {
seenCauses.add(e.getCause());
}
}
// we now have 'e' as the last in the chain
try {
e.initCause(cause);
} catch (Throwable t) {
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null
}
}

public static Throwable getFinalCause(Throwable e) {
int i = 0;
while (e.getCause() != null) {
if (i++ >= MAX_DEPTH) {
// stack too deep to get final cause
return new RuntimeException("Stack too deep to get final cause");
}
e = e.getCause();
}
return e;
}

}
81 changes: 81 additions & 0 deletions rxjava-core/src/main/java/rx/exceptions/OnErrorThrowable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.exceptions;

public class OnErrorThrowable extends RuntimeException {

private static final long serialVersionUID = -569558213262703934L;

private final boolean hasValue;
private final Object value;

private OnErrorThrowable(Throwable exception) {
super(exception);
hasValue = false;
this.value = null;
}

private OnErrorThrowable(Throwable exception, Object value) {
super(exception);
hasValue = true;
this.value = value;
}

public Object getValue() {
return value;
}

public boolean isValueNull() {
return hasValue;
}

public static OnErrorThrowable from(Throwable t) {
Throwable cause = Exceptions.getFinalCause(t);
if (cause instanceof OnErrorThrowable.OnNextValue) {
return new OnErrorThrowable(t, ((OnNextValue) cause).getValue());
} else {
return new OnErrorThrowable(t);
}
}

/**
* Adds the given value as the final cause of the given Throwable wrapped in OnNextValue RuntimeException..
*
* @param e
* @param value
* @return Throwable e passed in
*/
public static Throwable addValueAsLastCause(Throwable e, Object value) {
Exceptions.addCause(e, new OnNextValue(value));
return e;
}

public static class OnNextValue extends RuntimeException {

private static final long serialVersionUID = -3454462756050397899L;
private final Object value;

public OnNextValue(Object value) {
super("OnError while emitting onNext value: " + value);
this.value = value;
}

public Object getValue() {
return value;
}

}
}

This file was deleted.

9 changes: 7 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorCast.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import rx.Observable.Operator;
import rx.Subscriber;

import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;

/**
* Converts the elements of an observable sequence to the specified type.
Expand Down Expand Up @@ -46,7 +47,11 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
o.onNext(castClass.cast(t));
try {
o.onNext(castClass.cast(t));
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}
};
}
Expand Down
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperatorDoOnEach.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;

/**
* Converts the elements of an observable sequence to the specified type.
Expand Down Expand Up @@ -59,7 +60,7 @@ public void onNext(T value) {
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
onError(e);
onError(OnErrorThrowable.addValueAsLastCause(e, value));
return;
}
observer.onNext(value);
Expand Down
12 changes: 6 additions & 6 deletions rxjava-core/src/main/java/rx/operators/OperatorFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func1;
import rx.observables.GroupedObservable;

/**
* Filters an Observable by discarding any items it emits that do not meet some test.
Expand Down Expand Up @@ -48,13 +48,13 @@ public void onError(Throwable e) {
}

@Override
public void onNext(T value) {
public void onNext(T t) {
try {
if (predicate.call(value)) {
child.onNext(value);
if (predicate.call(t)) {
child.onNext(t);
}
} catch (Throwable ex) {
child.onError(ex);
} catch (Throwable e) {
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
}

Expand Down
Loading

0 comments on commit e676ddd

Please sign in to comment.