Skip to content

Commit 1bacbe6

Browse files
committed
Add server interceptor acting as a middleware
1 parent 5ba3745 commit 1bacbe6

File tree

7 files changed

+326
-15
lines changed

7 files changed

+326
-15
lines changed

lib/grpc.dart

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ export 'src/client/proxy.dart' show Proxy;
4242
export 'src/client/transport/http2_credentials.dart'
4343
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
4444
export 'src/server/call.dart' show ServiceCall;
45-
export 'src/server/interceptor.dart' show Interceptor;
45+
export 'src/server/interceptor.dart'
46+
show Interceptor, ServerInterceptor, ServerStreamingInvoker;
4647
export 'src/server/server.dart'
4748
show
4849
ServerCredentials,

lib/src/server/handler.dart

+5-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class ServerHandler extends ServiceCall {
3737
final ServerTransportStream _stream;
3838
final ServiceLookup _serviceLookup;
3939
final List<Interceptor> _interceptors;
40+
final List<ServerInterceptor> _serverInterceptors;
4041
final CodecRegistry? _codecRegistry;
4142
final GrpcErrorHandler? _errorHandler;
4243

@@ -83,6 +84,7 @@ class ServerHandler extends ServiceCall {
8384
required ServerTransportStream stream,
8485
required ServiceLookup serviceLookup,
8586
required List<Interceptor> interceptors,
87+
required List<ServerInterceptor> serverInterceptors,
8688
required CodecRegistry? codecRegistry,
8789
X509Certificate? clientCertificate,
8890
InternetAddress? remoteAddress,
@@ -94,7 +96,8 @@ class ServerHandler extends ServiceCall {
9496
_codecRegistry = codecRegistry,
9597
_clientCertificate = clientCertificate,
9698
_remoteAddress = remoteAddress,
97-
_errorHandler = errorHandler;
99+
_errorHandler = errorHandler,
100+
_serverInterceptors = serverInterceptors;
98101

99102
@override
100103
DateTime? get deadline => _deadline;
@@ -239,7 +242,7 @@ class ServerHandler extends ServiceCall {
239242
return;
240243
}
241244

242-
_responses = _descriptor.handle(this, requests.stream);
245+
_responses = _descriptor.handle(this, requests.stream, _serverInterceptors);
243246

244247
_responseSubscription = _responses.listen(_onResponse,
245248
onError: _onResponseError,

lib/src/server/interceptor.dart

+15
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,18 @@ import 'service.dart';
2727
/// If the interceptor returns null, the corresponding [ServiceMethod] of [Service] will be called.
2828
typedef Interceptor = FutureOr<GrpcError?> Function(
2929
ServiceCall call, ServiceMethod method);
30+
31+
typedef ServerStreamingInvoker<Q, R> = Stream<R> Function(
32+
ServiceCall call, ServiceMethod<Q, R> method, Stream<Q> requests);
33+
34+
/// A gRPC Interceptor.
35+
///
36+
/// An interceptor is called around the corresponding [ServiceMethod] invocation.
37+
/// If the interceptor throws [GrpcError], the error will be returned as a response. [ServiceMethod] wouldn't be called if the error is thrown before calling the invoker.
38+
/// If the interceptor modifies the provided stream, the invocation will continue with the provided stream.
39+
abstract class ServerInterceptor {
40+
Stream<R> intercept<Q, R>(ServiceCall call, ServiceMethod<Q, R> method,
41+
Stream<Q> requests, ServerStreamingInvoker<Q, R> invoker) {
42+
return invoker(call, method, requests);
43+
}
44+
}

lib/src/server/server.dart

+7
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class ServerTlsCredentials extends ServerCredentials {
8787
class ConnectionServer {
8888
final Map<String, Service> _services = {};
8989
final List<Interceptor> _interceptors;
90+
final List<ServerInterceptor> _serverInterceptors;
9091
final CodecRegistry? _codecRegistry;
9192
final GrpcErrorHandler? _errorHandler;
9293
final ServerKeepAliveOptions _keepAliveOptions;
@@ -100,11 +101,13 @@ class ConnectionServer {
100101
ConnectionServer(
101102
List<Service> services, [
102103
List<Interceptor> interceptors = const <Interceptor>[],
104+
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
103105
CodecRegistry? codecRegistry,
104106
GrpcErrorHandler? errorHandler,
105107
this._keepAliveOptions = const ServerKeepAliveOptions(),
106108
]) : _codecRegistry = codecRegistry,
107109
_interceptors = interceptors,
110+
_serverInterceptors = serverInterceptors,
108111
_errorHandler = errorHandler {
109112
for (final service in services) {
110113
_services[service.$name] = service;
@@ -168,6 +171,7 @@ class ConnectionServer {
168171
stream: stream,
169172
serviceLookup: lookupService,
170173
interceptors: _interceptors,
174+
serverInterceptors: _serverInterceptors,
171175
codecRegistry: _codecRegistry,
172176
// ignore: unnecessary_cast
173177
clientCertificate: clientCertificate as io_bits.X509Certificate?,
@@ -201,11 +205,13 @@ class Server extends ConnectionServer {
201205
required List<Service> services,
202206
ServerKeepAliveOptions keepAliveOptions = const ServerKeepAliveOptions(),
203207
List<Interceptor> interceptors = const <Interceptor>[],
208+
List<ServerInterceptor> serverInterceptors = const <ServerInterceptor>[],
204209
CodecRegistry? codecRegistry,
205210
GrpcErrorHandler? errorHandler,
206211
}) : super(
207212
services,
208213
interceptors,
214+
serverInterceptors,
209215
codecRegistry,
210216
errorHandler,
211217
keepAliveOptions,
@@ -308,6 +314,7 @@ class Server extends ConnectionServer {
308314
stream: stream,
309315
serviceLookup: lookupService,
310316
interceptors: _interceptors,
317+
serverInterceptors: _serverInterceptors,
311318
codecRegistry: _codecRegistry,
312319
// ignore: unnecessary_cast
313320
clientCertificate: clientCertificate as io_bits.X509Certificate?,

lib/src/server/service.dart

+33-12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import 'dart:async';
1717

1818
import '../shared/status.dart';
1919
import 'call.dart';
20+
import 'interceptor.dart';
2021

2122
/// Definition of a gRPC service method.
2223
class ServiceMethod<Q, R> {
@@ -48,19 +49,39 @@ class ServiceMethod<Q, R> {
4849

4950
List<int> serialize(dynamic response) => responseSerializer(response as R);
5051

51-
Stream<R> handle(ServiceCall call, Stream<Q> requests) {
52-
if (streamingResponse) {
53-
if (streamingRequest) {
54-
return handler(call, requests);
55-
} else {
56-
return handler(call, _toSingleFuture(requests));
57-
}
58-
} else {
59-
final response = streamingRequest
60-
? handler(call, requests)
61-
: handler(call, _toSingleFuture(requests));
62-
return response.asStream();
52+
ServerStreamingInvoker<Q, R> _createCall() => ((
53+
ServiceCall call,
54+
ServiceMethod<Q, R> method,
55+
Stream<Q> requests,
56+
) {
57+
if (streamingResponse) {
58+
if (streamingRequest) {
59+
return handler(call, requests);
60+
} else {
61+
return handler(call, _toSingleFuture(requests));
62+
}
63+
} else {
64+
final response = streamingRequest
65+
? handler(call, requests)
66+
: handler(call, _toSingleFuture(requests));
67+
return response.asStream();
68+
}
69+
});
70+
71+
Stream<R> handle(
72+
ServiceCall call,
73+
Stream<Q> requests,
74+
List<ServerInterceptor> interceptors,
75+
) {
76+
var invoker = _createCall();
77+
78+
for (final interceptor in interceptors.reversed) {
79+
final delegate = invoker;
80+
invoker = (call, method, requests) =>
81+
interceptor.intercept<Q, R>(call, method, requests, delegate);
6382
}
83+
84+
return invoker(call, this, requests);
6485
}
6586

6687
Future<Q> _toSingleFuture(Stream<Q> stream) {

test/server_test.dart

+217
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,221 @@ void main() {
384384
await harness.fromServer.done;
385385
});
386386
});
387+
388+
group('Server with server interceptor', () {
389+
group('processes calls if interceptor allows request', () {
390+
const expectedRequest = 5;
391+
const expectedResponse = 7;
392+
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
393+
expect(await request, expectedRequest);
394+
return expectedResponse;
395+
}
396+
397+
Null interceptor(call, method, requests) {
398+
if (method.name == 'Unary') {
399+
return null;
400+
}
401+
throw GrpcError.unauthenticated('Request is unauthenticated');
402+
}
403+
404+
Future<void> doTest(TestServerInterceptorOnStart? handler) async {
405+
harness
406+
..serverInterceptor.onStart = handler
407+
..service.unaryHandler = methodHandler
408+
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
409+
410+
await harness.fromServer.done;
411+
}
412+
413+
test('with sync interceptor', () => doTest(interceptor));
414+
test(
415+
'with async interceptor',
416+
() => doTest((call, method, requests) async =>
417+
interceptor(call, method, requests)));
418+
});
419+
420+
group('returns error if interceptor blocks request', () {
421+
Null interceptor(call, method, requests) {
422+
if (method.name == 'Unary') {
423+
throw GrpcError.unauthenticated('Request is unauthenticated');
424+
}
425+
return null;
426+
}
427+
428+
Future<void> doTest(TestServerInterceptorOnStart handler) async {
429+
harness
430+
..serverInterceptor.onStart = handler
431+
..expectErrorResponse(
432+
StatusCode.unauthenticated, 'Request is unauthenticated')
433+
..sendRequestHeader('/Test/Unary');
434+
435+
await harness.fromServer.done;
436+
}
437+
438+
test('with sync interceptor', () => doTest(interceptor));
439+
test(
440+
'with async interceptor',
441+
() => doTest((call, method, request) async =>
442+
interceptor(call, method, request)));
443+
});
444+
445+
test("don't fail if interceptor await 2 times", () async {
446+
Future<Null> interceptor(call, method, requests) async {
447+
await Future.value();
448+
await Future.value();
449+
throw GrpcError.internal('Reason is unknown');
450+
}
451+
452+
harness
453+
..serverInterceptor.onStart = interceptor
454+
..expectErrorResponse(StatusCode.internal, 'Reason is unknown')
455+
..sendRequestHeader('/Test/Unary')
456+
..sendData(1);
457+
458+
await harness.fromServer.done;
459+
});
460+
461+
group('serviceInterceptors are invoked', () {
462+
const expectedRequest = 5;
463+
const expectedResponse = 7;
464+
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
465+
expect(await request, expectedRequest);
466+
return expectedResponse;
467+
}
468+
469+
Future<void> doTest(List<TestServerInterceptor> interceptors) async {
470+
harness
471+
// ↓ mutation: Server is already built
472+
..serverInterceptors.addAll(interceptors)
473+
..service.unaryHandler = methodHandler
474+
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
475+
476+
await harness.fromServer.done;
477+
}
478+
479+
test('single serviceInterceptor is invoked', () async {
480+
final invocationsOrderRecords = [];
481+
482+
await doTest([
483+
TestServerInterceptor(
484+
onStart: (call, method, requests) {
485+
invocationsOrderRecords.add('Start');
486+
},
487+
onData: (call, method, requests, data) {
488+
invocationsOrderRecords.add('Data [$data]');
489+
},
490+
onFinish: (call, method, requests) {
491+
invocationsOrderRecords.add('Done');
492+
},
493+
)
494+
]);
495+
496+
expect(invocationsOrderRecords, equals(['Start', 'Data [7]', 'Done']));
497+
});
498+
499+
test('multiple serviceInterceptors are invoked', () async {
500+
final invocationsOrderRecords = [];
501+
502+
await doTest([
503+
TestServerInterceptor(
504+
onStart: (call, method, requests) {
505+
invocationsOrderRecords.add('Start 1');
506+
},
507+
onData: (call, method, requests, data) {
508+
invocationsOrderRecords.add('Data 1 [$data]');
509+
},
510+
onFinish: (call, method, requests) {
511+
invocationsOrderRecords.add('Done 1');
512+
},
513+
),
514+
TestServerInterceptor(
515+
onStart: (call, method, requests) {
516+
invocationsOrderRecords.add('Start 2');
517+
},
518+
onData: (call, method, requests, data) {
519+
invocationsOrderRecords.add('Data 2 [$data]');
520+
},
521+
onFinish: (call, method, requests) {
522+
invocationsOrderRecords.add('Done 2');
523+
},
524+
)
525+
]);
526+
527+
expect(
528+
invocationsOrderRecords,
529+
equals([
530+
'Start 1',
531+
'Start 2',
532+
'Data 2 [7]',
533+
'Data 1 [7]',
534+
'Done 2',
535+
'Done 1',
536+
]));
537+
});
538+
});
539+
540+
test('can modify response', () async {
541+
const expectedRequest = 5;
542+
const baseResponse = 7;
543+
const expectedResponse = 14;
544+
545+
final invocationsOrderRecords = [];
546+
547+
final interceptors = [
548+
TestServerInterceptor(
549+
onStart: (call, method, requests) {
550+
invocationsOrderRecords.add('Start 1');
551+
},
552+
onData: (call, method, requests, data) {
553+
invocationsOrderRecords.add('Data 1 [$data]');
554+
},
555+
onFinish: (call, method, requests) {
556+
invocationsOrderRecords.add('Done 1');
557+
},
558+
),
559+
TestServerInterruptingInterceptor(transform: <R>(value) {
560+
if (value is int) {
561+
return value * 2 as R;
562+
}
563+
564+
return value;
565+
}),
566+
TestServerInterceptor(
567+
onStart: (call, method, requests) {
568+
invocationsOrderRecords.add('Start 2');
569+
},
570+
onData: (call, method, requests, data) {
571+
invocationsOrderRecords.add('Data 2 [$data]');
572+
},
573+
onFinish: (call, method, requests) {
574+
invocationsOrderRecords.add('Done 2');
575+
},
576+
)
577+
];
578+
579+
Future<int> methodHandler(ServiceCall call, Future<int> request) async {
580+
expect(await request, expectedRequest);
581+
return baseResponse;
582+
}
583+
584+
harness
585+
// ↓ mutation: Server is already built
586+
..serverInterceptors.addAll(interceptors)
587+
..service.unaryHandler = methodHandler
588+
..runTest('/Test/Unary', [expectedRequest], [expectedResponse]);
589+
590+
await harness.fromServer.done;
591+
592+
expect(
593+
invocationsOrderRecords,
594+
equals([
595+
'Start 1',
596+
'Start 2',
597+
'Data 2 [7]',
598+
'Data 1 [14]',
599+
'Done 2',
600+
'Done 1',
601+
]));
602+
});
603+
});
387604
}

0 commit comments

Comments
 (0)