Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make sure outstanding RPCs count in ChannelPool can not go negative #2185

Merged
merged 9 commits into from
Oct 19, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ private boolean retain() {
private void release() {
int newCount = outstandingRpcs.decrementAndGet();
if (newCount < 0) {
throw new IllegalStateException("Bug: reference count is negative!: " + newCount);
LOG.log(Level.WARNING, "Bug: Reference count is negative!: " + newCount);
}

// Must check outstandingRpcs after shutdownRequested (in reverse order of retain()) to ensure
Expand Down Expand Up @@ -526,6 +526,8 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
@Nullable private CancellationException cancellationException;
final Entry entry;
private final AtomicBoolean wasClosed = new AtomicBoolean();
private final AtomicBoolean wasReleased = new AtomicBoolean();

public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry) {
super(delegate);
Expand All @@ -542,17 +544,32 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
try {
if (wasClosed.compareAndSet(false, true)) {
super.onClose(status, trailers);
} finally {
entry.release();
if (wasReleased.compareAndSet(false, true)) {
entry.release();
} else {
LOG.log(
Level.WARNING,
"The entry is already released, this indicates that there is an exception on start of the call");
}
} else {
LOG.log(
Level.WARNING,
"onClose() has already being called, please make sure onClose() is not being manually called, otherwise this may indicate a bug in gRPC library");
}
}
},
headers);
} catch (Exception e) {
// In case start failed, make sure to release
entry.release();
if (wasReleased.compareAndSet(false, true)) {
entry.release();
} else {
LOG.log(
Level.WARNING,
"The entry is already released, this indicates that onClose() has already being called previously");
}
throw e;
}
}
Expand Down