Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window with Observable boundary. #735

Merged
merged 1 commit into from
Jan 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3373,6 +3373,21 @@ public <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extend
return create(OperationWindow.window(this, windowOpenings, closingSelector));
}

/**
* Create an Observable which emits non-overlapping windows of items it collects from the
* source observable where the boundary of each window is determined by the items
* emitted from the boundary observable.
* @param <U> the window element type (ignored)
* @param boundary the Observable sequence whose emitted item is used for closing
* and opening windows
* @return an Observable which emits non-overlapping windows of items it collects from the
* source observable where the boundary of each window is determined by the items
* emitted from the boundary observable
*/
public <U> Observable<Observable<T>> window(Observable<U> boundary) {
return create(OperationWindow.window(this, boundary));
}

/**
* Creates an Observable that emits windows of items it collects from the
* source Observable. The resulting Observable emits connected,
Expand Down
144 changes: 144 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -354,4 +358,144 @@ public Observable<T> getContents() {
return Observable.from(contents);
}
}
/**
* Emits windows of values of the source Observable where the window boundary is
* determined by the items of the boundary Observable.
*/
public static <T, U> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, Observable<U> boundary) {
return new WindowViaObservable<T, U>(source, boundary);
}
/**
* Create non-overlapping windows from the source values by using another observable's
* values as to when to replace a window.
*/
private static final class WindowViaObservable<T, U> implements OnSubscribeFunc<Observable<T>> {
final Observable<? extends T> source;
final Observable<U> boundary;

public WindowViaObservable(Observable<? extends T> source, Observable<U> boundary) {
this.source = source;
this.boundary = boundary;
}

@Override
public Subscription onSubscribe(Observer<? super Observable<T>> t1) {
CompositeSubscription csub = new CompositeSubscription();

final SourceObserver<T> so = new SourceObserver<T>(t1, csub);
try {
t1.onNext(so.subject);
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
csub.add(source.subscribe(so));

if (!csub.isUnsubscribed()) {
csub.add(boundary.subscribe(new BoundaryObserver<T, U>(so)));
}

return csub;
}
/**
* Observe the source and emit the values into the current window.
*/
private static final class SourceObserver<T> implements Observer<T> {
final Observer<? super Observable<T>> observer;
final Subscription cancel;
final Object guard;
Subject<T, T> subject;

public SourceObserver(Observer<? super Observable<T>> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
this.guard = new Object();
this.subject = create();
}

Subject<T, T> create() {
return PublishSubject.create();
}
@Override
public void onNext(T args) {
synchronized (guard) {
if (subject == null) {
return;
}
subject.onNext(args);
}
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
subject = null;

s.onError(e);
observer.onError(e);
}
cancel.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
subject = null;

s.onCompleted();
observer.onCompleted();
}
cancel.unsubscribe();
}
public void replace() {
try {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
s.onCompleted();

subject = create();
observer.onNext(subject);
}
} catch (Throwable t) {
onError(t);
}
}
}
/**
* Observe the boundary and replace the window on each item.
*/
private static final class BoundaryObserver<T, U> implements Observer<U> {
final SourceObserver<T> so;

public BoundaryObserver(SourceObserver<T> so) {
this.so = so;
}

@Override
public void onNext(U args) {
so.replace();
}

@Override
public void onError(Throwable e) {
so.onError(e);
}

@Override
public void onCompleted() {
so.onCompleted();
}
}
}
}
Loading