From e0aa6c791f0856433eb7b25be95306db049b5666 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sat, 24 Aug 2019 20:21:31 +0300 Subject: [PATCH 1/4] fixing span scoping in worker --- gbus/worker.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/gbus/worker.go b/gbus/worker.go index 216b771..b418812 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -241,16 +241,20 @@ func (worker *worker) invokeDeadletterHandler(delivery amqp.Delivery) { } func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { - var ctx context.Context + + rootCtx := context.Background() var spanOptions []opentracing.StartSpanOption spCtx, err := amqptracer.Extract(delivery.Headers) + if err != nil { worker.log().WithError(err).Debug("could not extract SpanContext from headers") } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) } - worker.span, ctx = opentracing.StartSpanFromContext(context.Background(), "processMessage", spanOptions...) + span, ctx := opentracing.StartSpanFromContext(rootCtx, "processMessage", spanOptions...) + worker.span = span + defer worker.span.Finish() //catch all error handling so goroutine will not crash defer func() { @@ -325,8 +329,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return txCreateErr } - worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") - worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) + span, sctx := opentracing.StartSpanFromContext(sctx, "invokeHandlers") + defer span.Finish() + span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) From 9e18360440d1fcf42b4d4463c805b3dde2d933ef Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 25 Aug 2019 12:37:41 +0300 Subject: [PATCH 2/4] tuned opentracing functionality --- gbus/bus.go | 3 ++- gbus/saga/glue.go | 35 ++++++++++++++++++++++++++--------- gbus/saga/instance.go | 15 +++++++++++---- gbus/worker.go | 25 ++++++++++++------------- 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index e3e6e8e..5291722 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -623,7 +623,8 @@ func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp. func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) { b.SenderLock.Lock() defer b.SenderLock.Unlock() - span, _ := opentracing.StartSpanFromContext(sctx, "sendImpl") + span, _ := opentracing.StartSpanFromContext(sctx, "SendMessage") + defer func() { if err := recover(); err != nil { errMsg := fmt.Sprintf("panic recovered panicking err:\n%v\n%s", err, debug.Stack()) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d6e71e2..e5d20c8 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -1,6 +1,7 @@ package saga import ( + "context" "database/sql" "errors" "fmt" @@ -8,6 +9,8 @@ import ( "strings" "sync" + "github.com/opentracing/opentracing-go" + slog "github.com/opentracing/opentracing-go/log" "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/metrics" @@ -98,7 +101,8 @@ func (imsm *Glue) getDefsForMsgName(msgName string) []*Def { return defs } -func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) error { +//SagaHandler is the generic handler invoking saga instances +func (imsm *Glue) SagaHandler(invocation gbus.Invocation, message *gbus.BusMessage) error { imsm.lock.Lock() defer imsm.lock.Unlock() @@ -117,11 +121,12 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) */ startNew := def.shouldStartNewSaga(message) if startNew { + newInstance := def.newInstance() imsm.Log(). WithFields(logrus.Fields{"saga_def": def.String(), "saga_id": newInstance.ID}). Info("created new saga") - if invkErr := imsm.invokeSagaInstance(newInstance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, newInstance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", newInstance.ID).Error("failed to invoke saga") return invkErr } @@ -154,7 +159,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return e } def.configureSaga(instance) - if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -176,7 +181,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) for _, instance := range instances { def.configureSaga(instance) - if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { + if invkErr := imsm.invokeSagaInstance(def, instance, invocation, message); invkErr != nil { imsm.Log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } @@ -191,13 +196,16 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return nil } -func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error { +func (imsm *Glue) invokeSagaInstance(def *Def, instance *Instance, invocation gbus.Invocation, message *gbus.BusMessage) error { + + span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), def.String()) + defer span.Finish() sginv := &sagaInvocation{ decoratedBus: invocation.Bus(), decoratedInvocation: invocation, inboundMsg: message, sagaID: instance.ID, - ctx: invocation.Ctx(), + ctx: sctx, invokingService: imsm.svcName, } sginv.SetLogger(imsm.Log().WithFields(logrus.Fields{ @@ -207,7 +215,11 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat })) exchange, routingKey := invocation.Routing() - return instance.invoke(exchange, routingKey, sginv, message) + err := instance.invoke(exchange, routingKey, sginv, message) + if err != nil { + span.LogFields(slog.Error(err)) + } + return err } func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { @@ -232,7 +244,7 @@ func (imsm *Glue) registerMessage(message gbus.Message) error { return nil } imsm.alreadyRegistred[message.SchemaName()] = true - return imsm.bus.HandleMessage(message, imsm.handler) + return imsm.bus.HandleMessage(message, imsm.SagaHandler) } func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) error { @@ -241,7 +253,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return nil } imsm.alreadyRegistred[event.SchemaName()] = true - return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) + return imsm.bus.HandleEvent(exchange, topic, event, imsm.SagaHandler) } //TimeoutSaga fetches a saga instance and calls its timeout interface @@ -257,7 +269,12 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error { if err != nil { return err } + + span, _ := opentracing.StartSpanFromContext(context.Background(), "SagaTimeout") + span.SetTag("saga_type", saga.String()) + defer span.Finish() timeoutErr := saga.timeout(tx, imsm.bus) + if timeoutErr != nil { imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index a178384..bbf982c 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -6,11 +6,11 @@ import ( "reflect" "time" - "github.com/sirupsen/logrus" - "github.com/wework/grabbit/gbus/metrics" - + "github.com/opentracing/opentracing-go" "github.com/rs/xid" + "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" + "github.com/wework/grabbit/gbus/metrics" ) //Instance represent a living instance of a saga of a particular definition @@ -22,7 +22,7 @@ type Instance struct { Log logrus.FieldLogger } -func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { +func (si *Instance) invoke(exchange, routingKey string, invocation *sagaInvocation, message *gbus.BusMessage) error { methodsToInvoke := si.getSagaMethodNameToInvoke(exchange, routingKey, message) @@ -48,6 +48,13 @@ func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocati "method_name": methodName, "saga_id": si.ID, }).Info("invoking method on saga") + span, sctx := opentracing.StartSpanFromContext(invocation.Ctx(), methodName) + // replace the original context with the conext built arround the span so we ca + // trace the saga handler that is invoked + invocation.ctx = sctx + + defer span.Finish() + err := metrics.RunHandlerWithMetric(func() error { returns := method.Call(params) diff --git a/gbus/worker.go b/gbus/worker.go index b418812..55cc168 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -252,9 +252,9 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { } else { spanOptions = append(spanOptions, opentracing.FollowsFrom(spCtx)) } - span, ctx := opentracing.StartSpanFromContext(rootCtx, "processMessage", spanOptions...) + span, ctx := opentracing.StartSpanFromContext(rootCtx, "ProcessMessage", spanOptions...) worker.span = span - defer worker.span.Finish() + //defer worker.span.Finish() //catch all error handling so goroutine will not crash defer func() { @@ -269,7 +269,6 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { worker.span.LogFields(slog.String("panic", "failed to process message")) logEntry.Error("failed to process message") } - worker.span.Finish() }() worker.log().WithFields(logrus.Fields{"worker": worker.consumerTag, "message_id": delivery.MessageId}).Info("GOT MSG") @@ -322,16 +321,17 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan action := func(attempt uint) (actionErr error) { + attemptSpan, sctx := opentracing.StartSpanFromContext(sctx, "InvokeHandler") + defer attemptSpan.Finish() + tx, txCreateErr := worker.txProvider.New() if txCreateErr != nil { worker.log().WithError(txCreateErr).Error("failed creating new tx") - worker.span.LogFields(slog.Error(txCreateErr)) + attemptSpan.LogFields(slog.Error(txCreateErr)) return txCreateErr } - span, sctx := opentracing.StartSpanFromContext(sctx, "invokeHandlers") - defer span.Finish() - span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) + attemptSpan.LogFields(slog.Uint64("attempt", uint64(attempt+1))) defer func() { if p := recover(); p != nil { pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) @@ -346,10 +346,9 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan worker.span.Finish() }() var handlerErr error - var hspan opentracing.Span - var hsctx context.Context + for _, handler := range handlers { - hspan, hsctx = opentracing.StartSpanFromContext(sctx, handler.Name()) + hspan, hsctx := opentracing.StartSpanFromContext(sctx, handler.Name()) ctx := &defaultInvocationContext{ invocingSvc: delivery.ReplyTo, @@ -369,22 +368,22 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return handler(ctx, message) }, handler.Name(), worker.log()) if handlerErr != nil { - hspan.LogFields(slog.Error(handlerErr)) break } hspan.Finish() } if handlerErr != nil { - hspan.LogFields(slog.Error(handlerErr)) + attemptSpan.LogFields(slog.Error(handlerErr)) rbkErr := tx.Rollback() if rbkErr != nil { + attemptSpan.LogFields(slog.Error(rbkErr)) worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") } - hspan.Finish() return handlerErr } cmtErr := tx.Commit() if cmtErr != nil { + attemptSpan.LogFields(slog.Error(cmtErr)) worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") return cmtErr } From c830b4566bcacc7a79c512eb68bc6532cd2fce36 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 25 Aug 2019 12:52:32 +0300 Subject: [PATCH 3/4] fixing failing test --- gbus/saga/instance_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gbus/saga/instance_test.go b/gbus/saga/instance_test.go index f5bc56a..44581b4 100644 --- a/gbus/saga/instance_test.go +++ b/gbus/saga/instance_test.go @@ -1,6 +1,7 @@ package saga import ( + "context" "errors" "reflect" "testing" @@ -19,7 +20,9 @@ func TestInstanceInvocationReturnsErrors(t *testing.T) { m2 := TestMsg2{} exchange, routingKey := "", "kong" - invocationStub := &sagaInvocation{} + invocationStub := &sagaInvocation{ + ctx: context.Background(), + } failName := gbus.MessageHandler(s.Fail).Name() failFilter := gbus.NewMessageFilter(exchange, routingKey, m1) From dd674cadeb310858742817552ac27d6feb13c75e Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 25 Aug 2019 14:37:42 +0300 Subject: [PATCH 4/4] added tracing documentation --- README.md | 7 ++----- docs/TRACING.md | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 5 deletions(-) create mode 100644 docs/TRACING.md diff --git a/README.md b/README.md index 665dc4c..bcba0e2 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting: 5) [Reliable messaging](https://github.com/wework/grabbit/blob/master/docs/OUTBOX.md) and local service transactivity via Transaction Outbox pattern 6) Deadlettering 7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md) -8) Reporting [metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus - -Planned: - -1) Deduplication of inbound messages +8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus +9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing ## Stable release the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates. diff --git a/docs/TRACING.md b/docs/TRACING.md new file mode 100644 index 0000000..d2f6a46 --- /dev/null +++ b/docs/TRACING.md @@ -0,0 +1,25 @@ +# Tracing + +grabbit supports reporting standard [OpenTracing](https://opentracing.io/) tracing spans to a compatable OpenTracing backend (such as [Jaeger](https://www.jaegertracing.io/)). + +NOTE: In your hosting process you will need to set up a global tracer to collect and forward the traces reported by grabbit. See Jaeger go client for an [example](https://github.com/jaegertracing/jaeger-client-go) + +Once the global tracer is set up you will need to make sure that in your message handlers you carry over the passed in context to successive messages sent by the handler. + +```go + +func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{ + reply := gbus.NewBusMessage(MyReply{}) + cmd := gbus.NewBusMessage(MyCommand{}) + ctx := invocation.Ctx() + + if err := invocation.Send(ctx, "another-service", cmd); err != nil{ + return err + } + if err := invocation.Reply(ctx, reply); err != nil{ + return err + } + return nil + } + +``` \ No newline at end of file