Skip to content

Commit

Permalink
as: Refactor webhooks frontned to use a dynamic worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Sep 9, 2021
1 parent 98aae8b commit 98e91bd
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 116 deletions.
37 changes: 17 additions & 20 deletions pkg/applicationserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package applicationserver

import (
"context"
"net/http"
"time"

"github.com/bluele/gcache"
Expand All @@ -30,7 +29,6 @@ import (
"go.thethings.network/lorawan-stack/v3/pkg/component"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
)

Expand Down Expand Up @@ -162,18 +160,20 @@ type ApplicationPackagesConfig struct {
// NewWebhooks returns a new web.Webhooks based on the configuration.
// If Target is empty, this method returns nil.
func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.Webhooks, error) {
var target web.Sink
var creator web.SinkCreator
switch c.Target {
case "":
return nil, nil
case "direct":
client, err := server.HTTPClient(ctx)
if err != nil {
return nil, err
}
client.Timeout = c.Timeout
target = &web.HTTPClientSink{
Client: client,
creator = func() (web.Sink, error) {
client, err := server.HTTPClient(ctx)
if err != nil {
return nil, err
}
client.Timeout = c.Timeout
return &web.HTTPClientSink{
Client: client,
}, nil
}
default:
return nil, errWebhooksTarget.WithAttributes("target", c.Target)
Expand All @@ -182,18 +182,15 @@ func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.
return nil, errWebhooksRegistry.New()
}
if c.QueueSize > 0 || c.Workers > 0 {
target = &web.QueuedSink{
Target: target,
Queue: make(chan *http.Request, c.QueueSize),
Workers: c.Workers,
q, err := web.NewPooledSink(ctx, server, creator, c.Workers, c.QueueSize)
if err != nil {
return nil, err
}
return web.NewWebhooks(ctx, server, c.Registry, q, c.Downlinks)
}
if controllable, ok := target.(web.ControllableSink); ok {
go func() {
if err := controllable.Run(ctx); err != nil && !errors.IsCanceled(err) {
log.FromContext(ctx).WithError(err).Error("Webhooks target sink failed")
}
}()
target, err := creator()
if err != nil {
return nil, err
}
return web.NewWebhooks(ctx, server, c.Registry, target, c.Downlinks)
}
Expand Down
19 changes: 0 additions & 19 deletions pkg/applicationserver/io/web/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ const (
)

var webhookMetrics = &messageMetrics{
webhookQueue: metrics.NewGauge(
prometheus.GaugeOpts{
Subsystem: subsystem,
Name: "queue_size",
Help: "Webhook queue size",
},
),
webhooksSent: metrics.NewCounter(
prometheus.CounterOpts{
Subsystem: subsystem,
Expand All @@ -51,37 +44,25 @@ var webhookMetrics = &messageMetrics{
}

func init() {
webhookMetrics.webhookQueue.Set(0)
webhookMetrics.webhooksSent.Add(0)
metrics.MustRegister(webhookMetrics)
}

type messageMetrics struct {
webhookQueue prometheus.Gauge
webhooksSent prometheus.Counter
webhooksFailed *prometheus.CounterVec
}

func (m messageMetrics) Describe(ch chan<- *prometheus.Desc) {
m.webhookQueue.Describe(ch)
m.webhooksSent.Describe(ch)
m.webhooksFailed.Describe(ch)
}

func (m messageMetrics) Collect(ch chan<- prometheus.Metric) {
m.webhookQueue.Collect(ch)
m.webhooksSent.Collect(ch)
m.webhooksFailed.Collect(ch)
}

func registerWebhookQueued() {
webhookMetrics.webhookQueue.Inc()
}

func registerWebhookDequeued() {
webhookMetrics.webhookQueue.Dec()
}

func registerWebhookSent() {
webhookMetrics.webhooksSent.Inc()
}
Expand Down
112 changes: 52 additions & 60 deletions pkg/applicationserver/io/web/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
ttnweb "go.thethings.network/lorawan-stack/v3/pkg/web"
"go.thethings.network/lorawan-stack/v3/pkg/webhandlers"
"go.thethings.network/lorawan-stack/v3/pkg/webmiddleware"
"go.thethings.network/lorawan-stack/v3/pkg/workerpool"
)

const namespace = "applicationserver/io/web"
Expand All @@ -45,12 +46,6 @@ type Sink interface {
Process(*http.Request) error
}

// ControllableSink is a controllable Sink.
type ControllableSink interface {
Sink
Run(context.Context) error
}

// HTTPClientSink contains an HTTP client to make outgoing requests.
type HTTPClientSink struct {
*http.Client
Expand All @@ -74,70 +69,67 @@ func (s *HTTPClientSink) Process(req *http.Request) error {
return errRequest.WithAttributes("code", res.StatusCode)
}

// QueuedSink is a ControllableSink with queue.
type QueuedSink struct {
Target Sink
Queue chan *http.Request
Workers int
}
// SinkCreator creates a sink.
type SinkCreator func() (Sink, error)

// Run starts concurrent workers to process messages from the queue.
// If Target is a ControllableSink, this method runs the target.
// This method blocks until the target (if controllable) and all workers are done.
func (s *QueuedSink) Run(ctx context.Context) error {
if s.Workers < 1 {
s.Workers = 1
}
wg := sync.WaitGroup{}
if controllable, ok := s.Target.(ControllableSink); ok {
wg.Add(1)
go func() {
if err := controllable.Run(ctx); err != nil && !errors.IsCanceled(err) {
log.FromContext(ctx).WithError(err).Error("Target sink failed")
}
wg.Done()
}()
// StaticSinkCreator creates a SinkCreator that always returns the same Sink.
func StaticSinkCreator(s Sink) SinkCreator {
return func() (Sink, error) {
return s, nil
}
for i := 0; i < s.Workers; i++ {
wg.Add(1)
go func() {
for {
select {
case <-ctx.Done():
wg.Done()
return
case req := <-s.Queue:
registerWebhookDequeued()
ctx := req.Context()
if err := s.Target.Process(req); err != nil {
registerWebhookFailed(err)
log.FromContext(ctx).WithError(err).Warn("Failed to process message")
} else {
registerWebhookSent()
}
}
}

// pooledSink is a Sink with worker pool.
type pooledSink struct {
pool workerpool.WorkerPool
}

func createSinkHandlerCreator(createSink SinkCreator) workerpool.HandlerCreator {
factory := func() (workerpool.Handler, error) {
sink, err := createSink()
if err != nil {
return nil, err
}
h := func(ctx context.Context, item interface{}) {
req := item.(*http.Request)
if err := sink.Process(req); err != nil {
registerWebhookFailed(err)
log.FromContext(ctx).WithError(err).Warn("Failed to process message")
} else {
registerWebhookSent()
}
}()
}
return h, nil
}
<-ctx.Done()
wg.Wait()
return ctx.Err()
return factory
}

var errQueueFull = errors.DefineResourceExhausted("queue_full", "the queue is full")
// NewPooledSink creates a Sink that queues requests and processes them in parallel workers.
func NewPooledSink(ctx context.Context, c workerpool.Component, createSink SinkCreator, workers int, queueSize int) (Sink, error) {
wp, err := workerpool.NewWorkerPool(workerpool.Config{
Component: c,
Context: ctx,
Name: "webhooks",
CreateHandler: createSinkHandlerCreator(createSink),
MaxWorkers: workers,
QueueSize: queueSize,
})
if err != nil {
return nil, err
}
return &pooledSink{
pool: wp,
}, nil
}

// Process sends the request to the queue.
// This method returns immediately. An error is returned when the queue is full.
func (s *QueuedSink) Process(req *http.Request) error {
select {
case s.Queue <- req:
registerWebhookQueued()
return nil
default:
err := errQueueFull.New()
// Process sends the request to the workers.
// This method returns immediately. An error is returned when the workers are too busy.
func (s *pooledSink) Process(req *http.Request) error {
if err := s.pool.Publish(req.Context(), req); err != nil {
registerWebhookFailed(err)
return err
}
return nil
}

// Webhooks is an interface for registering incoming webhooks for downlink and creating a subscription to outgoing
Expand Down
39 changes: 22 additions & 17 deletions pkg/applicationserver/io/web/webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,24 @@ import (
"go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should"
)

type mockComponent struct{}

func (mockComponent) StartTask(conf *component.TaskConfig) {
component.DefaultStartTask(conf)
}

func (mockComponent) FromRequestContext(ctx context.Context) context.Context {
return ctx
}

func createdPooledSink(ctx context.Context, t *testing.T, sink web.Sink) web.Sink {
q, err := web.NewPooledSink(ctx, mockComponent{}, web.StaticSinkCreator(sink), 1, 4)
if err != nil {
t.Fatal(err)
}
return q
}

func TestWebhooks(t *testing.T) {
_, ctx := test.New(t)

Expand Down Expand Up @@ -147,27 +165,14 @@ func TestWebhooks(t *testing.T) {
}
for _, sink := range []web.Sink{
testSink,
&web.QueuedSink{
Target: testSink,
Queue: make(chan *http.Request, 4),
Workers: 1,
},
&web.QueuedSink{
Target: &web.QueuedSink{
Target: testSink,
Queue: make(chan *http.Request, 4),
Workers: 1,
},
Queue: make(chan *http.Request, 4),
Workers: 1,
},
createdPooledSink(ctx, t, testSink),
createdPooledSink(ctx, t,
createdPooledSink(ctx, t, testSink),
),
} {
t.Run(fmt.Sprintf("%T", sink), func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if controllable, ok := sink.(web.ControllableSink); ok {
go controllable.Run(ctx)
}
c := componenttest.NewComponent(t, &component.Config{})
as := mock.NewServer(c)
_, err := web.NewWebhooks(ctx, as, registry, sink, downlinks)
Expand Down

0 comments on commit 98e91bd

Please sign in to comment.