Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/zonal isolation zone discovery #6301

Merged
14 changes: 14 additions & 0 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/dynamicconfig/configstore"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/isolationgroup/isolationgroupapi"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/membership"
Expand Down Expand Up @@ -195,6 +196,8 @@ func (s *server) startService() common.Daemon {

params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy

params.GetIsolationGroups = getFromDynamicConfig(params, dc)

params.ClusterMetadata = cluster.NewMetadata(
clusterGroupMetadata.FailoverVersionIncrement,
clusterGroupMetadata.PrimaryClusterName,
Expand Down Expand Up @@ -371,3 +374,14 @@ func validateIndex(config *config.ElasticSearchConfig) {
log.Fatalf("Visibility index is missing in config")
}
}

func getFromDynamicConfig(params resource.Params, dc *dynamicconfig.Collection) func() []string {
return func() []string {
res, err := isolationgroupapi.MapAllIsolationGroupsResponse(dc.GetListProperty(dynamicconfig.AllIsolationGroups)())
if err != nil {
params.Logger.Error("failed to get isolation groups from config", tag.Error(err))
return nil
}
return res
}
}
37 changes: 37 additions & 0 deletions cmd/server/cadence/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/testflags"
"github.com/uber/cadence/tools/cassandra"
Expand Down Expand Up @@ -114,3 +119,35 @@ func (s *ServerSuite) TestServerStartup() {
daemon.Stop()
}
}

func TestSettingGettingZonalIsolationGroupsFromIG(t *testing.T) {

ctrl := gomock.NewController(t)
client := dynamicconfig.NewMockClient(ctrl)
client.EXPECT().GetListValue(dynamicconfig.AllIsolationGroups, gomock.Any()).Return([]interface{}{
"zone-1", "zone-2",
}, nil)

dc := dynamicconfig.NewCollection(client, loggerimpl.NewNopLogger())

assert.NotPanics(t, func() {
fn := getFromDynamicConfig(resource.Params{
Logger: loggerimpl.NewNopLogger(),
}, dc)
out := fn()
assert.Equal(t, []string{"zone-1", "zone-2"}, out)
})
}

func TestSettingGettingZonalIsolationGroupsFromIGError(t *testing.T) {
ctrl := gomock.NewController(t)
client := dynamicconfig.NewMockClient(ctrl)
client.EXPECT().GetListValue(dynamicconfig.AllIsolationGroups, gomock.Any()).Return(nil, assert.AnError)
dc := dynamicconfig.NewCollection(client, loggerimpl.NewNopLogger())

assert.NotPanics(t, func() {
getFromDynamicConfig(resource.Params{
Logger: loggerimpl.NewNopLogger(),
}, dc)()
})
}
2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/go-sql-driver/mysql v1.7.1 // indirect
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/mock v1.6.0
github.com/google/uuid v1.5.0 // indirect
github.com/hashicorp/go-version v1.2.0 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
Expand Down
13 changes: 7 additions & 6 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,13 @@ const (
UnknownListKey ListKey = iota
TestGetListPropertyKey

// AllIsolationGroups is the list of all possible isolation groups in a service
// KeyName: system.allIsolationGroups
// Value type: []string
// Default value: N/A
// Allowed filters: N/A
AllIsolationGroups

// HeaderForwardingRules defines which headers are forwarded from inbound calls to outbound.
// This value is only loaded at startup.
//
Expand All @@ -2971,12 +2978,6 @@ const (
// Value type: []rpc.HeaderRule or an []interface{} containing `map[string]interface{}{"Add":bool,"Match":string}` values.
// Default value: forward all headers. (this is a problematic value, and it will be changing as we reduce to a list of known values)
HeaderForwardingRules
// AllIsolationGroups is the list of all possible isolation groups in a service
// KeyName: system.allIsolationGroups
// Value type: []string
// Default value: N/A
// Allowed filters: N/A
AllIsolationGroups

LastListKey
)
Expand Down
7 changes: 2 additions & 5 deletions common/isolationgroup/defaultisolationgroupstate/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,11 @@ func NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
domainCache cache.DomainCache,
cfgStoreClient dynamicconfig.Client, // can be nil, which means global drain is unsupported
metricsClient metrics.Client,
getIsolationGroups func() []string,
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
) (isolationgroup.State, error) {
stopChan := make(chan struct{})

allIGs := dc.GetListProperty(dynamicconfig.AllIsolationGroups)()
allIsolationGroups, err := isolationgroupapi.MapAllIsolationGroupsResponse(allIGs)
if err != nil {
return nil, fmt.Errorf("could not get all isolation groups fron dynamic config: %w", err)
}
allIsolationGroups := getIsolationGroups()

config := defaultConfig{
IsolationGroupEnabled: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
Expand Down
42 changes: 16 additions & 26 deletions common/isolationgroup/defaultisolationgroupstate/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/isolationgroup/isolationgroupapi"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -639,32 +640,6 @@ func TestIsolationGroupStateMapping(t *testing.T) {
}
}

