Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the 'cast' and 'ofType' operators #403

Merged
merged 6 commits into from
Sep 25, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCast;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
Expand Down Expand Up @@ -4332,6 +4333,44 @@ public BlockingObservable<T> toBlockingObservable() {
return BlockingObservable.from(this);
}

/**
* Converts the elements of an observable sequence to the specified type.
*
* @param klass
* The target class type which the elements will be converted to.
*
* @return An observable sequence that contains each element of the source
* sequence converted to the specified type.
*
* @see <a
* href="http://msdn.microsoft.com/en-us/library/hh211842(v=vs.103).aspx">MSDN:
* Observable.Cast</a>
*/
public <R> Observable<R> cast(final Class<R> klass) {
return create(OperationCast.cast(this, klass));
}

/**
* Filters the elements of an observable sequence based on the specified
* type.
*
* @param klass
* The class type to filter the elements in the source sequence
* on.
*
* @return An observable sequence that contains elements from the input
* sequence of type klass.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229380(v=vs.103).aspx">MSDN: Observable.OfType</a>
*/
public <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
public Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
61 changes: 61 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationCast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package rx.operators;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.util.functions.Func1;

/**
* Converts the elements of an observable sequence to the specified type.
*/
public class OperationCast {

public static <T, R> OnSubscribeFunc<R> cast(
Observable<? extends T> source, final Class<R> klass) {
return OperationMap.map(source, new Func1<T, R>() {
public R call(T t) {
return klass.cast(t);
}
});
}

public static class UnitTest {

@Test
public void testCast() {
Observable<?> source = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(cast(source,
Integer.class));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testCastWithWrongType() {
Observable<?> source = Observable.from(1, 2);
Observable<Boolean> observable = Observable.create(cast(source,
Boolean.class));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onError(
org.mockito.Matchers.any(ClassCastException.class));
}
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -701,4 +703,42 @@ public void onNext(String v) {
fail("It should be a NumberFormatException");
}
}

@Test
public void testOfType() {
Observable<String> observable = Observable.from(1, "abc", false, 2L).ofType(String.class);

@SuppressWarnings("unchecked")
Observer<Object> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext("abc");
verify(aObserver, never()).onNext(false);
verify(aObserver, never()).onNext(2L);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testOfTypeWithPolymorphism() {
ArrayList<Integer> l1 = new ArrayList<Integer>();
l1.add(1);
LinkedList<Integer> l2 = new LinkedList<Integer>();
l2.add(2);

@SuppressWarnings("rawtypes")
Observable<List> observable = Observable.<Object>from(l1, l2, "123").ofType(List.class);

@SuppressWarnings("unchecked")
Observer<Object> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(l1);
verify(aObserver, times(1)).onNext(l2);
verify(aObserver, never()).onNext("123");
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

}