Skip to content

Commit

Permalink
Removed tags; Simplified interceptor code; Added logging fields edita…
Browse files Browse the repository at this point in the history
…bility.

Fixes #382


Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 30, 2021
1 parent 215af81 commit 07b4ff5
Show file tree
Hide file tree
Showing 34 changed files with 556 additions and 1,002 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,4 @@ coverage.txt
vendor/

.envrc
.bin
.bin
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Types of changes:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## v2

### Changed

* `tags` removed. Use `logging.ExtractFields` to read logging fields from logging interceptor for your local request logger. Use `logging.InjectFields` to inject custom fields to logging interceptor to client context or interceptor before logging interceptor.

## [Unreleased]

### Added
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 h1:FlFbCRLd5Jr4iYXZufAvgWN6Ao0JrI5chLINnUXDDr0=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
11 changes: 5 additions & 6 deletions interceptors/auth/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
pb "google.golang.org/grpc/examples/helloworld/helloworld"
"google.golang.org/grpc/status"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
)

var tokenInfoKey struct{}
Expand All @@ -35,12 +36,10 @@ func exampleAuthFunc(ctx context.Context) (context.Context, error) {
return nil, status.Errorf(codes.Unauthenticated, "invalid auth token: %v", err)
}

tags.Extract(ctx).Set("auth.sub", userClaimFromToken(tokenInfo))

// WARNING: in production define your own type to avoid context collisions
newCtx := context.WithValue(ctx, tokenInfoKey, tokenInfo)
ctx = logging.InjectFields(ctx, logging.Fields{"auth.sub", userClaimFromToken(tokenInfo)})

return newCtx, nil
// WARNING: In production define your own type to avoid context collisions.
return context.WithValue(ctx, tokenInfoKey, tokenInfo), nil
}

// Simple example of server initialization code
Expand Down
4 changes: 2 additions & 2 deletions interceptors/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
r := newReport(Unary, method)
reporter, newCtx := reportable.ClientReporter(ctx, req, r.rpcType, r.service, r.method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})

reporter.PostMsgSend(req, nil, time.Since(r.startTime))
err := invoker(newCtx, method, req, reply, cc, opts...)
Expand All @@ -31,7 +31,7 @@ func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterce
func StreamClientInterceptor(reportable ClientReportable) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
r := newReport(clientStreamType(desc), method)
reporter, newCtx := reportable.ClientReporter(ctx, nil, r.rpcType, r.service, r.method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: nil, Typ: r.rpcType, Service: r.service, Method: r.method})

