Skip to content

Commit

Permalink
finagle-core: Halve Netty tasks in Netty4 pipeline client
Browse files Browse the repository at this point in the history
Problem
The pipelining client is used for the Memcache client. Extra tasks are
generated for the Netty event loop during message sending and receiving.

Solution
When receiving a response from the server, we process the response in
the current thread if it is a Netty event loop thread. When sending a message,
we do not hand off the message to a Netty thread inside Netty4PushChannelHandle,
if the sending is already performed within the Netty event loop.

Differential Revision: https://phabricator.twitter.biz/D1193718
  • Loading branch information
Ivan Gorbachev authored and jenkins committed Jan 27, 2025
1 parent 50265ef commit 7e255d3
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Runtime Behavior Changes
* finagle-core: `Backoff.equalJittered` is now deprecated and falls back to `exponentialJittered`. ``PHAB_ID=D1182535``
* finagle-core: `PipeliningClientPushSession` now collects stats `epoll_queue_delay_ns` and `message_send_latency_ns`.
``PHAB_ID=D1185421``
* finagle-core: Halve Netty tasks in Netty4 pipeline client. ``PHAB_ID=D1193718``

New Features
~~~~~~~~~~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class PipeliningClientPushSession[In, Out](
else {
h_queue.offer(p)
h_queueSize += 1
handle.send(request) { _ =>
handle.sendInsideEventLoop(request) { _ =>
messageSendLatency.add(System.nanoTime() - handleStartTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.twitter.finagle.pushsession

import com.twitter.finagle.{ClientConnection, Status}
import com.twitter.finagle.ClientConnection
import com.twitter.finagle.Status
import com.twitter.finagle.ssl.session.SslSessionInfo
import com.twitter.util.{Closable, Try}
import com.twitter.util.Closable
import com.twitter.util.Try
import java.util.concurrent.Executor

/**
Expand Down Expand Up @@ -60,6 +62,8 @@ trait PushChannelHandle[In, Out] extends Closable with ClientConnection {
*/
def send(message: Out)(onComplete: Try[Unit] => Unit): Unit

def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit

/**
* Write a message to the underlying IO pipeline.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package com.twitter.finagle.pushsession

import com.twitter.finagle.Status
import com.twitter.finagle.ssl.session.SslSessionInfo
import com.twitter.util.{Future, Time, Try}
import com.twitter.util.Future
import com.twitter.util.Time
import com.twitter.util.Try
import java.net.SocketAddress
import java.util.concurrent.Executor

Expand All @@ -25,6 +27,10 @@ abstract class PushChannelHandleProxy[In, Out](underlying: PushChannelHandle[In,
def send(message: Out)(onComplete: (Try[Unit]) => Unit): Unit =
underlying.send(message)(onComplete)

def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
underlying.sendInsideEventLoop(message)(onComplete)
}

def sendAndForget(message: Out): Unit = underlying.sendAndForget(message)

def sendAndForget(messages: Iterable[Out]): Unit = underlying.sendAndForget(messages)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package com.twitter.finagle.pushsession.utils

import com.twitter.finagle.Status
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession}
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo}
import com.twitter.util.{Future, Promise, Return, Time, Try}
import java.net.{InetSocketAddress, SocketAddress}
import com.twitter.finagle.pushsession.PushChannelHandle
import com.twitter.finagle.pushsession.PushSession
import com.twitter.finagle.ssl.session.NullSslSessionInfo
import com.twitter.finagle.ssl.session.SslSessionInfo
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Return
import com.twitter.util.Time
import com.twitter.util.Try
import java.net.InetSocketAddress
import java.net.SocketAddress
import scala.collection.mutable

class MockChannelHandle[In, Out](var currentSession: PushSession[In, Out])
Expand Down Expand Up @@ -81,4 +88,8 @@ class MockChannelHandle[In, Out](var currentSession: PushSession[In, Out])
closedCalled = true
onClose
}

override def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
pendingWrites += SendOne(message, onComplete)
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package com.twitter.finagle.mux

import com.twitter.concurrent.AsyncQueue
import com.twitter.finagle.{ChannelClosedException, Status}
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession, SentinelSession}
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo}
import com.twitter.finagle.ChannelClosedException
import com.twitter.finagle.Status
import com.twitter.finagle.pushsession.PushChannelHandle
import com.twitter.finagle.pushsession.PushSession
import com.twitter.finagle.pushsession.SentinelSession
import com.twitter.finagle.ssl.session.NullSslSessionInfo
import com.twitter.finagle.ssl.session.SslSessionInfo
import com.twitter.finagle.util.Updater
import com.twitter.util.{Future, Promise, Return, Throw, Time, Try}
import com.twitter.util.Future
import com.twitter.util.Promise
import com.twitter.util.Return
import com.twitter.util.Throw
import com.twitter.util.Time
import com.twitter.util.Try
import java.net.SocketAddress
import java.util.concurrent.Executor
import scala.util.control.NonFatal
Expand Down Expand Up @@ -73,6 +82,9 @@ private[mux] class QueueChannelHandle[In, Out](destinationQueue: AsyncQueue[Out]
def send(message: Out)(onComplete: Try[Unit] => Unit): Unit =
send(message :: Nil)(onComplete)

def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit =
send(message :: Nil)(onComplete)

def send(messages: Iterable[Out])(onComplete: Try[Unit] => Unit): Unit = {
serialExecutor.execute(new Runnable {
def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package com.twitter.finagle.netty4.pushsession

import com.twitter.finagle.{
ChannelClosedException,
ChannelException,
Status,
UnknownChannelException
}
import com.twitter.finagle.pushsession.{PushChannelHandle, PushSession}
import com.twitter.finagle.ssl.session.{NullSslSessionInfo, SslSessionInfo, UsingSslSessionInfo}
import com.twitter.finagle.ChannelClosedException
import com.twitter.finagle.ChannelException
import com.twitter.finagle.Status
import com.twitter.finagle.UnknownChannelException
import com.twitter.finagle.pushsession.PushChannelHandle
import com.twitter.finagle.pushsession.PushSession
import com.twitter.finagle.ssl.session.NullSslSessionInfo
import com.twitter.finagle.ssl.session.SslSessionInfo
import com.twitter.finagle.ssl.session.UsingSslSessionInfo
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.logging.Logger
import com.twitter.util._
import io.netty.buffer.ByteBuf
import io.netty.channel.{
Channel,
ChannelHandlerContext,
ChannelInboundHandlerAdapter,
ChannelPipeline,
EventLoop
}
import io.netty.channel.Channel
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoop
import io.netty.handler.ssl.SslHandler
import io.netty.util
import io.netty.util.concurrent.GenericFutureListener
Expand Down Expand Up @@ -142,12 +141,23 @@ private final class Netty4PushChannelHandle[In, Out] private (
}

// See note above about the scheduling of send messages
def send(message: Out)(continuation: (Try[Unit]) => Unit): Unit = {
def send(message: Out)(onComplete: (Try[Unit]) => Unit): Unit = {
safeExecutor.safeExecute(new SafeRunnable {
def tryRun(): Unit = handleWriteAndFlush(message, continuation)
def tryRun(): Unit = handleWriteAndFlush(message, onComplete)
})
}

def sendInsideEventLoop(message: Out)(onComplete: Try[Unit] => Unit): Unit = {
if (!ch.eventLoop().inEventLoop()) {
throw new IllegalStateException(
s"Expected to be called from within the `Channel`s " +
s"associated `EventLoop` (${ch.eventLoop}), instead called " +
s"from thread ${Thread.currentThread}")
}

handleWriteAndFlush(message, onComplete)
}

// See note above about the scheduling of send messages
def sendAndForget(message: Out): Unit =
safeExecutor.safeExecute(new SafeRunnable {
Expand Down Expand Up @@ -268,9 +278,13 @@ private final class Netty4PushChannelHandle[In, Out] private (

override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
val m = msg.asInstanceOf[In]
safeExecutor.safeExecute(new SafeRunnable {
def tryRun(): Unit = session.receive(m)
})
if (!ctx.channel().eventLoop().inEventLoop()) {
safeExecutor.safeExecute(new SafeRunnable {
def tryRun(): Unit = session.receive(m)
})
} else {
session.receive(m)
}
}

override def channelInactive(ctx: ChannelHandlerContext): Unit =
Expand Down

0 comments on commit 7e255d3

Please sign in to comment.