From 669c7b6bb39a46f4509cb5000ae704ad70ccafd0 Mon Sep 17 00:00:00 2001 From: John Marks Date: Mon, 14 Oct 2013 23:23:20 +0100 Subject: [PATCH 1/4] SerialSubscription and Timeout operator --- rxjava-core/src/main/java/rx/Observable.java | 82 ++++------- .../java/rx/operators/OperationTimeout.java | 128 ++++++++++++++++++ .../rx/subscriptions/SerialSubscription.java | 62 +++++++++ .../SerialSubscriptionTests.java | 75 ++++++++++ 4 files changed, 292 insertions(+), 55 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTimeout.java create mode 100644 rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java create mode 100644 rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5cd876adba..9e3724e830 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -28,61 +28,7 @@ import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; -import rx.operators.OperationAll; -import rx.operators.OperationAny; -import rx.operators.OperationAverage; -import rx.operators.OperationBuffer; -import rx.operators.OperationCache; -import rx.operators.OperationCast; -import rx.operators.OperationCombineLatest; -import rx.operators.OperationConcat; -import rx.operators.OperationDebounce; -import rx.operators.OperationDefaultIfEmpty; -import rx.operators.OperationDefer; -import rx.operators.OperationDematerialize; -import rx.operators.OperationDistinct; -import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperationElementAt; -import rx.operators.OperationFilter; -import rx.operators.OperationFinally; -import rx.operators.OperationFirstOrDefault; -import rx.operators.OperationGroupBy; -import rx.operators.OperationInterval; -import rx.operators.OperationMap; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMerge; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMulticast; -import rx.operators.OperationObserveOn; -import rx.operators.OperationOnErrorResumeNextViaFunction; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationOnExceptionResumeNextViaObservable; -import rx.operators.OperationParallel; -import rx.operators.OperationRetry; -import rx.operators.OperationSample; -import rx.operators.OperationScan; -import rx.operators.OperationSkip; -import rx.operators.OperationSkipLast; -import rx.operators.OperationSkipWhile; -import rx.operators.OperationSubscribeOn; -import rx.operators.OperationSum; -import rx.operators.OperationSwitch; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; -import rx.operators.OperationTakeLast; -import rx.operators.OperationTakeUntil; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottleFirst; -import rx.operators.OperationTimestamp; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationToObservableIterable; -import rx.operators.OperationToObservableList; -import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationWindow; -import rx.operators.OperationZip; -import rx.operators.SafeObservableSubscription; -import rx.operators.SafeObserver; +import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -4507,6 +4453,32 @@ public Observable ignoreElements() { return filter(alwaysFalse()); } + /** + * Returns either the observable sequence or an TimeoutException if timeout elapses. + * @param timeout + * The timeout duration + * @param timeUnit + * The time unit of the timeout + * @param scheduler + * The scheduler to run the timeout timers on. + * @return The source sequence with a TimeoutException in case of a timeout. + */ + public Observable timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) { + return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler)); + } + + /** + * Returns either the observable sequence or an TimeoutException if timeout elapses. + * @param timeout + * The timeout duration + * @param timeUnit + * The time unit of the timeout + * @return The source sequence with a TimeoutException in case of a timeout. + */ + public Observable timeout(long timeout, TimeUnit timeUnit) { + return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation())); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java new file mode 100644 index 0000000000..99c8c167b6 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java @@ -0,0 +1,128 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public final class OperationTimeout { + public static Observable.OnSubscribeFunc timeout(Observable source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { + return new Timeout(source, timeout, timeUnit, scheduler); + } + + private static class Timeout implements Observable.OnSubscribeFunc { + private final Observable source; + private final long timeout; + private final TimeUnit timeUnit; + private final Scheduler scheduler; + + @Override + public Subscription onSubscribe(final Observer observer) { + final AtomicBoolean terminated = new AtomicBoolean(false); + final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout + final SerialSubscription serial = new SerialSubscription(); + final Object gate = new Object(); + CompositeSubscription composite = new CompositeSubscription(); + final Func0 schedule = new Func0() { + @Override + public Subscription call() { + final long expected = actual.get(); + return scheduler.schedule(new Action0() { + @Override + public void call() { + boolean timeoutWins = false; + synchronized (gate) { + if (expected == actual.get() && !terminated.getAndSet(true)) { + timeoutWins = true; + } + } + if (timeoutWins) { + observer.onError(new TimeoutException()); + } + + } + }, timeout, timeUnit); + } + }; + SafeObservableSubscription subscription = new SafeObservableSubscription(); + composite.add(subscription.wrap(source.subscribe(new Observer() { + @Override + public void onNext(T value) { + boolean onNextWins = false; + synchronized (gate) { + if (!terminated.get()) { + actual.incrementAndGet(); + onNextWins = true; + } + } + if (onNextWins) { + serial.setSubscription(schedule.call()); + observer.onNext(value); + } + } + + @Override + public void onError(Throwable error) { + boolean onErrorWins = false; + synchronized (gate) { + if (!terminated.getAndSet(true)) { + onErrorWins = true; + } + } + if (onErrorWins) { + serial.unsubscribe(); + observer.onError(error); + } + } + + @Override + public void onCompleted() { + boolean onCompletedWins = false; + synchronized (gate) { + if (!terminated.getAndSet(true)) { + onCompletedWins = true; + } + } + if (onCompletedWins) { + serial.unsubscribe(); + observer.onCompleted(); + } + } + }))); + composite.add(serial); + serial.setSubscription(schedule.call()); + return composite; + } + + private Timeout(Observable source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { + this.source = source; + this.timeout = timeout; + this.timeUnit = timeUnit; + this.scheduler = scheduler; + } + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java new file mode 100644 index 0000000000..a39ff45a10 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -0,0 +1,62 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import rx.Subscription; + +/** + * Represents a subscription whose underlying subscription can be swapped for another subscription + * which causes the previous underlying subscription to be unsubscribed. + * + * @see Rx.Net equivalent SerialDisposable + */ +public class SerialSubscription implements Subscription { + private boolean unsubscribed; + private Subscription subscription; + private final Object gate = new Object(); + + @Override + public void unsubscribe() { + synchronized (gate) { + if (!unsubscribed) { + if (subscription != null) { + subscription.unsubscribe(); + subscription = null; + } + unsubscribed = true; + } + } + } + + public Subscription getSubscription() { + synchronized (gate) { + return subscription; + } + } + + public void setSubscription(Subscription subscription) { + synchronized (gate) { + if (!unsubscribed) { + if (this.subscription != null) { + this.subscription.unsubscribe(); + } + this.subscription = subscription; + } else { + subscription.unsubscribe(); + } + } + } +} diff --git a/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java new file mode 100644 index 0000000000..4ffc5cef27 --- /dev/null +++ b/rxjava-core/src/test/java/rx/subscriptions/SerialSubscriptionTests.java @@ -0,0 +1,75 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subscriptions; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import rx.Subscription; + +import static org.mockito.Mockito.*; + +public class SerialSubscriptionTests { + private SerialSubscription serialSubscription; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + serialSubscription = new SerialSubscription(); + } + + @Test + public void unsubscribingWithoutUnderlyingDoesNothing() { + serialSubscription.unsubscribe(); + } + + @Test + public void unsubscribingWithSingleUnderlyingUnsubscribes() { + Subscription underlying = mock(Subscription.class); + serialSubscription.setSubscription(underlying); + underlying.unsubscribe(); + verify(underlying).unsubscribe(); + } + + @Test + public void replacingFirstUnderlyingCausesUnsubscription() { + Subscription first = mock(Subscription.class); + serialSubscription.setSubscription(first); + Subscription second = mock(Subscription.class); + serialSubscription.setSubscription(second); + verify(first).unsubscribe(); + } + + @Test + public void whenUnsubscribingSecondUnderlyingUnsubscribed() { + Subscription first = mock(Subscription.class); + serialSubscription.setSubscription(first); + Subscription second = mock(Subscription.class); + serialSubscription.setSubscription(second); + serialSubscription.unsubscribe(); + verify(second).unsubscribe(); + } + + @Test + public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscription() + { + serialSubscription.unsubscribe(); + Subscription underlying = mock(Subscription.class); + serialSubscription.setSubscription(underlying); + verify(underlying).unsubscribe(); + } +} From 22eaa5e4472d2fc89732039d98a33af1aa94cf9a Mon Sep 17 00:00:00 2001 From: John Marks Date: Mon, 14 Oct 2013 23:31:43 +0100 Subject: [PATCH 2/4] SerialSubscription and Timeout operator --- .../main/java/rx/operators/OperationTimeout.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java index 99c8c167b6..03f5e515ac 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTimeout.java @@ -40,6 +40,13 @@ private static class Timeout implements Observable.OnSubscribeFunc { private final TimeUnit timeUnit; private final Scheduler scheduler; + private Timeout(Observable source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { + this.source = source; + this.timeout = timeout; + this.timeUnit = timeUnit; + this.scheduler = scheduler; + } + @Override public Subscription onSubscribe(final Observer observer) { final AtomicBoolean terminated = new AtomicBoolean(false); @@ -117,12 +124,5 @@ public void onCompleted() { serial.setSubscription(schedule.call()); return composite; } - - private Timeout(Observable source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { - this.source = source; - this.timeout = timeout; - this.timeUnit = timeUnit; - this.scheduler = scheduler; - } } } From 10da2ae1c826c688af3d1236eb8aac237f1db397 Mon Sep 17 00:00:00 2001 From: John Marks Date: Tue, 15 Oct 2013 11:18:14 +0100 Subject: [PATCH 3/4] Improved efficiency of SerialSubscription and unit tested Timeout --- .../rx/subscriptions/SerialSubscription.java | 14 ++- .../src/test/java/rx/TimeoutTests.java | 117 ++++++++++++++++++ 2 files changed, 128 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/test/java/rx/TimeoutTests.java diff --git a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java index a39ff45a10..c1235afda6 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/SerialSubscription.java @@ -30,15 +30,19 @@ public class SerialSubscription implements Subscription { @Override public void unsubscribe() { + Subscription toUnsubscribe = null; synchronized (gate) { if (!unsubscribed) { if (subscription != null) { - subscription.unsubscribe(); + toUnsubscribe = subscription; subscription = null; } unsubscribed = true; } } + if (toUnsubscribe != null) { + toUnsubscribe.unsubscribe(); + } } public Subscription getSubscription() { @@ -48,15 +52,19 @@ public Subscription getSubscription() { } public void setSubscription(Subscription subscription) { + Subscription toUnsubscribe = null; synchronized (gate) { if (!unsubscribed) { if (this.subscription != null) { - this.subscription.unsubscribe(); + toUnsubscribe = this.subscription; } this.subscription = subscription; } else { - subscription.unsubscribe(); + toUnsubscribe = subscription; } } + if (toUnsubscribe != null) { + toUnsubscribe.unsubscribe(); + } } } diff --git a/rxjava-core/src/test/java/rx/TimeoutTests.java b/rxjava-core/src/test/java/rx/TimeoutTests.java new file mode 100644 index 0000000000..532ae42eba --- /dev/null +++ b/rxjava-core/src/test/java/rx/TimeoutTests.java @@ -0,0 +1,117 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.MockitoAnnotations; +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +public class TimeoutTests { + private PublishSubject underlyingSubject; + private TestScheduler testScheduler; + private Observable withTimeout; + private static final long TIMEOUT = 3; + private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + underlyingSubject = PublishSubject.create(); + testScheduler = new TestScheduler(); + withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler); + } + + @Test + public void shouldNotTimeoutIfOnNextWithinTimeout() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onNext("One"); + verify(observer).onNext("One"); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + verify(observer, never()).onError(any(Throwable.class)); + subscription.unsubscribe(); + } + + @Test + public void shouldNotTimeoutIfSecondOnNextWithinTimeout() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onNext("One"); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onNext("Two"); + verify(observer).onNext("Two"); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + verify(observer, never()).onError(any(Throwable.class)); + subscription.unsubscribe(); + } + + @Test + public void shouldTimeoutIfOnNextNotWithinTimeout() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS); + verify(observer).onError(any(TimeoutException.class)); + subscription.unsubscribe(); + } + + @Test + public void shouldTimeoutIfSecondOnNextNotWithinTimeout() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onNext("One"); + verify(observer).onNext("One"); + testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS); + verify(observer).onError(any(TimeoutException.class)); + subscription.unsubscribe(); + } + + @Test + public void shouldCompleteIfUnderlyingComletes() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onCompleted(); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + verify(observer).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + subscription.unsubscribe(); + } + + @Test + public void shouldErrorIfUnderlyingErrors() { + Observer observer = mock(Observer.class); + Subscription subscription = withTimeout.subscribe(observer); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + underlyingSubject.onError(new UnsupportedOperationException()); + testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); + verify(observer).onError(any(UnsupportedOperationException.class)); + subscription.unsubscribe(); + } +} From b58bc454dd6b64c39c22974d818c21d09518749d Mon Sep 17 00:00:00 2001 From: John Marks Date: Tue, 15 Oct 2013 11:37:23 +0100 Subject: [PATCH 4/4] Fixed a couple of warnings and reverted change to imports --- rxjava-core/src/main/java/rx/Observable.java | 63 ++++++++++++++++++-- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9e3724e830..8c8a0c4288 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -28,7 +28,62 @@ import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; -import rx.operators.*; +import rx.operators.OperationAll; +import rx.operators.OperationAny; +import rx.operators.OperationAverage; +import rx.operators.OperationBuffer; +import rx.operators.OperationCache; +import rx.operators.OperationCast; +import rx.operators.OperationCombineLatest; +import rx.operators.OperationConcat; +import rx.operators.OperationDebounce; +import rx.operators.OperationDefaultIfEmpty; +import rx.operators.OperationDefer; +import rx.operators.OperationDematerialize; +import rx.operators.OperationDistinct; +import rx.operators.OperationDistinctUntilChanged; +import rx.operators.OperationElementAt; +import rx.operators.OperationFilter; +import rx.operators.OperationFinally; +import rx.operators.OperationFirstOrDefault; +import rx.operators.OperationGroupBy; +import rx.operators.OperationInterval; +import rx.operators.OperationMap; +import rx.operators.OperationMaterialize; +import rx.operators.OperationMerge; +import rx.operators.OperationMergeDelayError; +import rx.operators.OperationMulticast; +import rx.operators.OperationObserveOn; +import rx.operators.OperationOnErrorResumeNextViaFunction; +import rx.operators.OperationOnErrorResumeNextViaObservable; +import rx.operators.OperationOnErrorReturn; +import rx.operators.OperationOnExceptionResumeNextViaObservable; +import rx.operators.OperationParallel; +import rx.operators.OperationRetry; +import rx.operators.OperationSample; +import rx.operators.OperationScan; +import rx.operators.OperationSkip; +import rx.operators.OperationSkipLast; +import rx.operators.OperationSkipWhile; +import rx.operators.OperationSubscribeOn; +import rx.operators.OperationSum; +import rx.operators.OperationSwitch; +import rx.operators.OperationSynchronize; +import rx.operators.OperationTake; +import rx.operators.OperationTakeLast; +import rx.operators.OperationTakeUntil; +import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottleFirst; +import rx.operators.OperationTimeout; +import rx.operators.OperationTimestamp; +import rx.operators.OperationToObservableFuture; +import rx.operators.OperationToObservableIterable; +import rx.operators.OperationToObservableList; +import rx.operators.OperationToObservableSortedList; +import rx.operators.OperationWindow; +import rx.operators.OperationZip; +import rx.operators.SafeObservableSubscription; +import rx.operators.SafeObserver; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -1801,8 +1856,6 @@ public static Observable switchOnNext(Observable - * the type of item emitted by the source Observable * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its {@link Observer}s */ @@ -1822,8 +1875,6 @@ public Observable synchronize() { * * @param lock * The lock object to synchronize each observer call on - * @param - * the type of item emitted by the source Observable * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its {@link Observer}s */ @@ -3140,7 +3191,7 @@ public Observable exists(Func1 predicate) { /** * Determines whether an observable sequence contains a specified element. * - * @param value + * @param element * The element to search in the sequence. * @return an Observable that emits if the element is in the source sequence. * @see MSDN: Observable.Contains