clientStream, err := streamer(newCtx, desc, cc, method, opts...)
if err != nil {
Expand Down
49 changes: 16 additions & 33 deletions interceptors/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (
)

type mockReport struct {
typ GRPCType
svcName, methodName string
CallMeta

postCalls []error
postMsgSends []error
Expand All @@ -40,9 +39,9 @@ type mockReportable struct {
func (m *mockReportable) Equal(t *testing.T, expected []*mockReport) {
require.Len(t, expected, len(m.reports))
for i, e := range m.reports {
require.Equal(t, expected[i].typ, e.typ, "%v", i)
require.Equal(t, expected[i].svcName, e.svcName, "%v", i)
require.Equal(t, expected[i].methodName, e.methodName, "%v", i)
require.Equal(t, expected[i].Typ, e.Typ, "%v", i)
require.Equal(t, expected[i].Service, e.Service, "%v", i)
require.Equal(t, expected[i].Method, e.Method, "%v", i)

require.Len(t, expected[i].postCalls, len(e.postCalls), "%v", i)
for k, err := range e.postCalls {
Expand Down Expand Up @@ -111,14 +110,14 @@ func (m *mockReportable) PostMsgReceive(_ interface{}, err error, _ time.Duratio
m.curr.postMsgReceives = append(m.curr.postMsgReceives, err)
}

func (m *mockReportable) ClientReporter(ctx context.Context, _ interface{}, typ GRPCType, serviceName string, methodName string) (Reporter, context.Context) {
m.curr = &mockReport{typ: typ, svcName: serviceName, methodName: methodName}
func (m *mockReportable) ClientReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
m.curr = &mockReport{CallMeta: c}
m.reports = append(m.reports, m.curr)
return m, ctx
}

func (m *mockReportable) ServerReporter(ctx context.Context, _ interface{}, typ GRPCType, serviceName string, methodName string) (Reporter, context.Context) {
m.curr = &mockReport{typ: typ, svcName: serviceName, methodName: methodName}
func (m *mockReportable) ServerReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
m.curr = &mockReport{CallMeta: c}
m.reports = append(m.reports, m.curr)
return m, ctx
}
Expand Down Expand Up @@ -206,9 +205,7 @@ func (s *ClientInterceptorTestSuite) TestUnaryReporting() {
_, err := s.testClient.PingEmpty(s.ctx, &testpb.PingEmptyRequest{}) // should return with code=OK
require.NoError(s.T(), err)
s.mock.Equal(s.T(), []*mockReport{{
typ: Unary,
svcName: testpb.TestServiceFullName,
methodName: "PingEmpty",
CallMeta: CallMeta{Typ: Unary, Service: testpb.TestServiceFullName, Method: "PingEmpty"},
postCalls: []error{nil},
postMsgReceives: []error{nil},
postMsgSends: []error{nil},
Expand All @@ -218,9 +215,7 @@ func (s *ClientInterceptorTestSuite) TestUnaryReporting() {
_, err = s.testClient.PingError(s.ctx, &testpb.PingErrorRequest{ErrorCodeReturned: uint32(codes.FailedPrecondition)}) // should return with code=FailedPrecondition
require.Error(s.T(), err)
s.mock.Equal(s.T(), []*mockReport{{
typ: Unary,
svcName: testpb.TestServiceFullName,
methodName: "PingError",
CallMeta: CallMeta{Typ: Unary, Service: testpb.TestServiceFullName, Method: "PingError"},
postCalls: []error{status.Errorf(codes.FailedPrecondition, "Userspace error.")},
postMsgReceives: []error{status.Errorf(codes.FailedPrecondition, "Userspace error.")},
postMsgSends: []error{nil},
Expand All @@ -233,9 +228,7 @@ func (s *ClientInterceptorTestSuite) TestStartedListReporting() {

// Even without reading, we should get initial mockReport.
s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}})

Expand All @@ -244,14 +237,10 @@ func (s *ClientInterceptorTestSuite) TestStartedListReporting() {

// Even without reading, we should get initial mockReport.
s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}, {
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postMsgSends: []error{nil},
}})
}
Expand All @@ -273,9 +262,7 @@ func (s *ClientInterceptorTestSuite) TestListReporting() {
require.EqualValues(s.T(), testpb.ListResponseCount, count, "Number of received msg on the wire must match")

s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postCalls: []error{io.EOF},
postMsgReceives: append(make([]error, testpb.ListResponseCount), io.EOF),
postMsgSends: []error{nil},
Expand All @@ -298,9 +285,7 @@ func (s *ClientInterceptorTestSuite) TestListReporting() {
require.Equal(s.T(), codes.FailedPrecondition, st.Code(), "Recv must return FailedPrecondition, otherwise the test is wrong")

s.mock.Equal(s.T(), []*mockReport{{
typ: ServerStream,
svcName: testpb.TestServiceFullName,
methodName: "PingList",
CallMeta: CallMeta{Typ: ServerStream, Service: testpb.TestServiceFullName, Method: "PingList"},
postCalls: []error{status.Errorf(codes.FailedPrecondition, "foobar"), status.Errorf(codes.FailedPrecondition, "foobar")},
postMsgReceives: []error{status.Errorf(codes.FailedPrecondition, "foobar"), status.Errorf(codes.FailedPrecondition, "foobar")},
postMsgSends: []error{nil},
Expand Down Expand Up @@ -342,9 +327,7 @@ func (s *ClientInterceptorTestSuite) TestBiStreamingReporting() {

require.EqualValues(s.T(), count, 100, "Number of received msg on the wire must match")
s.mock.Equal(s.T(), []*mockReport{{
typ: BidiStream,
svcName: testpb.TestServiceFullName,
methodName: "PingStream",
CallMeta: CallMeta{Typ: BidiStream, Service: testpb.TestServiceFullName, Method: "PingStream"},
postCalls: []error{io.EOF},
postMsgReceives: append(make([]error, 100), io.EOF),
postMsgSends: make([]error, 100),
Expand Down
13 changes: 6 additions & 7 deletions interceptors/logging/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
/*
logging is a "parent" package for gRPC logging middlewares.
The gRPC logging middleware populates request-scoped data to `grpc_ctxtags.Tags` that relate to the current gRPC call
(e.g. service and method names).
The gRPC logging middleware populates request-scoped data to `logging.Fields` that relate to the current gRPC call
(e.g. service and method names). You can laverage that data using `logging.ExtractFields` and `logging.InjectFields`.
Once the gRPC logging middleware has added the gRPC specific Tags to the ctx they will then be written with the logs
that are made using the `ctx_logrus` or `ctx_zap` loggers.
Once the gRPC logging middleware has added the gRPC specific Fields to the ctx they will then be written with the log lines.
All logging middleware will emit a final log statement. It is based on the error returned by the handler function,
the gRPC status code, an error (if any) and it emit at a level controlled via `WithLevels`.
the gRPC status code, an error (if any) and it emit at a level controlled via `WithLevels`. You can control this behavior
using `WithDecider`.
This parent package
This particular package is intended for use by other middleware, logging or otherwise. It contains interfaces that other
logging middlewares *could* share . This allows code to be shared between different implementations.
logging middlewares *could* share. This allows code to be shared between different implementations.
Field names
Expand All @@ -31,6 +31,5 @@ Implementations:
* providers/kit
* providers/zerolog
See relevant packages below.
*/
package logging
101 changes: 48 additions & 53 deletions interceptors/logging/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,20 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/peer"

"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags"
)

// extractFields returns all fields from tags.
func extractFields(tags tags.Tags) Fields {
var fields Fields
for k, v := range tags.Values() {
fields = append(fields, k, v)
}
return fields
}

type reporter struct {
interceptors.CallMeta

ctx context.Context
typ interceptors.GRPCType
service, method string
startCallLogged bool
opts *options
logger Logger
kind string
startCallLogged bool

opts *options
logger Logger
}

func (c *reporter) logMessage(logger Logger, err error, msg string, duration time.Duration) {
Expand All @@ -37,12 +29,11 @@ func (c *reporter) logMessage(logger Logger, err error, msg string, duration tim
if err != nil {
logger = logger.With("grpc.error", fmt.Sprintf("%v", err))
}
logger = logger.With(extractFields(tags.Extract(c.ctx))...)
logger.With(c.opts.durationFieldFunc(duration)...).Log(c.opts.levelFunc(code), msg)
}

func (c *reporter) PostCall(err error, duration time.Duration) {
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogFinishCall, LogStartAndFinishCall:
if err == io.EOF {
err = nil
Expand All @@ -57,7 +48,7 @@ func (c *reporter) PostMsgSend(_ interface{}, err error, duration time.Duration)
if c.startCallLogged {
return
}
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogStartAndFinishCall:
c.startCallLogged = true
c.logMessage(c.logger, err, "started call", duration)
Expand All @@ -68,68 +59,72 @@ func (c *reporter) PostMsgReceive(_ interface{}, err error, duration time.Durati
if c.startCallLogged {
return
}
switch c.opts.shouldLog(interceptors.FullMethod(c.service, c.method), err) {
switch c.opts.shouldLog(c.FullMethod(), err) {
case LogStartAndFinishCall:
c.startCallLogged = true
c.logMessage(c.logger, err, "started call", duration)
}
}

type reportable struct {
opts *options
logger Logger
}

func (r *reportable) ServerReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return r.reporter(ctx, typ, service, method, KindServerFieldValue)
}
func reportable(logger Logger, opts *options) interceptors.CommonReportableFunc {
return func(ctx context.Context, c interceptors.CallMeta, isClient bool) (interceptors.Reporter, context.Context) {
kind := KindServerFieldValue
if isClient {
kind = KindClientFieldValue
}

func (r *reportable) ClientReporter(ctx context.Context, _ interface{}, typ interceptors.GRPCType, service string, method string) (interceptors.Reporter, context.Context) {
return r.reporter(ctx, typ, service, method, KindClientFieldValue)
}
fields := newCommonFields(kind, c)
if !isClient {
if peer, ok := peer.FromContext(ctx); ok {
fields = append(fields, "peer.address", peer.Addr.String())
}
}
fields = fields.AppendUnique(ExtractFields(ctx))

func (r *reportable) reporter(ctx context.Context, typ interceptors.GRPCType, service string, method string, kind string) (interceptors.Reporter, context.Context) {
fields := commonFields(kind, typ, service, method)
fields = append(fields, "grpc.start_time", time.Now().Format(time.RFC3339))
if d, ok := ctx.Deadline(); ok {
fields = append(fields, "grpc.request.deadline", d.Format(time.RFC3339))
// Start/end specific fields. We don't want to pollute potential request fields downstream might use.
singleUseFields := []string{"grpc.start_time", time.Now().Format(time.RFC3339)}
if d, ok := ctx.Deadline(); ok {
singleUseFields = append(singleUseFields, "grpc.request.deadline", d.Format(time.RFC3339))
}
return &reporter{
CallMeta: c,
ctx: ctx,
startCallLogged: false,
opts: opts,
logger: logger.With(fields...).With(singleUseFields...),
kind: kind,
}, InjectFields(ctx, fields)
}
return &reporter{
ctx: ctx,
typ: typ,
service: service,
method: method,
startCallLogged: false,
opts: r.opts,
logger: r.logger.With(fields...),
kind: kind,
}, ctx
}

// UnaryClientInterceptor returns a new unary client interceptor that optionally logs the execution of external gRPC calls.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func UnaryClientInterceptor(logger Logger, opts ...Option) grpc.UnaryClientInterceptor {
o := evaluateClientOpt(opts)
return interceptors.UnaryClientInterceptor(&reportable{logger: logger, opts: o})
return interceptors.UnaryClientInterceptor(reportable(logger, o))
}

// StreamClientInterceptor returns a new streaming client interceptor that optionally logs the execution of external gRPC calls.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func StreamClientInterceptor(logger Logger, opts ...Option) grpc.StreamClientInterceptor {
o := evaluateClientOpt(opts)
return interceptors.StreamClientInterceptor(&reportable{logger: logger, opts: o})
return interceptors.StreamClientInterceptor(reportable(logger, o))
}

// UnaryServerInterceptor returns a new unary server interceptors that optionally logs endpoint handling.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details.
func UnaryServerInterceptor(logger Logger, opts ...Option) grpc.UnaryServerInterceptor {
o := evaluateServerOpt(opts)
return interceptors.UnaryServerInterceptor(&reportable{logger: logger, opts: o})
return interceptors.UnaryServerInterceptor(reportable(logger, o))
}

// StreamServerInterceptor returns a new stream server interceptors that optionally logs endpoint handling.
// Logger will use all tags (from tags package) available in current context as fields.
// Logger will read existing and write new logging.Fields available in current context.
// See `ExtractFields` and `InjectFields` for details..
func StreamServerInterceptor(logger Logger, opts ...Option) grpc.StreamServerInterceptor {
o := evaluateServerOpt(opts)
return interceptors.StreamServerInterceptor(&reportable{logger: logger, opts: o})
return interceptors.StreamServerInterceptor(reportable(logger, o))
}
Loading

0 comments on commit 07b4ff5

Please sign in to comment.