Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved tracing and added documentation #142

Merged
merged 4 commits into from
Aug 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern
6) Deadlettering
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus

Planned:

1) Deduplication of inbound messages
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing

## Stable release
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
Expand Down
25 changes: 25 additions & 0 deletions docs/TRACING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Tracing

grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)).

NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go)

Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler.

```go

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
reply := gbus.NewBusMessage(MyReply{})
cmd := gbus.NewBusMessage(MyCommand{})
ctx := invocation.Ctx()

if err := invocation.Send(ctx, "another-service", cmd); err != nil{
return err
}
if err := invocation.Reply(ctx, reply); err != nil{
return err
}
return nil
}

```
3 changes: 2 additions & 1 deletion gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.
func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
b.SenderLock.Lock()
defer b.SenderLock.Unlock()
span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl")
span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage")

defer func() {
if err := recover(); err != nil {
errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack())
Expand Down
35 changes: 26 additions & 9 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package saga

import (
"context"
"database/sql"
"errors"
"fmt"
"reflect"
"strings"
"sync"

"github.com/opentracing/opentracing-go"
slog "github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
Expand Down Expand Up @@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def {
return defs
}

func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error {
//SagaHandler is the generic handler invoking saga instances
func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error {

imsm.lock.Lock()
defer imsm.lock.Unlock()
Expand All @@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
*/
startNew := def.shouldStartNewSaga(message)
if startNew {

newInstance := def.newInstance()
imsm.Log().
WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}).
Info("created new saga")
if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga")
return invkErr
}
Expand Down Expand Up @@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return e
}
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
Expand All @@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)

for _, instance := range instances {
def.configureSaga(instance)
if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil {
if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil {
imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga")
return invkErr
}
Expand All @@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage)
return nil
}

func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {
func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error {

span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String())
defer span.Finish()
sginv := &sagaInvocation{
decoratedBus: invocation.Bus(),
decoratedInvocation: invocation,
inboundMsg: message,
sagaID: instance.ID,
ctx: invocation.Ctx(),
ctx: sctx,
invokingService: imsm.svcName,
}
sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{
Expand All @@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat
}))

exchange, routingKey := invocation.Routing()
return instance.invoke(exchange, routingKey, sginv, message)
err := instance.invoke(exchange, routingKey, sginv, message)
if err != nil {
span.LogFields(slog.Error(err))
}
return err
}

func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error {
Expand All @@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error {
return nil
}
imsm.alreadyRegistred[message.SchemaName()] = true
return imsm.bus.HandleMessage(message, imsm.handler)
return imsm.bus.HandleMessage(message, imsm.SagaHandler)
}

func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error {
Expand All @@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
return nil
}
imsm.alreadyRegistred[event.SchemaName()] = true
return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler)
return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler)
}

//TimeoutSaga fetches a saga instance and calls its timeout interface
Expand All @@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {
if err != nil {
return err
}

span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout")
span.SetTag("saga_type", saga.String())
defer span.Finish()
timeoutErr := saga.timeout(tx, imsm.bus)

if timeoutErr != nil {
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
Expand Down
15 changes: 11 additions & 4 deletions gbus/saga/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"reflect"
"time"

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus/metrics"

"github.com/opentracing/opentracing-go"
"github.com/rs/xid"
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
)

//Instance represent a living instance of a saga of a particular definition
Expand All @@ -22,7 +22,7 @@ type Instance struct {
Log logrus.FieldLogger
}

func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error {
func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error {

methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message)

Expand All @@ -48,6 +48,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati
"method_name": methodName, "saga_id": si.ID,
}).Info("invoking method on saga")

span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), methodName)
// replace the original context with the conext built arround the span so we ca
// trace the saga handler that is invoked
invocation.ctx = sctx

defer span.Finish()

err := metrics.RunHandlerWithMetric(func() error {
returns := method.Call(params)

Expand Down
5 changes: 4 additions & 1 deletion gbus/saga/instance_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package saga

import (
"context"
"errors"
"reflect"
"testing"
Expand All @@ -19,7 +20,9 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) {
m2 := TestMsg2{}

exchange, routingKey := "", "kong"
invocationStub := &sagaInvocation{}
invocationStub := &sagaInvocation{
ctx: context.Background(),
}

failName := gbus.MessageHandler(s.Fail).Name()
failFilter := gbus.NewMessageFilter(exchange, routingKey, m1)
Expand Down
28 changes: 16 additions & 12 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,20 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) {
}

func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
var ctx context.Context

rootCtx := context.Background()
var spanOptions []opentracing.StartSpanOption

spCtx, err := amqptracer.Extract(delivery.Headers)

if err != nil {
worker.log().WithError(err).Debug("could not extract SpanContext from headers")
} else {
spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx))
}
worker.span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...)
span, ctx := opentracing.StartSpanFromContext(rootCtx, "ProcessMessage", spanOptions...)
worker.span = span
//defer worker.span.Finish()

//catch all error handling so goroutine will not crash
defer func() {
Expand All @@ -265,7 +269,6 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) {
worker.span.LogFields(slog.String("panic", "failed to process message"))
logEntry.Error("failed to process message")
}
worker.span.Finish()
}()

worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG")
Expand Down Expand Up @@ -318,15 +321,17 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan

action := func(attempt uint) (actionErr error) {

attemptSpan, sctx := opentracing.StartSpanFromContext(sctx, "InvokeHandler")
defer attemptSpan.Finish()

tx, txCreateErr := worker.txProvider.New()
if txCreateErr != nil {
worker.log().WithError(txCreateErr).Error("failed creating new tx")
worker.span.LogFields(slog.Error(txCreateErr))
attemptSpan.LogFields(slog.Error(txCreateErr))
return txCreateErr
}

worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers")
worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
attemptSpan.LogFields(slog.Uint64("attempt", uint64(attempt+1)))
defer func() {
if p := recover(); p != nil {
pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack())
Expand All @@ -341,10 +346,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
worker.span.Finish()
}()
var handlerErr error
var hspan opentracing.Span
var hsctx context.Context

for _, handler := range handlers {
hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name())
hspan, hsctx := opentracing.StartSpanFromContext(sctx, handler.Name())

ctx := &defaultInvocationContext{
invocingSvc: delivery.ReplyTo,
Expand All @@ -364,22 +368,22 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan
return handler(ctx, message)
}, handler.Name(), worker.log())
if handlerErr != nil {
hspan.LogFields(slog.Error(handlerErr))
break
}
hspan.Finish()
}
if handlerErr != nil {
hspan.LogFields(slog.Error(handlerErr))
attemptSpan.LogFields(slog.Error(handlerErr))
rbkErr := tx.Rollback()
if rbkErr != nil {
attemptSpan.LogFields(slog.Error(rbkErr))
worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error")
}
hspan.Finish()
return handlerErr
}
cmtErr := tx.Commit()
if cmtErr != nil {
attemptSpan.LogFields(slog.Error(cmtErr))
worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers")
return cmtErr
}
Expand Down