From 34a6db78731031b9e5817bb3d70eead27a49b9cc Mon Sep 17 00:00:00 2001 From: Shaddoll Date: Wed, 27 Nov 2024 11:47:32 -0800 Subject: [PATCH] Update root partition to refresh non-root partition on start (#6527) --- .../matching/tasklist/task_list_manager.go | 12 ++++++++- .../tasklist/task_list_manager_test.go | 26 +++++++++++++++---- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/service/matching/tasklist/task_list_manager.go b/service/matching/tasklist/task_list_manager.go index 7a9f9b22572..1c34961c2ca 100644 --- a/service/matching/tasklist/task_list_manager.go +++ b/service/matching/tasklist/task_list_manager.go @@ -122,6 +122,7 @@ type ( outstandingPollsLock sync.Mutex outstandingPollsMap map[string]outstandingPollerInfo startWG sync.WaitGroup // ensures that background processes do not start until setup is ready + stopWG sync.WaitGroup stopped int32 closeCallback func(Manager) throttleRetry *backoff.ThrottleRetry @@ -273,6 +274,14 @@ func (c *taskListManagerImpl) Start() error { if c.taskListID.IsRoot() && c.taskListKind == types.TaskListKindNormal { c.partitionConfig = c.db.PartitionConfig().ToInternalType() c.logger.Info("get task list partition config from db", tag.Dynamic("root-partition", c.taskListID.GetRoot()), tag.Dynamic("task-list-partition-config", c.partitionConfig)) + if c.partitionConfig != nil { + // push update notification to all non-root partitions on start + c.stopWG.Add(1) + go func() { + defer c.stopWG.Done() + c.notifyPartitionConfig(context.Background(), *c.partitionConfig, int(c.partitionConfig.NumReadPartitions)) + }() + } } c.liveness.Start() c.taskReader.Start() @@ -298,6 +307,7 @@ func (c *taskListManagerImpl) Stop() { c.taskWriter.Stop() c.taskReader.Stop() c.matcher.DisconnectBlockedPollers() + c.stopWG.Wait() c.logger.Info("Task list manager state changed", tag.LifeCycleStopped) } @@ -433,7 +443,7 @@ func (c *taskListManagerImpl) notifyPartitionConfig(ctx context.Context, config _, e = c.matchingClient.RefreshTaskListPartitionConfig(ctx, &types.MatchingRefreshTaskListPartitionConfigRequest{ DomainUUID: c.taskListID.GetDomainID(), - TaskList: &types.TaskList{Name: taskListName}, + TaskList: &types.TaskList{Name: taskListName, Kind: &c.taskListKind}, TaskListType: taskListType, PartitionConfig: &config, }) diff --git a/service/matching/tasklist/task_list_manager_test.go b/service/matching/tasklist/task_list_manager_test.go index 07506715af9..c304ca54659 100644 --- a/service/matching/tasklist/task_list_manager_test.go +++ b/service/matching/tasklist/task_list_manager_test.go @@ -1131,7 +1131,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) { }).Return(&persistence.UpdateTaskListResponse{}, nil) deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{ DomainUUID: "domain-id", - TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1"}, + TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()}, TaskListType: types.TaskListTypeDecision.Ptr(), PartitionConfig: &types.TaskListPartitionConfig{ Version: 2, @@ -1141,7 +1141,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) { }).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil) deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{ DomainUUID: "domain-id", - TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2"}, + TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2", Kind: types.TaskListKindNormal.Ptr()}, TaskListType: types.TaskListTypeDecision.Ptr(), PartitionConfig: &types.TaskListPartitionConfig{ Version: 2, @@ -1185,7 +1185,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) { }).Return(&persistence.UpdateTaskListResponse{}, nil) deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{ DomainUUID: "domain-id", - TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1"}, + TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()}, TaskListType: types.TaskListTypeDecision.Ptr(), PartitionConfig: &types.TaskListPartitionConfig{ Version: 2, @@ -1195,7 +1195,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) { }).Return(nil, errors.New("matching client error")) deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{ DomainUUID: "domain-id", - TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2"}, + TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2", Kind: types.TaskListKindNormal.Ptr()}, TaskListType: types.TaskListTypeDecision.Ptr(), PartitionConfig: &types.TaskListPartitionConfig{ Version: 2, @@ -1321,10 +1321,26 @@ func TestManagerStart_RootPartition(t *testing.T) { Kind: persistence.TaskListKindNormal, AckLevel: 0, RangeID: 0, + AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 2, + NumWritePartitions: 2, + }, }, }, nil) + deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{ + DomainUUID: "domain-id", + TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()}, + TaskListType: types.TaskListTypeDecision.Ptr(), + PartitionConfig: &types.TaskListPartitionConfig{ + Version: 1, + NumReadPartitions: 2, + NumWritePartitions: 2, + }, + }).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil) assert.NoError(t, tlm.Start()) - assert.Nil(t, tlm.TaskListPartitionConfig()) + assert.Equal(t, &types.TaskListPartitionConfig{Version: 1, NumReadPartitions: 2, NumWritePartitions: 2}, tlm.TaskListPartitionConfig()) + tlm.stopWG.Wait() } func TestManagerStart_NonRootPartition(t *testing.T) {