Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep queue factory group tag #3880

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/history/historyEngineFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
NewCacheFn wcache.NewCacheFn
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
QueueFactories []QueueFactory
QueueFactories []QueueFactory `group:"queueFactory"`
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
TracerProvider trace.TracerProvider
Expand Down
68 changes: 37 additions & 31 deletions service/history/queueFactoryBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,63 +89,69 @@ type (
fx.In

Lifecycle fx.Lifecycle
Factories []QueueFactory
Factories []QueueFactory `group:"queueFactory"`
}
)

var QueueModule = fx.Options(
fx.Provide(QueueSchedulerRateLimiterProvider),
fx.Provide(
fx.Annotated{
Name: "transferQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewTransferQueueFactory,
},
fx.Annotated{
Name: "timerQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewTimerQueueFactory,
},
fx.Annotated{
Name: "visibilityQueueFactory",
Group: QueueFactoryFxGroup,
Target: NewVisibilityQueueFactory,
},
fx.Annotated{
Name: "archivalQueueFactory",
Target: NewArchivalQueueFactory,
},
getQueueFactories,
getOptionalQueueFactories,
),
fx.Invoke(QueueFactoryLifetimeHooks),
)

type queueFactorySet struct {
fx.In

TransferQueueFactory QueueFactory `name:"transferQueueFactory"`
TimerQueueFactory QueueFactory `name:"timerQueueFactory"`
VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"`
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
// additionalQueueFactories is a container for a list of queue factories that are only added to the group if
// they are enabled. This exists because there is no way to conditionally add to a group with a provider that returns
// a single object. For example, this doesn't work because it will always add the factory to the group, which can
// cause NPEs:
//
// fx.Annotated{
// Group: "queueFactory",
// Target: func() QueueFactory { return isEnabled ? NewQueueFactory() : nil },
// },
type additionalQueueFactories struct {
// This is what tells fx to add the factories to the group whenever this object is provided.
fx.Out

// Factories is a list of queue factories that will be added to the `group:"queueFactory"` group.
Factories []QueueFactory `group:"queueFactory,flatten"`
}

// getQueueFactories returns factories for all the enabled queue types.
// The archival queue factory is only returned when archival is enabled in the static config.
func getQueueFactories(
queueFactorySet queueFactorySet,
// getOptionalQueueFactories returns an additionalQueueFactories which contains a list of queue factories that will be
// added to the `group:"queueFactory"` group. The factories are added to the group only if they are enabled, which
// is why we must return a list here.
func getOptionalQueueFactories(
archivalMetadata archiver.ArchivalMetadata,
) []QueueFactory {
factories := []QueueFactory{
queueFactorySet.TransferQueueFactory,
queueFactorySet.TimerQueueFactory,
queueFactorySet.VisibilityQueueFactory,
}
params ArchivalQueueFactoryParams,
) additionalQueueFactories {

c := tasks.CategoryArchival
// this will only affect tests because this method is only called once in production,
// Removing this category will only affect tests because this method is only called once in production,
// but it may be called many times across test runs, which would leave the archival queue as a dangling category
tasks.RemoveCategory(c.ID())
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
tasks.NewCategory(c.ID(), c.Type(), c.Name())
if archivalMetadata.GetHistoryConfig().StaticClusterState() != archiver.ArchivalEnabled &&
archivalMetadata.GetVisibilityConfig().StaticClusterState() != archiver.ArchivalEnabled {
return additionalQueueFactories{}
}
tasks.NewCategory(c.ID(), c.Type(), c.Name())
return additionalQueueFactories{
Factories: []QueueFactory{
NewArchivalQueueFactory(params),
},
}
return factories
}

func QueueSchedulerRateLimiterProvider(
Expand Down