From e670021c35a1b3116d65a4fc475a3741e484bdab Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 26 Nov 2014 13:54:27 +0100 Subject: [PATCH 1/5] Operator OnBackpressureBlock --- src/main/java/rx/Observable.java | 182 +++++++++++- .../OperatorOnBackpressureBlock.java | 151 ++++++++++ .../operators/OnBackpressureBlockTest.java | 262 ++++++++++++++++++ 3 files changed, 584 insertions(+), 11 deletions(-) create mode 100644 src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java create mode 100644 src/test/java/rx/internal/operators/OnBackpressureBlockTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 27a97b4b3d..70a99a1e08 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -12,20 +12,142 @@ */ package rx; -import java.util.*; -import java.util.concurrent.*; - -import rx.exceptions.*; -import rx.functions.*; -import rx.internal.operators.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import rx.exceptions.Exceptions; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Action2; +import rx.functions.Func0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.functions.Func3; +import rx.functions.Func4; +import rx.functions.Func5; +import rx.functions.Func6; +import rx.functions.Func7; +import rx.functions.Func8; +import rx.functions.Func9; +import rx.functions.FuncN; +import rx.functions.Functions; +import rx.internal.operators.OnSubscribeAmb; +import rx.internal.operators.OnSubscribeCache; +import rx.internal.operators.OnSubscribeCombineLatest; +import rx.internal.operators.OnSubscribeDefer; +import rx.internal.operators.OnSubscribeDelaySubscription; +import rx.internal.operators.OnSubscribeDelaySubscriptionWithSelector; +import rx.internal.operators.OnSubscribeFromIterable; +import rx.internal.operators.OnSubscribeGroupJoin; +import rx.internal.operators.OnSubscribeJoin; +import rx.internal.operators.OnSubscribeMulticastSelector; +import rx.internal.operators.OnSubscribeRange; +import rx.internal.operators.OnSubscribeRedo; +import rx.internal.operators.OnSubscribeTimerOnce; +import rx.internal.operators.OnSubscribeTimerPeriodically; +import rx.internal.operators.OnSubscribeToObservableFuture; +import rx.internal.operators.OnSubscribeUsing; +import rx.internal.operators.OperatorAll; +import rx.internal.operators.OperatorAny; +import rx.internal.operators.OperatorAsObservable; +import rx.internal.operators.OperatorBufferWithSingleObservable; +import rx.internal.operators.OperatorBufferWithSize; +import rx.internal.operators.OperatorBufferWithStartEndObservable; +import rx.internal.operators.OperatorBufferWithTime; +import rx.internal.operators.OperatorCast; +import rx.internal.operators.OperatorConcat; +import rx.internal.operators.OperatorDebounceWithSelector; +import rx.internal.operators.OperatorDebounceWithTime; +import rx.internal.operators.OperatorDefaultIfEmpty; +import rx.internal.operators.OperatorDelay; +import rx.internal.operators.OperatorDelayWithSelector; +import rx.internal.operators.OperatorDematerialize; +import rx.internal.operators.OperatorDistinct; +import rx.internal.operators.OperatorDistinctUntilChanged; +import rx.internal.operators.OperatorDoOnEach; +import rx.internal.operators.OperatorDoOnSubscribe; +import rx.internal.operators.OperatorDoOnUnsubscribe; +import rx.internal.operators.OperatorElementAt; +import rx.internal.operators.OperatorFilter; +import rx.internal.operators.OperatorFinally; +import rx.internal.operators.OperatorGroupBy; +import rx.internal.operators.OperatorMap; +import rx.internal.operators.OperatorMapNotification; +import rx.internal.operators.OperatorMapPair; +import rx.internal.operators.OperatorMaterialize; +import rx.internal.operators.OperatorMerge; +import rx.internal.operators.OperatorMergeDelayError; +import rx.internal.operators.OperatorMergeMaxConcurrent; +import rx.internal.operators.OperatorMulticast; +import rx.internal.operators.OperatorObserveOn; +import rx.internal.operators.OperatorOnBackpressureBlock; +import rx.internal.operators.OperatorOnBackpressureBuffer; +import rx.internal.operators.OperatorOnBackpressureDrop; +import rx.internal.operators.OperatorOnErrorResumeNextViaFunction; +import rx.internal.operators.OperatorOnErrorResumeNextViaObservable; +import rx.internal.operators.OperatorOnErrorReturn; +import rx.internal.operators.OperatorOnExceptionResumeNextViaObservable; +import rx.internal.operators.OperatorPublish; +import rx.internal.operators.OperatorReplay; +import rx.internal.operators.OperatorRetryWithPredicate; +import rx.internal.operators.OperatorSampleWithObservable; +import rx.internal.operators.OperatorSampleWithTime; +import rx.internal.operators.OperatorScan; +import rx.internal.operators.OperatorSequenceEqual; +import rx.internal.operators.OperatorSerialize; +import rx.internal.operators.OperatorSingle; +import rx.internal.operators.OperatorSkip; +import rx.internal.operators.OperatorSkipLast; +import rx.internal.operators.OperatorSkipLastTimed; +import rx.internal.operators.OperatorSkipTimed; +import rx.internal.operators.OperatorSkipUntil; +import rx.internal.operators.OperatorSkipWhile; +import rx.internal.operators.OperatorSubscribeOn; +import rx.internal.operators.OperatorSwitch; +import rx.internal.operators.OperatorTake; +import rx.internal.operators.OperatorTakeLast; +import rx.internal.operators.OperatorTakeLastTimed; +import rx.internal.operators.OperatorTakeTimed; +import rx.internal.operators.OperatorTakeUntil; +import rx.internal.operators.OperatorTakeWhile; +import rx.internal.operators.OperatorThrottleFirst; +import rx.internal.operators.OperatorTimeInterval; +import rx.internal.operators.OperatorTimeout; +import rx.internal.operators.OperatorTimeoutWithSelector; +import rx.internal.operators.OperatorTimestamp; +import rx.internal.operators.OperatorToMap; +import rx.internal.operators.OperatorToMultimap; +import rx.internal.operators.OperatorToObservableList; +import rx.internal.operators.OperatorToObservableSortedList; +import rx.internal.operators.OperatorUnsubscribeOn; +import rx.internal.operators.OperatorWindowWithObservable; +import rx.internal.operators.OperatorWindowWithSize; +import rx.internal.operators.OperatorWindowWithStartEndObservable; +import rx.internal.operators.OperatorWindowWithTime; +import rx.internal.operators.OperatorZip; +import rx.internal.operators.OperatorZipIterable; +import rx.internal.util.RxRingBuffer; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; - -import rx.observables.*; +import rx.observables.BlockingObservable; +import rx.observables.ConnectableObservable; +import rx.observables.GroupedObservable; import rx.observers.SafeSubscriber; -import rx.plugins.*; -import rx.schedulers.*; -import rx.subjects.*; +import rx.plugins.RxJavaObservableExecutionHook; +import rx.plugins.RxJavaPlugins; +import rx.schedulers.Schedulers; +import rx.schedulers.TimeInterval; +import rx.schedulers.Timestamped; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; /** @@ -182,6 +304,7 @@ public void call(Subscriber o) { * @return the source Observable, transformed by the transformer function * @see RxJava wiki: Implementing Your Own Operators */ + @SuppressWarnings("unchecked") public Observable compose(Transformer transformer) { return ((Transformer) transformer).call(this); } @@ -5054,6 +5177,43 @@ public final Observable onBackpressureDrop() { return lift(new OperatorOnBackpressureDrop()); } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them is to + * block the producer thread. + *

