Skip to content

Commit

Permalink
Merge pull request ReactiveX#483 from benjchristensen/onEach-fix
Browse files Browse the repository at this point in the history
DoOn Tweaks
  • Loading branch information
benjchristensen committed Nov 13, 2013
2 parents deb1999 + 9dc68dc commit 5f39eaa
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 10 deletions.
45 changes: 35 additions & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5023,33 +5023,58 @@ public void onNext(T args) {

return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Invokes an action if onError is emitted from the observable sequence.
*
* @param onError
* The action to invoke if onError is invoked.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnError(final Action1<Throwable> onError) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onNext(T args) { }

};


return create(OperationDoOnEach.doOnEach(this, observer));
}

/**
* Invokes an action for each element in the observable sequence.
* Invokes an action when onCompleted is emitted from the observable sequence.
*
* @param onNext
* The action to invoke for each element in the source sequence.
* @param onCompleted
* The action to invoke when the source sequence is completed.
* The action to invoke when onCompleted is emitted.
*
* @return
* The source sequence with the side-effecting behavior applied.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229659(v=vs.103).aspx">MSDN: Observable.Do</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
*/
public Observable<T> doOnEach(final Action1<T> onNext, final Action0 onCompleted) {
public Observable<T> doOnCompleted(final Action0 onCompleted) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
onCompleted.call();
}

@Override
public void onError(Throwable e) {}
public void onError(Throwable e) { }

@Override
public void onNext(T args) {
onNext.call(args);
}
public void onNext(T args) { }

};

Expand Down
80 changes: 80 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableDoOnTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.util.functions.Action0;
import rx.util.functions.Action1;

public class ObservableDoOnTest {

@Test
public void testDoOnEach() {
final AtomicReference<String> r = new AtomicReference<String>();
String output = Observable.from("one").doOnEach(new Action1<String>() {

@Override
public void call(String v) {
r.set(v);
}
}).toBlockingObservable().single();

assertEquals("one", output);
assertEquals("one", r.get());
}

@Test
public void testDoOnError() {
final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
Throwable t = null;
try {
Observable.<String> error(new RuntimeException("an error")).doOnError(new Action1<Throwable>() {

@Override
public void call(Throwable v) {
r.set(v);
}
}).toBlockingObservable().single();
fail("expected exception, not a return value");
} catch (Throwable e) {
t = e;
}

assertNotNull(t);
assertEquals(t, r.get());
}

@Test
public void testDoOnCompleted() {
final AtomicBoolean r = new AtomicBoolean();
String output = Observable.from("one").doOnCompleted(new Action0() {

@Override
public void call() {
r.set(true);
}
}).toBlockingObservable().single();

assertEquals("one", output);
assertTrue(r.get());
}
}

0 comments on commit 5f39eaa

Please sign in to comment.