Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reliable Domain Change Notification #777

Merged
merged 26 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7e78cb7
use background thread to refresh domain cache
May 17, 2018
0030aeb
Merge branch 'master' into refresh0-cache
wxing1292 May 21, 2018
8137b73
Merge branch 'master' into refresh0-cache
wxing1292 May 21, 2018
0a6e477
* add domain_by_name_v2 table so all domain will be in the same parti…
May 23, 2018
71dc2ae
modify frontend service
May 24, 2018
8d4167a
unify metadata manager v1 and v2
May 24, 2018
8b79612
Merge remote-tracking branch 'origin/refresh0-cache' into domain-mirror
May 24, 2018
8b1a201
make domain cache use the domain proxy
May 24, 2018
f440947
set cassandra test to v 0.8
May 24, 2018
36aa6dc
update cross dc integ test
May 25, 2018
7dbea5a
Merge branch 'master' into domain-mirror
May 25, 2018
03174bc
update cache refresh logic
May 25, 2018
42097c8
modify domain cache to apply the domain change by order
May 25, 2018
d9ca8e7
rename some veriable
May 25, 2018
bc87400
bugfix: missing failover notification version on replication job
May 25, 2018
ba13d2b
Merge branch 'master' into domain-mirror
May 25, 2018
61bd092
make domain cache duplication deep copy, add more test
May 25, 2018
eea5c1b
change the order of shard catch up && shard registeration to domain c…
May 25, 2018
f2a550f
bugfix: empty uuid in reset mutable state
May 25, 2018
e4e1ea2
Merge branch 'master' into domain-mirror
May 31, 2018
c8337ed
bugfix: reset mutable state
May 31, 2018
01d8051
merge v1 and v2 domain metadata manager into one
Jun 5, 2018
de6cdbf
Merge branch 'master' into domain-mirror
wxing1292 Jun 5, 2018
dff91d6
address comments
Jun 5, 2018
7c6ab95
Merge branch 'master' into domain-mirror
Jun 5, 2018
8ad7cd8
Merge branch 'master' into domain-mirror
Jun 5, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
396 changes: 326 additions & 70 deletions common/cache/domainCache.go

Large diffs are not rendered by default.

