Skip to content

Commit

Permalink
as: Refactor application packages frontend to use a dynamic worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
adriansmares committed Sep 9, 2021
1 parent a3a585b commit 98aae8b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 60 deletions.
2 changes: 2 additions & 0 deletions pkg/applicationserver/io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type Server interface {
DownlinkQueueOperator
UplinkStorage
Cluster
// FromRequestContext decouples the lifetime of the provided context from the values found in the context.
FromRequestContext(context.Context) context.Context
// GetBaseConfig returns the component configuration.
GetBaseConfig(ctx context.Context) config.ServiceBase
// FillContext fills the given context.
Expand Down
132 changes: 72 additions & 60 deletions pkg/applicationserver/io/packages/packages.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/rpcserver"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"go.thethings.network/lorawan-stack/v3/pkg/workerpool"
"google.golang.org/grpc"
)

Expand All @@ -36,49 +37,46 @@ type server struct {
server io.Server
registry Registry

handlers map[string]ApplicationPackageHandler
subscriptions map[string]*io.Subscription
handlers map[string]ApplicationPackageHandler
pools map[string]workerpool.WorkerPool
}

// Server is an application packages frontend.
type Server interface {
rpcserver.Registerer
}

func startPackageWorker(ctx context.Context, as io.Server, name string, handler ApplicationPackageHandler, sub *io.Subscription, id int, timeout time.Duration) {
handleUp := func(ctx context.Context, defAssoc *ttnpb.ApplicationPackageDefaultAssociation, assoc *ttnpb.ApplicationPackageAssociation, up *ttnpb.ApplicationUp) error {
func createPackagePoolHandler(name string, handler ApplicationPackageHandler, timeout time.Duration) workerpool.HandlerCreator {
h := func(ctx context.Context, item interface{}) {
associatedUp := item.(*associatedApplicationUp)
pair, up := associatedUp.pair, associatedUp.up

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return handler.HandleUp(ctx, defAssoc, assoc, up)

if err := handler.HandleUp(ctx, pair.defaultAssociation, pair.association, up); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to handle message")
registerMessageFailed(name, err)
return
}
registerMessageProcessed(name)
}
as.StartTask(&component.TaskConfig{
Context: ctx,
ID: fmt.Sprintf("run_application_packages_%v_%v", name, id),
Func: func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-sub.Context().Done():
return sub.Context().Err()
case up := <-sub.Up():
ctx := up.Context
pair := associationsPairFromContext(ctx)
if err := handleUp(ctx, pair.defaultAssociation, pair.association, up.ApplicationUp); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to handle message")
registerMessageFailed(name, err)
continue
}
registerMessageProcessed(name)
}
}
},
Restart: component.TaskRestartOnFailure,
Backoff: component.DefaultTaskBackoffConfig,
})
return workerpool.StaticHandlerCreator(h)
}

func createFrontendPoolHandler(fanOut func(context.Context, *ttnpb.ApplicationUp) error) workerpool.HandlerCreator {
h := func(ctx context.Context, item interface{}) {
up := item.(*ttnpb.ApplicationUp)

if err := fanOut(ctx, up); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to fanout message")
registerMessageFailed("frontend-fanout", err)
}
}
return workerpool.StaticHandlerCreator(h)
}

func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription, handler func(context.Context, *ttnpb.ApplicationUp) error) {
func startFrontendSubscriber(ctx context.Context, as io.Server, sub *io.Subscription, submit func(context.Context, interface{}) error) {
as.StartTask(&component.TaskConfig{
Context: ctx,
ID: "run_application_packages",
Expand All @@ -91,8 +89,9 @@ func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription
return sub.Context().Err()
case up := <-sub.Up():
ctx := log.NewContextWithField(up.Context, "namespace", namespace)
if err := handler(ctx, up.ApplicationUp); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to handle message")
if err := submit(ctx, up.ApplicationUp); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to submit message")
registerMessageFailed("frontend-submit", err)
}
}
}
Expand All @@ -106,23 +105,44 @@ func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription
func New(ctx context.Context, as io.Server, registry Registry, handlers map[string]ApplicationPackageHandler, workers int, timeout time.Duration) (Server, error) {
ctx = log.NewContextWithField(ctx, "namespace", namespace)
s := &server{
ctx: ctx,
server: as,
registry: registry,
handlers: handlers,
subscriptions: make(map[string]*io.Subscription),
ctx: ctx,
server: as,
registry: registry,
handlers: handlers,
pools: make(map[string]workerpool.WorkerPool),
}

for name, handler := range handlers {
s.subscriptions[name] = io.NewSubscription(ctx, name, nil)
for id := 0; id < workers; id++ {
startPackageWorker(ctx, as, name, handler, s.subscriptions[name], id, timeout)
wp, err := workerpool.NewWorkerPool(workerpool.Config{
Component: as,
Context: ctx,
Name: fmt.Sprintf("application_packages_%v", name),
CreateHandler: createPackagePoolHandler(name, handler, timeout),
MaxWorkers: workers,
})
if err != nil {
return nil, err
}
s.pools[name] = wp
}

sub, err := as.Subscribe(ctx, "applicationpackages", nil, false)
if err != nil {
return nil, err
}
startFrontendWorker(ctx, as, sub, s.handleUp)

wp, err := workerpool.NewWorkerPool(workerpool.Config{
Component: as,
Context: ctx,
Name: "application_packages_fanout",
CreateHandler: createFrontendPoolHandler(s.fanOut),
MaxWorkers: workers,
})
if err != nil {
return nil, err
}

startFrontendSubscriber(ctx, as, sub, wp.Publish)
return s, nil
}

Expand All @@ -131,19 +151,9 @@ type associationsPair struct {
association *ttnpb.ApplicationPackageAssociation
}

type associationsPairCtxKeyType struct{}

var associationsPairCtxKey = &associationsPairCtxKeyType{}

func contextWithAssociationsPair(ctx context.Context, pair *associationsPair) context.Context {
return context.WithValue(ctx, associationsPairCtxKey, pair)
}

func associationsPairFromContext(ctx context.Context) *associationsPair {
if val, ok := ctx.Value(associationsPairCtxKey).(*associationsPair); ok {
return val
}
return nil
type associatedApplicationUp struct {
pair *associationsPair
up *ttnpb.ApplicationUp
}

type associationsMap map[string]*associationsPair
Expand Down Expand Up @@ -180,21 +190,23 @@ func (s *server) findAssociations(ctx context.Context, ids ttnpb.EndDeviceIdenti
return m, nil
}

func (s *server) handleUp(ctx context.Context, msg *ttnpb.ApplicationUp) error {
func (s *server) fanOut(ctx context.Context, msg *ttnpb.ApplicationUp) error {
associations, err := s.findAssociations(ctx, msg.EndDeviceIdentifiers)
if err != nil {
return err
}
for name, pair := range associations {
sub, ok := s.subscriptions[name]
pool, ok := s.pools[name]
if !ok {
continue
}
ctx := log.NewContextWithField(ctx, "package", name)
ctx = contextWithAssociationsPair(ctx, pair)
if err := sub.Publish(ctx, msg); err != nil {
if err := pool.Publish(ctx, &associatedApplicationUp{
pair: pair,
up: msg,
}); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to handle message")
registerMessageFailed(name, err)
registerMessageFailed(fmt.Sprintf("publish-%v", name), err)
}
}
return nil
Expand Down

0 comments on commit 98aae8b

Please sign in to comment.