diff --git a/service/history/historyEngineFactory.go b/service/history/historyEngineFactory.go index d3377dc99166..e19d2fe38f91 100644 --- a/service/history/historyEngineFactory.go +++ b/service/history/historyEngineFactory.go @@ -55,7 +55,7 @@ type ( NewCacheFn wcache.NewCacheFn ArchivalClient archiver.Client EventSerializer serialization.Serializer - QueueFactories []QueueFactory + QueueFactories []QueueFactory `group:"queueFactory"` ReplicationTaskFetcherFactory replication.TaskFetcherFactory ReplicationTaskExecutorProvider replication.TaskExecutorProvider TracerProvider trace.TracerProvider diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index ff719903a0f9..0bdff00be306 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -89,7 +89,7 @@ type ( fx.In Lifecycle fx.Lifecycle - Factories []QueueFactory + Factories []QueueFactory `group:"queueFactory"` } ) @@ -97,55 +97,60 @@ var QueueModule = fx.Options( fx.Provide(QueueSchedulerRateLimiterProvider), fx.Provide( fx.Annotated{ - Name: "transferQueueFactory", + Group: QueueFactoryFxGroup, Target: NewTransferQueueFactory, }, fx.Annotated{ - Name: "timerQueueFactory", + Group: QueueFactoryFxGroup, Target: NewTimerQueueFactory, }, fx.Annotated{ - Name: "visibilityQueueFactory", + Group: QueueFactoryFxGroup, Target: NewVisibilityQueueFactory, }, - fx.Annotated{ - Name: "archivalQueueFactory", - Target: NewArchivalQueueFactory, - }, - getQueueFactories, + getAdditionalQueueFactories, ), fx.Invoke(QueueFactoryLifetimeHooks), ) -type queueFactorySet struct { - fx.In - - TransferQueueFactory QueueFactory `name:"transferQueueFactory"` - TimerQueueFactory QueueFactory `name:"timerQueueFactory"` - VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"` - ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"` +// additionalQueueFactories is a container for a list of queue factories that are only added to the group if +// they are enabled. This exists because there is no way to conditionally add to a group with a provider that returns +// a single object. For example, this doesn't work because it will always add the factory to the group, which can +// cause NPEs: +// +// fx.Annotated{ +// Group: "queueFactory", +// Target: func() QueueFactory { return isEnabled ? NewQueueFactory() : nil }, +// }, +type additionalQueueFactories struct { + // This is what tells fx to add the factories to the group whenever this object is provided. + fx.Out + + // Factories is a list of queue factories that will be added to the `group:"queueFactory"` group. + Factories []QueueFactory `group:"queueFactory,flatten"` } -// getQueueFactories returns factories for all the enabled queue types. -// The archival queue factory is only returned when archival is enabled in the static config. -func getQueueFactories( - queueFactorySet queueFactorySet, +// getAdditionalQueueFactories returns an additionalQueueFactories which contains a list of queue factories that will be +// added to the `group:"queueFactory"` group. The factories are added to the group only if they are enabled, which +// is why we must return a list here. +func getAdditionalQueueFactories( archivalMetadata archiver.ArchivalMetadata, -) []QueueFactory { - factories := []QueueFactory{ - queueFactorySet.TransferQueueFactory, - queueFactorySet.TimerQueueFactory, - queueFactorySet.VisibilityQueueFactory, - } + params ArchivalQueueFactoryParams, +) additionalQueueFactories { + c := tasks.CategoryArchival - // this will only affect tests because this method is only called once in production, + // Removing this category will only affect tests because this method is only called once in production, // but it may be called many times across test runs, which would leave the archival queue as a dangling category tasks.RemoveCategory(c.ID()) - if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled { - factories = append(factories, queueFactorySet.ArchivalQueueFactory) - tasks.NewCategory(c.ID(), c.Type(), c.Name()) + if archivalMetadata.GetHistoryConfig().StaticClusterState() != archiver.ArchivalEnabled && archivalMetadata.GetVisibilityConfig().StaticClusterState() != archiver.ArchivalEnabled { + return additionalQueueFactories{} + } + tasks.NewCategory(c.ID(), c.Type(), c.Name()) + return additionalQueueFactories{ + Factories: []QueueFactory{ + NewArchivalQueueFactory(params), + }, } - return factories } func QueueSchedulerRateLimiterProvider(