535 changes: 535 additions & 0 deletions common/cache/domainCache_test.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ const (
PersistenceDeleteDomainScope
// PersistenceDeleteDomainByNameScope tracks DeleteDomainByName calls made by service to persistence layer
PersistenceDeleteDomainByNameScope
// PersistenceListDomainScope tracks DeleteDomainByName calls made by service to persistence layer
PersistenceListDomainScope
// PersistenceGetMetadataScope tracks DeleteDomainByName calls made by service to persistence layer
PersistenceGetMetadataScope
// PersistenceRecordWorkflowExecutionStartedScope tracks RecordWorkflowExecutionStarted calls made by service to persistence layer
PersistenceRecordWorkflowExecutionStartedScope
// PersistenceRecordWorkflowExecutionClosedScope tracks RecordWorkflowExecutionClosed calls made by service to persistence layer
Expand Down Expand Up @@ -458,6 +462,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceUpdateDomainScope: {operation: "UpdateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainScope: {operation: "DeleteDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainByNameScope: {operation: "DeleteDomainByName", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceListDomainScope: {operation: "ListDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetMetadataScope: {operation: "GetMetadata", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceRecordWorkflowExecutionStartedScope: {operation: "RecordWorkflowExecutionStarted"},
PersistenceRecordWorkflowExecutionClosedScope: {operation: "RecordWorkflowExecutionClosed"},
PersistenceListOpenWorkflowExecutionsScope: {operation: "ListOpenWorkflowExecutions"},
Expand Down Expand Up @@ -594,6 +600,7 @@ const (
CadenceFailures
CadenceLatency
CadenceErrBadRequestCounter
CadenceErrDomainNotActiveCounter
CadenceErrServiceBusyCounter
CadenceErrEntityNotExistsCounter
CadenceErrExecutionAlreadyStartedCounter
Expand Down Expand Up @@ -694,6 +701,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceFailures: {metricName: "cadence.errors", metricType: Counter},
CadenceLatency: {metricName: "cadence.latency", metricType: Timer},
CadenceErrBadRequestCounter: {metricName: "cadence.errors.bad-request", metricType: Counter},
CadenceErrDomainNotActiveCounter: {metricName: "cadence.errors.domain-not-active", metricType: Counter},
CadenceErrServiceBusyCounter: {metricName: "cadence.errors.service-busy", metricType: Counter},
CadenceErrEntityNotExistsCounter: {metricName: "cadence.errors.entity-not-exists", metricType: Counter},
CadenceErrExecutionAlreadyStartedCounter: {metricName: "cadence.errors.execution-already-started", metricType: Counter},
Expand Down
46 changes: 46 additions & 0 deletions common/mocks/MetadataManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,49 @@ func (_m *MetadataManager) UpdateDomain(request *persistence.UpdateDomainRequest

return r0
}

// ListDomain provides a mock function with given fields: request
func (_m *MetadataManager) ListDomain(request *persistence.ListDomainRequest) (*persistence.ListDomainResponse, error) {
ret := _m.Called(request)

var r0 *persistence.ListDomainResponse
if rf, ok := ret.Get(0).(func(*persistence.ListDomainRequest) *persistence.ListDomainResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.ListDomainResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.ListDomainRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// GetMetadata provides a mock function with given fields: request
func (_m *MetadataManager) GetMetadata() (*persistence.GetMetadataResponse, error) {
ret := _m.Called()

var r0 *persistence.GetMetadataResponse
if rf, ok := ret.Get(0).(func() *persistence.GetMetadataResponse); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetMetadataResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}

return r0, r1
}
47 changes: 31 additions & 16 deletions common/persistence/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
templateDomainType = `{` +
templateDomainInfoType = `{` +
`id: ?, ` +
`name: ?, ` +
`status: ?, ` +
Expand All @@ -55,7 +55,7 @@ const (

templateCreateDomainByNameQuery = `INSERT INTO domains_by_name (` +
`name, domain, config, replication_config, is_global_domain, config_version, failover_version) ` +
`VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?) IF NOT EXISTS`
`VALUES(?, ` + templateDomainInfoType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?) IF NOT EXISTS`

templateGetDomainQuery = `SELECT domain.name ` +
`FROM domains ` +
Expand All @@ -72,7 +72,7 @@ const (
`WHERE name = ?`

templateUpdateDomainByNameQuery = `UPDATE domains_by_name ` +
`SET domain = ` + templateDomainType + `, ` +
`SET domain = ` + templateDomainInfoType + `, ` +
`config = ` + templateDomainConfigType + `, ` +
`replication_config = ` + templateDomainReplicationConfigType + `, ` +
`config_version = ? ,` +
Expand Down Expand Up @@ -131,13 +131,20 @@ func (m *cassandraMetadataPersistence) Close() {
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the
// orphaned entry from domains table. We might need a background job to delete those orphaned record.
func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) {
if err := m.session.Query(templateCreateDomainQuery, request.Info.ID, request.Info.Name).Exec(); err != nil {
query := m.session.Query(templateCreateDomainQuery, request.Info.ID, request.Info.Name)
applied, err := query.ScanCAS()
if err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateDomain operation failed. Inserting into domains table. Error: %v", err),
}
}
if !applied {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateDomain operation failed because of uuid collision."),
}
}

query := m.session.Query(templateCreateDomainByNameQuery,
query = m.session.Query(templateCreateDomainByNameQuery,
request.Info.Name,
request.Info.ID,
request.Info.Name,
Expand All @@ -154,7 +161,7 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
)

previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
applied, err = query.MapScanCAS(previous)

if err != nil {
return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -255,22 +262,22 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
replicationConfig.Clusters = GetOrUseDefaultClusters(m.currentClusterName, replicationConfig.Clusters)

return &GetDomainResponse{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
IsGlobalDomain: isGlobalDomain,
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
DBVersion: dbVersion,
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
IsGlobalDomain: isGlobalDomain,
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
NotificationVersion: dbVersion,
}, nil
}

func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest) error {
var nextVersion int64 = 1
var currentVersion *int64
if request.DBVersion > 0 {
nextVersion = request.DBVersion + 1
currentVersion = &request.DBVersion
if request.NotificationVersion > 0 {
nextVersion = request.NotificationVersion + 1
currentVersion = &request.NotificationVersion
}
query := m.session.Query(templateUpdateDomainByNameQuery,
request.Info.ID,
Expand Down Expand Up @@ -325,6 +332,14 @@ func (m *cassandraMetadataPersistence) DeleteDomainByName(request *DeleteDomainB
return m.deleteDomain(request.Name, ID)
}

func (m *cassandraMetadataPersistence) ListDomain(request *ListDomainRequest) (*ListDomainResponse, error) {
panic("cassandraMetadataPersistence do not support list domain operation.")
}

func (m *cassandraMetadataPersistence) GetMetadata() (*GetMetadataResponse, error) {
panic("cassandraMetadataPersistence do not supporgetsmetadatain operation.")
}

func (m *cassandraMetadataPersistence) deleteDomain(name, ID string) error {
query := m.session.Query(templateDeleteDomainByNameQuery, name)
if err := query.Exec(); err != nil {
Expand Down
125 changes: 125 additions & 0 deletions common/persistence/cassandraMetadataPersistenceProxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence

import (
"errors"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/shared"
)

type (
// TODO, we should migrate the non global domain to new table, see #773
// WARN this struct should only be used by the domain cache ONLY
metadataManagerProxy struct {
metadataMgr MetadataManager
metadataMgrV2 MetadataManager
logger bark.Logger
}
)

// NewMetadataManagerProxy is used for merging the functionality the v1 and v2 MetadataManager
func NewMetadataManagerProxy(hosts string, port int, user, password, dc string, keyspace string,
currentClusterName string, logger bark.Logger) (MetadataManager, error) {
metadataMgr, err := NewCassandraMetadataPersistence(hosts, port, user, password, dc, keyspace, currentClusterName, logger)
if err != nil {
return nil, err
}
metadataMgrV2, err := NewCassandraMetadataPersistenceV2(hosts, port, user, password, dc, keyspace, currentClusterName, logger)
if err != nil {
return nil, err
}
return &metadataManagerProxy{metadataMgr: metadataMgr, metadataMgrV2: metadataMgrV2, logger: logger}, nil
}

func (m *metadataManagerProxy) GetDomain(request *GetDomainRequest) (*GetDomainResponse, error) {
// the reason this function does not call the v2 get domain is domain cache will
// use the list domain function to get all domain in the v2 table
resp, err := m.metadataMgrV2.GetDomain(request)
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return nil, err
}
} else {
resp.TableVersion = DomainTableVersionV2
return resp, nil
}

resp, err = m.metadataMgr.GetDomain(request)
if err == nil {
resp.TableVersion = DomainTableVersionV1
}
return resp, err
}

func (m *metadataManagerProxy) ListDomain(request *ListDomainRequest) (*ListDomainResponse, error) {
return m.metadataMgrV2.ListDomain(request)
}

func (m *metadataManagerProxy) GetMetadata() (*GetMetadataResponse, error) {
return m.metadataMgrV2.GetMetadata()
}

func (m *metadataManagerProxy) Close() {
m.metadataMgr.Close()
m.metadataMgrV2.Close()
}

func (m *metadataManagerProxy) CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) {
// for new domain, only create in the v2 table
return m.metadataMgrV2.CreateDomain(request)
}

func (m *metadataManagerProxy) UpdateDomain(request *UpdateDomainRequest) error {
switch request.TableVersion {
case DomainTableVersionV1:
return m.metadataMgr.UpdateDomain(request)
case DomainTableVersionV2:
return m.metadataMgrV2.UpdateDomain(request)
default:
return errors.New("domain table version is not set")
}
}

func (m *metadataManagerProxy) DeleteDomain(request *DeleteDomainRequest) error {
err := m.metadataMgr.DeleteDomain(request)
if err != nil {
m.logger.Warnf("Error deleting domain from V1 table: %v", err)
}
err = m.metadataMgrV2.DeleteDomain(request)
if err != nil {
m.logger.Warnf("Error deleting domain from V2 table: %v", err)
}
return nil
}

func (m *metadataManagerProxy) DeleteDomainByName(request *DeleteDomainByNameRequest) error {
err := m.metadataMgr.DeleteDomainByName(request)
if err != nil {
m.logger.Warnf("Error deleting domain by name from V1 table: %v", err)
}
err = m.metadataMgrV2.DeleteDomainByName(request)
if err != nil {
m.logger.Warnf("Error deleting domain by name from V2 table: %v", err)
}
return nil
}
Loading