Skip to content

Commit

Permalink
Make idle timeout configurable (#824)
Browse files Browse the repository at this point in the history
* Make idle timeout configurable

Motivation:

We shed idle connections but don't offer the ability to configure the
timeout; we should.

Modifications:

- Make server and client idle timeouts configurable
- Add integ tests to check the timeouts works

Result:

Idle timeouts are configurable, more tests.
  • Loading branch information
glbrntt authored Jun 8, 2020
1 parent 6cea0c8 commit 0048e6b
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 3 deletions.
11 changes: 10 additions & 1 deletion Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ extension ClientConnection {
/// The connection backoff configuration. If no connection retrying is required then this should
/// be `nil`.
public var connectionBackoff: ConnectionBackoff?

/// The amount of time to wait before closing the connection. The idle timeout will start only
/// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
///
/// If a connection becomes idle, starting a new RPC will automatically create a new connection.
public var connectionIdleTimeout: TimeAmount

/// The HTTP/2 flow control target window size.
public var httpTargetWindowSize: Int
Expand Down Expand Up @@ -269,6 +275,7 @@ extension ClientConnection {
connectivityStateDelegate: ConnectivityStateDelegate? = nil,
tls: Configuration.TLS? = nil,
connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
connectionIdleTimeout: TimeAmount = .minutes(5),
httpTargetWindowSize: Int = 65535
) {
self.target = target
Expand All @@ -277,6 +284,7 @@ extension ClientConnection {
self.connectivityStateDelegate = connectivityStateDelegate
self.tls = tls
self.connectionBackoff = connectionBackoff
self.connectionIdleTimeout = connectionIdleTimeout
self.httpTargetWindowSize = httpTargetWindowSize
}
}
Expand Down Expand Up @@ -334,6 +342,7 @@ extension Channel {
tlsConfiguration: TLSConfiguration?,
tlsServerHostname: String?,
connectionManager: ConnectionManager,
connectionIdleTimeout: TimeAmount,
errorDelegate: ClientErrorDelegate?,
logger: Logger
) -> EventLoopFuture<Void> {
Expand All @@ -346,7 +355,7 @@ extension Channel {
}.flatMap { _ in
return self.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { http2Handler in
self.pipeline.addHandler(
GRPCIdleHandler(mode: .client(connectionManager)),
GRPCIdleHandler(mode: .client(connectionManager), idleTimeout: connectionIdleTimeout),
position: .after(http2Handler)
)
}.flatMap {
Expand Down
1 change: 1 addition & 0 deletions Sources/GRPC/ConnectionManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ extension ConnectionManager {
tlsConfiguration: configuration.tls?.configuration,
tlsServerHostname: serverHostname,
connectionManager: self,
connectionIdleTimeout: configuration.connectionIdleTimeout,
errorDelegate: configuration.errorDelegate,
logger: self.logger
)
Expand Down
12 changes: 12 additions & 0 deletions Sources/GRPC/GRPCChannel/GRPCChannelBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ extension ClientConnection {
private var connectionBackoffIsEnabled = true
private var errorDelegate: ClientErrorDelegate?
private var connectivityStateDelegate: ConnectivityStateDelegate?
private var connectionIdleTimeout: TimeAmount = .minutes(5)
private var httpTargetWindowSize: Int = 65535

fileprivate init(group: EventLoopGroup) {
Expand All @@ -50,6 +51,7 @@ extension ClientConnection {
connectivityStateDelegate: self.connectivityStateDelegate,
tls: self.maybeTLS,
connectionBackoff: self.connectionBackoffIsEnabled ? self.connectionBackoff : nil,
connectionIdleTimeout: self.connectionIdleTimeout,
httpTargetWindowSize: self.httpTargetWindowSize
)
return ClientConnection(configuration: configuration)
Expand Down Expand Up @@ -140,6 +142,16 @@ extension ClientConnection.Builder {
self.connectionBackoffIsEnabled = enabled
return self
}

/// The amount of time to wait before closing the connection. The idle timeout will start only
/// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. If a
/// connection becomes idle, starting a new RPC will automatically create a new connection.
/// Defaults to 5 minutes if not set.
@discardableResult
public func withConnectionIdleTimeout(_ timeout: TimeAmount) -> Self {
self.connectionIdleTimeout = timeout
return self
}
}

extension ClientConnection.Builder {
Expand Down
6 changes: 5 additions & 1 deletion Sources/GRPC/HTTPProtocolSwitcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class HTTPProtocolSwitcher {
private let errorDelegate: ServerErrorDelegate?
private let logger = Logger(subsystem: .serverChannelCall)
private let httpTargetWindowSize: Int
private let idleTimeout: TimeAmount

// We could receive additional data after the initial data and before configuring
// the pipeline; buffer it and fire it down the pipeline once it is configured.
Expand All @@ -45,10 +46,12 @@ internal class HTTPProtocolSwitcher {
init(
errorDelegate: ServerErrorDelegate?,
httpTargetWindowSize: Int = 65535,
idleTimeout: TimeAmount,
handlersInitializer: (@escaping (Channel) -> EventLoopFuture<Void>)
) {
self.errorDelegate = errorDelegate
self.httpTargetWindowSize = httpTargetWindowSize
self.idleTimeout = idleTimeout
self.handlersInitializer = handlersInitializer
}
}
Expand Down Expand Up @@ -146,7 +149,8 @@ extension HTTPProtocolSwitcher: ChannelInboundHandler, RemovableChannelHandler {
.flatMap { self.handlersInitializer(streamChannel) }
}.flatMap { multiplexer in
// Add an idle handler between the two HTTP2 handlers.
context.channel.pipeline.addHandler(GRPCIdleHandler(mode: .server), position: .before(multiplexer))
let idleHandler = GRPCIdleHandler(mode: .server, idleTimeout: self.idleTimeout)
return context.channel.pipeline.addHandler(idleHandler, position: .before(multiplexer))
}
.cascade(to: pipelineConfigured)
}
Expand Down
9 changes: 8 additions & 1 deletion Sources/GRPC/Server.swift
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public final class Server {
.childChannelInitializer { channel in
let protocolSwitcher = HTTPProtocolSwitcher(
errorDelegate: configuration.errorDelegate,
httpTargetWindowSize: configuration.httpTargetWindowSize
httpTargetWindowSize: configuration.httpTargetWindowSize,
idleTimeout: configuration.connectionIdleTimeout
) { channel -> EventLoopFuture<Void> in
let logger = Logger(subsystem: .serverChannelCall, metadata: [MetadataKey.requestID: "\(UUID())"])
let handler = GRPCServerRequestRoutingHandler(
Expand Down Expand Up @@ -189,6 +190,10 @@ extension Server {
/// TLS configuration for this connection. `nil` if TLS is not desired.
public var tls: TLS?

/// The amount of time to wait before closing connections. The idle timeout will start only
/// if there are no RPCs in progress and will be cancelled as soon as any RPCs start.
public var connectionIdleTimeout: TimeAmount

/// The compression configuration for requests and responses.
///
/// If compression is enabled for the server it may be disabled for responses on any RPC by
Expand Down Expand Up @@ -217,6 +222,7 @@ extension Server {
serviceProviders: [CallHandlerProvider],
errorDelegate: ServerErrorDelegate? = LoggingServerErrorDelegate.shared,
tls: TLS? = nil,
connectionIdleTimeout: TimeAmount = .minutes(5),
messageEncoding: ServerMessageEncoding = .disabled,
httpTargetWindowSize: Int = 65535
) {
Expand All @@ -225,6 +231,7 @@ extension Server {
self.serviceProviders = serviceProviders
self.errorDelegate = errorDelegate
self.tls = tls
self.connectionIdleTimeout = connectionIdleTimeout
self.messageEncoding = messageEncoding
self.httpTargetWindowSize = httpTargetWindowSize
}
Expand Down
15 changes: 15 additions & 0 deletions Sources/GRPC/ServerBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ extension Server {
private var providers: [CallHandlerProvider] = []
private var errorDelegate: ServerErrorDelegate?
private var messageEncoding: ServerMessageEncoding = .disabled
private var connectionIdleTimeout: TimeAmount = .minutes(5)
private var httpTargetWindowSize: Int = 65535

fileprivate init(group: EventLoopGroup) {
Expand Down Expand Up @@ -51,6 +52,7 @@ extension Server {
serviceProviders: self.providers,
errorDelegate: self.errorDelegate,
tls: self.maybeTLS,
connectionIdleTimeout: self.connectionIdleTimeout,
messageEncoding: self.messageEncoding,
httpTargetWindowSize: self.httpTargetWindowSize
)
Expand All @@ -71,15 +73,28 @@ extension Server.Builder {
extension Server.Builder {
/// Sets the service providers that this server should offer. Note that calling this multiple
/// times will override any previously set providers.
@discardableResult
public func withServiceProviders(_ providers: [CallHandlerProvider]) -> Self {
self.providers = providers
return self
}
}

extension Server.Builder {
/// The amount of time to wait before closing connections. The idle timeout will start only
/// if there are no RPCs in progress and will be cancelled as soon as any RPCs start. Defaults to
/// 5 minutes if not set.
@discardableResult
public func withConnectionIdleTimeout(_ timeout: TimeAmount) -> Self {
self.connectionIdleTimeout = timeout
return self
}
}

extension Server.Builder {
/// Sets the message compression configuration. Compression is disabled if this is not configured
/// and any RPCs using compression will not be accepted.
@discardableResult
public func withMessageCompression(_ encoding: ServerMessageEncoding) -> Self {
self.messageEncoding = encoding
return self
Expand Down
80 changes: 80 additions & 0 deletions Tests/GRPCTests/GRPCIdleTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@testable import GRPC
import NIO
import EchoModel
import EchoImplementation
import XCTest

class GRPCIdleTests: GRPCTestCase {
func testClientIdleTimeout() {
XCTAssertNoThrow(try self.doTestIdleTimeout(serverIdle: .minutes(5), clientIdle: .milliseconds(100)))
}

func testServerIdleTimeout() throws {
XCTAssertNoThrow(try self.doTestIdleTimeout(serverIdle: .milliseconds(100), clientIdle: .minutes(5)))
}

func doTestIdleTimeout(serverIdle: TimeAmount, clientIdle: TimeAmount) throws {
// Is the server idling first? This determines what state change the client should see when the
// idle happens.
let isServerIdleFirst = serverIdle < clientIdle

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

// Setup a server.
let server = try Server.insecure(group: group)
.withServiceProviders([EchoProvider()])
.withConnectionIdleTimeout(serverIdle)
.bind(host: "localhost", port: 0)
.wait()
defer {
XCTAssertNoThrow(try server.close().wait())
}

// Setup a state change recorder for the client.
let stateRecorder = RecordingConnectivityDelegate()
stateRecorder.expectChanges(3) { changes in
XCTAssertEqual(changes, [
Change(from: .idle, to: .connecting),
Change(from: .connecting, to: .ready),
Change(from: .ready, to: isServerIdleFirst ? .transientFailure : .idle)
])
}

// Setup a connection.
let connection = ClientConnection.insecure(group: group)
.withConnectivityStateDelegate(stateRecorder)
.withConnectionIdleTimeout(clientIdle)
.connect(host: "localhost", port: server.channel.localAddress!.port!)
defer {
XCTAssertNoThrow(try connection.close().wait())
}

let client = Echo_EchoClient(channel: connection)

// Make a call; this will trigger channel creation.
let get = client.get(.with { $0.text = "ignored" })
let status = try get.status.wait()
XCTAssertEqual(status.code, .ok)

// Now wait for the state changes.
stateRecorder.waitForExpectedChanges(timeout: .seconds(10))
}
}
11 changes: 11 additions & 0 deletions Tests/GRPCTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ extension GRPCCustomPayloadTests {
]
}

extension GRPCIdleTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__GRPCIdleTests = [
("testClientIdleTimeout", testClientIdleTimeout),
("testServerIdleTimeout", testServerIdleTimeout),
]
}

extension GRPCInsecureInteroperabilityTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand Down Expand Up @@ -719,6 +729,7 @@ public func __allTests() -> [XCTestCaseEntry] {
testCase(FunctionalTestsMutualAuthenticationNIOTS.__allTests__FunctionalTestsMutualAuthenticationNIOTS),
testCase(GRPCClientStateMachineTests.__allTests__GRPCClientStateMachineTests),
testCase(GRPCCustomPayloadTests.__allTests__GRPCCustomPayloadTests),
testCase(GRPCIdleTests.__allTests__GRPCIdleTests),
testCase(GRPCInsecureInteroperabilityTests.__allTests__GRPCInsecureInteroperabilityTests),
testCase(GRPCSecureInteroperabilityTests.__allTests__GRPCSecureInteroperabilityTests),
testCase(GRPCServerRequestRoutingHandlerTests.__allTests__GRPCServerRequestRoutingHandlerTests),
Expand Down

0 comments on commit 0048e6b

Please sign in to comment.