diff --git a/proxy/director.go b/proxy/director.go index 47b808f..655e30f 100644 --- a/proxy/director.go +++ b/proxy/director.go @@ -99,4 +99,4 @@ func (sb *SingleBackend) BuildError(err error) ([]byte, error) { // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string) ([]Backend, error) +type StreamDirector func(ctx context.Context, fullMethodName string) (Mode, []Backend, error) diff --git a/proxy/examples_test.go b/proxy/examples_test.go index 9803262..3d53bfd 100644 --- a/proxy/examples_test.go +++ b/proxy/examples_test.go @@ -25,7 +25,6 @@ func ExampleRegisterService() { // Register a TestService with 4 of its methods explicitly. proxy.RegisterService(server, director, "talos.testproto.TestService", - proxy.WithMode(proxy.One2Many), proxy.WithMethodNames("PingEmpty", "Ping", "PingError", "PingList"), proxy.WithStreamedMethodNames("PingList"), ) @@ -55,21 +54,21 @@ func ExampleStreamDirector() { } } - director = func(ctx context.Context, fullMethodName string) ([]proxy.Backend, error) { + director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, status.Errorf(codes.Unimplemented, "Unknown method") + return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { - return []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil + return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.staging.svc.local")}, nil } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { - return []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil + return proxy.One2One, []proxy.Backend{simpleBackendGen("api-service.prod.svc.local")}, nil } } - return nil, status.Errorf(codes.Unimplemented, "Unknown method") + return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/proxy/handler.go b/proxy/handler.go index 9fff9a9..c5d7dee 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -20,7 +20,6 @@ var ( ) type handlerOptions struct { - mode Mode serviceName string methodNames []string streamedMethods map[string]struct{} @@ -51,7 +50,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } - backends, err := s.director(serverStream.Context(), fullMethodName) + mode, backends, err := s.director(serverStream.Context(), fullMethodName) if err != nil { return err } @@ -80,7 +79,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error } } - switch s.options.mode { + switch mode { case One2One: if len(backendConnections) != 1 { return status.Errorf(codes.Internal, "one2one proxying can't should have exactly one connection (got %d)", len(backendConnections)) diff --git a/proxy/handler_one2many_test.go b/proxy/handler_one2many_test.go index 1f0f043..a54fbce 100644 --- a/proxy/handler_one2many_test.go +++ b/proxy/handler_one2many_test.go @@ -250,6 +250,7 @@ func (s *ProxyOne2ManySuite) TestPingEmptyTargets() { {"1", "2"}, {"3", "2", "1"}, {"0", "4"}, + {"3"}, } { md := metadata.Pairs(clientMdKey, "true") md.Set("targets", targets...) @@ -550,20 +551,20 @@ func (s *ProxyOne2ManySuite) SetupSuite() { } // Setup of the proxy's Director. - director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) { + director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) { var targets []int md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return nil, status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.One2Many, nil, status.Errorf(codes.PermissionDenied, "testing rejection") } if mdTargets, exists := md["targets"]; exists { for _, strTarget := range mdTargets { t, err := strconv.Atoi(strTarget) if err != nil { - return nil, err + return proxy.One2Many, nil, err } targets = append(targets, t) @@ -587,17 +588,16 @@ func (s *ProxyOne2ManySuite) SetupSuite() { } } - return result, nil + return proxy.One2Many, result, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2Many))), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, "talos.testproto.MultiService", - proxy.WithMode(proxy.One2Many), proxy.WithMethodNames("Ping", "PingStream", "PingStreamError"), proxy.WithStreamedMethodNames("PingStream", "PingStreamError"), ) diff --git a/proxy/handler_one2one_test.go b/proxy/handler_one2one_test.go index bc8ca20..e2e05fd 100644 --- a/proxy/handler_one2one_test.go +++ b/proxy/handler_one2one_test.go @@ -204,15 +204,15 @@ func (s *ProxyOne2OneSuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) // nolint: staticcheck require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string) ([]proxy.Backend, error) { + director := func(ctx context.Context, fullName string) (proxy.Mode, []proxy.Backend, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return nil, status.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.One2One, nil, status.Errorf(codes.PermissionDenied, "testing rejection") } } - return []proxy.Backend{ + return proxy.One2One, []proxy.Backend{ &proxy.SingleBackend{ GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) { md, _ := metadata.FromIncomingContext(ctx) @@ -225,7 +225,7 @@ func (s *ProxyOne2OneSuite) SetupSuite() { } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director, proxy.WithMode(proxy.One2One))), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. proxy.RegisterService(s.proxy, director, diff --git a/proxy/proxy.go b/proxy/proxy.go index 1f13a98..acdc612 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -57,15 +57,6 @@ func WithStreamedDetector(detector StreamedDetectorFunc) Option { } } -// WithMode sets proxying mode: One2One or One2Many. -// -// Default mode is One2One. -func WithMode(mode Mode) Option { - return func(o *handlerOptions) { - o.mode = mode - } -} - // RegisterService sets up a proxy handler for a particular gRPC service and method. // The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. //