Skip to content

Commit

Permalink
stub: Wait for onClose when blocking stub is interrupted
Browse files Browse the repository at this point in the history
Interceptors need to see the onClose to clean up properly.

This also changes an isInterrupted() to interrupted(), since previously
the interrupted flag was still set when InterruptedException was thrown.
This caused an infinite loop with the new code. Previously, all callers
immediately re-set the interrupted flag, so there was no issue.

Fixes #5576
  • Loading branch information
ejona86 authored Apr 22, 2019
1 parent f4d48fe commit 6d44f46
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 39 deletions.
79 changes: 49 additions & 30 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,30 @@ public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call
public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withCause(e)
.asRuntimeException();
interrupt = true;
call.cancel("Thread interrupted", e);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}

Expand Down Expand Up @@ -208,7 +213,7 @@ private static <V> V getUnchecked(Future<V> future) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withDescription("Thread interrupted")
.withCause(e)
.asRuntimeException();
} catch (ExecutionException e) {
Expand Down Expand Up @@ -546,30 +551,45 @@ ClientCall.Listener<T> listener() {
return listener;
}

private Object waitForNext() throws InterruptedException {
if (threadless == null) {
return buffer.take();
} else {
Object next = buffer.poll();
while (next == null) {
threadless.waitAndDrain();
next = buffer.poll();
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return next;
}
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
return next;
}
}

@Override
public boolean hasNext() {
if (last == null) {
try {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException();
}
while (last == null) {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
}
if (last instanceof StatusRuntimeException) {
// Rethrow the exception with a new stacktrace.
Expand Down Expand Up @@ -643,15 +663,14 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
* Must only be called by one thread at a time.
*/
public void waitAndDrain() throws InterruptedException {
final Thread currentThread = Thread.currentThread();
throwIfInterrupted(currentThread);
throwIfInterrupted();
Runnable runnable = poll();
if (runnable == null) {
waiter = currentThread;
waiter = Thread.currentThread();
try {
while ((runnable = poll()) == null) {
LockSupport.park(this);
throwIfInterrupted(currentThread);
throwIfInterrupted();
}
} finally {
waiter = null;
Expand All @@ -666,8 +685,8 @@ public void waitAndDrain() throws InterruptedException {
} while ((runnable = poll()) != null);
}

private static void throwIfInterrupted(Thread currentThread) throws InterruptedException {
if (currentThread.isInterrupted()) {
private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
Expand Down
Loading

0 comments on commit 6d44f46

Please sign in to comment.