Skip to content

Commit

Permalink
Reapply "Store explicit TaskList partition data (cadence-workflow#6591)…
Browse files Browse the repository at this point in the history
…" (cadence-workflow#6625)

Address issues in GRPC -> types mapper and add additional tets.

Additionally address issues in serialization <-> sqlblobs mapper and add tests.

This reverts commit bf9f526.
  • Loading branch information
natemort committed Jan 28, 2025
1 parent 7d3786d commit 689032e
Show file tree
Hide file tree
Showing 38 changed files with 2,720 additions and 933 deletions.
883 changes: 699 additions & 184 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

308 changes: 157 additions & 151 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

34 changes: 28 additions & 6 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,20 @@ func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTas
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
},
}
}
Expand All @@ -616,9 +627,20 @@ func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshT
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
Version: 1,
ReadPartitions: map[int]*types.TaskListPartition{
0: {},
1: {},
2: {
IsolationGroups: []string{"foo"},
},
},
WritePartitions: map[int]*types.TaskListPartition{
0: {},
1: {
IsolationGroups: []string{"bar"},
},
},
},
}
}
12 changes: 7 additions & 5 deletions client/matching/partition_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfReadPartitions(domainID string,
}
c.RLock()
v := c.Version
w := c.NumWritePartitions
r := c.NumReadPartitions
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -142,8 +142,8 @@ func (p *partitionConfigProviderImpl) GetNumberOfWritePartitions(domainID string
}
c.RLock()
v := c.Version
w := c.NumWritePartitions
r := c.NumReadPartitions
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
c.RUnlock()
scope := p.metricsClient.Scope(metrics.PartitionConfigProviderScope, metrics.DomainTag(domainName), metrics.TaskListRootPartitionTag(taskList.GetName()), getTaskListTypeTag(taskListType))
scope.UpdateGauge(metrics.TaskListPartitionConfigNumReadGauge, float64(r))
Expand Down Expand Up @@ -180,7 +180,9 @@ func (p *partitionConfigProviderImpl) UpdatePartitionConfig(domainID string, tas
}
updated := c.updateConfig(*config)
if updated {
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", config.NumReadPartitions), tag.Dynamic("write-partition", config.NumWritePartitions), tag.Dynamic("config-version", config.Version))
w := len(c.WritePartitions)
r := len(c.ReadPartitions)
p.logger.Info("tasklist partition config updated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType), tag.Dynamic("read-partition", r), tag.Dynamic("write-partition", w), tag.Dynamic("config-version", config.Version))
}
}

Expand Down
20 changes: 14 additions & 6 deletions client/matching/partition_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestGetNumberOfReadPartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 4},
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(4)},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
Expand All @@ -145,10 +145,10 @@ func TestGetNumberOfReadPartitions(t *testing.T) {

kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
partitions := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)
p := partitionProvider.GetNumberOfReadPartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, partitions)
assert.Equal(t, tc.expectedPartitions, p)
})
}
}
Expand Down Expand Up @@ -196,18 +196,18 @@ func TestGetNumberOfWritePartitions(t *testing.T) {
if tc.enableReadFromCache && tc.taskListKind == types.TaskListKindNormal {
if tc.cachedConfigExists {
mockCache.EXPECT().Get(gomock.Any()).Return(&syncedTaskListPartitionConfig{
TaskListPartitionConfig: types.TaskListPartitionConfig{NumReadPartitions: 2, NumWritePartitions: 5},
TaskListPartitionConfig: types.TaskListPartitionConfig{ReadPartitions: partitions(2), WritePartitions: partitions(5)},
}).Times(1)
} else {
mockCache.EXPECT().Get(gomock.Any()).Return(nil).Times(1)
}
}
kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}
partitions := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)
p := partitionProvider.GetNumberOfWritePartitions("test-domain-id", taskList, 0)

// Validate result
assert.Equal(t, tc.expectedPartitions, partitions)
assert.Equal(t, tc.expectedPartitions, p)
})
}
}
Expand Down Expand Up @@ -253,3 +253,11 @@ func TestUpdatePartitionConfig(t *testing.T) {
})
}
}

func partitions(num int) map[int]*types.TaskListPartition {
result := make(map[int]*types.TaskListPartition, num)
for i := 0; i < num; i++ {
result[i] = &types.TaskListPartition{}
}
return result
}
58 changes: 58 additions & 0 deletions common/clock/sustain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package clock

import "time"

type Sustain struct {
started time.Time
source TimeSource
duration func() time.Duration
}

func NewSustain(source TimeSource, duration func() time.Duration) Sustain {
return Sustain{
source: source,
duration: duration,
}
}

func (s *Sustain) Check(value bool) bool {
if value {
now := s.source.Now()
if s.started.IsZero() {
s.started = now
}
if now.Sub(s.started) >= s.duration() {
s.Reset()
return true
}
} else {
s.Reset()
}
return false
}

func (s *Sustain) Reset() {
s.started = time.Time{}
}
135 changes: 135 additions & 0 deletions common/clock/sustain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package clock

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type check struct {
seconds int
value bool
}

func TestSustain(t *testing.T) {
cases := []struct {
name string
duration time.Duration
calls []check
expected []bool
}{
{
name: "simple case",
duration: time.Second * 10,
calls: []check{
{0, true},
{10, true},
},
expected: []bool{
false,
true,
},
},
{
name: "intermediate successes",
duration: 10 * time.Second,
calls: []check{
{0, true},
{2, true},
{2, true},
{2, true},
{2, true},
{2, true},
},
expected: []bool{
false,
false,
false,
false,
false,
true,
},
},
{
name: "resets after success",
duration: time.Second * 10,
calls: []check{
{0, true},
{10, true},
{0, true},
},
expected: []bool{
false,
true,
false,
},
},
{
name: "resets after false",
duration: time.Second * 10,
calls: []check{
{0, true},
{1, false},
{1, true},
{9, true},
{1, true},
},
expected: []bool{
false,
false,
false,
false,
true,
},
},
{
name: "duration = 0",
duration: 0,
calls: []check{
{0, true},
},
expected: []bool{
true,
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
clock := NewMockedTimeSource()
sus := NewSustain(clock, func() time.Duration {
return tc.duration
})
require.Equal(t, len(tc.calls), len(tc.expected))
for i, c := range tc.calls {
expected := tc.expected[i]
clock.Advance(time.Duration(c.seconds) * time.Second)
actual := sus.Check(c.value)
assert.Equal(t, expected, actual, "check %d", i)
}
})
}
}
16 changes: 8 additions & 8 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,20 +1034,20 @@ func CurrentQPS(qps float64) Tag {
return newFloat64Tag("current-qps", qps)
}

func NumReadPartitions(n int32) Tag {
return newInt32("num-read-partitions", n)
func NumReadPartitions(n int) Tag {
return newInt("num-read-partitions", n)
}

func NumWritePartitions(n int32) Tag {
return newInt32("num-write-partitions", n)
func NumWritePartitions(n int) Tag {
return newInt("num-write-partitions", n)
}

func CurrentNumReadPartitions(n int32) Tag {
return newInt32("current-num-read-partitions", n)
func CurrentNumReadPartitions(n int) Tag {
return newInt("current-num-read-partitions", n)
}

func CurrentNumWritePartitions(n int32) Tag {
return newInt32("current-num-write-partitions", n)
func CurrentNumWritePartitions(n int) Tag {
return newInt("current-num-write-partitions", n)
}

func PartitionUpscaleThreshold(qps float64) Tag {
Expand Down
Loading

0 comments on commit 689032e

Please sign in to comment.