Skip to content

Commit

Permalink
Better handle client connectivity state. (#510)
Browse files Browse the repository at this point in the history
* Better handle client connectivity state.

Motivation:

Pull #506 highlighted some issues with client reconnectivity, namely
a race between an automatic reconnection and the client closing the
connection. In this case the delegate would report changes of state
after the terminal shutdown state.

Modifications:

- State changes are now thread safe.
- Record when a user initiates shutdown, ignore state changes beyond
  this point and close any channels created after this point.

Result:

More correct channel reconnectivity.

* Fix a couple of code smells and a typo
  • Loading branch information
glbrntt authored and MrMage committed Jul 12, 2019
1 parent 2737b25 commit 7c304ed
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 135 deletions.
266 changes: 141 additions & 125 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,71 +54,45 @@ import NIOTLS
///
/// See `BaseClientCall` for a description of the remainder of the client pipeline.
public class ClientConnection {
/// The configuration this connection was created using.
internal let configuration: ClientConnection.Configuration

/// The channel which will handle gRPC calls.
internal var channel: EventLoopFuture<Channel>
internal var channel: EventLoopFuture<Channel> {
willSet {
self.willSetChannel(to: newValue)
}
didSet {
self.didSetChannel(to: self.channel)
}
}

/// HTTP multiplexer from the `channel` handling gRPC calls.
internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>

/// The configuration for this client.
internal let configuration: Configuration

/// A monitor for the connectivity state.
public let connectivity: ConnectivityStateMonitor

/// Creates a new connection from the given configuration.
public init(configuration: ClientConnection.Configuration) {
let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
let channel = ClientConnection.makeChannel(
configuration: configuration,
connectivityMonitor: monitor
)

self.channel = channel
self.multiplexer = channel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
self.connectivity = monitor
public init(configuration: Configuration) {
self.configuration = configuration
self.connectivity = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)

// We need to initialize `multiplexer` before we can call `willSetChannel` (which will then
// assign `multiplexer` to one from the created `Channel`s pipeline).
let eventLoop = configuration.eventLoopGroup.next()
let unavailable = GRPCStatus(code: .unavailable, message: nil)
self.multiplexer = eventLoop.makeFailedFuture(unavailable)

self.channel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivity: self.connectivity,
backoffIterator: self.configuration.connectionBackoff?.makeIterator()
)

self.channel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: channel)
}

/// Registers a callback on the `closeFuture` of the given channel to replace this class's
/// channel and multiplexer.
private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
channel.always { result in
// If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
if case .failure = result {
self.connectivity.state = .shutdown
}
}.flatMap {
$0.closeFuture
}.whenComplete { _ in
// `.shutdown` is terminal so don't attempt a reconnection.
guard self.connectivity.state != .shutdown else {
return
}

let newChannel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivityMonitor: self.connectivity
)

self.channel = newChannel
self.multiplexer = newChannel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}

// Change the state if the connection was successful.
newChannel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
}
// `willSet` and `didSet` are called on initialization, so call them explicitly now.
self.willSetChannel(to: channel)
self.didSetChannel(to: channel)
}

/// The `EventLoop` this connection is using.
Expand All @@ -132,97 +106,131 @@ public class ClientConnection {
// We're already shutdown or in the process of shutting down.
return channel.flatMap { $0.closeFuture }
} else {
self.connectivity.state = .shutdown
self.connectivity.initiateUserShutdown()
return channel.flatMap { $0.close() }
}
}
}

// MARK: - Channel creation

extension ClientConnection {
/// Creates a `Channel` using the given configuration.
/// Register a callback on the close future of the given `channel` to replace the channel (if
/// possible) and also replace the `multiplexer` with that from the new channel.
///
/// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
/// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
/// timeout and schedule a retry attempt (should the connection fail) if a
/// `ConnectionBackoff.Iterator` is provided.
/// - Parameter channel: The channel that will be set.
private func willSetChannel(to channel: EventLoopFuture<Channel>) {
// If we're about to set the channel and the user has initiated a shutdown (i.e. while the new
// channel was being created) then it is no longer needed.
guard !self.connectivity.userHasInitiatedShutdown else {
channel.whenSuccess { channel in
channel.close(mode: .all, promise: nil)
}
return
}

channel.flatMap { $0.closeFuture }.whenComplete { _ in
guard self.connectivity.canAttemptReconnect else { return }
self.channel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivity: self.connectivity,
backoffIterator: self.configuration.connectionBackoff?.makeIterator()
)
}

self.multiplexer = channel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
}

/// Register a callback on the given `channel` to update the connectivity state.
///
/// - Parameter channel: The channel that was set.
private func didSetChannel(to channel: EventLoopFuture<Channel>) {
channel.whenComplete { result in
switch result {
case .success:
self.connectivity.state = .ready

case .failure:
self.connectivity.state = .shutdown
}
}
}

