Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handle client connectivity state. #510

Merged
merged 2 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 150 additions & 125 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -54,71 +54,40 @@ import NIOTLS
///
/// See `BaseClientCall` for a description of the remainder of the client pipeline.
public class ClientConnection {
/// The configuration this connection was created using.
internal let configuration: ClientConnection.Configuration

/// The channel which will handle gRPC calls.
internal var channel: EventLoopFuture<Channel>
internal var channel: EventLoopFuture<Channel> {
willSet {
self.willSetChannel(to: newValue)
}
didSet {
self.didSetChannel(to: self.channel)
}
}

// This is implicitly unwrapped to avoid walking the channel twice to find the multiplexer
// in `init`. Specifically: we only initialize `multiplexer` in `willSetChannel(to:)` instead
// of explicitly initializing it in the `init` as well.
/// HTTP multiplexer from the `channel` handling gRPC calls.
internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>
internal var multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>!
MrMage marked this conversation as resolved.
Show resolved Hide resolved

/// The configuration for this client.
internal let configuration: Configuration

/// A monitor for the connectivity state.
public let connectivity: ConnectivityStateMonitor

/// Creates a new connection from the given configuration.
public init(configuration: ClientConnection.Configuration) {
let monitor = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)
let channel = ClientConnection.makeChannel(
configuration: configuration,
connectivityMonitor: monitor
)

self.channel = channel
self.multiplexer = channel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
self.connectivity = monitor
public init(configuration: Configuration) {
self.configuration = configuration
self.connectivity = ConnectivityStateMonitor(delegate: configuration.connectivityStateDelegate)

self.channel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: channel)
}

/// Registers a callback on the `closeFuture` of the given channel to replace this class's
/// channel and multiplexer.
private func replaceChannelAndMultiplexerOnClose(channel: EventLoopFuture<Channel>) {
channel.always { result in
// If we failed to get a channel then we've exhausted our backoff; we should `.shutdown`.
if case .failure = result {
self.connectivity.state = .shutdown
}
}.flatMap {
$0.closeFuture
}.whenComplete { _ in
// `.shutdown` is terminal so don't attempt a reconnection.
guard self.connectivity.state != .shutdown else {
return
}

let newChannel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivityMonitor: self.connectivity
)

self.channel = newChannel
self.multiplexer = newChannel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
self.channel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivity: self.connectivity
)

// Change the state if the connection was successful.
newChannel.whenSuccess { _ in
self.connectivity.state = .ready
}
self.replaceChannelAndMultiplexerOnClose(channel: newChannel)
}
self.willSetChannel(to: self.channel)
self.didSetChannel(to: self.channel)
}

/// The `EventLoop` this connection is using.
Expand All @@ -132,97 +101,145 @@ public class ClientConnection {
// We're already shutdown or in the process of shutting down.
return channel.flatMap { $0.closeFuture }
} else {
self.connectivity.state = .shutdown
self.connectivity.initiateUserShutdown()
return channel.flatMap { $0.close() }
}
}
}

// MARK: - Channel creation

