Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support compression on the server #714

Merged
merged 1 commit into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
"repositoryURL": "https://github.com/apple/swift-protobuf.git",
"state": {
"branch": null,
"revision": "da75a93ac017534e0028e83c0e4fc4610d2acf48",
"version": "1.7.0"
"revision": "7790acf0a81d08429cb20375bf42a8c7d279c5fe",
"version": "1.8.0"
}
}
]
Expand Down
5 changes: 4 additions & 1 deletion Sources/GRPC/CallHandlers/_BaseCallHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import Logging
/// - Important: This is **NOT** part of the public API.
public class _BaseCallHandler<RequestPayload: GRPCPayload, ResponsePayload: GRPCPayload>: GRPCCallHandler {
public func makeGRPCServerCodec() -> ChannelHandler {
return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(logger: self.logger)
return HTTP1ToGRPCServerCodec<RequestPayload, ResponsePayload>(
encoding: self.callHandlerContext.encoding,
logger: self.logger
)
}

/// Called when the request head has been received.
Expand Down
13 changes: 4 additions & 9 deletions Sources/GRPC/ClientCalls/BaseClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class BaseClientCall<Request: GRPCPayload, Response: GRPCPayload>: Client
internal let multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>

// Note: documentation is inherited from the `ClientCall` protocol.
public let options: CallOptions
public let subchannel: EventLoopFuture<Channel>
public let initialMetadata: EventLoopFuture<HPACKHeaders>
public let trailingMetadata: EventLoopFuture<HPACKHeaders>
Expand All @@ -75,12 +76,14 @@ public class BaseClientCall<Request: GRPCPayload, Response: GRPCPayload>: Client
eventLoop: EventLoop,
multiplexer: EventLoopFuture<HTTP2StreamMultiplexer>,
callType: GRPCCallType,
callOptions: CallOptions,
responseHandler: GRPCClientResponseChannelHandler<Response>,
requestHandler: _ClientRequestChannelHandler<Request>,
logger: Logger
) {
self.logger = logger
self.multiplexer = multiplexer
self.options = callOptions

let streamPromise = eventLoop.makePromise(of: Channel.self)

Expand Down Expand Up @@ -135,29 +138,21 @@ extension _GRPCRequestHead {
path: String,
host: String,
requestID: String,
encoding: ClientConnection.Configuration.MessageEncoding,
options: CallOptions
) {
var customMetadata = options.customMetadata
if let requestIDHeader = options.requestIDHeader {
customMetadata.add(name: requestIDHeader, value: requestID)
}

var encoding = encoding
// Compression is disabled at the RPC level; remove outbound (request) encoding. This will stop
// any 'grpc-encoding' header being sent to the peer.
if options.disableCompression {
encoding.outbound = nil
}

self = _GRPCRequestHead(
method: options.cacheable ? "GET" : "POST",
scheme: scheme,
path: path,
host: host,
timeout: options.timeout,
customMetadata: customMetadata,
encoding: encoding
encoding: options.messageEncoding
)
}
}
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/BidirectionalStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, Respo
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand All @@ -70,6 +69,7 @@ public final class BidirectionalStreamingCall<RequestPayload: GRPCPayload, Respo
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
callType: .bidirectionalStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
requestHandler: requestHandler,
logger: logger
Expand Down
61 changes: 40 additions & 21 deletions Sources/GRPC/ClientCalls/ClientCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public protocol ClientCall {
/// The type of the response message for the call.
associatedtype ResponsePayload: GRPCPayload

/// The options used to make the RPC.
var options: CallOptions { get }

/// HTTP/2 stream that requests and responses are sent and received on.
var subchannel: EventLoopFuture<Channel> { get }

Expand Down Expand Up @@ -62,42 +65,42 @@ public protocol StreamingRequestClientCall: ClientCall {
///
/// - Parameters:
/// - message: The message to send.
/// - disableCompression: Whether compression should be disabled for this message. Ignored if
/// compression was not enabled for the connection or RPC.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
/// - Returns: A future which will be fullfilled when the message has been sent.
func sendMessage(_ message: RequestPayload, disableCompression: Bool) -> EventLoopFuture<Void>
func sendMessage(_ message: RequestPayload, compression: Compression) -> EventLoopFuture<Void>

/// Sends a message to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - message: The message to send.
/// - disableCompression: Whether compression should be disabled for this message. Ignored if
/// compression was not enabled for the connection or RPC.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
/// - promise: A promise to be fulfilled when the message has been sent.
func sendMessage(_ message: RequestPayload, disableCompression: Bool, promise: EventLoopPromise<Void>?)
func sendMessage(_ message: RequestPayload, compression: Compression, promise: EventLoopPromise<Void>?)

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - disableCompression: Whether compression should be disabled for these messages. Ignored if
/// compression was not enabled for the connection or RPC.
func sendMessages<S: Sequence>(_ messages: S, disableCompression: Bool) -> EventLoopFuture<Void> where S.Element == RequestPayload
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
func sendMessages<S: Sequence>(_ messages: S, compression: Compression) -> EventLoopFuture<Void> where S.Element == RequestPayload

/// Sends a sequence of messages to the service.
///
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()` or `sendEnd(promise:)`.
///
/// - Parameters:
/// - messages: The sequence of messages to send.
/// - disableCompression: Whether compression should be disabled for these messages. Ignored if
/// compression was not enabled for the connection or RPC.
/// - compression: Whether compression should be used for this message. Ignored if compression
/// was not enabled for the RPC.
/// - promise: A promise to be fulfilled when all messages have been sent successfully.
func sendMessages<S: Sequence>(_ messages: S, disableCompression: Bool, promise: EventLoopPromise<Void>?) where S.Element == RequestPayload
func sendMessages<S: Sequence>(_ messages: S, compression: Compression, promise: EventLoopPromise<Void>?) where S.Element == RequestPayload

/// Returns a future which can be used as a message queue.
///
Expand Down Expand Up @@ -137,30 +140,42 @@ public protocol UnaryResponseClientCall: ClientCall {
extension StreamingRequestClientCall {
public func sendMessage(
_ message: RequestPayload,
disableCompression: Bool = false
compression: Compression = .deferToCallDefault
) -> EventLoopFuture<Void> {
return self.subchannel.flatMap { channel in
return channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)))
let context = _MessageContext<RequestPayload>(
message,
compressed: compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
)
return channel.writeAndFlush(_GRPCClientRequestPart.message(context))
}
}

public func sendMessage(
_ message: RequestPayload,
disableCompression: Bool = false,
compression: Compression = .deferToCallDefault,
promise: EventLoopPromise<Void>?
) {
self.subchannel.whenSuccess { channel in
channel.writeAndFlush(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)), promise: promise)
let context = _MessageContext<RequestPayload>(
message,
compressed: compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
)
channel.writeAndFlush(_GRPCClientRequestPart.message(context), promise: promise)
}
}

public func sendMessages<S: Sequence>(
_ messages: S,
disableCompression: Bool = false
compression: Compression = .deferToCallDefault
) -> EventLoopFuture<Void> where S.Element == RequestPayload {
return self.subchannel.flatMap { channel -> EventLoopFuture<Void> in
let writeFutures = messages.map { message in
channel.write(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)))
let writeFutures = messages.map { message -> EventLoopFuture<Void> in
let context = _MessageContext<RequestPayload>(
message,
compressed: compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
)
return channel.write(_GRPCClientRequestPart.message(context))
}
channel.flush()
return EventLoopFuture.andAllSucceed(writeFutures, on: channel.eventLoop)
Expand All @@ -169,15 +184,19 @@ extension StreamingRequestClientCall {

public func sendMessages<S: Sequence>(
_ messages: S,
disableCompression: Bool = false,
compression: Compression = .deferToCallDefault,
promise: EventLoopPromise<Void>?
) where S.Element == RequestPayload {
if let promise = promise {
self.sendMessages(messages).cascade(to: promise)
} else {
self.subchannel.whenSuccess { channel in
for message in messages {
channel.write(_GRPCClientRequestPart.message(.init(message, disableCompression: disableCompression)), promise: nil)
let context = _MessageContext<RequestPayload>(
message,
compressed: compression.isEnabled(enabledOnCall: self.options.messageEncoding.enabledForRequests)
)
channel.write(_GRPCClientRequestPart.message(context), promise: nil)
}
channel.flush()
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/GRPC/ClientCalls/ClientStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public final class ClientStreamingCall<RequestPayload: GRPCPayload, ResponsePayl
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

Expand All @@ -74,6 +73,7 @@ public final class ClientStreamingCall<RequestPayload: GRPCPayload, ResponsePayl
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
callType: .clientStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
requestHandler: requestHandler,
logger: logger
Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPC/ClientCalls/ServerStreamingCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ public final class ServerStreamingCall<RequestPayload: GRPCPayload, ResponsePayl
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

let requestHandler = _UnaryRequestChannelHandler<RequestPayload>(
requestHead: requestHead,
request: .init(request)
request: .init(request, compressed: callOptions.messageEncoding.enabledForRequests)
)

super.init(
eventLoop: connection.eventLoop,
multiplexer: connection.multiplexer,
callType: .serverStreaming,
callOptions: callOptions,
responseHandler: responseHandler,
requestHandler: requestHandler,
logger: logger
Expand Down
4 changes: 2 additions & 2 deletions Sources/GRPC/ClientCalls/UnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ public final class UnaryCall<RequestPayload: GRPCPayload, ResponsePayload: GRPCP
path: path,
host: connection.configuration.target.host,
requestID: requestID,
encoding: connection.configuration.messageEncoding,
options: callOptions
)

let requestHandler = _UnaryRequestChannelHandler<RequestPayload>(
requestHead: requestHead,
request: .init(request)
request: .init(request, compressed: callOptions.messageEncoding.enabledForRequests)
)

super.init(
eventLoop: connection.channel.eventLoop,
multiplexer: connection.multiplexer,
callType: .unary,
callOptions: callOptions,
responseHandler: responseHandler,
requestHandler: requestHandler,
logger: logger
Expand Down
18 changes: 1 addition & 17 deletions Sources/GRPC/ClientConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -476,20 +476,6 @@ extension ClientConnection {
/// be `nil`.
public var connectionBackoff: ConnectionBackoff?

/// The compression used for requests, and the compression algorithms to advertise as acceptable
/// for the remote peer to use for encoding responses.
///
/// If compression is enabled for a connection it may be disabled for requests on any RPC by
/// setting `CallOptions.disableCompression` to `true`.
///
/// Compression may also be disabled at the message-level for streaming requests (i.e. client
/// streaming and bidirectional streaming RPCs) by setting `disableCompression` to `true` in
/// `sendMessage(_:disableCompression)`, `sendMessage(_:disableCompression:promise)`,
/// `sendMessages(_:disableCompression)` or `sendMessages(_:disableCompression:promise)`.
///
/// Note that disabling compression has no effect if compression is disabled on the connection.
public var messageEncoding: MessageEncoding

/// The HTTP protocol used for this connection.
public var httpProtocol: HTTP2ToHTTP1ClientCodec.HTTPProtocol {
return self.tls == nil ? .http : .https
Expand All @@ -511,16 +497,14 @@ extension ClientConnection {
errorDelegate: ClientErrorDelegate? = LoggingClientErrorDelegate(),
connectivityStateDelegate: ConnectivityStateDelegate? = nil,
tls: Configuration.TLS? = nil,
connectionBackoff: ConnectionBackoff? = ConnectionBackoff(),
messageEncoding: MessageEncoding = .none
connectionBackoff: ConnectionBackoff? = ConnectionBackoff()
) {
self.target = target
self.eventLoopGroup = eventLoopGroup
self.errorDelegate = errorDelegate
self.connectivityStateDelegate = connectivityStateDelegate
self.tls = tls
self.connectionBackoff = connectionBackoff
self.messageEncoding = messageEncoding
}
}
}
Expand Down
24 changes: 16 additions & 8 deletions Sources/GRPC/ClientOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ public struct CallOptions {
/// The call timeout.
public var timeout: GRPCTimeout

/// The compression used for requests, and the compression algorithms to advertise as acceptable
/// for the remote peer to use for encoding responses.
///
/// Compression may also be disabled at the message-level for streaming requests (i.e. client
/// streaming and bidirectional streaming RPCs) by setting `compression` to `.disabled` in
/// `sendMessage(_:compression)`, `sendMessage(_:compression:promise)`,
/// `sendMessages(_:compression)` or `sendMessages(_:compression:promise)`.
///
/// Note that enabling `compression` via the `sendMessage` or `sendMessages` methods only applies
/// if encoding has been specified in these options.
public var messageEncoding: MessageEncoding

/// Whether the call is cacheable.
public var cacheable: Bool

Expand All @@ -46,24 +58,20 @@ public struct CallOptions {
/// messages associated with the call.
public var requestIDHeader: String?

/// Disables request compression on this call. Ignored if compression is disabled at the
/// connection level.
public var disableCompression: Bool

public init(
customMetadata: HPACKHeaders = HPACKHeaders(),
timeout: GRPCTimeout = GRPCTimeout.infinite,
cacheable: Bool = false,
messageEncoding: MessageEncoding = .none,
requestIDProvider: RequestIDProvider = .autogenerated,
requestIDHeader: String? = nil,
disableCompression: Bool = false
cacheable: Bool = false
) {
self.customMetadata = customMetadata
self.timeout = timeout
self.cacheable = false
self.messageEncoding = messageEncoding
self.requestIDProvider = requestIDProvider
self.requestIDHeader = requestIDHeader
self.disableCompression = disableCompression
self.cacheable = false
}

/// How Request IDs should be provided.
Expand Down
5 changes: 3 additions & 2 deletions Sources/GRPC/Compression/CompressionAlgorithm.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ public struct CompressionAlgorithm: Equatable {
public static let deflate = CompressionAlgorithm(.deflate)
public static let gzip = CompressionAlgorithm(.gzip)

public static let all = Algorithm.allCases.map(CompressionAlgorithm.init)
// The order here is important: most compression to least.
public static let all: [CompressionAlgorithm] = [.gzip, .deflate, .identity]

/// The name of the compression algorithm.
public var name: String {
return self.algorithm.rawValue
}

internal enum Algorithm: String, CaseIterable {
internal enum Algorithm: String {
case identity
case deflate
case gzip
Expand Down
Loading