Skip to content

Commit

Permalink
Update root partition to refresh non-root partition on start (#6527)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Nov 27, 2024
1 parent c500725 commit 34a6db7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
12 changes: 11 additions & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type (
outstandingPollsLock sync.Mutex
outstandingPollsMap map[string]outstandingPollerInfo
startWG sync.WaitGroup // ensures that background processes do not start until setup is ready
stopWG sync.WaitGroup
stopped int32
closeCallback func(Manager)
throttleRetry *backoff.ThrottleRetry
Expand Down Expand Up @@ -273,6 +274,14 @@ func (c *taskListManagerImpl) Start() error {
if c.taskListID.IsRoot() && c.taskListKind == types.TaskListKindNormal {
c.partitionConfig = c.db.PartitionConfig().ToInternalType()
c.logger.Info("get task list partition config from db", tag.Dynamic("root-partition", c.taskListID.GetRoot()), tag.Dynamic("task-list-partition-config", c.partitionConfig))
if c.partitionConfig != nil {
// push update notification to all non-root partitions on start
c.stopWG.Add(1)
go func() {
defer c.stopWG.Done()
c.notifyPartitionConfig(context.Background(), *c.partitionConfig, int(c.partitionConfig.NumReadPartitions))
}()
}
}
c.liveness.Start()
c.taskReader.Start()
Expand All @@ -298,6 +307,7 @@ func (c *taskListManagerImpl) Stop() {
c.taskWriter.Stop()
c.taskReader.Stop()
c.matcher.DisconnectBlockedPollers()
c.stopWG.Wait()
c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
}

Expand Down Expand Up @@ -433,7 +443,7 @@ func (c *taskListManagerImpl) notifyPartitionConfig(ctx context.Context, config

_, e = c.matchingClient.RefreshTaskListPartitionConfig(ctx, &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: c.taskListID.GetDomainID(),
TaskList: &types.TaskList{Name: taskListName},
TaskList: &types.TaskList{Name: taskListName, Kind: &c.taskListKind},
TaskListType: taskListType,
PartitionConfig: &config,
})
Expand Down
26 changes: 21 additions & 5 deletions service/matching/tasklist/task_list_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) {
}).Return(&persistence.UpdateTaskListResponse{}, nil)
deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: "domain-id",
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1"},
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
Version: 2,
Expand All @@ -1141,7 +1141,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) {
}).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil)
deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: "domain-id",
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2"},
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2", Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
Version: 2,
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) {
}).Return(&persistence.UpdateTaskListResponse{}, nil)
deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: "domain-id",
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1"},
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
Version: 2,
Expand All @@ -1195,7 +1195,7 @@ func TestUpdateTaskListPartitionConfig(t *testing.T) {
}).Return(nil, errors.New("matching client error"))
deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: "domain-id",
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2"},
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/2", Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
Version: 2,
Expand Down Expand Up @@ -1321,10 +1321,26 @@ func TestManagerStart_RootPartition(t *testing.T) {
Kind: persistence.TaskListKindNormal,
AckLevel: 0,
RangeID: 0,
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
},
}, nil)
deps.mockMatchingClient.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: "domain-id",
TaskList: &types.TaskList{Name: "/__cadence_sys/tl/1", Kind: types.TaskListKindNormal.Ptr()},
TaskListType: types.TaskListTypeDecision.Ptr(),
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 2,
NumWritePartitions: 2,
},
}).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil)
assert.NoError(t, tlm.Start())
assert.Nil(t, tlm.TaskListPartitionConfig())
assert.Equal(t, &types.TaskListPartitionConfig{Version: 1, NumReadPartitions: 2, NumWritePartitions: 2}, tlm.TaskListPartitionConfig())
tlm.stopWG.Wait()
}

func TestManagerStart_NonRootPartition(t *testing.T) {
Expand Down

0 comments on commit 34a6db7

Please sign in to comment.