Skip to content

Commit

Permalink
Revert "Store explicit TaskList partition data (#6591)" (#6625)
Browse files Browse the repository at this point in the history
This reverts commit eebf656.
  • Loading branch information
Shaddoll authored Jan 15, 2025
1 parent 7f5bb88 commit bf9f526
Show file tree
Hide file tree
Showing 36 changed files with 918 additions and 2,517 deletions.
883 changes: 184 additions & 699 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

308 changes: 151 additions & 157 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

34 changes: 6 additions & 28 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,20 +604,9 @@ func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTas
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
Expand All @@ -627,20 +616,9 @@ func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshT
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
12 changes: 5 additions & 7 deletions client/matching/partition_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfReadPartitions(domainID string,
}
c.RLock()
v := c.Version
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -142,8 +142,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string
}
c.RLock()
v := c.Version
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
w := c.NumWritePartitions
r := c.NumReadPartitions
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -180,9 +180,7 @@ func (p *partitionConfigProviderImpl) UpdatePartitionConfig(domainID string, tas
}
updated := c.updateConfig(*config)
if updated {
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", config.Version))
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", config.NumReadPartitions), tag.Dynamic("write-partition", config.NumWritePartitions), tag.Dynamic("config-version", config.Version))
}
}

Expand Down
20 changes: 6 additions & 14 deletions client/matching/partition_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestGetNumberOfReadPartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(4)},
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 4},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
Expand All @@ -145,10 +145,10 @@ func TestGetNumberOfReadPartitions(t *testing.T) {

kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
p := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)
partitions := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, p)
assert.Equal(t, tc.expectedPartitions, partitions)
})
}
}
Expand Down Expand Up @@ -196,18 +196,18 @@ func TestGetNumberOfWritePartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(2), WritePartitions: partitions(5)},
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 2, NumWritePartitions: 5},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
}
}
kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
p := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)
partitions := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, p)
assert.Equal(t, tc.expectedPartitions, partitions)
})
}
}
Expand Down Expand Up @@ -253,11 +253,3 @@ func TestUpdatePartitionConfig(t *testing.T) {
})
}
}

func partitions(num int) map[int]*types.TaskListPartition {
result := make(map[int]*types.TaskListPartition, num)
for i := 0; i < num; i++ {
result[i] = &types.TaskListPartition{}
}
return result
}
58 changes: 0 additions & 58 deletions common/clock/sustain.go

This file was deleted.

135 changes: 0 additions & 135 deletions common/clock/sustain_test.go

This file was deleted.

16 changes: 8 additions & 8 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,20 +1034,20 @@ func CurrentQPS(qps float64) Tag {
return newFloat64Tag("current-qps", qps)
}

func NumReadPartitions(n int) Tag {
return newInt("num-read-partitions", n)
func NumReadPartitions(n int32) Tag {
return newInt32("num-read-partitions", n)
}

func NumWritePartitions(n int) Tag {
return newInt("num-write-partitions", n)
func NumWritePartitions(n int32) Tag {
return newInt32("num-write-partitions", n)
}

func CurrentNumReadPartitions(n int) Tag {
return newInt("current-num-read-partitions", n)
func CurrentNumReadPartitions(n int32) Tag {
return newInt32("current-num-read-partitions", n)
}

func CurrentNumWritePartitions(n int) Tag {
return newInt("current-num-write-partitions", n)
func CurrentNumWritePartitions(n int32) Tag {
return newInt32("current-num-write-partitions", n)
}

func PartitionUpscaleThreshold(qps float64) Tag {
Expand Down
Loading

0 comments on commit bf9f526

Please sign in to comment.