Skip to content

Commit

Permalink
Merge pull request #25 from jeroenrinzema/v0.6.0-middleware
Browse files Browse the repository at this point in the history
feat: improved middleware interfaces and implementation
  • Loading branch information
jeroenrinzema authored Oct 13, 2019
2 parents c2aca7b + 5055994 commit 3581bc7
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 54 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -95,6 +96,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
2 changes: 1 addition & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewGroup(definitions ...options.GroupOption) *Group {
// commands and events could be consumed and produced to. The amount of retries
// attempted before a error is thrown could also be defined in a group.
type Group struct {
Middleware middleware.Client
Middleware middleware.Use
Timeout time.Duration
Topics []types.Topic
Codec options.Codec
Expand Down
67 changes: 47 additions & 20 deletions middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,86 @@ import (
"github.com/jeroenrinzema/commander/internal/types"
)

// HandlerFunc represents a middleware handle method
type HandlerFunc func(types.HandlerFunc) types.HandlerFunc
// BeforeConsumeHandlerFunc represents the function method called and returned by a middleware client
type BeforeConsumeHandlerFunc = types.HandlerFunc

// Controller middleware controller
type Controller interface {
Middleware(types.HandlerFunc) types.HandlerFunc
// BeforeProduceHandlerFunc represents the function method called and returned by a middleware client
type BeforeProduceHandlerFunc = func(*types.Message)

// ConsumeController middleware controller
type ConsumeController interface {
BeforeConsume(types.HandlerFunc) types.HandlerFunc
}

// ProduceController middleware controller
type ProduceController interface {
BeforeProduce(*types.Message) *types.Message
}

// Client represents a middleware client
// Client middleware interface
type Client interface {
Use(Controller)
WrapBeforeConsume(BeforeConsumeHandlerFunc) BeforeConsumeHandlerFunc
WrapBeforeProduce(BeforeProduceHandlerFunc) BeforeProduceHandlerFunc
}

// Use exposed usage interface
type Use interface {
Use(Controller)
Use(interface{})
}

// NewClient constructs a new middleware client
func NewClient() Client {
func NewClient() Use {
client := &client{
middlewares: []Controller{},
consume: []ConsumeController{},
produce: []ProduceController{},
}

return client
}

type client struct {
middlewares []Controller
mutex sync.RWMutex
consume []ConsumeController
produce []ProduceController

mutex sync.RWMutex
}

// Use calles the given middleware controller to initialize the middleware
func (client *client) Use(controller Controller) {
func (client *client) Use(value interface{}) {
client.mutex.Lock()
defer client.mutex.Unlock()

client.middlewares = append(client.middlewares, controller)
if controller, ok := value.(ConsumeController); ok {
client.consume = append(client.consume, controller)
}

if controller, ok := value.(ProduceController); ok {
client.produce = append(client.produce, controller)
}
}

// Run executes the given middleware in chronological order.
// A handle function is returned once all middleware is executed.
func Run(h types.HandlerFunc, m ...HandlerFunc) types.HandlerFunc {
if len(m) < 1 {
// WrapBeforeConsume executes defined consume middleware in chronological order.
// A handle executable handler is returned once all middleware is wrapped.
func (client *client) WrapBeforeConsume(h types.HandlerFunc) types.HandlerFunc {
if len(client.consume) < 1 {
return h
}

wrapped := h

// loop in reverse to preserve middleware order
for i := len(m) - 1; i >= 0; i-- {
wrapped = m[i](wrapped)
for i := len(client.consume) - 1; i >= 0; i-- {
wrapped = client.consume[i].BeforeConsume(wrapped)
}

return wrapped
}

// WrapBeforeProduce executes defined produce middleware in chronological order.
// A handle executable handler is returned once all middleware is wrapped.
func (client *client) WrapBeforeProduce(m *types.Message) {
// loop in reverse to preserve middleware order
for i := len(client.produce) - 1; i >= 0; i-- {
client.produce[i].BeforeProduce(m)
}
}
71 changes: 38 additions & 33 deletions middleware/zipkin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zipkin

import (
"github.com/jeroenrinzema/commander/internal/types"
"github.com/jeroenrinzema/commander/middleware"
"github.com/jeroenrinzema/commander/middleware/zipkin/metadata"
zipkin "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
Expand Down Expand Up @@ -54,22 +55,31 @@ type Zipkin struct {
Config Config
}

// Middleware handles the middleware handler request for every consume
func (controller *Zipkin) Middleware(next types.HandlerFunc) types.HandlerFunc {
// BeforeConsume middleware controller
func (controller *Zipkin) BeforeConsume(next middleware.BeforeConsumeHandlerFunc) middleware.BeforeConsumeHandlerFunc {
return func(message *types.Message, writer types.Writer) {
controller.Before(message)
defer controller.After(message)
controller.NewConsumeSpan(message)
defer controller.AfterConsumeSpan(message)
next(message, writer)
}
}

// BeforeProduce middleware controller
func (controller *Zipkin) BeforeProduce(next middleware.BeforeProduceHandlerFunc) middleware.BeforeProduceHandlerFunc {
return func(message *types.Message) {
controller.NewProduceSpan(message)
defer controller.AfterPublishSpan(message)
next(message)
}
}

// Close closes the Zipkin reporter
func (controller *Zipkin) Close() error {
return controller.Reporter.Close()
}

// Before starts a new span and stores it in the message context
func (controller *Zipkin) Before(message *types.Message) {
// NewConsumeSpan starts a new span and stores it in the message context
func (controller *Zipkin) NewConsumeSpan(message *types.Message) {
options := []zipkin.SpanOption{
zipkin.Kind(model.Consumer),
}
Expand All @@ -88,8 +98,8 @@ func (controller *Zipkin) Before(message *types.Message) {
span.Tag(VersionTag, message.Version.String())
}

// After finishes the stored span in the message context
func (controller *Zipkin) After(message *types.Message) {
// AfterConsumeSpan finishes the stored span in the message context
func (controller *Zipkin) AfterConsumeSpan(message *types.Message) {
span, has := metadata.SpanConsumeFromContext(message.Ctx())
if !has {
return
Expand All @@ -98,33 +108,28 @@ func (controller *Zipkin) After(message *types.Message) {
span.Finish()
}

// // BeforePublish prepares the given message span headers
// func (service *Zipkin) BeforePublish(ctx context.Context, event interface{}) {
// message, ok := event.(*commander.Message)
// if !ok {
// return
// }

// parent, has := metadata.SpanConsumeFromContext(ctx)
// if !has {
// return
// }
// NewProduceSpan prepares the given message span headers
func (controller *Zipkin) NewProduceSpan(message *types.Message) {
parent, has := metadata.SpanConsumeFromContext(message.Ctx())
if !has {
return
}

// span := service.Tracer.StartSpan("commander.produce", zipkin.Kind(model.Consumer), zipkin.Parent(parent.Context()))
span := controller.Tracer.StartSpan("commander.produce", zipkin.Kind(model.Consumer), zipkin.Parent(parent.Context()))

// span.Tag(ActionTag, message.Action)
// span.Tag(VersionTag, message.Version.String())
span.Tag(ActionTag, message.Action)
span.Tag(VersionTag, message.Version.String())

// ctx = metadata.NewSpanProduceContext(ctx, span)
// ctx = metadata.AppendMessageHeaders(ctx, span.Context())
// }
message.NewCtx(metadata.NewSpanProduceContext(message.Ctx(), span))
message.NewCtx(metadata.AppendMessageHeaders(message.Ctx(), span.Context()))
}

// // AfterPublish closes the producing span
// func (service *Zipkin) AfterPublish(ctx context.Context, event interface{}) {
// span, has := metadata.SpanProduceFromContext(ctx)
// if !has {
// return
// }
// AfterPublishSpan closes the producing span
func (controller *Zipkin) AfterPublishSpan(message *types.Message) {
span, has := metadata.SpanProduceFromContext(message.Ctx())
if !has {
return
}

// span.Finish()
// }
span.Finish()
}

0 comments on commit 3581bc7

Please sign in to comment.