Skip to content

Commit

Permalink
Merge pull request #151 from mairbek/TakeUntiFunctional
Browse files Browse the repository at this point in the history
Implemented TakeUntil operation
  • Loading branch information
benjchristensen committed Mar 11, 2013
2 parents 10e7435 + e1de8c5 commit 9c6dec9
Show file tree
Hide file tree
Showing 2 changed files with 309 additions and 23 deletions.
51 changes: 28 additions & 23 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import rx.operators.OperationConcat;
import rx.operators.OperationFilter;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationNext;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
import rx.operators.OperationToObservableList;
import rx.operators.OperationToObservableSortedList;
import rx.operators.OperationZip;
import rx.operators.*;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;
import rx.util.AtomicObservableSubscription;
Expand Down Expand Up @@ -978,6 +958,20 @@ public static <T> Observable<T> merge(Observable<T>... source) {
return _create(OperationMerge.merge(source));
}

/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
* @param source the source sequence to propagate elements for.
* @param other the observable sequence that terminates propagation of elements of the source sequence.
* @param <T> the type of source.
* @param <E> the other type.
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
return OperatorTakeUntil.takeUntil(source, other);
}


/**
* Combines the objects emitted by two or more Observables, and emits the result as a single Observable,
* by using the <code>concat</code> method.
Expand Down Expand Up @@ -2838,6 +2832,17 @@ public Observable<T> takeLast(final int count) {
return takeLast(this, count);
}

/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
* @param other the observable sequence that terminates propagation of elements of the source sequence.
* @param <E> the other type.
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
public <E> Observable<T> takeUntil(Observable<E> other) {
return takeUntil(this, other);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down Expand Up @@ -2992,7 +2997,6 @@ public void testSequenceEqual() {
verify(result, times(1)).onNext(false);
}


@Test
public void testToIterable() {
Observable<String> obs = toObservable("one", "two", "three");
Expand Down Expand Up @@ -3082,7 +3086,7 @@ public Boolean call(Integer args) {

assertEquals(-1, last);
}

public void testSingle() {
Observable<String> observable = toObservable("one");
assertEquals("one", observable.single());
Expand Down Expand Up @@ -3151,6 +3155,7 @@ public Boolean call(String args) {
});
}


private static class TestException extends RuntimeException {

}
Expand Down
281 changes: 281 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorTakeUntil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

import static org.mockito.Mockito.*;

public class OperatorTakeUntil {

/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
* @param source the source sequence to propagate elements for.
* @param other the observable sequence that terminates propagation of elements of the source sequence.
* @param <T> the type of source.
* @param <E> the other type.
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
*/
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));
Observable<Notification<T>> result = Observable.merge(s, o);

return result.takeWhile(new Func1<Notification<T>, Boolean>() {
@Override
public Boolean call(Notification<T> notification) {
return !notification.halt;
}
}).map(new Func1<Notification<T>, T>() {
@Override
public T call(Notification<T> notification) {
return notification.value;
}
});
}

private static class Notification<T> {
private final boolean halt;
private final T value;

public static <T> Notification<T> value(T value) {
return new Notification<T>(false, value);
}

public static <T> Notification<T> halt() {
return new Notification<T>(true, null);
}

private Notification(boolean halt, T value) {
this.halt = halt;
this.value = value;
}

}

private static class SourceObservable<T> implements Func1<Observer<Notification<T>>, Subscription> {
private final Observable<T> sequence;

private SourceObservable(Observable<T> sequence) {
this.sequence = sequence;
}

@Override
public Subscription call(final Observer<Notification<T>> notificationObserver) {
return sequence.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
notificationObserver.onNext(Notification.<T>halt());
}

@Override
public void onError(Exception e) {
notificationObserver.onError(e);
}

@Override
public void onNext(T args) {
notificationObserver.onNext(Notification.value(args));
}
});
}
}

private static class OtherObservable<T, E> implements Func1<Observer<Notification<T>>, Subscription> {
private final Observable<E> sequence;

private OtherObservable(Observable<E> sequence) {
this.sequence = sequence;
}

@Override
public Subscription call(final Observer<Notification<T>> notificationObserver) {
return sequence.subscribe(new Observer<E>() {
@Override
public void onCompleted() {
// Ignore
}

@Override
public void onError(Exception e) {
notificationObserver.onError(e);
}

@Override
public void onNext(E args) {
notificationObserver.onNext(Notification.<T>halt());
}
});
}
}

public static class UnitTest {

@Test
public void testTakeUntil() {
Subscription sSource = mock(Subscription.class);
Subscription sOther = mock(Subscription.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);

Observer<String> result = mock(Observer.class);
Observable<String> stringObservable = takeUntil(source, other);
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
other.sendOnCompleted();

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).unsubscribe();
verify(sOther, times(1)).unsubscribe();

}

@Test
public void testTakeUntilSourceCompleted() {
Subscription sSource = mock(Subscription.class);
Subscription sOther = mock(Subscription.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);

Observer<String> result = mock(Observer.class);
Observable<String> stringObservable = takeUntil(source, other);
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
source.sendOnCompleted();

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(sSource, times(1)).unsubscribe();
verify(sOther, times(1)).unsubscribe();

}

@Test
public void testTakeUntilSourceError() {
Subscription sSource = mock(Subscription.class);
Subscription sOther = mock(Subscription.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Exception error = new Exception();

Observer<String> result = mock(Observer.class);
Observable<String> stringObservable = takeUntil(source, other);
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
source.sendOnError(error);

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(1)).onError(error);
verify(sSource, times(1)).unsubscribe();
verify(sOther, times(1)).unsubscribe();

}

@Test
public void testTakeUntilOtherError() {
Subscription sSource = mock(Subscription.class);
Subscription sOther = mock(Subscription.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Exception error = new Exception();

Observer<String> result = mock(Observer.class);
Observable<String> stringObservable = takeUntil(source, other);
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnError(error);

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(1)).onError(error);
verify(result, times(0)).onCompleted();
verify(sSource, times(1)).unsubscribe();
verify(sOther, times(1)).unsubscribe();

}

@Test
public void testTakeUntilOtherCompleted() {
Subscription sSource = mock(Subscription.class);
Subscription sOther = mock(Subscription.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);

Observer<String> result = mock(Observer.class);
Observable<String> stringObservable = takeUntil(source, other);
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnCompleted();

verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onCompleted();
verify(sSource, times(0)).unsubscribe();
verify(sOther, times(0)).unsubscribe();

}

private static class TestObservable extends Observable<String> {

Observer<String> observer = null;
Subscription s;

public TestObservable(Subscription s) {
this.s = s;
}

/* used to simulate subscription */
public void sendOnCompleted() {
observer.onCompleted();
}

/* used to simulate subscription */
public void sendOnNext(String value) {
observer.onNext(value);
}

/* used to simulate subscription */
@SuppressWarnings("unused")
public void sendOnError(Exception e) {
observer.onError(e);
}

@Override
public Subscription subscribe(final Observer<String> observer) {
this.observer = observer;
return s;
}
}

}
}

0 comments on commit 9c6dec9

Please sign in to comment.