diff --git a/.changelog/1829.feature.md b/.changelog/1829.breaking.md similarity index 100% rename from .changelog/1829.feature.md rename to .changelog/1829.breaking.md diff --git a/go/common/grpc/policy/api/api.go b/go/common/grpc/policy/api/api.go index 0c9a377c9d4..c6ea86ffa42 100644 --- a/go/common/grpc/policy/api/api.go +++ b/go/common/grpc/policy/api/api.go @@ -2,11 +2,21 @@ package api import ( "context" + "errors" + "fmt" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/accessctl" "github.com/oasislabs/oasis-core/go/common/grpc" "github.com/oasislabs/oasis-core/go/common/pubsub" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/peer" +) + +const ( + // ForwardedSubjectMD is name of the metadata field in which the actual + // subject should be passed in case sentry forwarded the request. + ForwardedSubjectMD = "forwarded-subject" ) // ServicePolicies contains policies for a GRPC service. @@ -28,3 +38,23 @@ type PolicyWatcherClient interface { // WatchPolicies returns updates to GRPC policies. WatchPolicies(ctx context.Context) (<-chan ServicePolicies, pubsub.ClosableSubscription, error) } + +// SubjectFromGRPCContext tries to extract subject from TLS Certificate provided +// in the gRPC context. +func SubjectFromGRPCContext(ctx context.Context) (string, error) { + peer, ok := peer.FromContext(ctx) + if !ok { + return "", errors.New("grpc: failed to obtain connection peer from context") + } + tlsAuth, ok := peer.AuthInfo.(credentials.TLSInfo) + if !ok { + return "", errors.New("grpc: unexpected peer authentication credentials") + } + if nPeerCerts := len(tlsAuth.State.PeerCertificates); nPeerCerts != 1 { + return "", fmt.Errorf("grpc: unexpected number of peer certificates: %d", nPeerCerts) + } + peerCert := tlsAuth.State.PeerCertificates[0] + subject := accessctl.SubjectFromX509Certificate(peerCert) + return string(subject), nil + +} diff --git a/go/common/grpc/policy/policy.go b/go/common/grpc/policy/policy.go index 9bfb245643b..b506d92f82d 100644 --- a/go/common/grpc/policy/policy.go +++ b/go/common/grpc/policy/policy.go @@ -23,12 +23,6 @@ var ( _ RuntimePolicyChecker = (*DynamicRuntimePolicyChecker)(nil) ) -const ( - // ForwardedSubjectMD is name of the metadata field in which the actual - // subject should be passed in case sentry forwarded the request. - ForwardedSubjectMD = "forwarded-subject" -) - // ErrForbiddenByPolicy is the error returned when an action is not allowed by policy. type ErrForbiddenByPolicy struct { method accessctl.Action @@ -135,7 +129,7 @@ func (c *DynamicRuntimePolicyChecker) CheckAccessAllowed( if !ok { return fmt.Errorf("grpc: failed getting metadata from context") } - forwardedSubjects, ok := md[ForwardedSubjectMD] + forwardedSubjects, ok := md[api.ForwardedSubjectMD] if !ok { // Not proxied through sentry, allow. return nil diff --git a/go/common/grpc/policy/policy_test.go b/go/common/grpc/policy/policy_test.go index 8f1d6d33eeb..f4105223b71 100644 --- a/go/common/grpc/policy/policy_test.go +++ b/go/common/grpc/policy/policy_test.go @@ -138,7 +138,7 @@ func pingHandler( // nolint: golint } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/PingServer/Ping", + FullMethod: "/PingService/Ping", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(PingServer).Ping(ctx, req.(*PingQuery)) @@ -174,7 +174,7 @@ func TestAccessPolicy(t *testing.T) { // Register the PolicyWatcherService. api.RegisterService(grpcServer.Server(), watcher) - policyChecker := NewDynamicRuntimePolicyChecker("ping", watcher) + policyChecker := NewDynamicRuntimePolicyChecker(cmnGrpc.ServiceName(serviceDesc.ServiceName), watcher) server := &pingServer{policyChecker} policy := accessctl.NewPolicy() policyChecker.SetAccessPolicy(policy, testNs) @@ -235,7 +235,7 @@ func TestAccessPolicy(t *testing.T) { _, err = client.Ping(ctx, &PingQuery{testNs}) require.EqualError( err, - "rpc error: code = PermissionDenied desc = grpc: calling /PingServer/Ping method for runtime 06956b25ae9fabb8295ce6879f0995c7ad02f8b2a1b22cbade17960a70d765ea not allowed for client CN=oasis-node", + "rpc error: code = PermissionDenied desc = grpc: calling /PingService/Ping method for runtime 06956b25ae9fabb8295ce6879f0995c7ad02f8b2a1b22cbade17960a70d765ea not allowed for client CN=oasis-node", "Calling Ping with an empty access policy should not be allowed", ) require.Equal(codes.PermissionDenied, status.Code(err), "returned gRPC error should be PermissionDenied") @@ -243,7 +243,7 @@ func TestAccessPolicy(t *testing.T) { // Add a policy rule to allow the client to call Ping. policy = accessctl.NewPolicy() subject := accessctl.SubjectFromX509Certificate(clientX509Cert) - policy.Allow(subject, "/PingServer/Ping") + policy.Allow(subject, "/PingService/Ping") policyChecker.SetAccessPolicy(policy, testNs) expectedPolicy[testNs] = policy @@ -257,6 +257,7 @@ func TestAccessPolicy(t *testing.T) { res, err := client.Ping(ctx, &PingQuery{testNs}) require.NoError(err, "Calling Ping with proper access policy set should succeed") require.IsType(&PingResponse{}, res, "Calling Ping should return a response of the correct type") + } func init() { diff --git a/go/common/grpc/proxy/proxy.go b/go/common/grpc/proxy/proxy.go index fee9ec79639..4a21de3a8ea 100644 --- a/go/common/grpc/proxy/proxy.go +++ b/go/common/grpc/proxy/proxy.go @@ -5,19 +5,15 @@ package proxy import ( "context" - "errors" - "fmt" "io" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" "google.golang.org/grpc/status" - "github.com/oasislabs/oasis-core/go/common/accessctl" - "github.com/oasislabs/oasis-core/go/common/grpc/policy" + "github.com/oasislabs/oasis-core/go/common/cbor" + policy "github.com/oasislabs/oasis-core/go/common/grpc/policy/api" "github.com/oasislabs/oasis-core/go/common/logging" ) @@ -46,24 +42,6 @@ type proxy struct { // between all requets. } -// TODO: this is duplicated with code in grpc/policy. Extract and reuse. -func extractPeerFromContext(ctx context.Context) (string, error) { - peer, ok := peer.FromContext(ctx) - if !ok { - return "", errors.New("grpc: failed to obtain connection peer from context") - } - tlsAuth, ok := peer.AuthInfo.(credentials.TLSInfo) - if !ok { - return "", errors.New("grpc: unexpected peer authentication credentials") - } - if nPeerCerts := len(tlsAuth.State.PeerCertificates); nPeerCerts != 1 { - return "", fmt.Errorf("grpc: unexpected number of peer certificates: %d", nPeerCerts) - } - peerCert := tlsAuth.State.PeerCertificates[0] - subject := accessctl.SubjectFromX509Certificate(peerCert) - return string(subject), nil -} - func (p *proxy) handler(srv interface{}, stream grpc.ServerStream) error { method, ok := grpc.MethodFromServerStream(stream) if !ok { @@ -78,11 +56,13 @@ func (p *proxy) handler(srv interface{}, stream grpc.ServerStream) error { ServerStreams: true, ClientStreams: true, } - sub, err := extractPeerFromContext(upstreamCtx) + sub, err := policy.SubjectFromGRPCContext(upstreamCtx) if err != nil { p.logger.Error("failed extracting peer from context", "err", err, ) + // XXX: failing here means proxy will only work with TLS Authenticated + // connections but that is fine. return status.Errorf(codes.Internal, "failed extracting peer from context") } // Pass subject header upstream. @@ -143,8 +123,8 @@ func (p *proxy) proxyUpstream(downstream grpc.ServerStream, upstream grpc.Client for { // XXX: since we are using CBOR we are able to unmarshal messages // without knowing the schema. This wouldn't work with protobuf, and - // a raw binary codec should be used instead. - var m interface{} + // a raw binary codec would have to be used. + var m cbor.RawMessage if err := downstream.RecvMsg(&m); err != nil { if err != io.EOF { p.logger.Error("failure receiving msg from client", @@ -178,14 +158,14 @@ func (p *proxy) proxyDownstream(upstream grpc.ClientStream, downstream grpc.Serv var headerSent bool go func() { for { - // Wait for stream msg (from server). + // Wait for stream msg (from upstream). // XXX: since we are using CBOR we are able to unmarshal messages // without knowing the schema. This wouldn't work with protobuf, and - // a raw binary codec should be used instead. - var m interface{} + // a raw binary codec would have to be used. + var m cbor.RawMessage if err := upstream.RecvMsg(&m); err != nil { if err != io.EOF { - p.logger.Error("failure receiving msg from server", + p.logger.Error("failure receiving msg from upstream", "err", err, ) } diff --git a/go/common/grpc/testing/ping.go b/go/common/grpc/testing/ping.go index 04160e918fd..941749c8792 100644 --- a/go/common/grpc/testing/ping.go +++ b/go/common/grpc/testing/ping.go @@ -169,7 +169,7 @@ func pingHandler( // nolint: golint } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/PingServer/Ping", + FullMethod: "/PingService/Ping", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(PingServer).Ping(ctx, req.(*PingQuery)) diff --git a/go/sentry/api/api.go b/go/sentry/api/api.go index 8ca3667051b..f153f7529f4 100644 --- a/go/sentry/api/api.go +++ b/go/sentry/api/api.go @@ -7,12 +7,15 @@ import ( "github.com/oasislabs/oasis-core/go/common/node" ) +// SentryAddresses contains sentry node consensus and committee addresses. +type SentryAddresses struct { + Consensus []node.ConsensusAddress + Committee []node.CommitteeAddress +} + // Backend is a sentry backend implementation. -// TODO: Make these into a single method. type Backend interface { - // GetConsensusAddresses returns the list of consensus addresses of the sentry node. - GetConsensusAddresses(context.Context) ([]node.ConsensusAddress, error) - - // GetCommitteeAddresses returns the CommitteeAddresses of the sentry node. - GetCommitteeAddresses(context.Context) ([]node.CommitteeAddress, error) + // Get addresses returns the list of consensus and committee addresses of + // the sentry node. + GetAddresses(context.Context) (*SentryAddresses, error) } diff --git a/go/sentry/api/grpc.go b/go/sentry/api/grpc.go index cf6e8e8b726..649e81c24c2 100644 --- a/go/sentry/api/grpc.go +++ b/go/sentry/api/grpc.go @@ -6,17 +6,14 @@ import ( "google.golang.org/grpc" cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc" - "github.com/oasislabs/oasis-core/go/common/node" ) var ( // serviceName is the gRPC service name. serviceName = cmnGrpc.NewServiceName("Sentry") - // methodGetConsensusAddresses is the name of the GetConsensusAddresses method. - methodGetConsensusAddresses = serviceName.NewMethodName("GetConsensusAddresses") - // methodGetCommitteeAddresses is the name of the GetCommitteeAddresses method. - methodGetCommitteeAddresses = serviceName.NewMethodName("GetCommitteeAddresses") + // methodGetAddresses is the name of the GetAddresses method. + methodGetAddresses = serviceName.NewMethodName("GetAddresses") // serviceDesc is the gRPC service descriptor. serviceDesc = grpc.ServiceDesc{ @@ -24,52 +21,29 @@ var ( HandlerType: (*Backend)(nil), Methods: []grpc.MethodDesc{ { - MethodName: methodGetConsensusAddresses.Short(), - Handler: handlerGetConsensusAddresses, - }, - { - MethodName: methodGetCommitteeAddresses.Short(), - Handler: handlerGetCommitteeAddresses, + MethodName: methodGetAddresses.Short(), + Handler: handlerGetAddresses, }, }, Streams: []grpc.StreamDesc{}, } ) -func handlerGetConsensusAddresses( // nolint: golint +func handlerGetAddresses( // nolint: golint srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor, ) (interface{}, error) { if interceptor == nil { - return srv.(Backend).GetConsensusAddresses(ctx) + return srv.(Backend).GetAddresses(ctx) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: methodGetConsensusAddresses.Full(), + FullMethod: methodGetAddresses.Full(), } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetConsensusAddresses(ctx) - } - return interceptor(ctx, nil, info, handler) -} - -func handlerGetCommitteeAddresses( //nolint: golint - srv interface{}, - ctx context.Context, - dec func(interface{}) error, - interceptor grpc.UnaryServerInterceptor, -) (interface{}, error) { - if interceptor == nil { - return srv.(Backend).GetCommitteeAddresses(ctx) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: methodGetCommitteeAddresses.Full(), - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(Backend).GetCommitteeAddresses(ctx) + return srv.(Backend).GetAddresses(ctx) } return interceptor(ctx, nil, info, handler) } @@ -83,20 +57,12 @@ type sentryClient struct { conn *grpc.ClientConn } -func (c *sentryClient) GetConsensusAddresses(ctx context.Context) ([]node.ConsensusAddress, error) { - var rsp []node.ConsensusAddress - if err := c.conn.Invoke(ctx, methodGetConsensusAddresses.Full(), nil, &rsp); err != nil { - return nil, err - } - return rsp, nil -} - -func (c *sentryClient) GetCommitteeAddresses(ctx context.Context) ([]node.CommitteeAddress, error) { - var rsp []node.CommitteeAddress - if err := c.conn.Invoke(ctx, methodGetCommitteeAddresses.Full(), nil, &rsp); err != nil { +func (c *sentryClient) GetAddresses(ctx context.Context) (*SentryAddresses, error) { + var rsp SentryAddresses + if err := c.conn.Invoke(ctx, methodGetAddresses.Full(), nil, &rsp); err != nil { return nil, err } - return rsp, nil + return &rsp, nil } // NewSentryClient creates a new gRPC sentry client service. diff --git a/go/sentry/sentry.go b/go/sentry/sentry.go index 560c010a15f..454e0337a12 100644 --- a/go/sentry/sentry.go +++ b/go/sentry/sentry.go @@ -23,33 +23,33 @@ type backend struct { workerCommonCfg *workerCommon.Config } -func (b *backend) GetConsensusAddresses(ctx context.Context) ([]node.ConsensusAddress, error) { - addrs, err := b.consensus.GetAddresses() +func (b *backend) GetAddresses(ctx context.Context) (*api.SentryAddresses, error) { + // Consensus addresses. + consensusAddrs, err := b.consensus.GetAddresses() if err != nil { return nil, fmt.Errorf("sentry: error obtaining consensus addresses: %w", err) } b.logger.Debug("successfully obtained consensus addresses", - "addresses", addrs, + "addresses", consensusAddrs, ) - return addrs, nil -} - -func (b *backend) GetCommitteeAddresses(ctx context.Context) ([]node.CommitteeAddress, error) { - addrs, err := b.workerCommonCfg.GetNodeAddresses() + // Committe addresses. + committeeAddrs, err := b.workerCommonCfg.GetNodeAddresses() if err != nil { return nil, fmt.Errorf("sentry: error obtaining worker addresses: %w", err) } - var committeeAddresses []node.CommitteeAddress - for _, addr := range addrs { + for _, addr := range committeeAddrs { committeeAddresses = append(committeeAddresses, node.CommitteeAddress{ Certificate: b.identity.TLSCertificate.Certificate[0], Address: addr, }) } - return committeeAddresses, nil + return &api.SentryAddresses{ + Committee: committeeAddresses, + Consensus: consensusAddrs, + }, nil } // New constructs a new sentry Backend instance. diff --git a/go/worker/registration/worker.go b/go/worker/registration/worker.go index d7bd6948a1c..2ff3132df98 100644 --- a/go/worker/registration/worker.go +++ b/go/worker/registration/worker.go @@ -451,29 +451,17 @@ func (w *Worker) querySentries() ([]node.ConsensusAddress, []node.CommitteeAddre } defer client.Close() - // Query sentry nodes for their consensus address(es). - var addrs []node.ConsensusAddress - addrs, err = client.GetConsensusAddresses(w.ctx) + // Query sentry node for addresses. + sentryAddresses, err := client.GetAddresses(w.ctx) if err != nil { - w.logger.Warn("failed to obtain consensus address(es) from sentry node", + w.logger.Warn("failed to obtain addressesfrom sentry node", "err", err, "sentry_address", sentryAddr, ) - } else { - consensusAddrs = append(consensusAddrs, addrs...) } - // Query sentry nodes for their Committee address(s). - var cAddrs []node.CommitteeAddress - cAddrs, err = client.GetCommitteeAddresses(w.ctx) - if err != nil { - w.logger.Warn("failed to obtain committee info for sentry node", - "err", err, - "sentry_address", sentryAddr, - ) - } else { - committeeAddrs = append(committeeAddrs, cAddrs...) - } + consensusAddrs = append(consensusAddrs, sentryAddresses.Consensus...) + committeeAddrs = append(committeeAddrs, sentryAddresses.Committee...) } if len(consensusAddrs) == 0 { diff --git a/go/worker/sentry/grpc/init.go b/go/worker/sentry/grpc/init.go index 83972097b72..bf8711925cb 100644 --- a/go/worker/sentry/grpc/init.go +++ b/go/worker/sentry/grpc/init.go @@ -37,7 +37,6 @@ const ( // Flags has the configuration flags. var Flags = flag.NewFlagSet("", flag.ContinueOnError) -// TOOD: duplicated with: worker/registration/worker.go func initConnection(ident *identity.Identity) (*upstreamConn, error) { var err error var tlsCert *tlsPkg.Certificate @@ -48,10 +47,10 @@ func initConnection(ident *identity.Identity) (*upstreamConn, error) { // Parse certificate. tlsCert, err = tls.LoadCertificate(certFile) if err != nil { - return nil, fmt.Errorf("failed to load storage certificate file %v: %w", certFile, err) + return nil, fmt.Errorf("failed to load upstream certificate file %v: %w", certFile, err) } if len(tlsCert.Certificate) != 1 { - return nil, fmt.Errorf("storage certificate file %v should contain exactly 1 certificate in the chain", certFile) + return nil, fmt.Errorf("upstream certificate file %v should contain exactly 1 certificate in the chain", certFile) } var backendCert *x509.Certificate backendCert, err = x509.ParseCertificate(tlsCert.Certificate[0]) diff --git a/go/worker/sentry/grpc/worker.go b/go/worker/sentry/grpc/worker.go index 2ec9a1b8b80..09bfcc60f7d 100644 --- a/go/worker/sentry/grpc/worker.go +++ b/go/worker/sentry/grpc/worker.go @@ -111,7 +111,7 @@ func (g *Worker) worker() { g.policyWatcher = policyAPI.NewPolicyWatcherClient(g.conn) - // TODO: should close this after getting intial policy. + // XXX: this could also be closed after getting the intial policy. close(g.initCh) pCh, pSub, err := g.policyWatcher.WatchPolicies(g.ctx)