Skip to content

Commit

Permalink
v1.1.4 rollup into master (#179)
Browse files Browse the repository at this point in the history
* inceased outbox VARCHAR column length to 2048 (#155)

* added reply to initiator functionality to sagas (#157)

* added generic handler metrics with message type as the label (#144)

* added generic handler metrics with message type as the label

* add handler name label to the metrics

* adding new metrics to the read me

* Fix handle empty body (#156)

* set the correct Type and Content-Type headers on out going messages (#160)

* set the correct Type and Content-Type headers on out going messages

* refactoring

* fixing ReplyToInitiator not working  when initiator sends a message via the RPC interface (#163)


* fixing ReplyToInitiator not working  when initiator sends a message via the RPC interface

* Improved wording of saga documentation article (#164)

* better wording for documentation

* added golangcli lint configuration and fixed linting failures (#165)

* fixed logging issues (#167)

* allow getting the saga id of the current invoked saga (#168)

* setting the target saga id on the saga correlation id field (#171)

* support emperror (#174)

* setting the target saga id on the saga correlation id field

* added emperror support

* Fix logging and added logging documentation (#176)

* fixed logging issues and added documentation

logging via the invocation interface was broken and did not add
contextual data related to the invocation due to a bug in the way the Glogged structure is currently implemented.

Also added documentation on how logging should be done within a
handler including adding context to returned errors so that data gets logged

* added missing documentation file

* added documentation on serialization support (#177)

* fixed emperror url format

* added serialization documentation
  • Loading branch information
Guy Baron authored Sep 28, 2019
1 parent d13f2c2 commit 900f24f
Show file tree
Hide file tree
Showing 34 changed files with 950 additions and 139 deletions.
37 changes: 37 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
linters-settings:
golint:
# minimal confidence for issues, default is 0.8
min-confidence: 0.8
gocyclo:
min-complexity: 15

govet:
# report about shadowed variables
check-shadowing: false

# settings per analyzer
settings:
printf: # analyzer name, run `go tool vet help` to see all analyzers
funcs: # run `go tool vet help printf` to see available settings for `printf` analyzer
- (github.com/golangci/golangci-lint/pkg/logutils.Log).Infof
- (github.com/golangci/golangci-lint/pkg/logutils.Log).Warnf
- (github.com/golangci/golangci-lint/pkg/logutils.Log).Errorf
- (github.com/golangci/golangci-lint/pkg/logutils.Log).Fatalf
linters:
disable-all: true
enable:
- deadcode
# - errcheck
- gosimple
- govet
- ineffassign
# - staticcheck
# - typecheck
- unused
# - varcheck
# - deadcode
# - dupl
# - gocritic
- gocyclo
- golint
- misspell
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ A lightweight transactional message bus on top of RabbitMQ supporting:
7) [Structured logging](https://github.com/wework/grabbit/blob/master/docs/LOGGING.md)
8) Reporting [Metrics](https://github.com/wework/grabbit/blob/master/docs/METRICS.md) via Prometheus
9) Distributed [Tracing](https://github.com/wework/grabbit/blob/master/docs/TRACING.md) via OpenTracing
10) [Extensible serialization](https://github.com/wework/grabbit/blob/master/docs/SERIALIZATION.md) with
default support for gob, protobuf and avro

## Stable release
the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.
Expand Down
84 changes: 81 additions & 3 deletions docs/LOGGING.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,100 @@
# Logging

### Logging within a handler

grabbit supports structured logging via the [logrus](https://github.com/sirupsen/logrus) logging package.
The logger is accessible to message handlers via the past in invocation instance.
When logging via the logger exposed on the passed invocation each entry you log will be
annotated with the following contextual data (added as logrus fields to the log entry)
allowing for a better debugging experience.

- _service: the service name
- handler_name: the name of the handler being invoked
- message_id: the id of the processed message
- message_name: the type of the message that is being processed
- routing_key: the routing_key of the message
- saga_id: the id of the saga instance being invoked
- saga_def: the type of the saga that is being invoked

```go

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
invocation.Log().WithField("name", "rhinof").Info("handler invoked")
invocation.Log().WithField("name", "rhinof").Info("performing some business logic")
return nil
}

```
### Logging contextual data when a handler errors

In cases a message handler errors it is common to log custom contextual data allowing
service developers to diagnose the root cause of the error.

```go
package my_handlers

import (
"gitub.com/wework/grabbit/gbus"
)

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
invocation.Log().WithField("name", "rhinof").Info("performing some business logic")
PlaceOrderCommand := message.Payload.(*PlaceOrderCommand)
e := placeOrder(PlaceOrderCommand.CustomerID, PlaceOrderCommand.LineItems)
if e != nil{
invocation.Log().
WithField("customer_id", PlaceOrderCommand.CustomerID).
Error("failed placing order for customer")
return e
}
return nil
}
```
grabbit makes it easier handling these cases and reduce the repetitive task of logging
these custom contextual attributes in cases of errors by integrating the [emperror errors package](https://github.com/emperror/errors).
emperror drop-in replacement for the default errors package allows developers to add the needed contextual data on the error instance and have graabit log the error with all contextual attribute.

```go
package my_handlers

import (
"emperror.dev/errors"
"gitub.com/wework/grabbit/gbus"
)

func SomeHandler(invocation gbus.Invocation, message *gbus.BusMessage) error{
invocation.Log().WithField("name", "rhinof").Info("performing some business logic")
PlaceOrderCommand := message.Payload.(*PlaceOrderCommand)
return placeOrder(PlaceOrderCommand.CustomerID, PlaceOrderCommand.LineItems)
}

func placeOrder(customerID string, lineItems LineItems[]) error{

if(someErrorCondition()){
return errors.WithDetails("failed placing order for customer", "customer_id", customerID)
}

return nil
}

```



### Setting a custom logger instance

grabbit will create a default instance of logrus FieldLogger if no such logger is set when the bus is created.
In order to set a custom logger when creating the bus you need to call the Builder.WithLogger method passing it
To set a custom logger when creating the bus you need to call the Builder.WithLogger method passing it
a logrus.FieldLogger instance.

```go

logger := logrus.New().WithField("my_custom_key", "my_custom_value")
bus := builder.
New().
Bus("rabbitmq connection string").
WithLogger(logger).
WorkerNum(3, 1).
WithConfirms().
Txnl("mysql", "").
Build("your service name")

```
6 changes: 4 additions & 2 deletions docs/METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ grabbit exposes and reports the following metrics to Prometheus

| Namespace | Subsystem | Name | Description |
| ------------- | ------------- | ----------------------------------| --------------------------------------------------------------------------- |
| grabbit | handler | [name of message handler]_result | records and counts each succesfull or failed execution of a message handler |
| grabbit | handler | [name of message handler]_latency | records the execution time of each handler |
| grabbit | handlers | [name of message handler]_result | records and counts each successful or failed execution of a message handler |
| grabbit | handlers | [name of message handler]_latency | records the execution time of each handler |
| grabbit | handlers | result | records and counts each run of a handler, having the handler's name, message type and the result as labels|
| grabbit | handlers | latency | records the execution time of each run of a handler, having the handler's name, message type as labels|
| grabbit | messages | rejected_messages | increments each time a message gets rejected |
| grabbit | saga | timedout_sagas | counting the number of timedout saga instances |
58 changes: 55 additions & 3 deletions docs/SAGA.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BookVacationSaga struct {
BookingId string
GotCarSvcResponse bool
GotHotelSvcResponse bool
SomeConfigData string
}
```

Expand Down Expand Up @@ -171,7 +172,7 @@ func (s *BookVacationSaga) HandleBookFlightResponse(invocation gbus.Invocation,

}
```
### Step 4 - Handling the timeout requirement
### Step 4 - Handling the timeout requirement

In order to define a timeout for the saga and have grabbit call the saga instance once that timeout is reached (assuming the saga hasn't completed yet) the saga needs to implement the gbus.RequestSagaTimeout interface

Expand All @@ -191,7 +192,7 @@ func (s *BookVacationSaga) TimeoutDuration() time.Duration {
return time.Minute * 15
}

func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error {
func (s *BookVacationSaga) Timeout(tx *sql.Tx, bus Messaging) error {
return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{}))
}

Expand All @@ -202,7 +203,7 @@ func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.Bus
```go

gb := getBus("vacationSvc")
gb.RegisterSaga(BookVacationSaga{})
gb.RegisterSaga(&BookVacationSaga{})

```

Expand All @@ -218,3 +219,54 @@ It is recommended to follow [semantic versioning](https://semver.org/) of the go

grabbit automatically implements an optimistic concurrency model when processing a message and persisting saga instances, detecting when the saga state turns stale due to processing concurrently a different message.
When the above is detected grabbit will rollback the bounded transaction and retry the execution of the saga.

### Configuring a Saga Instance

It is sometimes necessary to configure a saga instance with some data before it gets executed.
grrabit allows you to do so by providing a saga configuration function when registering the saga.
Each time a saga instance gets created or inflated from the persistent store the configuration function will be executed.

The saga configuration function accepts a single gbus.Saga parameter and returns a single gbus.Saga return value.
The passed in gbus.Saga is the instance that will be executed and will be the type of the saga being registered meaning it can safely be casted to your specific saga type.
Once you casted to the specific saga type you can configure the instance and access its fields as needed.
After the instance is configured the function returns the configured saga instance so grabbit can proceed and execute it.

The following snippet is an example of how to pass in a saga configuration function
```go
configSaga := func(saga gbus.Saga) gbus.Saga {
s := saga.(*BookVacationSaga)
s.SomeConfigData = "config value"
return s
}
svc1.RegisterSaga(&BookVacationSaga{}, configSaga)

```

### Replying to the saga initiator

It is common that during its life cycle a saga will need to report back and send messages with the service that initiated it (sent the command that started the saga).
In the example above when the booking has completed we would like to send a message to the service which initiated the booking saga.
The way we have implemented this in the example above is by publishing an event which the service which initiated the saga would need to subscribe to and handle to get notified when the booking is complete.

Although the above would work it won't be an elegant solution especially if the initiator of the saga is another saga since it means that the initiating saga will need to filter all events and select the single event that correlates to that particular instance.
To relive client code to do so grabbit provides a way for a saga to directly send a message to its initiator, and if the initiator is another saga grabbit will automatically correlate the message with the correct saga instance and invoke the relevant handler.

To send a message to the saga initiator the message handler attached to the saga instance will need to cast the passed in gbus.Invocation argument to a gbus.SagaInvocation and then invoke the ReplyToInitiator function.
We can replace the following code from the above example

```go
if s.IsComplete(){
event := gbus.NewBusMessage(VacationBookingComplete{})
invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic", event)
}
```

to this:

```go
sagaInvocation := invocation.(gbus.SagaInvocation)
if s.IsComplete(){
msg := gbus.NewBusMessage(VacationBookingComplete{})
sagaInvocation.ReplyToInitiator(invocation.Ctx(), msg)
}
```
35 changes: 35 additions & 0 deletions docs/SERIALIZATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Serialization

grabbit supports out of the box three serializers to format messages over the wire

- gob (defualt)
- protobuf
- avro (experimental)

To configure a bus to work with a different serializer than the default one you must call the WithSerializer function call on the builder interface passing it an instance of a gbus.Serializer.

The following example configures the bus to work with the protobuf serializer

```go
package main

import (
"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/builder"
"github.com/wework/grabbit/gbus/serialization"
)

logger := logrus.New()
bus := builder.
New().
Bus("rabbitmq connection string").
WithLogger(logger).
WithSerializer(serialization.NewProtoSerializer(logger)).
WorkerNum(3, 1).
WithConfirms().
Txnl("mysql", "database connection string").
Build("your service name")

```

17 changes: 17 additions & 0 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,26 @@ type Invocation interface {
Bus() Messaging
Tx() *sql.Tx
Ctx() context.Context
InvokingSvc() string
Routing() (exchange, routingKey string)
DeliveryInfo() DeliveryInfo
}

/*
SagaInvocation allows saga instances to reply to their creator even when not in the conext of handling
the message that starts the saga.
A message handler that is attached to a saga instance can safly cast the passed in invocation to SagaInvocation
and use the ReplyToInitiator function to send a message to the originating service that sent the message that started the saga
*/
type SagaInvocation interface {
ReplyToInitiator(ctx context.Context, message *BusMessage) error
//HostingSvc returns the svc name that is executing the service
HostingSvc() string

//SagaID returns the saga id of the currently invoked saga instance
SagaID() string
}

//Serializer is the base interface for all message serializers
type Serializer interface {
Name() string
Expand All @@ -247,6 +263,7 @@ type TxProvider interface {

//TxOutbox abstracts the transactional outgoing channel type
type TxOutbox interface {
Logged
Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
Start(amqpOut *AMQPOutbox) error
Stop() error
Expand Down
2 changes: 2 additions & 0 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
gb.Outbox.SetLogger(gb.Log())
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
Expand All @@ -107,6 +108,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
}
}
glue := saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider, gb.Log, timeoutManager)
glue.SetLogger(gb.Log())
gb.Glue = glue
return gb
}
Expand Down
13 changes: 7 additions & 6 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,12 +655,13 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
}

msg := amqp.Publishing{
Body: buffer,
ReplyTo: replyTo,
MessageId: message.ID,
CorrelationId: message.CorrelationID,
ContentEncoding: b.Serializer.Name(),
Headers: headers,
Type: message.PayloadFQN,
Body: buffer,
ReplyTo: replyTo,
MessageId: message.ID,
CorrelationId: message.CorrelationID,
ContentType: b.Serializer.Name(),
Headers: headers,
}
span.LogFields(message.GetTraceLog()...)

Expand Down
10 changes: 5 additions & 5 deletions gbus/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ Messaging = &defaultInvocationContext{}

type defaultInvocationContext struct {
*Glogged
invocingSvc string
invokingSvc string
bus *DefaultBus
inboundMsg *BusMessage
tx *sql.Tx
Expand All @@ -29,8 +29,8 @@ type DeliveryInfo struct {
MaxRetryCount uint
}

func (dfi *defaultInvocationContext) Log() logrus.FieldLogger {
return dfi.Glogged.Log().WithFields(logrus.Fields{"routing_key": dfi.routingKey, "message_id": dfi.inboundMsg.ID})
func (dfi *defaultInvocationContext) InvokingSvc() string {
return dfi.invokingSvc
}

//Reply implements the Invocation.Reply signature
Expand All @@ -43,9 +43,9 @@ func (dfi *defaultInvocationContext) Reply(ctx context.Context, replyMessage *Bu
var err error

if dfi.tx != nil {
return dfi.bus.sendWithTx(ctx, dfi.tx, dfi.invocingSvc, replyMessage)
return dfi.bus.sendWithTx(ctx, dfi.tx, dfi.invokingSvc, replyMessage)
}
if err = dfi.bus.Send(ctx, dfi.invocingSvc, replyMessage); err != nil {
if err = dfi.bus.Send(ctx, dfi.invokingSvc, replyMessage); err != nil {
//TODO: add logs?
logrus.WithError(err).Error("could not send reply")

Expand Down
Loading

0 comments on commit 900f24f

Please sign in to comment.