Skip to content

Commit

Permalink
Netty request/response tracer should wait for send
Browse files Browse the repository at this point in the history
This commit backports commit 4c7993e
from master to 2.x.

Relates #18500
  • Loading branch information
jasontedor committed May 24, 2016
1 parent 3d023c9 commit 876bb46
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,14 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
final TransportRequestOptions finalOptions = options;
ChannelFutureListener channelFutureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions);
}
};
future.addListener(channelFutureListener);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void sendResponse(TransportResponse response) throws IOException {
}

@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException {
close();
if (transport.compress) {
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
Expand Down Expand Up @@ -110,7 +111,14 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);
addedReleaseListener = true;
transportServiceAdapter.onResponseSent(requestId, action, response, options);
final TransportResponseOptions finalOptions = options;
ChannelFutureListener onResponseSentListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
}
};
future.addListener(onResponseSentListener);
} finally {
if (!addedReleaseListener && bStream != null) {
Releasables.close(bStream.bytes());
Expand All @@ -119,7 +127,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op
}

@Override
public void sendResponse(Throwable error) throws IOException {
public void sendResponse(final Throwable error) throws IOException {
close();
BytesStreamOutput stream = new BytesStreamOutput();
stream.skip(NettyHeader.HEADER_SIZE);
Expand All @@ -134,7 +142,14 @@ public void sendResponse(Throwable error) throws IOException {
ChannelBuffer buffer = bytes.toChannelBuffer();
NettyHeader.writeHeader(buffer, requestId, status, version);
channel.write(buffer);
transportServiceAdapter.onResponseSent(requestId, action, error);
ChannelFuture future = channel.write(buffer);
ChannelFutureListener onResponseSentListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
transportServiceAdapter.onResponseSent(requestId, action, error);
}
};
future.addListener(onResponseSentListener);
}

private void close() {
Expand Down

0 comments on commit 876bb46

Please sign in to comment.