Skip to content

Commit

Permalink
pass ctx with reva log in the init services
Browse files Browse the repository at this point in the history
  • Loading branch information
gmgigi96 committed Oct 27, 2022
1 parent fedaffb commit 5e15c18
Show file tree
Hide file tree
Showing 150 changed files with 430 additions and 346 deletions.
16 changes: 13 additions & 3 deletions cmd/revad/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package runtime

import (
"context"
"fmt"
"io"
"log"
Expand All @@ -29,6 +30,7 @@ import (
"strings"

"github.com/cs3org/reva/cmd/revad/internal/grace"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/logger"
"github.com/cs3org/reva/pkg/registry/memory"
"github.com/cs3org/reva/pkg/rgrpc"
Expand Down Expand Up @@ -184,18 +186,25 @@ func handlePIDFlag(l *zerolog.Logger, pidFile string) (*grace.Watcher, error) {
return w, nil
}

func ctxWithInitLogger(ctx context.Context, log *zerolog.Logger, pkg string) context.Context {
l := log.With().Str("context", "init").Str("pkg", pkg).Logger()
return appctx.WithLogger(ctx, &l)
}

func start(mainConf map[string]interface{}, servers map[string]grace.Server, listeners map[string]net.Listener, log *zerolog.Logger, watcher *grace.Watcher) {
if isEnabledHTTP(mainConf) {
go func() {
if err := servers["http"].(*rhttp.Server).Start(listeners["http"]); err != nil {
ctx := ctxWithInitLogger(context.Background(), log, "http")
if err := servers["http"].(*rhttp.Server).Start(ctx, listeners["http"]); err != nil {
log.Error().Err(err).Msg("error starting the http server")
watcher.Exit(1)
}
}()
}
if isEnabledGRPC(mainConf) {
go func() {
if err := servers["grpc"].(*rgrpc.Server).Start(listeners["grpc"]); err != nil {
ctx := ctxWithInitLogger(context.Background(), log, "grpc")
if err := servers["grpc"].(*rgrpc.Server).Start(ctx, listeners["grpc"]); err != nil {
log.Error().Err(err).Msg("error starting the grpc server")
watcher.Exit(1)
}
Expand Down Expand Up @@ -264,7 +273,8 @@ func getHTTPServer(conf interface{}, l *zerolog.Logger) (*rhttp.Server, error) {
return s, nil
}

// adjustCPU parses string cpu and sets GOMAXPROCS
// adjustCPU parses string cpu and sets GOMAXPROCS
//
// according to its value. It accepts either
// a number (e.g. 3) or a percent (e.g. 50%).
// Default is to use all available cores.
Expand Down
1 change: 1 addition & 0 deletions internal/grpc/interceptors/eventsmiddleware/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func init() {

// NewUnary returns a new unary interceptor that emits events when needed
// no lint because of the switch statement that should be extendable
//
//nolint:gocritic
func NewUnary(m map[string]interface{}) (grpc.UnaryServerInterceptor, int, error) {
publisher, err := publisherFromConfig(m)
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/applicationauth/applicationauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (s *service) Register(ss *grpc.Server) {
appauthpb.RegisterApplicationsAPIServer(ss, s)
}

func getAppAuthManager(c *config) (appauth.Manager, error) {
func getAppAuthManager(ctx context.Context, c *config) (appauth.Manager, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("driver not found: " + c.Driver)
}
Expand All @@ -73,15 +73,15 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New creates a app auth provider svc
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {

c, err := parseConfig(m)
if err != nil {
return nil, err
}
c.init()

am, err := getAppAuthManager(c)
am, err := getAppAuthManager(ctx, c)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/appprovider/appprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New creates a new AppProviderService
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

provider, err := getProvider(c)
provider, err := getProvider(ctx, c)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,9 +163,9 @@ func (s *service) Register(ss *grpc.Server) {
providerpb.RegisterProviderAPIServer(ss, s)
}

func getProvider(c *config) (app.Provider, error) {
func getProvider(ctx context.Context, c *config) (app.Provider, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("driver not found: " + c.Driver)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/appregistry/appregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func (c *config) init() {
}

// New creates a new StorageRegistryService
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {

c, err := parseConfig(m)
if err != nil {
return nil, err
}

reg, err := getRegistry(c)
reg, err := getRegistry(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -92,9 +92,9 @@ func parseConfig(m map[string]interface{}) (*config, error) {
return c, nil
}

func getRegistry(c *config) (app.Registry, error) {
func getRegistry(ctx context.Context, c *config) (app.Registry, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("appregistrysvc: driver not found: " + c.Driver)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/grpc/services/appregistry/appregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func Test_ListAppProviders(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rr, err := static.New(map[string]interface{}{"providers": tt.providers, "mime_types": tt.mimeTypes})
rr, err := static.New(context.Background(), map[string]interface{}{"providers": tt.providers, "mime_types": tt.mimeTypes})
if err != nil {
t.Errorf("could not create registry error = %v", err)
return
Expand Down Expand Up @@ -296,7 +296,7 @@ func Test_GetAppProviders(t *testing.T) {
},
}

rr, err := static.New(map[string]interface{}{"providers": providers, "mime_types": mimeTypes})
rr, err := static.New(context.Background(), map[string]interface{}{"providers": providers, "mime_types": mimeTypes})
if err != nil {
t.Errorf("could not create registry error = %v", err)
return
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestNew(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := New(tt.m, nil)
got, err := New(context.Background(), tt.m, nil)
if err != nil {
assert.Equal(t, tt.wantErr, err.Error())
assert.Nil(t, got)
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/authprovider/authprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func parseConfig(m map[string]interface{}) (*config, error) {
return c, nil
}

func getAuthManager(manager string, m map[string]map[string]interface{}) (auth.Manager, *plugin.RevaPlugin, error) {
func getAuthManager(ctx context.Context, manager string, m map[string]map[string]interface{}) (auth.Manager, *plugin.RevaPlugin, error) {
if manager == "" {
return nil, nil, errtypes.InternalError("authsvc: driver not configured for auth manager")
}
Expand All @@ -85,7 +85,7 @@ func getAuthManager(manager string, m map[string]map[string]interface{}) (auth.M
return authManager, p, nil
} else if _, ok := err.(errtypes.NotFound); ok {
if f, ok := registry.NewFuncs[manager]; ok {
authmgr, err := f(m[manager])
authmgr, err := f(ctx, m[manager])
return authmgr, nil, err
}
} else {
Expand All @@ -95,13 +95,13 @@ func getAuthManager(manager string, m map[string]map[string]interface{}) (auth.M
}

// New returns a new AuthProviderServiceServer.
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

authManager, plug, err := getAuthManager(c.AuthManager, c.AuthManagers)
authManager, plug, err := getAuthManager(ctx, c.AuthManager, c.AuthManagers)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/authregistry/authregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (c *config) init() {
}

// New creates a new AuthRegistry
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

c.init()

reg, err := getRegistry(c)
reg, err := getRegistry(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -94,9 +94,9 @@ func parseConfig(m map[string]interface{}) (*config, error) {
return c, nil
}

func getRegistry(c *config) (auth.Registry, error) {
func getRegistry(ctx context.Context, c *config) (auth.Registry, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("authregistrysvc: driver not found: " + c.Driver)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func (s *service) Register(ss *grpc.Server) {
datatx.RegisterTxAPIServer(ss, s)
}

func getDatatxManager(c *config) (txdriver.Manager, error) {
func getDatatxManager(ctx context.Context, c *config) (txdriver.Manager, error) {
if f, ok := txregistry.NewFuncs[c.TxDriver]; ok {
return f(c.TxDrivers[c.TxDriver])
return f(ctx, c.TxDrivers[c.TxDriver])
}
return nil, errtypes.NotFound("datatx service: driver not found: " + c.TxDriver)
}
Expand All @@ -117,15 +117,15 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New creates a new datatx svc
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {

c, err := parseConfig(m)
if err != nil {
return nil, err
}
c.init()

txManager, err := getDatatxManager(c)
txManager, err := getDatatxManager(ctx, c)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package gateway

import (
"context"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -129,7 +130,7 @@ type svc struct {
// New creates a new gateway svc that acts as a proxy for any grpc operation.
// The gateway is responsible for high-level controls: rate-limiting, coordination between svcs
// like sharing and storage acls, asynchronous transactions, ...
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/groupprovider/groupprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,22 @@ func parseConfig(m map[string]interface{}) (*config, error) {
return c, nil
}

func getDriver(c *config) (group.Manager, error) {
func getDriver(ctx context.Context, c *config) (group.Manager, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for group manager", c.Driver))
}

// New returns a new GroupProviderServiceServer.
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c, err := parseConfig(m)
if err != nil {
return nil, err
}

groupManager, err := getDriver(c)
groupManager, err := getDriver(ctx, c)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/grpc/services/helloworld/helloworld.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type service struct {
// New returns a new PreferencesServiceServer
// It can be tested like this:
// prototool grpc --address 0.0.0.0:9999 --method 'revad.helloworld.HelloWorldService/Hello' --data '{"name": "Alice"}'
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
c := &conf{}
if err := mapstructure.Decode(m, c); err != nil {
err = errors.Wrap(err, "helloworld: error decoding conf")
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/ocmcore/ocmcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (s *service) Register(ss *grpc.Server) {
ocmcore.RegisterOcmCoreAPIServer(ss, s)
}

func getShareManager(c *config) (share.Manager, error) {
func getShareManager(ctx context.Context, c *config) (share.Manager, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound(fmt.Sprintf("driver not found: %s", c.Driver))
}
Expand All @@ -78,15 +78,15 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New creates a new ocm core svc
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {

c, err := parseConfig(m)
if err != nil {
return nil, err
}
c.init()

sm, err := getShareManager(c)
sm, err := getShareManager(ctx, c)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/grpc/services/ocminvitemanager/ocminvitemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func (s *service) Register(ss *grpc.Server) {
invitepb.RegisterInviteAPIServer(ss, s)
}

func getInviteManager(c *config) (invite.Manager, error) {
func getInviteManager(ctx context.Context, c *config) (invite.Manager, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
return f(ctx, c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("driver not found: " + c.Driver)
}
Expand All @@ -73,15 +73,15 @@ func parseConfig(m map[string]interface{}) (*config, error) {
}

// New creates a new OCM invite manager svc
func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
func New(ctx context.Context, m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {

c, err := parseConfig(m)
if err != nil {
return nil, err
}
c.init()

im, err := getInviteManager(c)
im, err := getInviteManager(ctx, c)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 5e15c18

Please sign in to comment.