+ * The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the + * consumer side considers the amount its downstream requested through {@code Producer.request(n)} + * and doesn't emit more than requested even if more is available. For example, using + * {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException. + *

+ * Note that if the upstream Observable does support backpressure, this operator ignores that capability + * and doesn't propagate any backpressure requests from downstream. + * + * @param maxQueueLength the maximum number of items the producer can emit without blocking + * @return the source Observable modified to block {@code onNext} notifications on overflow + * @see RxJava wiki: Backpressure + */ + public final Observable onBackpressureBlock(int maxQueueLength) { + return lift(new OperatorOnBackpressureBlock(maxQueueLength)); + } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them is to + * block the producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size. + *

+ * The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but the + * consumer side considers the amount its downstream requested through {@code Producer.request(n)} + * and doesn't emit more than requested even if available. + *

+ * Note that if the upstream Observable does support backpressure, this operator ignores that capability + * and doesn't propagate any backpressure requests from downstream. + * + * @return the source Observable modified to block {@code onNext} notifications on overflow + * @see RxJava wiki: Backpressure + */ + public final Observable onBackpressureBlock() { + return onBackpressureBlock(RxRingBuffer.SIZE); + } + /** * Instructs an Observable to pass control to another Observable rather than invoking * {@link Observer#onError onError} if it encounters an error. diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java new file mode 100644 index 0000000000..60f81c3522 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -0,0 +1,151 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; + +/** + * Operator that uses blocks the producer thread in case a backpressure is needed. + */ +public class OperatorOnBackpressureBlock implements Operator { + final int max; + public OperatorOnBackpressureBlock(int max) { + this.max = max; + } + @Override + public Subscriber call(Subscriber child) { + BlockingSubscriber s = new BlockingSubscriber(max, child); + s.init(); + return s; + } + + static final class BlockingSubscriber extends Subscriber { + final NotificationLite nl = NotificationLite.instance(); + final BlockingQueue queue; + final Subscriber child; + /** Guarded by this. */ + long requestedCount; + /** Guarded by this. */ + boolean emitting; + volatile boolean terminated; + /** Set before terminated, read after terminated. */ + Throwable exception; + public BlockingSubscriber(int max, Subscriber child) { + this.queue = new ArrayBlockingQueue(max); + this.child = child; + } + void init() { + child.add(this); + child.setProducer(new Producer() { + @Override + public void request(long n) { + synchronized (BlockingSubscriber.this) { + if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) { + requestedCount = Long.MAX_VALUE; + } else { + requestedCount += n; + } + } + drain(); + } + }); + } + @Override + public void onNext(T t) { + try { + queue.put(nl.next(t)); + drain(); + } catch (InterruptedException ex) { + if (!isUnsubscribed()) { + onError(ex); + } + } + } + @Override + public void onError(Throwable e) { + if (!terminated) { + exception = e; + terminated = true; + drain(); + } + } + @Override + public void onCompleted() { + terminated = true; + drain(); + } + void drain() { + long n; + synchronized (this) { + if (emitting) { + return; + } + emitting = true; + n = requestedCount; + } + boolean skipFinal = false; + try { + while (true) { + int emitted = 0; + while (n > 0) { + Object o = queue.poll(); + if (o == null) { + if (terminated) { + if (exception != null) { + child.onError(exception); + } else { + child.onCompleted(); + } + return; + } + if (n == Long.MAX_VALUE) { + return; + } else { + break; + } + } else { + child.onNext(nl.getValue(o)); + n--; + emitted++; + } + } + synchronized (this) { + if (requestedCount == Long.MAX_VALUE || emitted == 0) { + skipFinal = true; + emitting = false; + return; + } else { + requestedCount -= emitted; + n += requestedCount; + } + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + } +} diff --git a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java new file mode 100644 index 0000000000..64953c26f2 --- /dev/null +++ b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java @@ -0,0 +1,262 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import static org.mockito.Mockito.*; + +import java.util.Arrays; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Subscriber; +import rx.exceptions.MissingBackpressureException; +import rx.exceptions.TestException; +import rx.internal.util.RxRingBuffer; +import rx.observers.TestObserver; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +/** + * Test the onBackpressureBlock() behavior. + */ +public class OnBackpressureBlockTest { + static final int WAIT = 200; + + @Test(timeout = 1000) + public void testSimpleBelowCapacity() { + Observable source = Observable.just(1).onBackpressureBlock(10); + + TestObserver o = new TestObserver(); + source.subscribe(o); + + o.assertReceivedOnNext(Arrays.asList(1)); + o.assertTerminalEvent(); + assertTrue(o.getOnErrorEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testSimpleAboveCapacity() throws InterruptedException { + Observable source = Observable.range(1, 11).subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(10); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)); + + o.assertTerminalEvent(); + assertTrue(o.getOnErrorEvents().isEmpty()); + } + + @Test(timeout = 3000) + public void testNoMissingBackpressureException() { + final int NUM_VALUES = RxRingBuffer.SIZE * 3; + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t1) { + for (int i = 0; i < NUM_VALUES; i++) { + t1.onNext(i); + } + t1.onCompleted(); + } + }).subscribeOn(Schedulers.newThread()); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber s = new TestSubscriber(o); + + source.onBackpressureBlock(RxRingBuffer.SIZE).observeOn(Schedulers.newThread()).subscribe(s); + + s.awaitTerminalEvent(); + + verify(o, never()).onError(any(MissingBackpressureException.class)); + + s.assertNoErrors(); + verify(o, times(NUM_VALUES)).onNext(any(Integer.class)); + verify(o).onCompleted(); + } + @Test(timeout = 10000) + public void testBlockedProducerCanBeUnsubscribed() throws InterruptedException { + Observable source = Observable.range(1, 11).subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(5); + + Thread.sleep(WAIT); + + o.unsubscribe(); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testExceptionIsDelivered() throws InterruptedException { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(3); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertTerminalEvent(); + assertEquals(1, o.getOnErrorEvents().size()); + assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); + } + @Test(timeout = 10000) + public void testExceptionIsDeliveredAfterValues() throws InterruptedException { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + assertEquals(1, o.getOnErrorEvents().size()); + assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); + assertTrue(o.getOnCompletedEvents().isEmpty()); + } + @Test + public void testTakeWorksWithSubscriberRequesting() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5).take(7); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } + @Test + public void testTakeWorksSubscriberRequestUnlimited() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5).take(7); + + TestSubscriber o = new TestSubscriber(); + source.subscribe(o); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } + @Test + public void testTakeWorksSubscriberRequestUnlimitedBufferedException() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(11).take(7); + + TestSubscriber o = new TestSubscriber(); + source.subscribe(o); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } +} From fc6ea3a1c69d9a01f6911b0ffb03ec03e2f537ae Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 26 Nov 2014 14:00:23 +0100 Subject: [PATCH 2/5] Fixed accidental import * expansion. --- src/main/java/rx/Observable.java | 146 +++---------------------------- 1 file changed, 12 insertions(+), 134 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 70a99a1e08..e3bc5cb3a8 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -12,142 +12,20 @@ */ package rx; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import rx.exceptions.Exceptions; -import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.functions.Func3; -import rx.functions.Func4; -import rx.functions.Func5; -import rx.functions.Func6; -import rx.functions.Func7; -import rx.functions.Func8; -import rx.functions.Func9; -import rx.functions.FuncN; -import rx.functions.Functions; -import rx.internal.operators.OnSubscribeAmb; -import rx.internal.operators.OnSubscribeCache; -import rx.internal.operators.OnSubscribeCombineLatest; -import rx.internal.operators.OnSubscribeDefer; -import rx.internal.operators.OnSubscribeDelaySubscription; -import rx.internal.operators.OnSubscribeDelaySubscriptionWithSelector; -import rx.internal.operators.OnSubscribeFromIterable; -import rx.internal.operators.OnSubscribeGroupJoin; -import rx.internal.operators.OnSubscribeJoin; -import rx.internal.operators.OnSubscribeMulticastSelector; -import rx.internal.operators.OnSubscribeRange; -import rx.internal.operators.OnSubscribeRedo; -import rx.internal.operators.OnSubscribeTimerOnce; -import rx.internal.operators.OnSubscribeTimerPeriodically; -import rx.internal.operators.OnSubscribeToObservableFuture; -import rx.internal.operators.OnSubscribeUsing; -import rx.internal.operators.OperatorAll; -import rx.internal.operators.OperatorAny; -import rx.internal.operators.OperatorAsObservable; -import rx.internal.operators.OperatorBufferWithSingleObservable; -import rx.internal.operators.OperatorBufferWithSize; -import rx.internal.operators.OperatorBufferWithStartEndObservable; -import rx.internal.operators.OperatorBufferWithTime; -import rx.internal.operators.OperatorCast; -import rx.internal.operators.OperatorConcat; -import rx.internal.operators.OperatorDebounceWithSelector; -import rx.internal.operators.OperatorDebounceWithTime; -import rx.internal.operators.OperatorDefaultIfEmpty; -import rx.internal.operators.OperatorDelay; -import rx.internal.operators.OperatorDelayWithSelector; -import rx.internal.operators.OperatorDematerialize; -import rx.internal.operators.OperatorDistinct; -import rx.internal.operators.OperatorDistinctUntilChanged; -import rx.internal.operators.OperatorDoOnEach; -import rx.internal.operators.OperatorDoOnSubscribe; -import rx.internal.operators.OperatorDoOnUnsubscribe; -import rx.internal.operators.OperatorElementAt; -import rx.internal.operators.OperatorFilter; -import rx.internal.operators.OperatorFinally; -import rx.internal.operators.OperatorGroupBy; -import rx.internal.operators.OperatorMap; -import rx.internal.operators.OperatorMapNotification; -import rx.internal.operators.OperatorMapPair; -import rx.internal.operators.OperatorMaterialize; -import rx.internal.operators.OperatorMerge; -import rx.internal.operators.OperatorMergeDelayError; -import rx.internal.operators.OperatorMergeMaxConcurrent; -import rx.internal.operators.OperatorMulticast; -import rx.internal.operators.OperatorObserveOn; -import rx.internal.operators.OperatorOnBackpressureBlock; -import rx.internal.operators.OperatorOnBackpressureBuffer; -import rx.internal.operators.OperatorOnBackpressureDrop; -import rx.internal.operators.OperatorOnErrorResumeNextViaFunction; -import rx.internal.operators.OperatorOnErrorResumeNextViaObservable; -import rx.internal.operators.OperatorOnErrorReturn; -import rx.internal.operators.OperatorOnExceptionResumeNextViaObservable; -import rx.internal.operators.OperatorPublish; -import rx.internal.operators.OperatorReplay; -import rx.internal.operators.OperatorRetryWithPredicate; -import rx.internal.operators.OperatorSampleWithObservable; -import rx.internal.operators.OperatorSampleWithTime; -import rx.internal.operators.OperatorScan; -import rx.internal.operators.OperatorSequenceEqual; -import rx.internal.operators.OperatorSerialize; -import rx.internal.operators.OperatorSingle; -import rx.internal.operators.OperatorSkip; -import rx.internal.operators.OperatorSkipLast; -import rx.internal.operators.OperatorSkipLastTimed; -import rx.internal.operators.OperatorSkipTimed; -import rx.internal.operators.OperatorSkipUntil; -import rx.internal.operators.OperatorSkipWhile; -import rx.internal.operators.OperatorSubscribeOn; -import rx.internal.operators.OperatorSwitch; -import rx.internal.operators.OperatorTake; -import rx.internal.operators.OperatorTakeLast; -import rx.internal.operators.OperatorTakeLastTimed; -import rx.internal.operators.OperatorTakeTimed; -import rx.internal.operators.OperatorTakeUntil; -import rx.internal.operators.OperatorTakeWhile; -import rx.internal.operators.OperatorThrottleFirst; -import rx.internal.operators.OperatorTimeInterval; -import rx.internal.operators.OperatorTimeout; -import rx.internal.operators.OperatorTimeoutWithSelector; -import rx.internal.operators.OperatorTimestamp; -import rx.internal.operators.OperatorToMap; -import rx.internal.operators.OperatorToMultimap; -import rx.internal.operators.OperatorToObservableList; -import rx.internal.operators.OperatorToObservableSortedList; -import rx.internal.operators.OperatorUnsubscribeOn; -import rx.internal.operators.OperatorWindowWithObservable; -import rx.internal.operators.OperatorWindowWithSize; -import rx.internal.operators.OperatorWindowWithStartEndObservable; -import rx.internal.operators.OperatorWindowWithTime; -import rx.internal.operators.OperatorZip; -import rx.internal.operators.OperatorZipIterable; -import rx.internal.util.RxRingBuffer; +import java.util.*; +import java.util.concurrent.*; + +import rx.exceptions.*; +import rx.functions.*; +import rx.internal.operators.*; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; -import rx.observables.BlockingObservable; -import rx.observables.ConnectableObservable; -import rx.observables.GroupedObservable; + +import rx.observables.*; import rx.observers.SafeSubscriber; -import rx.plugins.RxJavaObservableExecutionHook; -import rx.plugins.RxJavaPlugins; -import rx.schedulers.Schedulers; -import rx.schedulers.TimeInterval; -import rx.schedulers.Timestamped; -import rx.subjects.ReplaySubject; -import rx.subjects.Subject; +import rx.plugins.*; +import rx.schedulers.*; +import rx.subjects.*; import rx.subscriptions.Subscriptions; /** @@ -5211,7 +5089,7 @@ public final Observable onBackpressureBlock(int maxQueueLength) { * @see RxJava wiki: Backpressure */ public final Observable onBackpressureBlock() { - return onBackpressureBlock(RxRingBuffer.SIZE); + return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE); } /** From beb515651892c51919b146442ef01da4af63da11 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 26 Nov 2014 14:54:54 +0100 Subject: [PATCH 3/5] Do not leave drain if queue has data and downstream doesn't do backpressure. --- .../OperatorOnBackpressureBlock.java | 26 ++++++++++++------- .../operators/OnBackpressureBlockTest.java | 6 ++--- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java index 60f81c3522..37f43ccdd3 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -117,11 +117,7 @@ void drain() { } return; } - if (n == Long.MAX_VALUE) { - return; - } else { - break; - } + break; } else { child.onNext(nl.getValue(o)); n--; @@ -129,13 +125,23 @@ void drain() { } } synchronized (this) { - if (requestedCount == Long.MAX_VALUE || emitted == 0) { - skipFinal = true; - emitting = false; - return; + // if no backpressure below + if (requestedCount == Long.MAX_VALUE) { + // no new data arrived since the last poll + if (queue.peek() == null) { + skipFinal = true; + emitting = false; + return; + } + n = Long.MAX_VALUE; } else { + if (emitted == 0) { + skipFinal = true; + emitting = false; + return; + } requestedCount -= emitted; - n += requestedCount; + n = requestedCount; } } } diff --git a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java index 64953c26f2..b247b15e6b 100644 --- a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java +++ b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java @@ -204,7 +204,7 @@ public void onStart() { assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); assertTrue(o.getOnCompletedEvents().isEmpty()); } - @Test + @Test(timeout = 10000) public void testTakeWorksWithSubscriberRequesting() { Observable source = Observable.range(1, 10) .concatWith(Observable.error(new TestException("Forced failure"))) @@ -227,7 +227,7 @@ public void onStart() { o.assertNoErrors(); o.assertTerminalEvent(); } - @Test + @Test(timeout = 10000) public void testTakeWorksSubscriberRequestUnlimited() { Observable source = Observable.range(1, 10) .concatWith(Observable.error(new TestException("Forced failure"))) @@ -243,7 +243,7 @@ public void testTakeWorksSubscriberRequestUnlimited() { o.assertNoErrors(); o.assertTerminalEvent(); } - @Test + @Test(timeout = 10000) public void testTakeWorksSubscriberRequestUnlimitedBufferedException() { Observable source = Observable.range(1, 10) .concatWith(Observable.error(new TestException("Forced failure"))) From 1bf3fafcdbf2457a25ed5a59ab358c1e32cf089c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 26 Nov 2014 16:05:46 +0100 Subject: [PATCH 4/5] Fixed typo. --- .../java/rx/internal/operators/OperatorOnBackpressureBlock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java index 37f43ccdd3..bb328788e9 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -24,7 +24,7 @@ import rx.Subscriber; /** - * Operator that uses blocks the producer thread in case a backpressure is needed. + * Operator that blocks the producer thread in case a backpressure is needed. */ public class OperatorOnBackpressureBlock implements Operator { final int max; From 181c0aa17e3b3639144105292e259738726948f4 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 29 Nov 2014 13:50:42 -0800 Subject: [PATCH 5/5] Marking OnBackpressureBlock as @Experimental I chose Experimental instead of Beta since we may still change how it behaves based on what Scheduler is being used. --- src/main/java/rx/Observable.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index e3bc5cb3a8..70a5655011 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -15,12 +15,12 @@ import java.util.*; import java.util.concurrent.*; +import rx.annotations.Experimental; import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; - import rx.observables.*; import rx.observers.SafeSubscriber; import rx.plugins.*; @@ -5070,7 +5070,9 @@ public final Observable onBackpressureDrop() { * @param maxQueueLength the maximum number of items the producer can emit without blocking * @return the source Observable modified to block {@code onNext} notifications on overflow * @see RxJava wiki: Backpressure + * @Experimental The behavior of this can change at any time. */ + @Experimental public final Observable onBackpressureBlock(int maxQueueLength) { return lift(new OperatorOnBackpressureBlock(maxQueueLength)); } @@ -5087,7 +5089,9 @@ public final Observable onBackpressureBlock(int maxQueueLength) { * * @return the source Observable modified to block {@code onNext} notifications on overflow * @see RxJava wiki: Backpressure + * @Experimental The behavior of this can change at any time. */ + @Experimental public final Observable onBackpressureBlock() { return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE); }