From 4ff856bfde9e7ff16dfda4a3f559eee882eb1a09 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 20 May 2016 16:10:30 -0400 Subject: [PATCH] Netty request/response tracer should wait for send We write to Netty channels in an async fashion, but notify listeners via a transport service adapter before we are certain that the channel write succeeded. In particular, the tracer logs are implemented via a transport service adapter and this means that we can write tracer logs before a write was successful and in some cases the write might fail leading to misleading logs. This commit attaches the transport service adapters to channel writes as a listener so that the notification occurs only after a successful write. --- .../transport/netty/NettyTransport.java | 5 ++++- .../transport/netty/NettyTransportChannel.java | 12 +++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 59649bff78365..c5d8f29e39ae1 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -925,7 +925,10 @@ public void sendRequest(final DiscoveryNode node, final long requestId, final St ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onRequestSent(node, requestId, action, request, options); + final TransportRequestOptions finalOptions = options; + ChannelFutureListener channelFutureListener = + f -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions); + future.addListener(channelFutureListener); } finally { if (!addedReleaseListener) { Releasables.close(bStream.bytes()); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 57893ff1908b1..91b6bc120add4 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -37,6 +37,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,7 +115,10 @@ public void sendResponse(TransportResponse response, TransportResponseOptions op ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener); addedReleaseListener = true; - transportServiceAdapter.onResponseSent(requestId, action, response, options); + final TransportResponseOptions finalOptions = options; + ChannelFutureListener onResponseSentListener = + f -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions); + future.addListener(onResponseSentListener); } finally { if (!addedReleaseListener && bStream != null) { Releasables.close(bStream.bytes()); @@ -137,8 +141,10 @@ public void sendResponse(Throwable error) throws IOException { BytesReference bytes = stream.bytes(); ChannelBuffer buffer = bytes.toChannelBuffer(); NettyHeader.writeHeader(buffer, requestId, status, version); - channel.write(buffer); - transportServiceAdapter.onResponseSent(requestId, action, error); + ChannelFuture future = channel.write(buffer); + ChannelFutureListener onResponseSentListener = + f -> transportServiceAdapter.onResponseSent(requestId, action, error); + future.addListener(onResponseSentListener); } private void close() {