Skip to content

Commit

Permalink
Keep queue factory group tag (#3880)
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Feb 3, 2023
1 parent 285c5d5 commit d05cdb9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 32 deletions.
2 changes: 1 addition & 1 deletion service/history/historyEngineFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
NewCacheFn wcache.NewCacheFn
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
QueueFactories []QueueFactory
QueueFactories []QueueFactory `group:"queueFactory"`
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
TracerProvider trace.TracerProvider
Expand Down
68 changes: 37 additions & 31 deletions service/history/queueFactoryBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,63 +89,69 @@ type (
fx.In

Lifecycle fx.Lifecycle
Factories []QueueFactory
Factories []QueueFactory `group:"queueFactory"`
}
)

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Provide(
fx.Annotated{
Name: "transferQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewTransferQueueFactory,
},
fx.Annotated{
Name: "timerQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewTimerQueueFactory,
},
fx.Annotated{
Name: "visibilityQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewVisibilityQueueFactory,
},
fx.Annotated{
Name: "archivalQueueFactory",
Target: NewArchivalQueueFactory,
},
getQueueFactories,
getOptionalQueueFactories,
),
fx.Invoke(QueueFactoryLifetimeHooks),
)

type queueFactorySet struct {
fx.In

TransferQueueFactory QueueFactory `name:"transferQueueFactory"`
TimerQueueFactory QueueFactory `name:"timerQueueFactory"`
VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"`
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
// additionalQueueFactories is a container for a list of queue factories that are only added to the group if
// they are enabled. This exists because there is no way to conditionally add to a group with a provider that returns
// a single object. For example, this doesn't work because it will always add the factory to the group, which can
// cause NPEs:
//
// fx.Annotated{
// Group: "queueFactory",
// Target: func() QueueFactory { return isEnabled ? NewQueueFactory() : nil },
// },
type additionalQueueFactories struct {
// This is what tells fx to add the factories to the group whenever this object is provided.
fx.Out

// Factories is a list of queue factories that will be added to the `group:"queueFactory"` group.
Factories []QueueFactory `group:"queueFactory,flatten"`
}

// getQueueFactories returns factories for all the enabled queue types.
// The archival queue factory is only returned when archival is enabled in the static config.
func getQueueFactories(
queueFactorySet queueFactorySet,
// getOptionalQueueFactories returns an additionalQueueFactories which contains a list of queue factories that will be
// added to the `group:"queueFactory"` group. The factories are added to the group only if they are enabled, which
// is why we must return a list here.
func getOptionalQueueFactories(
archivalMetadata archiver.ArchivalMetadata,
) []QueueFactory {
factories := []QueueFactory{
queueFactorySet.TransferQueueFactory,
queueFactorySet.TimerQueueFactory,
queueFactorySet.VisibilityQueueFactory,
}
params ArchivalQueueFactoryParams,
) additionalQueueFactories {

c := tasks.CategoryArchival
// this will only affect tests because this method is only called once in production,
// Removing this category will only affect tests because this method is only called once in production,
// but it may be called many times across test runs, which would leave the archival queue as a dangling category
tasks.RemoveCategory(c.ID())
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
tasks.NewCategory(c.ID(), c.Type(), c.Name())
if archivalMetadata.GetHistoryConfig().StaticClusterState() != archiver.ArchivalEnabled &&
archivalMetadata.GetVisibilityConfig().StaticClusterState() != archiver.ArchivalEnabled {
return additionalQueueFactories{}
}
tasks.NewCategory(c.ID(), c.Type(), c.Name())
return additionalQueueFactories{
Factories: []QueueFactory{
NewArchivalQueueFactory(params),
},
}
return factories
}

func QueueSchedulerRateLimiterProvider(
Expand Down

0 comments on commit d05cdb9

Please sign in to comment.