diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 59649bff78365..c5d8f29e39ae1 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -925,7 +925,10 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onRequestSent(node, requestId, action, request, options); + final TransportRequestOptions finalOptions = options; + ChannelFutureListener channelFutureListener = + f -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions); + future.addListener(channelFutureListener); } finally { if (!addedReleaseListener) { Releasables.close(bStream.bytes()); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 57893ff1908b1..91b6bc120add4 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -37,6 +37,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,7 +115,10 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onResponseSent(requestId, action, response, options); + final TransportResponseOptions finalOptions = options; + ChannelFutureListener onResponseSentListener = + f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions); + future.addListener(onResponseSentListener); } finally { if (!addedReleaseListener && bStream != null) { Releasables.close(bStream.bytes()); @@ -137,8 +141,10 @@ public void sendResponse(Throwable error) throws IOException { BytesReference bytes = stream.bytes(); ChannelBuffer buffer = bytes.toChannelBuffer(); NettyHeader.writeHeader(buffer, requestId, status, version); - channel.write(buffer); - transportServiceAdapter.onResponseSent(requestId, action, error); + ChannelFuture future = channel.write(buffer); + ChannelFutureListener onResponseSentListener = + f -> transportServiceAdapter.onResponseSent(requestId, action, error); + future.addListener(onResponseSentListener); } private void close() {