func TestMapAllIsolationGroupStates(t *testing.T) {

tests := map[string]struct {
in []interface{}
expected []string
expectedErr error
}{
"valid mapping": {
in: []interface{}{"zone-1", "zone-2", "zone-3"},
expected: []string{"zone-1", "zone-2", "zone-3"},
},
"invalid mapping": {
in: []interface{}{1, 2, 3},
expectedErr: errors.New("failed to get all-isolation-groups response from dynamic config: got 1 (int)"),
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
res, err := isolationgroupapi.MapAllIsolationGroupsResponse(td.in)
assert.Equal(t, td.expected, res)
assert.Equal(t, td.expectedErr, err)
})
}
}

func TestUpdateRequest(t *testing.T) {

tests := map[string]struct {
Expand Down Expand Up @@ -716,6 +691,21 @@ func TestUpdateRequest(t *testing.T) {
}
}

func TestNewDefaultIsolationGroupStateWatcherWithConfigStoreClient(t *testing.T) {
dc := dynamicconfig.NewNopCollection()
domainCache := cache.NewNoOpDomainCache()
client := metrics.NewNoopMetricsClient()
ig := func() []string { return nil }
NewDefaultIsolationGroupStateWatcherWithConfigStoreClient(
loggerimpl.NewNopLogger(),
dc,
domainCache,
nil,
client,
ig,
)
}

func TestIsolationGroupShutdown(t *testing.T) {
var v defaultIsolationGroupStateHandler
assert.NotPanics(t, func() {
Expand Down
24 changes: 12 additions & 12 deletions common/isolationgroup/isolationgroupapi/mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ import (
"github.com/uber/cadence/common/types"
)

func MapAllIsolationGroupsResponse(in []interface{}) ([]string, error) {
var allIsolationGroups []string
for k := range in {
v, ok := in[k].(string)
if !ok {
return nil, fmt.Errorf("failed to get all-isolation-groups response from dynamic config: got %v (%T)", in[k], in[k])
}
allIsolationGroups = append(allIsolationGroups, v)
}
return allIsolationGroups, nil
}

func MapDynamicConfigResponse(in []interface{}) (out types.IsolationGroupConfiguration, err error) {
if in == nil {
return nil, nil
Expand Down Expand Up @@ -85,3 +73,15 @@ func MapUpdateGlobalIsolationGroupsRequest(in types.IsolationGroupConfiguration)
}
return out, nil
}

func MapAllIsolationGroupsResponse(in []interface{}) ([]string, error) {
var allIsolationGroups []string
for k := range in {
v, ok := in[k].(string)
if !ok {
return nil, fmt.Errorf("failed to get all-isolation-groups response from dynamic config: got %v (%T)", in[k], in[k])
}
allIsolationGroups = append(allIsolationGroups, v)
}
return allIsolationGroups, nil
}
56 changes: 56 additions & 0 deletions common/isolationgroup/isolationgroupapi/mappers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package isolationgroupapi

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMapAllIsolationGroupStates(t *testing.T) {

tests := map[string]struct {
in []interface{}
expected []string
expectedErr error
}{
"valid mapping": {
in: []interface{}{"zone-1", "zone-2", "zone-3"},
expected: []string{"zone-1", "zone-2", "zone-3"},
},
"invalid mapping": {
in: []interface{}{1, 2, 3},
expectedErr: errors.New("failed to get all-isolation-groups response from dynamic config: got 1 (int)"),
},
}

for name, td := range tests {
t.Run(name, func(t *testing.T) {
res, err := MapAllIsolationGroupsResponse(td.in)
assert.Equal(t, td.expected, res)
assert.Equal(t, td.expectedErr, err)
})
}
}
11 changes: 6 additions & 5 deletions common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ import (
type (
// Params holds the set of parameters needed to initialize common service resources
Params struct {
Name string
InstanceID string
Logger log.Logger
ThrottledLogger log.Logger
HostName string
Name string
InstanceID string
Logger log.Logger
ThrottledLogger log.Logger
HostName string
GetIsolationGroups func() []string

MetricScope tally.Scope
MembershipResolver membership.Resolver
Expand Down
9 changes: 9 additions & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@
dispatcher := params.RPCFactory.GetDispatcher()
membershipResolver := params.MembershipResolver

ensureGetAllIsolationGroupsFnIsSet(params)

Check warning on line 174 in common/resource/resourceImpl.go

View check run for this annotation

Codecov / codecov/patch

common/resource/resourceImpl.go#L174

Added line #L174 was not covered by tests

dynamicCollection := dynamicconfig.NewCollection(
params.DynamicConfig,
logger,
Expand Down Expand Up @@ -706,6 +708,7 @@
domainCache,
isolationGroupStore,
params.MetricsClient,
params.GetIsolationGroups,

Check warning on line 711 in common/resource/resourceImpl.go

View check run for this annotation

Codecov / codecov/patch

common/resource/resourceImpl.go#L711

Added line #L711 was not covered by tests
)
}

Expand All @@ -716,3 +719,9 @@
}
return partition.NewDefaultPartitioner(params.Logger, state)
}

func ensureGetAllIsolationGroupsFnIsSet(params *Params) {
if params.GetIsolationGroups == nil {
params.GetIsolationGroups = func() []string { return []string{} }
}
}
6 changes: 6 additions & 0 deletions common/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ func TestShutdown(t *testing.T) {
i.Stop()
})
}

func TestNewResource(t *testing.T) {
assert.NotPanics(t, func() {
ensureGetAllIsolationGroupsFnIsSet(&Params{})
})
}
Loading
Loading