Skip to content

Commit

Permalink
Merge branch 'master' into front
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Mar 30, 2018
2 parents e3a11d0 + 39bd2e8 commit 3f24945
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 76 deletions.
3 changes: 1 addition & 2 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,10 @@ func (s *server) startService() common.Daemon {
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
params.ClusterMetadata = cluster.NewMetadata(
s.cfg.ClustersInfo.EnableGlobalDomain,
s.cfg.ClustersInfo.InitialFailoverVersion,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.MasterClusterName,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterNames,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
)
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
params.MessagingClient = s.cfg.Kafka.NewKafkaClient(zap.NewNop(), params.Logger, params.MetricScope)
Expand Down
87 changes: 58 additions & 29 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package cluster

import "fmt"

type (
// Metadata provides information about clusters
Metadata interface {
Expand All @@ -34,62 +36,73 @@ type (
GetMasterClusterName() string
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterNames return the all cluster names, as a set
GetAllClusterNames() map[string]bool
// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
GetAllClusterFailoverVersions() map[string]int64
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
ClusterNameForFailoverVersion(failoverVersion int64) string
}

metadataImpl struct {
// EnableGlobalDomain whether the global domain is enabled,
// this attr should be discarded when cross DC is made public
enableGlobalDomain bool
// initialFailoverVersion is the initial failover version
initialFailoverVersion int64
// failoverVersionIncrement is the increment of each cluster failover version
failoverVersionIncrement int64
// masterClusterName is the name of the master cluster, only the master cluster can register / update domain
// all clusters can do domain failover
masterClusterName string
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterNames contains all cluster names, as a set
clusterNames map[string]bool
// clusterInitialFailoverVersions contains all cluster name -> corresponding initial failover version
clusterInitialFailoverVersions map[string]int64
// clusterInitialFailoverVersions contains all initial failover version -> corresponding cluster name
initialFailoverVersionClusters map[int64]string
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(enableGlobalDomain bool, initialFailoverVersion int64, failoverVersionIncrement int64,
masterClusterName string, currentClusterName string, clusterNames []string) Metadata {
func NewMetadata(enableGlobalDomain bool, failoverVersionIncrement int64,
masterClusterName string, currentClusterName string, clusterInitialFailoverVersions map[string]int64) Metadata {

if initialFailoverVersion < 0 {
panic("Bad initial failover version")
} else if failoverVersionIncrement <= initialFailoverVersion {
panic("Bad failover version increment")
if len(clusterInitialFailoverVersions) < 0 {
panic("Empty initial failover versions for cluster")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(clusterNames) == 0 {
panic("Total number of all cluster names is 0")
}

clusters := make(map[string]bool)
for _, clusterName := range clusterNames {
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
if failoverVersionIncrement <= initialFailoverVersion {
panic(fmt.Sprintf(
"Failover version increment %v is smaller than initial value: %v.",
failoverVersionIncrement,
clusterInitialFailoverVersions,
))
}
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
clusters[clusterName] = true
initialFailoverVersionClusters[initialFailoverVersion] = clusterName
}
if _, ok := clusters[currentClusterName]; !ok {

if _, ok := clusterInitialFailoverVersions[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
}
if _, ok := clusterInitialFailoverVersions[masterClusterName]; !ok {
panic("Master cluster is not specified in all cluster names")
}
if len(initialFailoverVersionClusters) != len(clusterInitialFailoverVersions) {
panic("Cluster to initial failover versions have duplicate initial versions")
}

return &metadataImpl{
enableGlobalDomain: enableGlobalDomain,
initialFailoverVersion: initialFailoverVersion,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterNames: clusters,
enableGlobalDomain: enableGlobalDomain,
failoverVersionIncrement: failoverVersionIncrement,
masterClusterName: masterClusterName,
currentClusterName: currentClusterName,
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
}
}

Expand All @@ -101,7 +114,8 @@ func (metadata *metadataImpl) IsGlobalDomainEnabled() bool {

// GetNextFailoverVersion return the next failover version based on input
func (metadata *metadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 {
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + metadata.initialFailoverVersion
initialFailoverVersion := metadata.clusterInitialFailoverVersions[metadata.currentClusterName]
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement*metadata.failoverVersionIncrement + initialFailoverVersion
if failoverVersion <= currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
Expand All @@ -122,7 +136,22 @@ func (metadata *metadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterNames return the all cluster names
func (metadata *metadataImpl) GetAllClusterNames() map[string]bool {
return metadata.clusterNames
// GetAllClusterFailoverVersions return the all cluster name -> corresponding initial failover version
func (metadata *metadataImpl) GetAllClusterFailoverVersions() map[string]int64 {
return metadata.clusterInitialFailoverVersions
}

// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int64) string {
initialFailoverVersion := failoverVersion % metadata.failoverVersionIncrement
clusterName, ok := metadata.initialFailoverVersionClusters[initialFailoverVersion]
if !ok {
panic(fmt.Sprintf(
"Unknown initial failover version %v with given cluster initial failover version map: %v and failover version increment %v.",
initialFailoverVersion,
metadata.clusterInitialFailoverVersions,
metadata.failoverVersionIncrement,
))
}
return clusterName
}
17 changes: 9 additions & 8 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
package cluster

const (
// TestInitialFailoverVersion is initial failover version used for test
TestInitialFailoverVersion = int64(0)
// TestCurrentClusterInitialFailoverVersion is initial failover version for current cluster
TestCurrentClusterInitialFailoverVersion = int64(0)
// TestAlternativeClusterInitialFailoverVersion is initial failover version for alternative cluster
TestAlternativeClusterInitialFailoverVersion = int64(1)
// TestFailoverVersionIncrement is failover version increment used for test
TestFailoverVersionIncrement = int64(10)
// TestCurrentClusterName is current cluster used for test
Expand All @@ -34,10 +36,10 @@ const (
var (
// TestAllClusterNames is the all cluster names used for test
TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName}
// TestAllClusterNamesMap is the same as above, juse convinent for test mocking
TestAllClusterNamesMap = map[string]bool{
TestCurrentClusterName: true,
TestAlternativeClusterName: true,
// TestAllClusterFailoverVersions is the same as above, juse convinent for test mocking
TestAllClusterFailoverVersions = map[string]int64{
TestCurrentClusterName: TestCurrentClusterInitialFailoverVersion,
TestAlternativeClusterName: TestAlternativeClusterInitialFailoverVersion,
}
)

Expand All @@ -49,10 +51,9 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad
}
return NewMetadata(
enableGlobalDomain,
TestInitialFailoverVersion,
TestFailoverVersionIncrement,
masterClusterName,
TestCurrentClusterName,
TestAllClusterNames,
TestAllClusterFailoverVersions,
)
}
26 changes: 20 additions & 6 deletions common/mocks/ClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,35 @@ package mocks

import mock "github.com/stretchr/testify/mock"

// Metadata is an autogenerated mock type for the Metadata type
// ClusterMetadata is an autogenerated mock type for the Metadata type
type ClusterMetadata struct {
mock.Mock
}

// GetAllClusterNames provides a mock function with given fields:
func (_m *ClusterMetadata) GetAllClusterNames() map[string]bool {
// ClusterNameForFailoverVersion provides a mock function with given fields:
func (_m *ClusterMetadata) ClusterNameForFailoverVersion(failoverVersion int64) string {
ret := _m.Called(failoverVersion)

var r0 string
if rf, ok := ret.Get(0).(func(int64) string); ok {
r0 = rf(failoverVersion)
} else {
r0 = ret.Get(0).(string)
}

return r0
}

// GetAllClusterFailoverVersions provides a mock function with given fields:
func (_m *ClusterMetadata) GetAllClusterFailoverVersions() map[string]int64 {
ret := _m.Called()

var r0 map[string]bool
if rf, ok := ret.Get(0).(func() map[string]bool); ok {
var r0 map[string]int64
if rf, ok := ret.Get(0).(func() map[string]int64); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]bool)
r0 = ret.Get(0).(map[string]int64)
}
}

Expand Down
6 changes: 2 additions & 4 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,15 @@ type (
// EnableGlobalDomain whether the global domain is enabled, this attr should be discarded when
// cross DC is made public
EnableGlobalDomain bool `yaml:"enableGlobalDomain"`
// InitialFailoverVersion is the initial failover version
InitialFailoverVersion int64 `yaml:"initialFailoverVersion"`
// FailoverVersionIncrement is the increment of each cluster failover version
FailoverVersionIncrement int64 `yaml:"failoverVersionIncrement"`
// MasterClusterName is the master cluster name, only the master cluster can register / update domain
// all clusters can do domain failover
MasterClusterName string `yaml:"masterClusterName"`
// CurrentClusterName is the name of the current cluster
CurrentClusterName string `yaml:"currentClusterName"`
// ClusterNames contains all cluster names
ClusterNames []string `yaml:"clusterNames"`
// ClusterInitialFailoverVersions contains all cluster names to corresponding initial failover version
ClusterInitialFailoverVersions map[string]int64 `yaml:"clusterInitialFailoverVersion"`
}

// Metrics contains the config items for metrics subsystem
Expand Down
7 changes: 3 additions & 4 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ services:

clustersInfo:
enableGlobalDomain: false
initialFailoverVersion: 0
failoverVersionIncrement: 10
masterClusterName: "active"
currentClusterName: "active"
clusterNames:
- "active"
- "standby"
clusterInitialFailoverVersion:
active: 0
standby: 1
7 changes: 3 additions & 4 deletions config/development_active.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ services:

clustersInfo:
enableGlobalDomain: true
initialFailoverVersion: 0
failoverVersionIncrement: 10
masterClusterName: "active"
currentClusterName: "active"
clusterNames:
- "active"
- "standby"
clusterInitialFailoverVersion:
active: 0
standby: 1

kafka:
clusters:
Expand Down
7 changes: 3 additions & 4 deletions config/development_standby.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ services:

clustersInfo:
enableGlobalDomain: true
initialFailoverVersion: 1
failoverVersionIncrement: 10
masterClusterName: "active"
currentClusterName: "standby"
clusterNames:
- "active"
- "standby"
clusterInitialFailoverVersion:
active: 0
standby: 1

kafka:
clusters:
Expand Down
7 changes: 3 additions & 4 deletions docker/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ services:

clustersInfo:
enableGlobalDomain: false
initialFailoverVersion: 0
failoverVersionIncrement: 10
masterClusterName: "active"
currentClusterName: "active"
clusterNames:
- "active"
- "standby"
clusterInitialFailoverVersion:
active: 0
standby: 1
16 changes: 8 additions & 8 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainDisabled
activeClusterName := ""
currentClusterName := s.ClusterMetadata.GetCurrentClusterName()
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -336,7 +336,7 @@ func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_
emitMetric := true
activeClusterName := ""
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -369,7 +369,7 @@ func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_
emitMetric := true
activeClusterName := ""
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainDisabled_A
emitMetric := true
currentClusterName := s.ClusterMetadata.GetCurrentClusterName()
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -508,7 +508,7 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_No
retention := int32(7)
emitMetric := true
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -547,7 +547,7 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_Is
retention := int32(7)
emitMetric := true
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -708,7 +708,7 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_Is
retention := int32(7)
emitMetric := true
clusters := []*workflow.ClusterReplicationConfiguration{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down Expand Up @@ -767,7 +767,7 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_Fa
activeClusterName := ""
failoverVersion := int64(59)
persistenceClusters := []*persistence.ClusterReplicationConfig{}
for clusterName := range s.ClusterMetadata.GetAllClusterNames() {
for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() {
clusters = append(clusters, &workflow.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterName),
})
Expand Down
Loading

0 comments on commit 3f24945

Please sign in to comment.