From 73a4ed2b5c68a9421585d751bd50cc1c48f4be50 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 7 Sep 2013 22:04:35 +0200 Subject: [PATCH 1/4] implemented skipWhile and skipWhileWithIndex (#80) --- rxjava-core/src/main/java/rx/Observable.java | 29 +++ .../java/rx/operators/OperationSkipWhile.java | 193 ++++++++++++++++++ 2 files changed, 222 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 1a72f61501..010071e92f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,6 +49,7 @@ import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; +import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; @@ -2341,6 +2342,34 @@ public Observable takeUntil(Observable other) { return OperationTakeUntil.takeUntil(this, other); } + /** + * Returns an Observable that bypasses all items from the source Observable as long as the specified + * condition holds true. Emits all further source items as soon as the condition becomes false. + * @param predicate + * A function to test each item emitted from the source Observable for a condition. + * It receives the emitted item as first parameter and the index of the emitted item as + * second parameter. + * @return an Observable that emits all items from the source Observable as soon as the condition + * becomes false. + * @see MSDN: Observable.SkipWhile + */ + public Observable skipWhileWithIndex(Func2 predicate) { + return create(OperationSkipWhile.skipWhileWithIndex(this, predicate)); + } + + /** + * Returns an Observable that bypasses all items from the source Observable as long as the specified + * condition holds true. Emits all further source items as soon as the condition becomes false. + * @param predicate + * A function to test each item emitted from the source Observable for a condition. + * @return an Observable that emits all items from the source Observable as soon as the condition + * becomes false. + * @see MSDN: Observable.SkipWhile + */ + public Observable skipWhile(Func1 predicate) { + return create(OperationSkipWhile.skipWhile(this, predicate)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java b/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java new file mode 100644 index 0000000000..8277f80fc2 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java @@ -0,0 +1,193 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.Observable.create; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Skips any emitted source items as long as the specified condition holds true. Emits all further source items + * as soon as the condition becomes false. + */ +public final class OperationSkipWhile { + public static OnSubscribeFunc skipWhileWithIndex(Observable source, Func2 predicate) { + return new SkipWhile(source, predicate); + } + + public static OnSubscribeFunc skipWhile(Observable source, final Func1 predicate) { + return new SkipWhile(source, new Func2() { + @Override + public Boolean call(T value, Integer index) { + return predicate.call(value); + } + }); + } + + private static class SkipWhile implements OnSubscribeFunc { + private final Observable source; + private final Func2 predicate; + private final AtomicBoolean skipping = new AtomicBoolean(true); + private final AtomicInteger index = new AtomicInteger(0); + + SkipWhile(Observable source, Func2 pred) { + this.source = source; + this.predicate = pred; + } + + public Subscription onSubscribe(Observer observer) { + return source.subscribe(new SkipWhileObserver(observer)); + } + + private class SkipWhileObserver implements Observer { + private final Observer observer; + + public SkipWhileObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + if (!skipping.get()) { + observer.onNext(next); + } else { + try { + if (!predicate.call(next, index.getAndIncrement())) { + skipping.set(false); + observer.onNext(next); + } else { + } + } catch(Throwable t) { + observer.onError(t); + } + } + } + + } + + } + + public static class UnitTest { + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + + private static final Func1 LESS_THAN_FIVE = new Func1() { + @Override + public Boolean call(Integer v) { + if (v == 42) throw new RuntimeException("that's not the answer to everything!"); + return v < 5; + } + }; + + private static final Func2 INDEX_LESS_THAN_THREE = new Func2() { + @Override + public Boolean call(Integer value, Integer index) { + return index < 3; + } + }; + + @Test + public void testSkipWithIndex() { + Observable src = Observable.from(1, 2, 3, 4, 5); + create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(4); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipEmpty() { + Observable src = Observable.empty(); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSkipEverything() { + Observable src = Observable.from(1, 2, 3, 4, 3, 2, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSkipNothing() { + Observable src = Observable.from(5, 3, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onNext(3); + inOrder.verify(w, times(1)).onNext(1); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipSome() { + Observable src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onNext(3); + inOrder.verify(w, times(1)).onNext(1); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipError() { + Observable src = Observable.from(1, 2, 42, 5, 3, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, never()).onNext(anyInt()); + inOrder.verify(w, never()).onCompleted(); + inOrder.verify(w, times(1)).onError(any(RuntimeException.class)); + } + } +} From bd383717e80444651733a9fb532cad3bb305289c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 14:06:43 +0200 Subject: [PATCH 2/4] Added the two variants of the first operator. --- rxjava-core/src/main/java/rx/Observable.java | 22 +++++++++ .../java/rx/util/functions/Functions.java | 11 +++++ .../src/main/java/rx/util/functions/Not.java | 40 ++++++++++++++++ .../src/test/java/rx/ObservableTests.java | 46 +++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/util/functions/Not.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 010071e92f..b58c5eb7fa 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,6 +15,8 @@ */ package rx; +import static rx.util.functions.Functions.not; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -2275,6 +2277,26 @@ public Observable take(final int num) { return create(OperationTake.take(this, num)); } + /** + * Returns an Observable that emits only the very first item emitted by the source Observable. + * @return an Observable that emits only the very first item from the source, or none if the + * source Observable completes without emitting a single item. + * @see MSDN: Observable.First + */ + public Observable first() { + return take(1); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable. + * @return an Observable that emits only the very first item from the source, or none if the + * source Observable completes without emitting a single item. + * @see MSDN: Observable.First + */ + public Observable first(Func1 predicate) { + return skipWhile(not(predicate)).take(1); + } + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true. diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 7809b3240d..30728600a8 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -313,6 +313,16 @@ public Void call(Object... args) { }; } + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public static Func1 not(Func1 predicate) { + return new Not(predicate); + } + public static Func1 alwaysTrue() { return AlwaysTrue.INSTANCE; } @@ -334,4 +344,5 @@ public Boolean call(Object o) { return true; } } + } diff --git a/rxjava-core/src/main/java/rx/util/functions/Not.java b/rxjava-core/src/main/java/rx/util/functions/Not.java new file mode 100644 index 0000000000..d3928f7888 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/functions/Not.java @@ -0,0 +1,40 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util.functions; + +/** + * Implements the negation of a predicate. + * + * @param The type of the single input parameter. + */ +public class Not implements Func1 { + private final Func1 predicate; + + /** + * Constructs a predicate that returns true for each input that the source + * predicate returns false for and vice versa. + * + * @param predicate The source predicate to negate. + */ + public Not(Func1 predicate) { + this.predicate = predicate; + } + + @Override + public Boolean call(T param) { + return !predicate.call(param); + } +} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index ba11919040..830370421f 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -35,6 +35,7 @@ import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; +import rx.util.functions.Func1; import rx.util.functions.Func2; public class ObservableTests { @@ -42,6 +43,13 @@ public class ObservableTests { @Mock Observer w; + private static final Func1 IS_EVEN = new Func1() { + @Override + public Boolean call(Integer value) { + return value % 2 == 0; + } + }; + @Before public void before() { MockitoAnnotations.initMocks(this); @@ -73,6 +81,44 @@ public Subscription onSubscribe(Observer Observer) { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testFirstWithPredicateOfSome() { + Observable observable = Observable.from(1, 3, 5, 4, 6, 3); + observable.first(IS_EVEN).subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(4); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstWithPredicateOfNoneMatchingThePredicate() { + Observable observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1); + observable.first(IS_EVEN).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfSome() { + Observable observable = Observable.from(1, 2, 3); + observable.first().subscribe(w); + verify(w, times(1)).onNext(anyInt()); + verify(w).onNext(1); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testFirstOfNone() { + Observable observable = Observable.empty(); + observable.first().subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, times(1)).onCompleted(); + verify(w, never()).onError(any(Throwable.class)); + } + @Test public void testReduce() { Observable observable = Observable.from(1, 2, 3, 4); From 44525182d5d6f26258d24373e579f9607c2d0d73 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 15:20:42 +0200 Subject: [PATCH 3/4] Added FirstOrDefault operation --- rxjava-core/src/main/java/rx/Observable.java | 39 +++- .../rx/operators/OperationFirstOrDefault.java | 190 ++++++++++++++++++ 2 files changed, 226 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b58c5eb7fa..bd14311fc2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -37,6 +37,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; import rx.operators.OperationFinally; +import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; @@ -2288,15 +2289,47 @@ public Observable first() { } /** - * Returns an Observable that emits only the very first item emitted by the source Observable. - * @return an Observable that emits only the very first item from the source, or none if the - * source Observable completes without emitting a single item. + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition. + * @param predicate + * The condition any source emitted item has to satisfy. + * @return an Observable that emits only the very first item satisfying the given condition from the source, + * or none if the source Observable completes without emitting a single matching item. * @see MSDN: Observable.First */ public Observable first(Func1 predicate) { return skipWhile(not(predicate)).take(1); } + /** + * Returns an Observable that emits only the very first item emitted by the source Observable, or + * a default value. + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything. + * @return an Observable that emits only the very first item from the source, or a default value + * if the source Observable completes without emitting a single item. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); + } + + /** + * Returns an Observable that emits only the very first item emitted by the source Observable + * that satisfies a given condition, or a default value otherwise. + * @param predicate + * The condition any source emitted item has to satisfy. + * @param defaultValue + * The default value to emit if the source Observable doesn't emit anything that + * satisfies the given condition. + * @return an Observable that emits only the very first item from the source that satisfies the + * given condition, or a default value otherwise. + * @see MSDN: Observable.FirstOrDefault + */ + public Observable firstOrDefault(Func1 predicate, T defaultValue) { + return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); + } + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true. diff --git a/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java new file mode 100644 index 0000000000..73283a8a3b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java @@ -0,0 +1,190 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.create; +import static rx.Observable.empty; +import static rx.Observable.from; +import static rx.util.functions.Functions.alwaysTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + */ +public final class OperationFirstOrDefault { + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable that satisfies the given condition, + * or a default value if the source emits no items that satisfy the given condition. + * + * @param source + * The source Observable to emit the first item for. + * @param predicate + * The condition the emitted source items have to satisfy. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, Func1 predicate, T defaultValue) { + return new FirstOrElse(source, predicate, defaultValue); + } + + /** + * Returns an Observable that emits the first item emitted by the source + * Observable, or a default value if the source emits nothing. + * + * @param source + * The source Observable to emit the first item for. + * @param defaultValue + * The default value to use whenever the source Observable doesn't emit anything. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc firstOrDefault(Observable source, T defaultValue) { + return new FirstOrElse(source, alwaysTrue(), defaultValue); + } + + private static class FirstOrElse implements OnSubscribeFunc { + private final Observable source; + private final Func1 predicate; + private final T defaultValue; + + private FirstOrElse(Observable source, Func1 predicate, T defaultValue) { + this.source = source; + this.defaultValue = defaultValue; + this.predicate = predicate; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final Subscription sourceSub = source.subscribe(new Observer() { + private final AtomicBoolean hasEmitted = new AtomicBoolean(false); + + @Override + public void onCompleted() { + if (!hasEmitted.get()) { + observer.onNext(defaultValue); + observer.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + try { + if (!hasEmitted.get() && predicate.call(next)) { + hasEmitted.set(true); + observer.onNext(next); + observer.onCompleted(); + } + } catch (Throwable t) { + // may happen within the predicate call (user code) + observer.onError(t); + } + } + }); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + sourceSub.unsubscribe(); + } + }); + } + } + + public static class UnitTest { + @Mock + Observer w; + + private static final Func1 IS_D = new Func1() { + @Override + public Boolean call(String value) { + return "d".equals(value); + } + }; + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testFirstOrElseOfNone() { + Observable src = empty(); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseOfSome() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("a"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() { + Observable src = from("a", "b", "c"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("default"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testFirstOrElseWithPredicateOfSome() { + Observable src = from("a", "b", "c", "d", "e", "f"); + create(firstOrDefault(src, IS_D, "default")).subscribe(w); + + verify(w, times(1)).onNext(anyString()); + verify(w, times(1)).onNext("d"); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + } +} From 98cccec27252f30578cd3cf1b7aeddcb9837a2fd Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 8 Sep 2013 15:22:51 +0200 Subject: [PATCH 4/4] Added link urls to the msdn descriptions --- rxjava-core/src/main/java/rx/Observable.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index bd14311fc2..31d7399126 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2308,12 +2308,12 @@ public Observable first(Func1 predicate) { * The default value to emit if the source Observable doesn't emit anything. * @return an Observable that emits only the very first item from the source, or a default value * if the source Observable completes without emitting a single item. - * @see MSDN: Observable.FirstOrDefault + * @see MSDN: Observable.FirstOrDefault */ public Observable firstOrDefault(T defaultValue) { return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); } - + /** * Returns an Observable that emits only the very first item emitted by the source Observable * that satisfies a given condition, or a default value otherwise. @@ -2324,12 +2324,12 @@ public Observable firstOrDefault(T defaultValue) { * satisfies the given condition. * @return an Observable that emits only the very first item from the source that satisfies the * given condition, or a default value otherwise. - * @see MSDN: Observable.FirstOrDefault + * @see MSDN: Observable.FirstOrDefault */ public Observable firstOrDefault(Func1 predicate, T defaultValue) { return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); } - + /** * Returns an Observable that emits items emitted by the source Observable so long as a * specified condition is true.