Skip to content

Commit

Permalink
todo fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Jan 9, 2020
1 parent 573a41e commit 0cd571c
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 127 deletions.
File renamed without changes.
30 changes: 30 additions & 0 deletions go/common/grpc/policy/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ package api

import (
"context"
"errors"
"fmt"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/accessctl"
"github.com/oasislabs/oasis-core/go/common/grpc"
"github.com/oasislabs/oasis-core/go/common/pubsub"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)

const (
// ForwardedSubjectMD is name of the metadata field in which the actual
// subject should be passed in case sentry forwarded the request.
ForwardedSubjectMD = "forwarded-subject"
)

// ServicePolicies contains policies for a GRPC service.
Expand All @@ -28,3 +38,23 @@ type PolicyWatcherClient interface {
// WatchPolicies returns updates to GRPC policies.
WatchPolicies(ctx context.Context) (<-chan ServicePolicies, pubsub.ClosableSubscription, error)
}

// SubjectFromGRPCContext tries to extract subject from TLS Certificate provided
// in the gRPC context.
func SubjectFromGRPCContext(ctx context.Context) (string, error) {
peer, ok := peer.FromContext(ctx)
if !ok {
return "", errors.New("grpc: failed to obtain connection peer from context")
}
tlsAuth, ok := peer.AuthInfo.(credentials.TLSInfo)
if !ok {
return "", errors.New("grpc: unexpected peer authentication credentials")
}
if nPeerCerts := len(tlsAuth.State.PeerCertificates); nPeerCerts != 1 {
return "", fmt.Errorf("grpc: unexpected number of peer certificates: %d", nPeerCerts)
}
peerCert := tlsAuth.State.PeerCertificates[0]
subject := accessctl.SubjectFromX509Certificate(peerCert)
return string(subject), nil

}
8 changes: 1 addition & 7 deletions go/common/grpc/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ var (
_ RuntimePolicyChecker = (*DynamicRuntimePolicyChecker)(nil)
)

const (
// ForwardedSubjectMD is name of the metadata field in which the actual
// subject should be passed in case sentry forwarded the request.
ForwardedSubjectMD = "forwarded-subject"
)

