Skip to content

Commit

Permalink
Merge pull request #735 from akarnokd/WindowViaObservable
Browse files Browse the repository at this point in the history
Window with Observable boundary.
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents 22fc397 + 82c0aec commit 9650eb1
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 0 deletions.
15 changes: 15 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3561,6 +3561,21 @@ public <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extend
return create(OperationWindow.window(this, windowOpenings, closingSelector));
}

/**
* Create an Observable which emits non-overlapping windows of items it collects from the
* source observable where the boundary of each window is determined by the items
* emitted from the boundary observable.
* @param <U> the window element type (ignored)
* @param boundary the Observable sequence whose emitted item is used for closing
* and opening windows
* @return an Observable which emits non-overlapping windows of items it collects from the
* source observable where the boundary of each window is determined by the items
* emitted from the boundary observable
*/
public <U> Observable<Observable<T>> window(Observable<U> boundary) {
return create(OperationWindow.window(this, boundary));
}

/**
* Creates an Observable that emits windows of items it collects from the
* source Observable. The resulting Observable emits connected,
Expand Down
144 changes: 144 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationWindow.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -354,4 +358,144 @@ public Observable<T> getContents() {
return Observable.from(contents);
}
}
/**
* Emits windows of values of the source Observable where the window boundary is
* determined by the items of the boundary Observable.
*/
public static <T, U> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, Observable<U> boundary) {
return new WindowViaObservable<T, U>(source, boundary);
}
/**
* Create non-overlapping windows from the source values by using another observable's
* values as to when to replace a window.
*/
private static final class WindowViaObservable<T, U> implements OnSubscribeFunc<Observable<T>> {
final Observable<? extends T> source;
final Observable<U> boundary;

public WindowViaObservable(Observable<? extends T> source, Observable<U> boundary) {
this.source = source;
this.boundary = boundary;
}

@Override
public Subscription onSubscribe(Observer<? super Observable<T>> t1) {
CompositeSubscription csub = new CompositeSubscription();

final SourceObserver<T> so = new SourceObserver<T>(t1, csub);
try {
t1.onNext(so.subject);
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
csub.add(source.subscribe(so));

if (!csub.isUnsubscribed()) {
csub.add(boundary.subscribe(new BoundaryObserver<T, U>(so)));
}

return csub;
}
/**
* Observe the source and emit the values into the current window.
*/
private static final class SourceObserver<T> implements Observer<T> {
final Observer<? super Observable<T>> observer;
final Subscription cancel;
final Object guard;
Subject<T, T> subject;

public SourceObserver(Observer<? super Observable<T>> observer, Subscription cancel) {
this.observer = observer;
this.cancel = cancel;
this.guard = new Object();
this.subject = create();
}

Subject<T, T> create() {
return PublishSubject.create();
}
@Override
public void onNext(T args) {
synchronized (guard) {
if (subject == null) {
return;
}
subject.onNext(args);
}
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
subject = null;

s.onError(e);
observer.onError(e);
}
cancel.unsubscribe();
}

@Override
public void onCompleted() {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
subject = null;

s.onCompleted();
observer.onCompleted();
}
cancel.unsubscribe();
}
public void replace() {
try {
synchronized (guard) {
if (subject == null) {
return;
}
Subject<T, T> s = subject;
s.onCompleted();

subject = create();
observer.onNext(subject);
}
} catch (Throwable t) {
onError(t);
}
}
}
/**
* Observe the boundary and replace the window on each item.
*/
private static final class BoundaryObserver<T, U> implements Observer<U> {
final SourceObserver<T> so;

public BoundaryObserver(SourceObserver<T> so) {
this.so = so;
}

@Override
public void onNext(U args) {
so.replace();
}

@Override
public void onError(Throwable e) {
so.onError(e);
}

@Override
public void onCompleted() {
so.onCompleted();
}
}
}
}
Loading

0 comments on commit 9650eb1

Please sign in to comment.