diff --git a/pkg/applicationserver/io/io.go b/pkg/applicationserver/io/io.go index 360ad9f9773..d5d467a7b2f 100644 --- a/pkg/applicationserver/io/io.go +++ b/pkg/applicationserver/io/io.go @@ -76,6 +76,8 @@ type Server interface { DownlinkQueueOperator UplinkStorage Cluster + // FromRequestContext decouples the lifetime of the provided context from the values found in the context. + FromRequestContext(context.Context) context.Context // GetBaseConfig returns the component configuration. GetBaseConfig(ctx context.Context) config.ServiceBase // FillContext fills the given context. diff --git a/pkg/applicationserver/io/packages/packages.go b/pkg/applicationserver/io/packages/packages.go index 88d2f3f4a00..70a8133b7ac 100644 --- a/pkg/applicationserver/io/packages/packages.go +++ b/pkg/applicationserver/io/packages/packages.go @@ -25,6 +25,7 @@ import ( "go.thethings.network/lorawan-stack/v3/pkg/log" "go.thethings.network/lorawan-stack/v3/pkg/rpcserver" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/workerpool" "google.golang.org/grpc" ) @@ -36,8 +37,8 @@ type server struct { server io.Server registry Registry - handlers map[string]ApplicationPackageHandler - subscriptions map[string]*io.Subscription + handlers map[string]ApplicationPackageHandler + pools map[string]workerpool.WorkerPool } // Server is an application packages frontend. @@ -45,40 +46,37 @@ type Server interface { rpcserver.Registerer } -func startPackageWorker(ctx context.Context, as io.Server, name string, handler ApplicationPackageHandler, sub *io.Subscription, id int, timeout time.Duration) { - handleUp := func(ctx context.Context, defAssoc *ttnpb.ApplicationPackageDefaultAssociation, assoc *ttnpb.ApplicationPackageAssociation, up *ttnpb.ApplicationUp) error { +func createPackagePoolHandler(name string, handler ApplicationPackageHandler, timeout time.Duration) workerpool.HandlerCreator { + h := func(ctx context.Context, item interface{}) { + associatedUp := item.(*associatedApplicationUp) + pair, up := associatedUp.pair, associatedUp.up + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - return handler.HandleUp(ctx, defAssoc, assoc, up) + + if err := handler.HandleUp(ctx, pair.defaultAssociation, pair.association, up); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to handle message") + registerMessageFailed(name, err) + return + } + registerMessageProcessed(name) } - as.StartTask(&component.TaskConfig{ - Context: ctx, - ID: fmt.Sprintf("run_application_packages_%v_%v", name, id), - Func: func(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-sub.Context().Done(): - return sub.Context().Err() - case up := <-sub.Up(): - ctx := up.Context - pair := associationsPairFromContext(ctx) - if err := handleUp(ctx, pair.defaultAssociation, pair.association, up.ApplicationUp); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to handle message") - registerMessageFailed(name, err) - continue - } - registerMessageProcessed(name) - } - } - }, - Restart: component.TaskRestartOnFailure, - Backoff: component.DefaultTaskBackoffConfig, - }) + return workerpool.StaticHandlerCreator(h) +} + +func createFrontendPoolHandler(fanOut func(context.Context, *ttnpb.ApplicationUp) error) workerpool.HandlerCreator { + h := func(ctx context.Context, item interface{}) { + up := item.(*ttnpb.ApplicationUp) + + if err := fanOut(ctx, up); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to fanout message") + registerMessageFailed("frontend-fanout", err) + } + } + return workerpool.StaticHandlerCreator(h) } -func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription, handler func(context.Context, *ttnpb.ApplicationUp) error) { +func startFrontendSubscriber(ctx context.Context, as io.Server, sub *io.Subscription, submit func(context.Context, interface{}) error) { as.StartTask(&component.TaskConfig{ Context: ctx, ID: "run_application_packages", @@ -91,8 +89,9 @@ func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription return sub.Context().Err() case up := <-sub.Up(): ctx := log.NewContextWithField(up.Context, "namespace", namespace) - if err := handler(ctx, up.ApplicationUp); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to handle message") + if err := submit(ctx, up.ApplicationUp); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to submit message") + registerMessageFailed("frontend-submit", err) } } } @@ -106,23 +105,44 @@ func startFrontendWorker(ctx context.Context, as io.Server, sub *io.Subscription func New(ctx context.Context, as io.Server, registry Registry, handlers map[string]ApplicationPackageHandler, workers int, timeout time.Duration) (Server, error) { ctx = log.NewContextWithField(ctx, "namespace", namespace) s := &server{ - ctx: ctx, - server: as, - registry: registry, - handlers: handlers, - subscriptions: make(map[string]*io.Subscription), + ctx: ctx, + server: as, + registry: registry, + handlers: handlers, + pools: make(map[string]workerpool.WorkerPool), } + for name, handler := range handlers { - s.subscriptions[name] = io.NewSubscription(ctx, name, nil) - for id := 0; id < workers; id++ { - startPackageWorker(ctx, as, name, handler, s.subscriptions[name], id, timeout) + wp, err := workerpool.NewWorkerPool(workerpool.Config{ + Component: as, + Context: ctx, + Name: fmt.Sprintf("application_packages_%v", name), + CreateHandler: createPackagePoolHandler(name, handler, timeout), + MaxWorkers: workers, + }) + if err != nil { + return nil, err } + s.pools[name] = wp } + sub, err := as.Subscribe(ctx, "applicationpackages", nil, false) if err != nil { return nil, err } - startFrontendWorker(ctx, as, sub, s.handleUp) + + wp, err := workerpool.NewWorkerPool(workerpool.Config{ + Component: as, + Context: ctx, + Name: "application_packages_fanout", + CreateHandler: createFrontendPoolHandler(s.fanOut), + MaxWorkers: workers, + }) + if err != nil { + return nil, err + } + + startFrontendSubscriber(ctx, as, sub, wp.Publish) return s, nil } @@ -131,19 +151,9 @@ type associationsPair struct { association *ttnpb.ApplicationPackageAssociation } -type associationsPairCtxKeyType struct{} - -var associationsPairCtxKey = &associationsPairCtxKeyType{} - -func contextWithAssociationsPair(ctx context.Context, pair *associationsPair) context.Context { - return context.WithValue(ctx, associationsPairCtxKey, pair) -} - -func associationsPairFromContext(ctx context.Context) *associationsPair { - if val, ok := ctx.Value(associationsPairCtxKey).(*associationsPair); ok { - return val - } - return nil +type associatedApplicationUp struct { + pair *associationsPair + up *ttnpb.ApplicationUp } type associationsMap map[string]*associationsPair @@ -180,21 +190,23 @@ func (s *server) findAssociations(ctx context.Context, ids ttnpb.EndDeviceIdenti return m, nil } -func (s *server) handleUp(ctx context.Context, msg *ttnpb.ApplicationUp) error { +func (s *server) fanOut(ctx context.Context, msg *ttnpb.ApplicationUp) error { associations, err := s.findAssociations(ctx, msg.EndDeviceIdentifiers) if err != nil { return err } for name, pair := range associations { - sub, ok := s.subscriptions[name] + pool, ok := s.pools[name] if !ok { continue } ctx := log.NewContextWithField(ctx, "package", name) - ctx = contextWithAssociationsPair(ctx, pair) - if err := sub.Publish(ctx, msg); err != nil { + if err := pool.Publish(ctx, &associatedApplicationUp{ + pair: pair, + up: msg, + }); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to handle message") - registerMessageFailed(name, err) + registerMessageFailed(fmt.Sprintf("publish-%v", name), err) } } return nil