extension ClientConnection {
/// Creates a `Channel` using the given configuration.
/// Register a callback on the close future of the given `channel` to replace the channel (if
/// possible) and also replace the `multiplexer` with that from the new channel.
///
/// This involves: creating a `ClientBootstrap`, connecting to a target and verifying that the TLS
/// handshake was successful (if TLS was configured). We _may_ additiionally set a connection
/// timeout and schedule a retry attempt (should the connection fail) if a
/// `ConnectionBackoff.Iterator` is provided.
/// - Parameter channel: The channel that will be set.
private func willSetChannel(to channel: EventLoopFuture<Channel>) {
// If we're about to set the channel and the user has initiated a shutdown (i.e. while the new
// channel was being created) then it is no longer needed.
guard !self.connectivity.userHasInitiatedShutdown else {
channel.whenSuccess { channel in
channel.close(mode: .all, promise: nil)
}
return
}

channel.flatMap { $0.closeFuture }.whenComplete { _ in
guard self.connectivity.canAttemptReconnect else { return }
self.channel = ClientConnection.makeChannel(
configuration: self.configuration,
connectivity: self.connectivity
)
}

self.multiplexer = channel.flatMap {
$0.pipeline.handler(type: HTTP2StreamMultiplexer.self)
}
}

/// Register a callback on the given `channel` to update the connectivity state.
///
/// See the individual functions for more information:
/// - `makeBootstrap(configuration:)`, and
/// - `verifyTLS(channel:)`.
/// - Parameter channel: The channel that was set.
private func didSetChannel(to channel: EventLoopFuture<Channel>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this logic need to be split into will and did set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

willSet is preparing the channel so that it can be used, didSet is advertising that self.channel may be used (or not in the case of shutdown).

If we had the current didSet logic in willSet then we potentially have race problems when e.g. waiting on .ready (since it would be triggered before actually assigning the new value). If the willSet logic was in didSet then it would be possible to call self.channel and hit an error (closing the channel) before the reconnect callback was registered.

channel.whenComplete { result in
switch result {
case .success:
self.connectivity.state = .ready

case .failure:
self.connectivity.state = .shutdown
}
}
}

/// Attempts to create a new `Channel` using the given configuration.
///
/// See `makeChannel(configuration:connectivity:backoffIterator:)`.
private class func makeChannel(
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
configuration: Configuration,
connectivity: ConnectivityStateMonitor
) -> EventLoopFuture<Channel> {
let iterator = configuration.connectionBackoff?.makeIterator()
return ClientConnection.makeChannel(
configuration: configuration,
connectivity: connectivity,
backoffIterator: iterator
)
}

/// Attempts to create a new `Channel` using the given configuration.
///
/// This involves: creating a `ClientBootstrapProtocol`, connecting to a target and verifying that
/// the TLS handshake was successful (if TLS was configured). We _may_ additiionally set a
/// connection timeout and schedule a retry attempt (should the connection fail) if a
/// `ConnectionBackoffIterator` is provided.
///
/// - Parameter configuration: The configuration to start the connection with.
/// - Parameter connectivityMonitor: A connectivity state monitor.
/// - Parameter connectivity: A connectivity state monitor.
/// - Parameter backoffIterator: An `Iterator` for `ConnectionBackoff` providing a sequence of
/// connection timeouts and backoff to use when attempting to create a connection.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoff.Iterator?
configuration: Configuration,
connectivity: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoffIterator?
) -> EventLoopFuture<Channel> {
// We could have been shutdown by the user, avoid a connection attempt if this is the case.
guard connectivityMonitor.state != .shutdown else {
return configuration.eventLoopGroup.next().makeFailedFuture(GRPCStatus.processingError)
}

connectivityMonitor.state = .connecting
connectivity.state = .connecting
let timeoutAndBackoff = backoffIterator?.next()
var bootstrap = ClientConnection.makeBootstrap(configuration: configuration)

// Set a timeout, if we have one.
if let timeout = timeoutAndBackoff?.timeout {
bootstrap = bootstrap.connectTimeout(.seconds(timeInterval: timeout))
}
let bootstrap = self.makeBootstrap(
configuration: configuration,
group: configuration.eventLoopGroup,
timeout: timeoutAndBackoff?.timeout
)

let channel = bootstrap.connect(to: configuration.target).flatMap { channel -> EventLoopFuture<Channel> in
if configuration.tlsConfiguration != nil {
return ClientConnection.verifyTLS(channel: channel).map { channel }
return channel.verifyTLS().map { channel }
} else {
return channel.eventLoop.makeSucceededFuture(channel)
}
}

channel.whenFailure { _ in
// We could have been shutdown by the user whilst we were connecting. If we were then avoid
// the this extra state transition.
if connectivityMonitor.state != .shutdown {
// We might try again in a moment.
connectivityMonitor.state = timeoutAndBackoff == nil ? .shutdown : .transientFailure
}
}

// If we don't have backoff then we can't retry, just return the `channel` no matter what
// state we are in.
guard let backoff = timeoutAndBackoff?.backoff else {
return channel
}

// If we're in error then schedule our next attempt.
return channel.flatMapError { error in
// The `futureResult` of the scheduled task is of type
// `EventLoopFuture<EventLoopFuture<ClientConnection>>`, so we need to `flatMap` it to
// remove a level of indirection.
return channel.eventLoop.scheduleTask(in: .seconds(timeInterval: backoff)) {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: backoffIterator
)
}.futureResult.flatMap { nextConnection in
return nextConnection
}
// If our connection attempt was unsuccessful schedule another attempt in some time.
glbrntt marked this conversation as resolved.
Show resolved Hide resolved
return channel.flatMapError { _ in
// We will try to connect again: the failure is transient.
connectivity.state = .transientFailure
return ClientConnection.scheduleReconnectAttempt(
in: backoff,
on: channel.eventLoop,
configuration: configuration,
connectivity: connectivity,
backoffIterator: backoffIterator
)
}
}

