Skip to content

Commit

Permalink
Merge pull request #1049 from benjchristensen/joins-module
Browse files Browse the repository at this point in the history
Move rx.joins to rxjava-joins module
  • Loading branch information
benjchristensen committed Apr 19, 2014
2 parents 320495f + 6896110 commit 83407c9
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 325 deletions.
21 changes: 21 additions & 0 deletions rxjava-contrib/rxjava-joins/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply plugin: 'osgi'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6

dependencies {
compile project(':rxjava-core')
testCompile project(":rxjava-core").sourceSets.test.output
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}

jar {
manifest {
name = 'rxjava-joins'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;
package rx.joins.operators;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;
package rx.joins.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
Expand All @@ -36,6 +36,7 @@
import rx.functions.Func3;
import rx.functions.Functions;
import rx.joins.Plan0;
import rx.observables.JoinObservable;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -95,20 +96,20 @@ public void before() {
@Test(expected = NullPointerException.class)
public void and2ArgumentNull() {
Observable<Integer> some = Observable.just(1);
some.and(null);
JoinObservable.from(some).and(null);
}

@Test(expected = NullPointerException.class)
public void and3argumentNull() {
Observable<Integer> some = Observable.just(1);
some.and(some).and(null);
JoinObservable.from(some).and(some).and(null);
}

@Test
public void and2() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -123,7 +124,7 @@ public void and2Error1() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(error.and(some).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(error).and(some).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -138,7 +139,7 @@ public void and2Error2() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(error).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(error).then(add2)).toObservable();

m.subscribe(observer);

Expand All @@ -151,7 +152,7 @@ public void and2Error2() {
public void and3() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -166,7 +167,7 @@ public void and3Error1() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(error.and(some).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(error).and(some).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -181,7 +182,7 @@ public void and3Error2() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(error).and(some).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(error).and(some).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -196,7 +197,7 @@ public void and3Error3() {

Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(error).then(add3));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(error).then(add3)).toObservable();

m.subscribe(observer);

Expand All @@ -209,28 +210,28 @@ public void and3Error3() {
public void thenArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.then(null);
JoinObservable.from(some).then(null);
}

@Test(expected = NullPointerException.class)
public void then2ArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.and(some).then(null);
JoinObservable.from(some).and(some).then(null);
}

@Test(expected = NullPointerException.class)
public void then3ArgumentNull() {
Observable<Integer> some = Observable.just(1);

some.and(some).and(some).then(null);
JoinObservable.from(some).and(some).and(some).then(null);
}

@Test
public void then1() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.then(Functions.<Integer> identity()));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(Functions.<Integer> identity())).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -242,7 +243,7 @@ public void then1() {
public void then1Error() {
Observable<Integer> some = Observable.error(new RuntimeException("Forced failure"));

Observable<Integer> m = Observable.when(some.then(Functions.<Integer> identity()));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(Functions.<Integer> identity())).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -254,7 +255,7 @@ public void then1Error() {
public void then1Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.then(func1Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).then(func1Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -266,7 +267,7 @@ public void then1Throws() {
public void then2Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).then(func2Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).then(func2Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -278,7 +279,7 @@ public void then2Throws() {
public void then3Throws() {
Observable<Integer> some = Observable.just(1);

Observable<Integer> m = Observable.when(some.and(some).and(some).then(func3Throw));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(some).and(some).and(some).then(func3Throw)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -288,20 +289,20 @@ public void then3Throws() {

@Test(expected = NullPointerException.class)
public void whenArgumentNull1() {
Observable.when((Plan0<Object>[]) null);
JoinObservable.when((Plan0<Object>[]) null);
}

@Test(expected = NullPointerException.class)
public void whenArgumentNull2() {
Observable.when((Iterable<Plan0<Object>>) null);
JoinObservable.when((Iterable<Plan0<Object>>) null);
}

@Test
public void whenMultipleSymmetric() {
Observable<Integer> source1 = Observable.from(1, 2, 3);
Observable<Integer> source2 = Observable.from(4, 5, 6);

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -316,7 +317,7 @@ public void whenMultipleAsymSymmetric() {
Observable<Integer> source1 = Observable.from(1, 2, 3);
Observable<Integer> source2 = Observable.from(4, 5);

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -330,7 +331,7 @@ public void whenEmptyEmpty() {
Observable<Integer> source1 = Observable.empty();
Observable<Integer> source2 = Observable.empty();

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -343,7 +344,7 @@ public void whenNeverNever() {
Observable<Integer> source1 = Observable.never();
Observable<Integer> source2 = Observable.never();

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, never()).onError(any(Throwable.class));
Expand All @@ -356,7 +357,7 @@ public void whenThrowNonEmpty() {
Observable<Integer> source1 = Observable.empty();
Observable<Integer> source2 = Observable.error(new RuntimeException("Forced failure"));

Observable<Integer> m = Observable.when(source1.and(source2).then(add2));
Observable<Integer> m = JoinObservable.when(JoinObservable.from(source1).and(source2).then(add2)).toObservable();
m.subscribe(observer);

verify(observer, times(1)).onError(any(Throwable.class));
Expand All @@ -370,11 +371,11 @@ public void whenComplicated() {
PublishSubject<Integer> ys = PublishSubject.create();
PublishSubject<Integer> zs = PublishSubject.create();

Observable<Integer> m = Observable.when(
xs.and(ys).then(add2), // 1+4=5, 2+5=7, 3+6=9
xs.and(zs).then(mul2), // 1*7=7, 2*8=16, 3*9=27
ys.and(zs).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3
);
Observable<Integer> m = JoinObservable.when(
JoinObservable.from(xs).and(ys).then(add2), // 1+4=5, 2+5=7, 3+6=9
JoinObservable.from(xs).and(zs).then(mul2), // 1*7=7, 2*8=16, 3*9=27
JoinObservable.from(ys).and(zs).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3
).toObservable();

TestSubscriber<Integer> to = new TestSubscriber<Integer>(observer);
m.subscribe(to);
Expand Down
Loading

0 comments on commit 83407c9

Please sign in to comment.