// ErrForbiddenByPolicy is the error returned when an action is not allowed by policy.
type ErrForbiddenByPolicy struct {
method accessctl.Action
Expand Down Expand Up @@ -135,7 +129,7 @@ func (c *DynamicRuntimePolicyChecker) CheckAccessAllowed(
if !ok {
return fmt.Errorf("grpc: failed getting metadata from context")
}
forwardedSubjects, ok := md[ForwardedSubjectMD]
forwardedSubjects, ok := md[api.ForwardedSubjectMD]
if !ok {
// Not proxied through sentry, allow.
return nil
Expand Down
9 changes: 5 additions & 4 deletions go/common/grpc/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func pingHandler( // nolint: golint
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/PingServer/Ping",
FullMethod: "/PingService/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PingServer).Ping(ctx, req.(*PingQuery))
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestAccessPolicy(t *testing.T) {
// Register the PolicyWatcherService.
api.RegisterService(grpcServer.Server(), watcher)

policyChecker := NewDynamicRuntimePolicyChecker("ping", watcher)
policyChecker := NewDynamicRuntimePolicyChecker(cmnGrpc.ServiceName(serviceDesc.ServiceName), watcher)
server := &pingServer{policyChecker}
policy := accessctl.NewPolicy()
policyChecker.SetAccessPolicy(policy, testNs)
Expand Down Expand Up @@ -235,15 +235,15 @@ func TestAccessPolicy(t *testing.T) {
_, err = client.Ping(ctx, &PingQuery{testNs})
require.EqualError(
err,
"rpc error: code = PermissionDenied desc = grpc: calling /PingServer/Ping method for runtime 06956b25ae9fabb8295ce6879f0995c7ad02f8b2a1b22cbade17960a70d765ea not allowed for client CN=oasis-node",
"rpc error: code = PermissionDenied desc = grpc: calling /PingService/Ping method for runtime 06956b25ae9fabb8295ce6879f0995c7ad02f8b2a1b22cbade17960a70d765ea not allowed for client CN=oasis-node",
"Calling Ping with an empty access policy should not be allowed",
)
require.Equal(codes.PermissionDenied, status.Code(err), "returned gRPC error should be PermissionDenied")

// Add a policy rule to allow the client to call Ping.
policy = accessctl.NewPolicy()
subject := accessctl.SubjectFromX509Certificate(clientX509Cert)
policy.Allow(subject, "/PingServer/Ping")
policy.Allow(subject, "/PingService/Ping")
policyChecker.SetAccessPolicy(policy, testNs)

expectedPolicy[testNs] = policy
Expand All @@ -257,6 +257,7 @@ func TestAccessPolicy(t *testing.T) {
res, err := client.Ping(ctx, &PingQuery{testNs})
require.NoError(err, "Calling Ping with proper access policy set should succeed")
require.IsType(&PingResponse{}, res, "Calling Ping should return a response of the correct type")

}

func init() {
Expand Down
42 changes: 11 additions & 31 deletions go/common/grpc/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ package proxy

import (
"context"
"errors"
"fmt"
"io"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/oasislabs/oasis-core/go/common/accessctl"
"github.com/oasislabs/oasis-core/go/common/grpc/policy"
"github.com/oasislabs/oasis-core/go/common/cbor"
policy "github.com/oasislabs/oasis-core/go/common/grpc/policy/api"
"github.com/oasislabs/oasis-core/go/common/logging"
)

Expand Down Expand Up @@ -46,24 +42,6 @@ type proxy struct {
// between all requets.
}

// TODO: this is duplicated with code in grpc/policy. Extract and reuse.
func extractPeerFromContext(ctx context.Context) (string, error) {
peer, ok := peer.FromContext(ctx)
if !ok {
return "", errors.New("grpc: failed to obtain connection peer from context")
}
tlsAuth, ok := peer.AuthInfo.(credentials.TLSInfo)
if !ok {
return "", errors.New("grpc: unexpected peer authentication credentials")
}
if nPeerCerts := len(tlsAuth.State.PeerCertificates); nPeerCerts != 1 {
return "", fmt.Errorf("grpc: unexpected number of peer certificates: %d", nPeerCerts)
}
peerCert := tlsAuth.State.PeerCertificates[0]
subject := accessctl.SubjectFromX509Certificate(peerCert)
return string(subject), nil
}

func (p *proxy) handler(srv interface{}, stream grpc.ServerStream) error {
method, ok := grpc.MethodFromServerStream(stream)
if !ok {
Expand All @@ -78,11 +56,13 @@ func (p *proxy) handler(srv interface{}, stream grpc.ServerStream) error {
ServerStreams: true,
ClientStreams: true,
}
sub, err := extractPeerFromContext(upstreamCtx)
sub, err := policy.SubjectFromGRPCContext(upstreamCtx)
if err != nil {
p.logger.Error("failed extracting peer from context",
"err", err,
)
// XXX: failing here means proxy will only work with TLS Authenticated
// connections but that is fine.
return status.Errorf(codes.Internal, "failed extracting peer from context")
}
// Pass subject header upstream.
Expand Down Expand Up @@ -143,8 +123,8 @@ func (p *proxy) proxyUpstream(downstream grpc.ServerStream, upstream grpc.Client
for {
// XXX: since we are using CBOR we are able to unmarshal messages
// without knowing the schema. This wouldn't work with protobuf, and
// a raw binary codec should be used instead.
var m interface{}
// a raw binary codec would have to be used.
var m cbor.RawMessage
if err := downstream.RecvMsg(&m); err != nil {
if err != io.EOF {
p.logger.Error("failure receiving msg from client",
Expand Down Expand Up @@ -178,14 +158,14 @@ func (p *proxy) proxyDownstream(upstream grpc.ClientStream, downstream grpc.Serv
var headerSent bool
go func() {
for {
// Wait for stream msg (from server).
// Wait for stream msg (from upstream).
// XXX: since we are using CBOR we are able to unmarshal messages
// without knowing the schema. This wouldn't work with protobuf, and
// a raw binary codec should be used instead.
var m interface{}
// a raw binary codec would have to be used.
var m cbor.RawMessage
if err := upstream.RecvMsg(&m); err != nil {
if err != io.EOF {
p.logger.Error("failure receiving msg from server",
p.logger.Error("failure receiving msg from upstream",
"err", err,
)
}
Expand Down
2 changes: 1 addition & 1 deletion go/common/grpc/testing/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func pingHandler( // nolint: golint
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/PingServer/Ping",
FullMethod: "/PingService/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PingServer).Ping(ctx, req.(*PingQuery))
Expand Down
15 changes: 9 additions & 6 deletions go/sentry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"github.com/oasislabs/oasis-core/go/common/node"
)

// SentryAddresses contains sentry node consensus and committee addresses.
type SentryAddresses struct {
Consensus []node.ConsensusAddress
Committee []node.CommitteeAddress
}

// Backend is a sentry backend implementation.
// TODO: Make these into a single method.
type Backend interface {
// GetConsensusAddresses returns the list of consensus addresses of the sentry node.
GetConsensusAddresses(context.Context) ([]node.ConsensusAddress, error)

// GetCommitteeAddresses returns the CommitteeAddresses of the sentry node.
GetCommitteeAddresses(context.Context) ([]node.CommitteeAddress, error)
// Get addresses returns the list of consensus and committee addresses of
// the sentry node.
GetAddresses(context.Context) (*SentryAddresses, error)
}
58 changes: 12 additions & 46 deletions go/sentry/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,44 @@ import (
"google.golang.org/grpc"

cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
"github.com/oasislabs/oasis-core/go/common/node"
)

var (
// serviceName is the gRPC service name.
serviceName = cmnGrpc.NewServiceName("Sentry")

// methodGetConsensusAddresses is the name of the GetConsensusAddresses method.
methodGetConsensusAddresses = serviceName.NewMethodName("GetConsensusAddresses")
// methodGetCommitteeAddresses is the name of the GetCommitteeAddresses method.
methodGetCommitteeAddresses = serviceName.NewMethodName("GetCommitteeAddresses")
// methodGetAddresses is the name of the GetAddresses method.
methodGetAddresses = serviceName.NewMethodName("GetAddresses")

// serviceDesc is the gRPC service descriptor.
serviceDesc = grpc.ServiceDesc{
ServiceName: string(serviceName),
HandlerType: (*Backend)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: methodGetConsensusAddresses.Short(),
Handler: handlerGetConsensusAddresses,
},
{
MethodName: methodGetCommitteeAddresses.Short(),
Handler: handlerGetCommitteeAddresses,
MethodName: methodGetAddresses.Short(),
Handler: handlerGetAddresses,
},
},
Streams: []grpc.StreamDesc{},
}
)

func handlerGetConsensusAddresses( // nolint: golint
func handlerGetAddresses( // nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
if interceptor == nil {
return srv.(Backend).GetConsensusAddresses(ctx)
return srv.(Backend).GetAddresses(ctx)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetConsensusAddresses.Full(),
FullMethod: methodGetAddresses.Full(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetConsensusAddresses(ctx)
}
return interceptor(ctx, nil, info, handler)
}

func handlerGetCommitteeAddresses( //nolint: golint
srv interface{},
ctx context.Context,
dec func(interface{}) error,
interceptor grpc.UnaryServerInterceptor,
) (interface{}, error) {
if interceptor == nil {
return srv.(Backend).GetCommitteeAddresses(ctx)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: methodGetCommitteeAddresses.Full(),
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(Backend).GetCommitteeAddresses(ctx)
return srv.(Backend).GetAddresses(ctx)
}
return interceptor(ctx, nil, info, handler)
}
Expand All @@ -83,20 +57,12 @@ type sentryClient struct {
conn *grpc.ClientConn
}

func (c *sentryClient) GetConsensusAddresses(ctx context.Context) ([]node.ConsensusAddress, error) {
var rsp []node.ConsensusAddress
if err := c.conn.Invoke(ctx, methodGetConsensusAddresses.Full(), nil, &rsp); err != nil {
return nil, err
}
return rsp, nil
}

func (c *sentryClient) GetCommitteeAddresses(ctx context.Context) ([]node.CommitteeAddress, error) {
var rsp []node.CommitteeAddress
if err := c.conn.Invoke(ctx, methodGetCommitteeAddresses.Full(), nil, &rsp); err != nil {
func (c *sentryClient) GetAddresses(ctx context.Context) (*SentryAddresses, error) {
var rsp SentryAddresses
if err := c.conn.Invoke(ctx, methodGetAddresses.Full(), nil, &rsp); err != nil {
return nil, err
}
return rsp, nil
return &rsp, nil
}

// NewSentryClient creates a new gRPC sentry client service.
Expand Down
22 changes: 11 additions & 11 deletions go/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,33 @@ type backend struct {
workerCommonCfg *workerCommon.Config
}

func (b *backend) GetConsensusAddresses(ctx context.Context) ([]node.ConsensusAddress, error) {
addrs, err := b.consensus.GetAddresses()
func (b *backend) GetAddresses(ctx context.Context) (*api.SentryAddresses, error) {
// Consensus addresses.
consensusAddrs, err := b.consensus.GetAddresses()
if err != nil {
return nil, fmt.Errorf("sentry: error obtaining consensus addresses: %w", err)
}
b.logger.Debug("successfully obtained consensus addresses",
"addresses", addrs,
"addresses", consensusAddrs,
)

return addrs, nil
}

func (b *backend) GetCommitteeAddresses(ctx context.Context) ([]node.CommitteeAddress, error) {
addrs, err := b.workerCommonCfg.GetNodeAddresses()
// Committe addresses.
committeeAddrs, err := b.workerCommonCfg.GetNodeAddresses()
if err != nil {
return nil, fmt.Errorf("sentry: error obtaining worker addresses: %w", err)
}

var committeeAddresses []node.CommitteeAddress
for _, addr := range addrs {
for _, addr := range committeeAddrs {
committeeAddresses = append(committeeAddresses, node.CommitteeAddress{
Certificate: b.identity.TLSCertificate.Certificate[0],
Address: addr,
})
}

return committeeAddresses, nil
return &api.SentryAddresses{
Committee: committeeAddresses,
Consensus: consensusAddrs,
}, nil
}

// New constructs a new sentry Backend instance.
Expand Down
Loading

0 comments on commit 0cd571c

Please sign in to comment.