/// Creates a `Channel` using the given configuration amd state connectivity monitor.
///
/// See `makeChannel(configuration:connectivityMonitor:backoffIterator:)`.
private class func makeChannel(
configuration: ClientConnection.Configuration,
connectivityMonitor: ConnectivityStateMonitor
/// Schedule an attempt to make a channel in `timeout` seconds on the given `eventLoop`.
private class func scheduleReconnectAttempt(
in timeout: TimeInterval,
on eventLoop: EventLoop,
configuration: Configuration,
connectivity: ConnectivityStateMonitor,
backoffIterator: ConnectionBackoffIterator?
) -> EventLoopFuture<Channel> {
return makeChannel(
configuration: configuration,
connectivityMonitor: connectivityMonitor,
backoffIterator: configuration.connectionBackoff?.makeIterator()
)
// The `futureResult` of the scheduled task is of type
// `EventLoopFuture<EventLoopFuture<Channel>>`, so we need to `flatMap` it to
// remove a level of indirection.
return eventLoop.scheduleTask(in: .seconds(timeInterval: timeout)) {
ClientConnection.makeChannel(
configuration: configuration,
connectivity: connectivity,
backoffIterator: backoffIterator
)
}.futureResult.flatMap { channel in
channel
}
}

/// Makes and configures a `ClientBootstrap` using the provided configuration.
Expand All @@ -231,8 +248,14 @@ extension ClientConnection {
/// handlers detailed in the documentation for `ClientConnection`.
///
/// - Parameter configuration: The configuration to prepare the bootstrap with.
private class func makeBootstrap(configuration: Configuration) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: configuration.eventLoopGroup)
/// - Parameter group: The `EventLoopGroup` to use for the bootstrap.
/// - Parameter timeout: The connection timeout in seconds.
private class func makeBootstrap(
configuration: Configuration,
group: EventLoopGroup,
timeout: TimeInterval?
) -> ClientBootstrapProtocol {
let bootstrap = GRPCNIO.makeClientBootstrap(group: group)
// Enable SO_REUSEADDR and TCP_NODELAY.
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1)
Expand All @@ -247,17 +270,12 @@ extension ClientConnection {
let errorHandler = DelegatingErrorHandler(delegate: configuration.errorDelegate)
return channel.pipeline.addHandler(errorHandler)
}
}

return bootstrap
}
}

/// Verifies that a TLS handshake was successful by using the `TLSVerificationHandler`.
///
/// - Parameter channel: The channel to verify successful TLS setup on.
private class func verifyTLS(channel: Channel) -> EventLoopFuture<Void> {
return channel.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
if let timeout = timeout {
return bootstrap.connectTimeout(.seconds(timeInterval: timeout))
} else {
return bootstrap
}
}
}
Expand Down Expand Up @@ -393,6 +411,13 @@ fileprivate extension Channel {
return self.eventLoop.makeFailedFuture(error)
}
}

/// Returns the `verification` future from the `TLSVerificationHandler` in this channels pipeline.
func verifyTLS() -> EventLoopFuture<Void> {
return self.pipeline.handler(type: TLSVerificationHandler.self).flatMap {
$0.verification
}
}
}

fileprivate extension TimeAmount {
Expand Down
Loading