From 876bb468e6bc0f90baf03c145963b3b1553123f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 24 May 2016 15:02:06 -0400 Subject: [PATCH] Netty request/response tracer should wait for send This commit backports commit 4c7993ea719b2d06360135bb57ac43e28ae1fe09 from master to 2.x. Relates #18500 --- .../transport/netty/NettyTransport.java | 9 +++++++- .../netty/NettyTransportChannel.java | 23 +++++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 24bcfec677afa..83bfec3cdb156 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -881,7 +881,14 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onRequestSent(node, requestId, action, request, options); + final TransportRequestOptions finalOptions = options; + ChannelFutureListener channelFutureListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions); + } + }; + future.addListener(channelFutureListener); } finally { if (!addedReleaseListener) { Releasables.close(bStream.bytes()); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index ff99a25cf628a..a94c1470afdcc 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -33,6 +33,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,7 +81,7 @@ public void sendResponse(TransportResponse response) throws IOException { } @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException { close(); if (transport.compress) { options = TransportResponseOptions.builder(options).withCompress(transport.compress).build(); @@ -110,7 +111,14 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onResponseSent(requestId, action, response, options); + final TransportResponseOptions finalOptions = options; + ChannelFutureListener onResponseSentListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions); + } + }; + future.addListener(onResponseSentListener); } finally { if (!addedReleaseListener && bStream != null) { Releasables.close(bStream.bytes()); @@ -119,7 +127,7 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op } @Override - public void sendResponse(Throwable error) throws IOException { + public void sendResponse(final Throwable error) throws IOException { close(); BytesStreamOutput stream = new BytesStreamOutput(); stream.skip(NettyHeader.HEADER_SIZE); @@ -134,7 +142,14 @@ public void sendResponse(Throwable error) throws IOException { ChannelBuffer buffer = bytes.toChannelBuffer(); NettyHeader.writeHeader(buffer, requestId, status, version); channel.write(buffer); - transportServiceAdapter.onResponseSent(requestId, action, error); + ChannelFuture future = channel.write(buffer); + ChannelFutureListener onResponseSentListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + transportServiceAdapter.onResponseSent(requestId, action, error); + } + }; + future.addListener(onResponseSentListener); } private void close() {