/// Attempts to create a new `Channel` using the given configuration.
///
/// See the individual functions for more information:
/// - `makeBootstrap(configuration:)`, and
/// - `verifyTLS(channel:)`.
/// This involves: creating a `ClientBootstrapProtocol`, connecting to a target and verifying that
/// the TLS handshake was successful (if TLS was configured). We _may_ additiionally set a
/// connection timeout and schedule a retry attempt (should the connection fail) if a
/// `ConnectionBackoffIterator` is provided.
///
/// - Parameter configuration: The configuration to start the connection with.
/// - Parameter connectivityMonitor: A connectivity state monitor.
/// - Parameter connectivity: A connectivity state monitor.
/// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
/// connection timeouts and backoff to use when attempting to create a connection.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoff.Iterator?
configuration: Configuration,
connectivity: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoffIterator?
) -> EventLoopFuture<Channel> {
// We could have been shutdown by the user, avoid a connection attempt if this is the case.
guard connectivityMonitor.state != .shutdown else {
return configuration.eventLoopGroup.next().makeFailedFuture(GRPCStatus.processingError)
}

connectivityMonitor.state = .connecting
connectivity.state = .connecting
let timeoutAndBackoff = backoffIterator?.next()
var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)

// Set a timeout, if we have one.
if let timeout = timeoutAndBackoff?.timeout {
bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
}
let bootstrap = self.makeBootstrap(
configuration: configuration,
group: configuration.eventLoopGroup,
timeout: timeoutAndBackoff?.timeout
)

let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
if configuration.tlsConfiguration != nil {
return ClientConnection.verifyTLS(channel: channel).map { channel }
return channel.verifyTLS().map { channel }
} else {
return channel.eventLoop.makeSucceededFuture(channel)
}
}

channel.whenFailure { _ in
// We could have been shutdown by the user whilst we were connecting. If we were then avoid
// the this extra state transition.
if connectivityMonitor.state != .shutdown {
// We might try again in a moment.
connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
}
}

// If we don't have backoff then we can't retry, just return the `channel` no matter what
// state we are in.
guard let backoff = timeoutAndBackoff?.backoff else {
return channel
}

// If we're in error then schedule our next attempt.
return channel.flatMapError { error in
// The `futureResult` of the scheduled task is of type
// `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
// remove a level of indirection.
return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: backoffIterator
)
}.futureResult.flatMap { nextConnection in
return nextConnection
}
// If our connection attempt was unsuccessful, schedule another attempt in some time.
return channel.flatMapError { _ in
// We will try to connect again: the failure is transient.
connectivity.state = .transientFailure
return ClientConnection.scheduleReconnectAttempt(
in: backoff,
on: channel.eventLoop,
configuration: configuration,
connectivity: connectivity,
backoffIterator: backoffIterator
)
}
}

/// Creates a `Channel` using the given configuration amd state connectivity monitor.
///
/// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor
/// Schedule an attempt to make a channel in `timeout` seconds on the given `eventLoop`.
private class func scheduleReconnectAttempt(
in timeout: TimeInterval,
on eventLoop: EventLoop,
configuration: Configuration,
connectivity: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoffIterator?
) -> EventLoopFuture<Channel> {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: configuration.connectionBackoff?.makeIterator()
)
// The `futureResult` of the scheduled task is of type
// `EventLoopFuture<EventLoopFuture<Channel>>`, so we need to `flatMap` it to
// remove a level of indirection.
return eventLoop.scheduleTask(in: .seconds(timeInterval: timeout)) {
ClientConnection.makeChannel(
configuration: configuration,
connectivity: connectivity,
backoffIterator: backoffIterator
)
}.futureResult.flatMap { channel in
channel
}
}

/// Makes and configures a `ClientBootstrap` using the provided configuration.
Expand All @@ -231,8 +239,14 @@ extension ClientConnection {
/// handlers detailed in the documentation for `ClientConnection`.
///
/// - Parameter configuration: The configuration to prepare the bootstrap with.
private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
/// - Parameter group: The `EventLoopGroup` to use for the bootstrap.
/// - Parameter timeout: The connection timeout in seconds.
private class func makeBootstrap(
configuration: Configuration,
group: EventLoopGroup,
timeout: TimeInterval?
) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: group)
// Enable SO_REUSEADDR and TCP_NODELAY.
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
Expand All @@ -247,17 +261,12 @@ extension ClientConnection {
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
return channel.pipeline.addHandler(errorHandler)
}
}

return bootstrap
}
}

/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
///
/// - Parameter channel: The channel to verify successful TLS setup on.
private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
if let timeout = timeout {
return bootstrap.connectTimeout(.seconds(timeInterval: timeout))
} else {
return bootstrap
}
}
}
Expand Down Expand Up @@ -393,6 +402,13 @@ fileprivate extension Channel {
return self.eventLoop.makeFailedFuture(error)
}
}

/// Returns the `verification` future from the `TLSVerificationHandler` in this channels pipeline.
func verifyTLS() -> EventLoopFuture<Void> {
return self.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
}
}
}

fileprivate extension TimeAmount {
Expand Down
Loading

0 comments on commit 7c304ed

Please sign in to comment.