Skip to content

Add server interceptor acting as a middleware #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.1.0
* Add a `serverInterceptors` argument to `ConnectionServer`. These interceptors are acting
as middleware, wrapping a `ServiceMethod` invocation.

## 4.0.4

* Allow the latest `package:googleapis_auth`.
Expand Down
3 changes: 2 additions & 1 deletion lib/grpc.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ export 'src/client/proxy.dart' show Proxy;
export 'src/client/transport/http2_credentials.dart'
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
export 'src/server/call.dart' show ServiceCall;
export 'src/server/interceptor.dart' show Interceptor;
export 'src/server/interceptor.dart'
show Interceptor, ServerInterceptor, ServerStreamingInvoker;
export 'src/server/server.dart'
show
ServerCredentials,
Expand Down
7 changes: 5 additions & 2 deletions lib/src/server/handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ServerHandler extends ServiceCall {
final ServerTransportStream _stream;
final ServiceLookup _serviceLookup;
final List<Interceptor> _interceptors;
final List<ServerInterceptor> _serverInterceptors;
final CodecRegistry? _codecRegistry;
final GrpcErrorHandler? _errorHandler;

Expand Down Expand Up @@ -83,6 +84,7 @@ class ServerHandler extends ServiceCall {
required ServerTransportStream stream,
required ServiceLookup serviceLookup,
required List<Interceptor> interceptors,
required List<ServerInterceptor> serverInterceptors,
required CodecRegistry? codecRegistry,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
Expand All @@ -94,7 +96,8 @@ class ServerHandler extends ServiceCall {
_codecRegistry = codecRegistry,
_clientCertificate = clientCertificate,
_remoteAddress = remoteAddress,
_errorHandler = errorHandler;
_errorHandler = errorHandler,
_serverInterceptors = serverInterceptors;

@override
DateTime? get deadline => _deadline;
Expand Down Expand Up @@ -239,7 +242,7 @@ class ServerHandler extends ServiceCall {
return;
}

_responses = _descriptor.handle(this, requests.stream);
_responses = _descriptor.handle(this, requests.stream, _serverInterceptors);

_responseSubscription = _responses.listen(_onResponse,
onError: _onResponseError,
Expand Down
15 changes: 15 additions & 0 deletions lib/src/server/interceptor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,18 @@ import 'service.dart';
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
typedef Interceptor = FutureOr<GrpcError?> Function(
ServiceCall call, ServiceMethod method);

typedef ServerStreamingInvoker<Q, R> = Stream<R> Function(
ServiceCall call, ServiceMethod<Q, R> method, Stream<Q> requests);

/// A gRPC Interceptor.
///
/// An interceptor is called around the corresponding [ServiceMethod] invocation.
/// If the interceptor throws [GrpcError], the error will be returned as a response. [ServiceMethod] wouldn't be called if the error is thrown before calling the invoker.
/// If the interceptor modifies the provided stream, the invocation will continue with the provided stream.
abstract class ServerInterceptor {
Stream<R> intercept<Q, R>(ServiceCall call, ServiceMethod<Q, R> method,
Stream<Q> requests, ServerStreamingInvoker<Q, R> invoker) {
Comment on lines +40 to +41
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal here is to be close to the Client implementation. As the handler is a Stream, I did not add unary. It is probably doable tho.

return invoker(call, method, requests);
}
}
7 changes: 7 additions & 0 deletions lib/src/server/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class ServerTlsCredentials extends ServerCredentials {
class ConnectionServer {
final Map<String, Service> _services = {};
final List<Interceptor> _interceptors;
final List<ServerInterceptor> _serverInterceptors;
final CodecRegistry? _codecRegistry;
final GrpcErrorHandler? _errorHandler;
final ServerKeepAliveOptions _keepAliveOptions;
Expand All @@ -100,11 +101,13 @@ class ConnectionServer {
ConnectionServer(
List<Service> services, [
List<Interceptor> interceptors = const <Interceptor>[],
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
CodecRegistry? codecRegistry,
GrpcErrorHandler? errorHandler,
this._keepAliveOptions = const ServerKeepAliveOptions(),
]) : _codecRegistry = codecRegistry,
_interceptors = interceptors,
_serverInterceptors = serverInterceptors,
_errorHandler = errorHandler {
for (final service in services) {
_services[service.$name] = service;
Expand Down Expand Up @@ -168,6 +171,7 @@ class ConnectionServer {
stream: stream,
serviceLookup: lookupService,
interceptors: _interceptors,
serverInterceptors: _serverInterceptors,
codecRegistry: _codecRegistry,
// ignore: unnecessary_cast
clientCertificate: clientCertificate as io_bits.X509Certificate?,
Expand Down Expand Up @@ -201,11 +205,13 @@ class Server extends ConnectionServer {
required List<Service> services,
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
List<Interceptor> interceptors = const <Interceptor>[],
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
CodecRegistry? codecRegistry,
GrpcErrorHandler? errorHandler,
}) : super(
services,
interceptors,
serverInterceptors,
codecRegistry,
errorHandler,
keepAliveOptions,
Expand Down Expand Up @@ -308,6 +314,7 @@ class Server extends ConnectionServer {
stream: stream,
serviceLookup: lookupService,
interceptors: _interceptors,
serverInterceptors: _serverInterceptors,
codecRegistry: _codecRegistry,
// ignore: unnecessary_cast
clientCertificate: clientCertificate as io_bits.X509Certificate?,
Expand Down
48 changes: 36 additions & 12 deletions lib/src/server/service.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'dart:async';

import '../shared/status.dart';
import 'call.dart';
import 'interceptor.dart';

/// Definition of a gRPC service method.
class ServiceMethod<Q, R> {
Expand Down Expand Up @@ -48,19 +49,42 @@ class ServiceMethod<Q, R> {

List<int> serialize(dynamic response) => responseSerializer(response as R);

Stream<R> handle(ServiceCall call, Stream<Q> requests) {
if (streamingResponse) {
if (streamingRequest) {
return handler(call, requests);
} else {
return handler(call, _toSingleFuture(requests));
}
} else {
final response = streamingRequest
? handler(call, requests)
: handler(call, _toSingleFuture(requests));
return response.asStream();
ServerStreamingInvoker<Q, R> _createCall() => ((
ServiceCall call,
ServiceMethod<Q, R> method,
Stream<Q> requests,
) {
if (streamingResponse) {
if (streamingRequest) {
return handler(call, requests);
} else {
return handler(call, _toSingleFuture(requests));
}
} else {
final response = streamingRequest
? handler(call, requests)
: handler(call, _toSingleFuture(requests));
return response.asStream();
}
});

Stream<R> handle(
ServiceCall call,
Stream<Q> requests,
List<ServerInterceptor> interceptors,
) {
var invoker = _createCall();

for (final interceptor in interceptors.reversed) {
final delegate = invoker;
// invoker is actually reassigned in the same scope as the above function,
// reassigning invoker in delegate is required to avoid an infinite
// recursion
invoker = (call, method, requests) =>
interceptor.intercept<Q, R>(call, method, requests, delegate);
}

return invoker(call, this, requests);
}

Future<Q> _toSingleFuture(Stream<Q> stream) {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: grpc
version: 4.0.4
version: 4.1.0
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
repository: https://github.com/grpc/grpc-dart

Expand Down
Loading
Loading