diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java index 1fc6a196f6..b3cb5ff531 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMap.java @@ -202,6 +202,7 @@ void drain() { boolean empty = t == null; if (d && empty) { + disposed = true; actual.onComplete(); return; } @@ -367,7 +368,7 @@ public void onComplete() { @Override public boolean isDisposed() { - return d.isDisposed(); + return cancelled; } @Override @@ -400,7 +401,7 @@ void drain() { Throwable ex = error.get(); if (ex != null) { queue.clear(); - + cancelled = true; actual.onError(error.terminate()); return; } @@ -414,6 +415,7 @@ void drain() { v = queue.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; this.d.dispose(); error.addThrowable(ex); actual.onError(error.terminate()); @@ -423,6 +425,7 @@ void drain() { boolean empty = v == null; if (d && empty) { + cancelled = true; Throwable ex = error.terminate(); if (ex != null) { actual.onError(ex); @@ -440,6 +443,7 @@ void drain() { o = ObjectHelper.requireNonNull(mapper.apply(v), "The mapper returned a null ObservableSource"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); + cancelled = true; this.d.dispose(); queue.clear(); error.addThrowable(ex); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapTest.java index efa9951efe..9991e65254 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapTest.java @@ -13,15 +13,18 @@ package io.reactivex.internal.operators.observable; +import static org.junit.Assert.assertTrue; + import java.util.List; import java.util.concurrent.Callable; import org.junit.Test; import io.reactivex.*; -import io.reactivex.disposables.Disposables; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; @@ -367,4 +370,66 @@ protected void subscribeActual(Observer observer) { RxJavaPlugins.reset(); } } + + @SuppressWarnings("unchecked") + @Test + public void concatReportsDisposedOnComplete() { + final Disposable[] disposable = { null }; + + Observable.fromArray(Observable.just(1), Observable.just(2)) + .hide() + .concatMap(Functions.>identity()) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } + + @Test + @SuppressWarnings("unchecked") + public void concatReportsDisposedOnError() { + final Disposable[] disposable = { null }; + + Observable.fromArray(Observable.just(1), Observable.error(new TestException())) + .hide() + .concatMap(Functions.>identity()) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java index a2489dcff1..2a158cb714 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java @@ -1041,4 +1041,118 @@ public void subscribe(ObservableEmitter s) throws Exception { assertEquals(1, calls[0]); } + + @Test + public void concatReportsDisposedOnComplete() { + final Disposable[] disposable = { null }; + + Observable.concat(Observable.just(1), Observable.just(2)) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } + + @Test + @SuppressWarnings("unchecked") + public void concatReportsDisposedOnCompleteDelayError() { + final Disposable[] disposable = { null }; + + Observable.concatArrayDelayError(Observable.just(1), Observable.just(2)) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } + + @Test + public void concatReportsDisposedOnError() { + final Disposable[] disposable = { null }; + + Observable.concat(Observable.just(1), Observable.error(new TestException())) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } + + @Test + @SuppressWarnings("unchecked") + public void concatReportsDisposedOnErrorDelayError() { + final Disposable[] disposable = { null }; + + Observable.concatArrayDelayError(Observable.just(1), Observable.error(new TestException())) + .subscribe(new Observer() { + + @Override + public void onSubscribe(Disposable d) { + disposable[0] = d; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onComplete() { + } + }); + + assertTrue(disposable[0].isDisposed()); + } }