Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved middleware interfaces and implementation #25

Merged
merged 1 commit into from
Oct 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/Shopify/sarama v1.19.0 h1:9oksLxC6uxVPHPVYUmq6xhr1BOF/hHobWH2UzO67z1s=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -95,6 +96,7 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
2 changes: 1 addition & 1 deletion group.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewGroup(definitions ...options.GroupOption) *Group {
// commands and events could be consumed and produced to. The amount of retries
// attempted before a error is thrown could also be defined in a group.
type Group struct {
Middleware middleware.Client
Middleware middleware.Use
Timeout time.Duration
Topics []types.Topic
Codec options.Codec
Expand Down
67 changes: 47 additions & 20 deletions middleware/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,86 @@ import (
"github.com/jeroenrinzema/commander/internal/types"
)

// HandlerFunc represents a middleware handle method
type HandlerFunc func(types.HandlerFunc) types.HandlerFunc
// BeforeConsumeHandlerFunc represents the function method called and returned by a middleware client
type BeforeConsumeHandlerFunc = types.HandlerFunc

// Controller middleware controller
type Controller interface {
Middleware(types.HandlerFunc) types.HandlerFunc
// BeforeProduceHandlerFunc represents the function method called and returned by a middleware client
type BeforeProduceHandlerFunc = func(*types.Message)

// ConsumeController middleware controller
type ConsumeController interface {
BeforeConsume(types.HandlerFunc) types.HandlerFunc
}

// ProduceController middleware controller
type ProduceController interface {
BeforeProduce(*types.Message) *types.Message
}

// Client represents a middleware client
// Client middleware interface
type Client interface {
Use(Controller)
WrapBeforeConsume(BeforeConsumeHandlerFunc) BeforeConsumeHandlerFunc
WrapBeforeProduce(BeforeProduceHandlerFunc) BeforeProduceHandlerFunc
}

// Use exposed usage interface
type Use interface {
Use(Controller)
Use(interface{})
}

// NewClient constructs a new middleware client
func NewClient() Client {
func NewClient() Use {
client := &client{
middlewares: []Controller{},
consume: []ConsumeController{},
produce: []ProduceController{},
}

return client
}

type client struct {
middlewares []Controller
mutex sync.RWMutex
consume []ConsumeController
produce []ProduceController

mutex sync.RWMutex
}

// Use calles the given middleware controller to initialize the middleware
func (client *client) Use(controller Controller) {
func (client *client) Use(value interface{}) {
client.mutex.Lock()
defer client.mutex.Unlock()

client.middlewares = append(client.middlewares, controller)
if controller, ok := value.(ConsumeController); ok {
client.consume = append(client.consume, controller)
}

if controller, ok := value.(ProduceController); ok {
client.produce = append(client.produce, controller)
}
}

// Run executes the given middleware in chronological order.
// A handle function is returned once all middleware is executed.
func Run(h types.HandlerFunc, m ...HandlerFunc) types.HandlerFunc {
if len(m) < 1 {
// WrapBeforeConsume executes defined consume middleware in chronological order.
// A handle executable handler is returned once all middleware is wrapped.
func (client *client) WrapBeforeConsume(h types.HandlerFunc) types.HandlerFunc {
if len(client.consume) < 1 {
return h
}

wrapped := h

// loop in reverse to preserve middleware order
for i := len(m) - 1; i >= 0; i-- {
wrapped = m[i](wrapped)
for i := len(client.consume) - 1; i >= 0; i-- {
wrapped = client.consume[i].BeforeConsume(wrapped)
}

return wrapped
}

// WrapBeforeProduce executes defined produce middleware in chronological order.
// A handle executable handler is returned once all middleware is wrapped.
func (client *client) WrapBeforeProduce(m *types.Message) {
// loop in reverse to preserve middleware order
for i := len(client.produce) - 1; i >= 0; i-- {
client.produce[i].BeforeProduce(m)
}
}
71 changes: 38 additions & 33 deletions middleware/zipkin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zipkin

import (
"github.com/jeroenrinzema/commander/internal/types"
"github.com/jeroenrinzema/commander/middleware"
"github.com/jeroenrinzema/commander/middleware/zipkin/metadata"
zipkin "github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
Expand Down Expand Up @@ -54,22 +55,31 @@ type Zipkin struct {
Config Config
}

// Middleware handles the middleware handler request for every consume
func (controller *Zipkin) Middleware(next types.HandlerFunc) types.HandlerFunc {
// BeforeConsume middleware controller
func (controller *Zipkin) BeforeConsume(next middleware.BeforeConsumeHandlerFunc) middleware.BeforeConsumeHandlerFunc {
return func(message *types.Message, writer types.Writer) {
controller.Before(message)
defer controller.After(message)
controller.NewConsumeSpan(message)
defer controller.AfterConsumeSpan(message)
next(message, writer)
}
}

// BeforeProduce middleware controller
func (controller *Zipkin) BeforeProduce(next middleware.BeforeProduceHandlerFunc) middleware.BeforeProduceHandlerFunc {
return func(message *types.Message) {
controller.NewProduceSpan(message)
defer controller.AfterPublishSpan(message)
next(message)
}
}

// Close closes the Zipkin reporter
func (controller *Zipkin) Close() error {
return controller.Reporter.Close()
}

// Before starts a new span and stores it in the message context
func (controller *Zipkin) Before(message *types.Message) {
// NewConsumeSpan starts a new span and stores it in the message context
func (controller *Zipkin) NewConsumeSpan(message *types.Message) {
options := []zipkin.SpanOption{
zipkin.Kind(model.Consumer),
}
Expand All @@ -88,8 +98,8 @@ func (controller *Zipkin) Before(message *types.Message) {
span.Tag(VersionTag, message.Version.String())
}

// After finishes the stored span in the message context
func (controller *Zipkin) After(message *types.Message) {
// AfterConsumeSpan finishes the stored span in the message context
func (controller *Zipkin) AfterConsumeSpan(message *types.Message) {
span, has := metadata.SpanConsumeFromContext(message.Ctx())
if !has {
return
Expand All @@ -98,33 +108,28 @@ func (controller *Zipkin) After(message *types.Message) {
span.Finish()
}

// // BeforePublish prepares the given message span headers
// func (service *Zipkin) BeforePublish(ctx context.Context, event interface{}) {
// message, ok := event.(*commander.Message)
// if !ok {
// return
// }

// parent, has := metadata.SpanConsumeFromContext(ctx)
// if !has {
// return
// }
// NewProduceSpan prepares the given message span headers
func (controller *Zipkin) NewProduceSpan(message *types.Message) {
parent, has := metadata.SpanConsumeFromContext(message.Ctx())
if !has {
return
}

// span := service.Tracer.StartSpan("commander.produce", zipkin.Kind(model.Consumer), zipkin.Parent(parent.Context()))
span := controller.Tracer.StartSpan("commander.produce", zipkin.Kind(model.Consumer), zipkin.Parent(parent.Context()))

// span.Tag(ActionTag, message.Action)
// span.Tag(VersionTag, message.Version.String())
span.Tag(ActionTag, message.Action)
span.Tag(VersionTag, message.Version.String())

// ctx = metadata.NewSpanProduceContext(ctx, span)
// ctx = metadata.AppendMessageHeaders(ctx, span.Context())
// }
message.NewCtx(metadata.NewSpanProduceContext(message.Ctx(), span))
message.NewCtx(metadata.AppendMessageHeaders(message.Ctx(), span.Context()))
}

// // AfterPublish closes the producing span
// func (service *Zipkin) AfterPublish(ctx context.Context, event interface{}) {
// span, has := metadata.SpanProduceFromContext(ctx)
// if !has {
// return
// }
// AfterPublishSpan closes the producing span
func (controller *Zipkin) AfterPublishSpan(message *types.Message) {
span, has := metadata.SpanProduceFromContext(message.Ctx())
if !has {
return
}

// span.Finish()
// }
span.Finish()
}