From 8aa3d1d38bb6b8a3e9befb8c29852ab1ec7b43c3 Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Fri, 24 Aug 2012 10:52:48 -0700 Subject: [PATCH 1/7] Shutdown the cleanup executor as well during shutdown to free up threads. --- .../linkedin/norbert/network/netty/ClientChannelHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala index 0ef9efc5..acdd175b 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala @@ -147,6 +147,7 @@ class ClientChannelHandler(clientName: Option[String], def shutdown: Unit = { responseHandler.shutdown + cleanupExecutor.shutdownNow statsJMX.foreach { JMX.unregister(_) } serverErrorStrategyJMX.foreach { JMX.unregister(_) } clientStatsStrategyJMX.foreach { JMX.unregister(_) } @@ -237,4 +238,4 @@ class ClientStatisticsRequestStrategyMBeanImpl(clientName: Option[String], servi def setOutlierConstant(c: Double) = { strategy.outlierConstant = c} def getTotalNodesMarkedDown = strategy.totalNodesMarkedDown.get.abs -} \ No newline at end of file +} From 10146651fab23a86fb1b8b9ed37c31f01adea20b Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Fri, 24 Aug 2012 10:56:41 -0700 Subject: [PATCH 2/7] Clean all the channels in the channel pool every K requests --- .../norbert/norbertutils/package.scala | 8 +- .../norbert/network/netty/ChannelPool.scala | 79 ++++++++++++------- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala b/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala index 138e3ea4..2c68478a 100644 --- a/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala +++ b/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala @@ -45,9 +45,11 @@ package object norbertutils { def atomicCreateIfAbsent[K, V](map: ConcurrentMap[K, V], key: K)(fn: K => V): V = { val oldValue = map.get(key) if(oldValue == null) { - val newValue = fn(key) - map.putIfAbsent(key, newValue) - map.get(key) + map.synchronized { + val newValue = fn(key) + map.putIfAbsent(key, newValue) + map.get(key) + } } else { oldValue } diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala index 83e6d4c5..42ff195f 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala @@ -59,7 +59,8 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi private val poolSize = new AtomicInteger(0) private val closed = new AtomicBoolean private val requestsSent = new AtomicInteger(0) - private var channelBufferRecycleFrequence = 1000 + private val lock = new java.util.concurrent.locks.ReentrantReadWriteLock(true) + private var channelBufferRecycleFrequence = 10000 private val channelBufferRecycleCounter = new AtomicInteger(channelBufferRecycleFrequence) private val jmxHandle = JMX.register(new MBean(classOf[ChannelPoolMBean], "address=%s,port=%d".format(address.getHostName, address.getPort)) with ChannelPoolMBean { @@ -112,40 +113,64 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } } - if(!isFirstWriteToChannel && ((channelBufferRecycleCounter.incrementAndGet % channelBufferRecycleFrequence) == 0)) - { - val pipeline = channel.getPipeline - try { - pipeline.remove("frameDecoder") - pipeline.addBefore("protobufDecoder", "frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4)) - pool.offer(channel) - } catch { - case e: Exception => log.warn("error while replacing frameDecoder, discarding channel") + + if(!isFirstWriteToChannel && ((channelBufferRecycleCounter.incrementAndGet % channelBufferRecycleFrequence) == 0)) { + cleanFrameDecoders + } + + pool.offer(channel) + } + + /** + * Cleans the frame decoders in the channel pool for garbage collection / resource utilization reasons. + * We acquire the write lock at this point so that we can block all readers from taking from the queue. It would + * be dangerous if we modified their frame decoder after they pull from the queue. It should be possible to + * make this an even more fine-grained lock if contention is absolutely an issue, but cleaning the frame decoders + * is something that should be done rarely and shouldn't take long. + */ + private def cleanFrameDecoders = { + lock.writeLock.lock() + try { + import scala.collection.JavaConversions._ + for(channel <- pool.iterator()) { + val pipeline = channel.getPipeline + try { + pipeline.remove("frameDecoder") + pipeline.addBefore("protobufDecoder", "frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4)) + } catch { + case e: Exception => log.warn("Error while replacing frameDecoder, continuing") + } } - } else - { - pool.offer(channel) + } catch { + case ex: Exception => log.warn("Exception while cleaning the frame decoders, ignoring") } + lock.writeLock.unlock() } private def checkoutChannel: Option[Channel] = { - var found = false - var channel: Channel = null - - while (!pool.isEmpty && !found) { - pool.poll match { - case null => // do nothing + lock.readLock().lock(); - case c => - if (c.isConnected) { - channel = c - found = true - } else { - poolSize.decrementAndGet - } + var channel: Channel = null + try { + var found = false + while (!pool.isEmpty && !found) { + pool.poll match { + case null => // do nothing + + case c => + if (c.isConnected) { + channel = c + found = true + } else { + poolSize.decrementAndGet + } + } } + } catch { + case ex: Exception => log.error("Error checking out channel") } - + + lock.readLock().unlock(); Option(channel) } From 651b4bff29d333da25063771d5b3be0afb71807b Mon Sep 17 00:00:00 2001 From: Terence Yim Date: Fri, 24 Aug 2012 10:52:48 -0700 Subject: [PATCH 3/7] Shutdown the cleanup executor as well during shutdown to free up threads. --- .../linkedin/norbert/network/netty/ClientChannelHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala index 0ef9efc5..acdd175b 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala @@ -147,6 +147,7 @@ class ClientChannelHandler(clientName: Option[String], def shutdown: Unit = { responseHandler.shutdown + cleanupExecutor.shutdownNow statsJMX.foreach { JMX.unregister(_) } serverErrorStrategyJMX.foreach { JMX.unregister(_) } clientStatsStrategyJMX.foreach { JMX.unregister(_) } @@ -237,4 +238,4 @@ class ClientStatisticsRequestStrategyMBeanImpl(clientName: Option[String], servi def setOutlierConstant(c: Double) = { strategy.outlierConstant = c} def getTotalNodesMarkedDown = strategy.totalNodesMarkedDown.get.abs -} \ No newline at end of file +} From aa303a5bd8f30af5aee3102a667620970274dc42 Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Fri, 24 Aug 2012 10:56:41 -0700 Subject: [PATCH 4/7] Clean all the channels in the channel pool every K requests --- .../norbert/norbertutils/package.scala | 8 +- .../norbert/network/netty/ChannelPool.scala | 79 ++++++++++++------- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala b/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala index 138e3ea4..2c68478a 100644 --- a/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala +++ b/cluster/src/main/scala/com/linkedin/norbert/norbertutils/package.scala @@ -45,9 +45,11 @@ package object norbertutils { def atomicCreateIfAbsent[K, V](map: ConcurrentMap[K, V], key: K)(fn: K => V): V = { val oldValue = map.get(key) if(oldValue == null) { - val newValue = fn(key) - map.putIfAbsent(key, newValue) - map.get(key) + map.synchronized { + val newValue = fn(key) + map.putIfAbsent(key, newValue) + map.get(key) + } } else { oldValue } diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala index 83e6d4c5..42ff195f 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala @@ -59,7 +59,8 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi private val poolSize = new AtomicInteger(0) private val closed = new AtomicBoolean private val requestsSent = new AtomicInteger(0) - private var channelBufferRecycleFrequence = 1000 + private val lock = new java.util.concurrent.locks.ReentrantReadWriteLock(true) + private var channelBufferRecycleFrequence = 10000 private val channelBufferRecycleCounter = new AtomicInteger(channelBufferRecycleFrequence) private val jmxHandle = JMX.register(new MBean(classOf[ChannelPoolMBean], "address=%s,port=%d".format(address.getHostName, address.getPort)) with ChannelPoolMBean { @@ -112,40 +113,64 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } } - if(!isFirstWriteToChannel && ((channelBufferRecycleCounter.incrementAndGet % channelBufferRecycleFrequence) == 0)) - { - val pipeline = channel.getPipeline - try { - pipeline.remove("frameDecoder") - pipeline.addBefore("protobufDecoder", "frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4)) - pool.offer(channel) - } catch { - case e: Exception => log.warn("error while replacing frameDecoder, discarding channel") + + if(!isFirstWriteToChannel && ((channelBufferRecycleCounter.incrementAndGet % channelBufferRecycleFrequence) == 0)) { + cleanFrameDecoders + } + + pool.offer(channel) + } + + /** + * Cleans the frame decoders in the channel pool for garbage collection / resource utilization reasons. + * We acquire the write lock at this point so that we can block all readers from taking from the queue. It would + * be dangerous if we modified their frame decoder after they pull from the queue. It should be possible to + * make this an even more fine-grained lock if contention is absolutely an issue, but cleaning the frame decoders + * is something that should be done rarely and shouldn't take long. + */ + private def cleanFrameDecoders = { + lock.writeLock.lock() + try { + import scala.collection.JavaConversions._ + for(channel <- pool.iterator()) { + val pipeline = channel.getPipeline + try { + pipeline.remove("frameDecoder") + pipeline.addBefore("protobufDecoder", "frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4)) + } catch { + case e: Exception => log.warn("Error while replacing frameDecoder, continuing") + } } - } else - { - pool.offer(channel) + } catch { + case ex: Exception => log.warn("Exception while cleaning the frame decoders, ignoring") } + lock.writeLock.unlock() } private def checkoutChannel: Option[Channel] = { - var found = false - var channel: Channel = null - - while (!pool.isEmpty && !found) { - pool.poll match { - case null => // do nothing + lock.readLock().lock(); - case c => - if (c.isConnected) { - channel = c - found = true - } else { - poolSize.decrementAndGet - } + var channel: Channel = null + try { + var found = false + while (!pool.isEmpty && !found) { + pool.poll match { + case null => // do nothing + + case c => + if (c.isConnected) { + channel = c + found = true + } else { + poolSize.decrementAndGet + } + } } + } catch { + case ex: Exception => log.error("Error checking out channel") } - + + lock.readLock().unlock(); Option(channel) } From 2c43e106b634aa175c3113221ad8f99d3d10452c Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Fri, 31 Aug 2012 10:40:52 -0700 Subject: [PATCH 5/7] Channels in the channel pool can "expire" after a certain amount of time --- .../norbert/network/NetworkDefaults.scala | 5 + .../network/client/NetworkClient.scala | 2 + .../norbert/network/netty/ChannelPool.scala | 121 +++++++----------- .../network/netty/NettyNetworkClient.scala | 1 + .../network/netty/ChannelPoolSpec.scala | 28 +++- 5 files changed, 83 insertions(+), 74 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/NetworkDefaults.scala b/network/src/main/scala/com/linkedin/norbert/network/NetworkDefaults.scala index acd3ce9a..7ccd273c 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/NetworkDefaults.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/NetworkDefaults.scala @@ -46,6 +46,11 @@ object NetworkDefaults { */ val STALE_REQUEST_TIMEOUT_MINS = 1 + /** + * How long to keep a channel alive before we'll toss it away + */ + val CLOSE_CHANNEL_TIMEOUT_MILLIS = 30000L + /** * The amount of time before a request is considered "timed out" by the processing queue. If for some reason (perhaps a GC), when the request * is pulled from the queue and has been sitting in the queue for longer than this time, a HeavyLoadException is thrown to the client, signalling a throttle. diff --git a/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala index 3fc8dd29..36e8eef8 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/client/NetworkClient.scala @@ -34,8 +34,10 @@ class NetworkClientConfig { var connectTimeoutMillis = NetworkDefaults.CONNECT_TIMEOUT_MILLIS var writeTimeoutMillis = NetworkDefaults.WRITE_TIMEOUT_MILLIS var maxConnectionsPerNode = NetworkDefaults.MAX_CONNECTIONS_PER_NODE + var staleRequestTimeoutMins = NetworkDefaults.STALE_REQUEST_TIMEOUT_MINS var staleRequestCleanupFrequenceMins = NetworkDefaults.STALE_REQUEST_CLEANUP_FREQUENCY_MINS + var closeChannelTimeMillis = NetworkDefaults.CLOSE_CHANNEL_TIMEOUT_MILLIS var requestStatisticsWindow = NetworkDefaults.REQUEST_STATISTICS_WINDOW diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala index 42ff195f..e10895f6 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala @@ -28,13 +28,17 @@ import jmx.JMX import logging.Logging import cluster.{Node, ClusterClient} import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} -import norbertutils.{SystemClock} +import norbertutils.{Clock, SystemClock} import java.io.IOException import common.{BackoffStrategy, SimpleBackoffStrategy} +import java.util class ChannelPoolClosedException extends Exception -class ChannelPoolFactory(maxConnections: Int, openTimeoutMillis: Int, writeTimeoutMillis: Int, bootstrap: ClientBootstrap, errorStrategy: Option[BackoffStrategy]) { +class ChannelPoolFactory(maxConnections: Int, openTimeoutMillis: Int, writeTimeoutMillis: Int, + bootstrap: ClientBootstrap, + errorStrategy: Option[BackoffStrategy], + closeChannelTimeMillis: Long) { def newChannelPool(address: InetSocketAddress): ChannelPool = { val group = new DefaultChannelGroup("norbert-client [%s]".format(address)) @@ -44,7 +48,9 @@ class ChannelPoolFactory(maxConnections: Int, openTimeoutMillis: Int, writeTimeo writeTimeoutMillis = writeTimeoutMillis, bootstrap = bootstrap, channelGroup = group, - errorStrategy = errorStrategy) + closeChannelTimeMillis = closeChannelTimeMillis, + errorStrategy = errorStrategy, + clock = SystemClock) } def shutdown: Unit = { @@ -52,16 +58,25 @@ class ChannelPoolFactory(maxConnections: Int, openTimeoutMillis: Int, writeTimeo } } -class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMillis: Int, writeTimeoutMillis: Int, bootstrap: ClientBootstrap, - channelGroup: ChannelGroup, val errorStrategy: Option[BackoffStrategy]) extends Logging { - private val pool = new ArrayBlockingQueue[Channel](maxConnections) +class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMillis: Int, writeTimeoutMillis: Int, + bootstrap: ClientBootstrap, + channelGroup: ChannelGroup, + closeChannelTimeMillis: Long, + val errorStrategy: Option[BackoffStrategy], + clock: Clock) extends Logging { + + case class PoolEntry(channel: Channel, creationTime: Long) { + def age = System.currentTimeMillis() - creationTime + + def isFresh(closeChannelTimeMillis: Long) = closeChannelTimeMillis > 0 && age < closeChannelTimeMillis + } + + private val pool = new ArrayBlockingQueue[PoolEntry](maxConnections) private val waitingWrites = new LinkedBlockingQueue[Request[_, _]] private val poolSize = new AtomicInteger(0) private val closed = new AtomicBoolean private val requestsSent = new AtomicInteger(0) private val lock = new java.util.concurrent.locks.ReentrantReadWriteLock(true) - private var channelBufferRecycleFrequence = 10000 - private val channelBufferRecycleCounter = new AtomicInteger(channelBufferRecycleFrequence) private val jmxHandle = JMX.register(new MBean(classOf[ChannelPoolMBean], "address=%s,port=%d".format(address.getHostName, address.getPort)) with ChannelPoolMBean { import scala.math._ @@ -72,19 +87,15 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi def getMaxChannels = maxConnections def getNumberRequestsSent = requestsSent.get.abs - - def getChannelBufferRecycleFrequence = channelBufferRecycleFrequence - - def setChannelBufferRecycleFrequence(noReqsPerRecycle: Int) {channelBufferRecycleFrequence = max(noReqsPerRecycle, 10) } }) def sendRequest[RequestMsg, ResponseMsg](request: Request[RequestMsg, ResponseMsg]): Unit = if (closed.get) { throw new ChannelPoolClosedException } else { checkoutChannel match { - case Some(channel) => - writeRequestToChannel(request, channel) - checkinChannel(channel) + case Some(poolEntry) => + writeRequestToChannel(request, poolEntry.channel) + checkinChannel(poolEntry) case None => waitingWrites.offer(request) @@ -99,7 +110,7 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi } } - private def checkinChannel(channel: Channel, isFirstWriteToChannel: Boolean = false) { + private def checkinChannel(poolEntry: PoolEntry, isFirstWriteToChannel: Boolean = false) { while (!waitingWrites.isEmpty) { waitingWrites.poll match { case null => // do nothing @@ -107,71 +118,35 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi case request => val timeout = if (isFirstWriteToChannel) writeTimeoutMillis + openTimeoutMillis else writeTimeoutMillis if((System.currentTimeMillis - request.timestamp) < timeout) - writeRequestToChannel(request, channel) + writeRequestToChannel(request, poolEntry.channel) else request.onFailure(new TimeoutException("Timed out while waiting to write")) } } + if(poolEntry.isFresh(closeChannelTimeMillis)) + pool.offer(poolEntry) - if(!isFirstWriteToChannel && ((channelBufferRecycleCounter.incrementAndGet % channelBufferRecycleFrequence) == 0)) { - cleanFrameDecoders - } - - pool.offer(channel) } - /** - * Cleans the frame decoders in the channel pool for garbage collection / resource utilization reasons. - * We acquire the write lock at this point so that we can block all readers from taking from the queue. It would - * be dangerous if we modified their frame decoder after they pull from the queue. It should be possible to - * make this an even more fine-grained lock if contention is absolutely an issue, but cleaning the frame decoders - * is something that should be done rarely and shouldn't take long. - */ - private def cleanFrameDecoders = { - lock.writeLock.lock() - try { - import scala.collection.JavaConversions._ - for(channel <- pool.iterator()) { - val pipeline = channel.getPipeline - try { - pipeline.remove("frameDecoder") - pipeline.addBefore("protobufDecoder", "frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4)) - } catch { - case e: Exception => log.warn("Error while replacing frameDecoder, continuing") - } - } - } catch { - case ex: Exception => log.warn("Exception while cleaning the frame decoders, ignoring") - } - lock.writeLock.unlock() - } + private def checkoutChannel: Option[PoolEntry] = { + var poolEntry: PoolEntry = null + var found = false + while (!pool.isEmpty && !found) { + pool.poll match { + case null => // do nothing - private def checkoutChannel: Option[Channel] = { - lock.readLock().lock(); - - var channel: Channel = null - try { - var found = false - while (!pool.isEmpty && !found) { - pool.poll match { - case null => // do nothing - - case c => - if (c.isConnected) { - channel = c - found = true - } else { - poolSize.decrementAndGet - } - } + case pe => + if (pe.channel.isConnected && pe.isFresh(closeChannelTimeMillis)) { + poolEntry = pe + found = true + } else { + poolSize.decrementAndGet + } } - } catch { - case ex: Exception => log.error("Error checking out channel") } - - lock.readLock().unlock(); - Option(channel) + + Option(poolEntry) } private def openChannel(request: Request[_, _]) { @@ -188,7 +163,9 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi log.debug("Opened a channel to: %s".format(address)) channelGroup.add(channel) - checkinChannel(channel, isFirstWriteToChannel = true) + + val poolEntry = PoolEntry(channel, System.currentTimeMillis()) + checkinChannel(poolEntry, isFirstWriteToChannel = true) } else { log.error(openFuture.getCause, "Error when opening channel to: %s, marking offline".format(address)) errorStrategy.foreach(_.notifyFailure(request.node)) @@ -224,6 +201,4 @@ trait ChannelPoolMBean { def getMaxChannels: Int def getWriteQueueSize: Int def getNumberRequestsSent: Int - def getChannelBufferRecycleFrequence: Int - def setChannelBufferRecycleFrequence(noReqsPerRecycle: Int): Unit } diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkClient.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkClient.scala index 64c9bf8d..71b28b10 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkClient.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkClient.scala @@ -115,6 +115,7 @@ abstract class BaseNettyNetworkClient(clientConfig: NetworkClientConfig) extends openTimeoutMillis = clientConfig.connectTimeoutMillis, writeTimeoutMillis = clientConfig.writeTimeoutMillis, bootstrap = bootstrap, + closeChannelTimeMillis = clientConfig.closeChannelTimeMillis, errorStrategy = Some(channelPoolStrategy)) val clusterIoClient = new NettyClusterIoClient(channelPoolFactory, strategy) diff --git a/network/src/test/scala/com/linkedin/norbert/network/netty/ChannelPoolSpec.scala b/network/src/test/scala/com/linkedin/norbert/network/netty/ChannelPoolSpec.scala index f2e0ebc6..4dc6c3b9 100644 --- a/network/src/test/scala/com/linkedin/norbert/network/netty/ChannelPoolSpec.scala +++ b/network/src/test/scala/com/linkedin/norbert/network/netty/ChannelPoolSpec.scala @@ -25,12 +25,15 @@ import org.jboss.netty.channel.{Channel, ChannelFutureListener, ChannelFuture} import com.google.protobuf.Message import java.util.concurrent.{TimeoutException, TimeUnit} import java.net.InetSocketAddress +import norbertutils.MockClock class ChannelPoolSpec extends Specification with Mockito { val channelGroup = mock[ChannelGroup] val bootstrap = mock[ClientBootstrap] val address = new InetSocketAddress("localhost", 31313) - val channelPool = new ChannelPool(address, 1, 100, 100, bootstrap, channelGroup, None) + + val channelPool = new ChannelPool(address, 1, 100, 100, bootstrap, channelGroup, + closeChannelTimeMillis = 10000, errorStrategy = None, clock = MockClock) "ChannelPool" should { "close the ChannelGroup when close is called" in { @@ -122,6 +125,29 @@ class ChannelPoolSpec extends Specification with Mockito { } } + "open a new channel if a channel has expired" in { + val channel = mock[Channel] + val future = new TestChannelFuture(channel, true) + bootstrap.connect(address) returns future + channelGroup.add(channel) returns true + channel.write(any[Request[_, _]]) returns future + + val request = mock[Request[_, _]] + channelPool.sendRequest(request) + future.listener.operationComplete(future) + + MockClock.currentTime = 20000L + + channelPool.sendRequest(request) + future.listener.operationComplete(future) + + got { + two(channelGroup).add(channel) + two(bootstrap).connect(address) + } + } + + "write all queued requests" in { val channel = mock[Channel] val request = mock[Request[_, _]] From 03c3f91a86f425fce2a2c3b27af1041ac0c0e0dc Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Fri, 31 Aug 2012 10:42:08 -0700 Subject: [PATCH 6/7] Fix bug in the timeout iterator --- .../network/common/NorbertFuture.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala index 14ce81f9..3fcc550d 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/common/NorbertFuture.scala @@ -99,21 +99,25 @@ case class TimeoutIterator[ResponseMsg](inner: ResponseIterator[ResponseMsg], ti def next: ResponseMsg = { val before = System.currentTimeMillis - val res = inner.next(timeLeft.get, TimeUnit.MILLISECONDS) - val time = (System.currentTimeMillis - before).asInstanceOf[Int] - timeLeft.addAndGet(-time) - res + try { + return inner.next(timeLeft.get, TimeUnit.MILLISECONDS) + } finally { + val time = (System.currentTimeMillis - before).asInstanceOf[Int] + timeLeft.addAndGet(-time) + } } def next(t: Long, unit: TimeUnit): ResponseMsg = { val before = System.currentTimeMillis val methodTimeout = unit.toMillis(t) - val res = inner.next(math.min(methodTimeout, timeLeft.get), TimeUnit.MILLISECONDS) - val time = (System.currentTimeMillis - before).asInstanceOf[Int] - timeLeft.addAndGet(-time) - res + try { + return inner.next(math.min(methodTimeout, timeLeft.get), TimeUnit.MILLISECONDS) + } finally { + val time = (System.currentTimeMillis - before).asInstanceOf[Int] + timeLeft.addAndGet(-time) + } } } From 5c835aeb8ebeb74808cb354790fb2aa98c5251e4 Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Fri, 31 Aug 2012 12:31:22 -0700 Subject: [PATCH 7/7] Close the channel if we aren't planning on returning it to the pool --- .../norbert/network/netty/ChannelPool.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala index e10895f6..bfdbf255 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ChannelPool.scala @@ -126,7 +126,8 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi if(poolEntry.isFresh(closeChannelTimeMillis)) pool.offer(poolEntry) - + else + poolEntry.channel.close() } private def checkoutChannel: Option[PoolEntry] = { @@ -137,9 +138,13 @@ class ChannelPool(address: InetSocketAddress, maxConnections: Int, openTimeoutMi case null => // do nothing case pe => - if (pe.channel.isConnected && pe.isFresh(closeChannelTimeMillis)) { - poolEntry = pe - found = true + if (pe.channel.isConnected) { + if(pe.isFresh(closeChannelTimeMillis)) { + poolEntry = pe + found = true + } else { + pe.channel.close() + } } else { poolSize.decrementAndGet }