Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout with selector overloads #740

Merged
merged 1 commit into from
Jan 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6691,6 +6691,86 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? exten
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
}

/**
* Create an observable which completes if a source item doesn't arrive after the
* previous one in the time window specified by the per-item observable.
* <p>
* The arrival of the first source item is not timed out.
* @param <U> the timeout value type (ignored)
* @param timeoutSelector function that returns an observable for each source item
* which determines the timeout window for the subsequent source item
* @return an observable which completes if a source item doesn't arrive after the
* previous one in the time window specified by the per-item observable.
*/
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector) {
return timeout(timeoutSelector, Observable.<T>empty());
}

/**
* Create an observable which switches to the other Observable if a source
* item doesn't arrive after the
* previous one in the time window specified by the per-item observable.
* <p>
* The arrival of the first source item is not timed out.
* @param <U> the timeout value type (ignored)
* @param timeoutSelector function that returns an observable for each source item
* which determines the timeout window for the subsequent source item
* @param other the other observable to switch to if the source times out
* @return an observable which switches to the other Observable if a source
* item doesn't arrive after the
* previous one in the time window specified by the per-item observable
*/
public <U> Observable<T> timeout(Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
if (other == null) {
throw new NullPointerException("other");
}
return create(OperationTimeout.timeoutSelector(this, null, timeoutSelector, other));
}

/**
* Create an Observable which completes if either the first item or any subsequent item
* doesn't arrive within the time window specified by the timeout selectors' Observable.
* @param <U> the first timeout value type (ignored)
* @param <V> the subsequent timeout value type (ignored)
* @param firstTimeoutSelector function that returns an observable which determines
* the timeout window for the first source item
* @param timeoutSelector function that returns an observable for each source item
* which determines the timeout window for the subsequent source item
* @return an Observable which completes if either the first item or any subsequent item
* doesn't arrive within the time window specified by the timeout selectors' Observable.
*/
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector) {
if (firstTimeoutSelector == null) {
throw new NullPointerException("firstTimeoutSelector");
}
return timeout(firstTimeoutSelector, timeoutSelector, Observable.<T>empty());
}

/**
* Create an Observable which switches to another Observable
* if either the first item or any subsequent item
* doesn't arrive within the time window specified by the timeout selectors' Observable.
* @param <U> the first timeout value type (ignored)
* @param <V> the subsequent timeout value type (ignored)
* @param firstTimeoutSelector function that returns an observable which determines
* the timeout window for the first source item
* @param timeoutSelector function that returns an observable for each source item
* which determines the timeout window for the subsequent source item
* @param other the other observable to switch to if the source times out
* @return an Observable which switches to another Observable
* if either the first item or any subsequent item
* doesn't arrive within the time window specified by the timeout selectors' Observable
*/
public <U, V> Observable<T> timeout(Func0<? extends Observable<U>> firstTimeoutSelector, Func1<? super T, ? extends Observable<U>> timeoutSelector, Observable<? extends T> other) {
if (firstTimeoutSelector == null) {
throw new NullPointerException("firstTimeoutSelector");
}
if (other == null) {
throw new NullPointerException("other");
}
return create(OperationTimeout.timeoutSelector(this, firstTimeoutSelector, timeoutSelector, other));
}

/**
* Records the time interval between consecutive items emitted by an
* Observable.
Expand Down
158 changes: 158 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Applies a timeout policy for each element in the observable sequence, using
Expand Down Expand Up @@ -154,4 +156,160 @@ public void onCompleted() {
return composite;
}
}

/** Timeout using a per-item observable sequence. */
public static <T, U, V> OnSubscribeFunc<T> timeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
return new TimeoutSelector<T, U, V>(source, firstValueTimeout, valueTimeout, other);
}

/** Timeout using a per-item observable sequence. */
private static final class TimeoutSelector<T, U, V> implements OnSubscribeFunc<T> {
final Observable<? extends T> source;
final Func0<? extends Observable<U>> firstValueTimeout;
final Func1<? super T, ? extends Observable<V>> valueTimeout;
final Observable<? extends T> other;

public TimeoutSelector(Observable<? extends T> source, Func0<? extends Observable<U>> firstValueTimeout, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other) {
this.source = source;
this.firstValueTimeout = firstValueTimeout;
this.valueTimeout = valueTimeout;
this.other = other;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
CompositeSubscription csub = new CompositeSubscription();

SourceObserver<T, V> so = new SourceObserver<T, V>(t1, valueTimeout, other, csub);
if (firstValueTimeout != null) {
Observable<U> o;
try {
o = firstValueTimeout.call();
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}

csub.add(o.subscribe(new TimeoutObserver<U>(so)));
}
csub.add(source.subscribe(so));
return csub;
}

/** Observe the source. */
private static final class SourceObserver<T, V> implements Observer<T>, TimeoutCallback {
final Observer<? super T> observer;
final Func1<? super T, ? extends Observable<V>> valueTimeout;
final Observable<? extends T> other;
final CompositeSubscription cancel;
final Object guard;
boolean done;
final SerialSubscription tsub;
final TimeoutObserver<V> to;

public SourceObserver(Observer<? super T> observer, Func1<? super T, ? extends Observable<V>> valueTimeout, Observable<? extends T> other, CompositeSubscription cancel) {
this.observer = observer;
this.valueTimeout = valueTimeout;
this.other = other;
this.cancel = cancel;
this.guard = new Object();
this.tsub = new SerialSubscription();
this.cancel.add(tsub);
this.to = new TimeoutObserver<V>(this);
}

@Override
public void onNext(T args) {
tsub.set(Subscriptions.empty());

synchronized (guard) {
if (done) {
return;
}
observer.onNext(args);
}

Observable<V> o;
try {
o = valueTimeout.call(args);
} catch (Throwable t) {
onError(t);
return;
}

SerialSubscription osub = new SerialSubscription();
tsub.set(osub);

osub.set(o.subscribe(to));
}
@Override
public void onError(Throwable e) {
synchronized (guard) {
if (done) {
return;
}
done = true;
observer.onError(e);
}
cancel.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (guard) {
if (done) {
return;
}
done = true;
observer.onCompleted();
}
cancel.unsubscribe();
}
@Override
public void timeout() {
if (other != null) {
synchronized (guard) {
if (done) {
return;
}
done = true;
}
cancel.clear();
cancel.add(other.subscribe(observer));
} else {
onCompleted();
}
}
}

/** The timeout callback. */
private interface TimeoutCallback {
void timeout();
void onError(Throwable t);
}

/** Observe the timeout. */
private static final class TimeoutObserver<V> implements Observer<V> {
final TimeoutCallback parent;

public TimeoutObserver(TimeoutCallback parent) {
this.parent = parent;
}

@Override
public void onNext(V args) {
parent.timeout();
}

@Override
public void onError(Throwable e) {
parent.onError(e);
}

@Override
public void onCompleted() {
parent.timeout();
}
}
}
}
Loading