Skip to content

Commit

Permalink
2.x: fix Flowable.concatMapEager hang due to bad request management (#…
Browse files Browse the repository at this point in the history
…4751)

* 2.x: fix Flowable.concatMapEager hang due to bad request management

* Missed negation of check
  • Loading branch information
akarnokd authored Oct 22, 2016
1 parent 792d1cf commit f79625a
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,11 @@ public void drain() {
int missed = 1;
InnerQueuedSubscriber<R> inner = current;
Subscriber<? super R> a = actual;
long r = requested.get();
long e = 0L;
ErrorMode em = errorMode;

outer:
for (;;) {
long r = requested.get();
long e = 0L;

if (inner == null) {

Expand Down Expand Up @@ -271,6 +270,8 @@ public void drain() {
}
}

boolean continueNextSource = false;

if (inner != null) {
SimpleQueue<R> q = inner.queue();
if (q != null) {
Expand Down Expand Up @@ -313,7 +314,8 @@ public void drain() {
inner = null;
current = null;
s.request(1);
continue outer;
continueNextSource = true;
break;
}

if (empty) {
Expand Down Expand Up @@ -353,15 +355,18 @@ public void drain() {
inner = null;
current = null;
s.request(1);
continue;
continueNextSource = true;
}
}
}
}

if (e != 0L && r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
e = 0L;
requested.addAndGet(-e);
}

if (continueNextSource) {
continue;
}

missed = addAndGet(-missed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public void onSubscribe(Subscription s) {
if (m == QueueSubscription.ASYNC) {
fusionMode = m;
queue = qs;
QueueDrainHelper.request(get(), prefetch);
QueueDrainHelper.request(s, prefetch);
return;
}
}

queue = QueueDrainHelper.createQueue(prefetch);

QueueDrainHelper.request(get(), prefetch);
QueueDrainHelper.request(s, prefetch);
}
}

Expand All @@ -104,22 +104,26 @@ public void onComplete() {

@Override
public void request(long n) {
long p = produced + n;
if (p >= limit) {
produced = 0L;
get().request(p);
} else {
produced = p;
if (fusionMode != QueueSubscription.SYNC) {
long p = produced + n;
if (p >= limit) {
produced = 0L;
get().request(p);
} else {
produced = p;
}
}
}

public void requestOne() {
long p = produced + 1;
if (p == limit) {
produced = 0L;
get().request(p);
} else {
produced = p;
if (fusionMode != QueueSubscription.SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0L;
get().request(p);
} else {
produced = p;
}
}
}

Expand Down
27 changes: 16 additions & 11 deletions src/test/java/io/reactivex/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.operators.maybe.MaybeToFlowable;
import io.reactivex.internal.operators.single.SingleToFlowable;
Expand Down Expand Up @@ -144,21 +145,25 @@ public void accept(Throwable t) {
}

public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz) {
try {
assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index)));
} catch (AssertionError e) {
list.get(index).printStackTrace();
throw e;
Throwable ex = list.get(index);
if (!clazz.isInstance(ex)) {
AssertionError err = new AssertionError(clazz + " expected but got " + list.get(index));
err.initCause(list.get(index));
throw err;
}
}

public static void assertError(List<Throwable> list, int index, Class<? extends Throwable> clazz, String message) {
try {
assertTrue(list.get(index).toString(), clazz.isInstance(list.get(index)));
assertEquals(message, list.get(index).getMessage());
} catch (AssertionError e) {
list.get(index).printStackTrace();
throw e;
Throwable ex = list.get(index);
if (!clazz.isInstance(ex)) {
AssertionError err = new AssertionError("Type " + clazz + " expected but got " + ex);
err.initCause(ex);
throw err;
}
if (!ObjectHelper.equals(message, ex.getMessage())) {
AssertionError err = new AssertionError("Message " + message + " expected but got " + ex.getMessage());
err.initCause(ex);
throw err;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,12 @@ public void testAsynchronousRun() {
public Flowable<Integer> apply(Integer t) {
return Flowable.range(1, 1000).subscribeOn(Schedulers.computation());
}
}).observeOn(Schedulers.newThread()).subscribe(ts);

ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
ts.assertNoErrors();
ts.assertValueCount(2000);
}).observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertNoErrors()
.assertValueCount(2000)
.assertComplete();
}

@Test
Expand Down

0 comments on commit f79625a

Please sign in to comment.