From 5c6cb080cf0bbff8093354e33a9bd8be77807b1f Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Thu, 1 Mar 2018 17:53:43 -0800 Subject: [PATCH] wire replicator transmission to domain APIs (#590) * wire replicator transmission to domain APIs --- cmd/server/cadence.go | 2 +- common/mocks/KafkaProducer.go | 76 +++++++ common/persistence/dataInterfaces.go | 2 +- common/service/service.go | 7 + common/service/serviceinterfaces.go | 4 + host/onebox.go | 8 +- .../frontend/domainReplicationTaskHandler.go | 121 ++++++++++ .../domainReplicationTaskHandler_test.go | 206 ++++++++++++++++++ service/frontend/handler.go | 33 ++- service/frontend/service.go | 7 +- service/history/historyEngine.go | 2 +- service/history/historyEngine_test.go | 6 +- .../worker/domainReplicationTaskHandler.go | 47 +++- .../domainReplicationTaskHandler_test.go | 38 ++-- service/worker/processor.go | 4 +- 15 files changed, 520 insertions(+), 43 deletions(-) create mode 100644 common/mocks/KafkaProducer.go create mode 100644 service/frontend/domainReplicationTaskHandler.go create mode 100644 service/frontend/domainReplicationTaskHandler_test.go diff --git a/cmd/server/cadence.go b/cmd/server/cadence.go index 24d60847638..0ee87d2c667 100644 --- a/cmd/server/cadence.go +++ b/cmd/server/cadence.go @@ -61,7 +61,7 @@ func startHandler(c *cli.Context) { log.Fatal("Unable to get current directory") } if err := cassandra.VerifyCompatibleVersion(cassCfg, dir); err != nil { - log.Fatalf("Incompatible versions", err) + log.Fatal("Incompatible versions", err) } services := getServices(c) diff --git a/common/mocks/KafkaProducer.go b/common/mocks/KafkaProducer.go new file mode 100644 index 00000000000..e485be2e264 --- /dev/null +++ b/common/mocks/KafkaProducer.go @@ -0,0 +1,76 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/common/messaging" +) + +// KafkaProducer is an autogenerated mock type for the KafkaProducer type +type KafkaProducer struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *KafkaProducer) Close() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Publish provides a mock function with given fields: msg +func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error { + ret := _m.Called(msg) + + var r0 error + if rf, ok := ret.Get(0).(func(*replicator.ReplicationTask) error); ok { + r0 = rf(msg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// PublishBatch provides a mock function with given fields: msgs +func (_m *KafkaProducer) PublishBatch(msgs []*replicator.ReplicationTask) error { + ret := _m.Called(msgs) + + var r0 error + if rf, ok := ret.Get(0).(func([]*replicator.ReplicationTask) error); ok { + r0 = rf(msgs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +var _ messaging.Producer = (*KafkaProducer)(nil) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index b7f7fbbfffe..68e96257350 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -1010,7 +1010,7 @@ func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *History } func (b *HistoryEventBatch) String() string { - return fmt.Sprint("[version:%v, events:%v]", b.Version, b.Events) + return fmt.Sprintf("[version:%v, events:%v]", b.Version, b.Events) } // NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch diff --git a/common/service/service.go b/common/service/service.go index 60338b8da40..e148c3f2b44 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -89,6 +89,7 @@ type ( runtimeMetricsReporter *metrics.RuntimeMetricsReporter metricsClient metrics.Client clusterMetadata cluster.Metadata + messagingClient messaging.Client dynamicCollection *dynamicconfig.Collection } ) @@ -105,6 +106,7 @@ func New(params *BootstrapParams) Service { metricsScope: params.MetricScope, numberOfHistoryShards: params.CassandraConfig.NumHistoryShards, clusterMetadata: params.ClusterMetadata, + messagingClient: params.MessagingClient, dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig), } sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger) @@ -227,6 +229,11 @@ func (h *serviceImpl) GetClusterMetadata() cluster.Metadata { return h.clusterMetadata } +// GetMessagingClient returns the messaging client against Kafka +func (h *serviceImpl) GetMessagingClient() messaging.Client { + return h.messagingClient +} + func getMetricsServiceIdx(serviceName string, logger bark.Logger) metrics.ServiceIdx { switch serviceName { case common.FrontendServiceName: diff --git a/common/service/serviceinterfaces.go b/common/service/serviceinterfaces.go index 237f8a1b9ce..8a17f5d0ce4 100644 --- a/common/service/serviceinterfaces.go +++ b/common/service/serviceinterfaces.go @@ -25,6 +25,7 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "go.uber.org/yarpc" ) @@ -55,5 +56,8 @@ type ( // GetClusterMetadata returns the service cluster metadata GetClusterMetadata() cluster.Metadata + + // GetMessagingClient returns the messaging client against Kafka + GetMessagingClient() messaging.Client } ) diff --git a/host/onebox.go b/host/onebox.go index 33322c6cb3d..02f2717f8a4 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -28,12 +28,14 @@ import ( "errors" + "github.com/stretchr/testify/mock" "github.com/uber-common/bark" "github.com/uber-go/tally" "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" fecli "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/config" @@ -197,9 +199,13 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards params.CassandraConfig.Hosts = "127.0.0.1" + // TODO when cross DC is public, remove this temporary override + kafkaProducer := &mocks.KafkaProducer{} + kafkaProducer.On("Publish", mock.Anything).Return(nil) + c.frontEndService = service.New(params) c.frontendHandler = frontend.NewWorkflowHandler( - c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr) + c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr, kafkaProducer) err := c.frontendHandler.Start() if err != nil { c.logger.WithField("error", err).Fatal("Failed to start frontend") diff --git a/service/frontend/domainReplicationTaskHandler.go b/service/frontend/domainReplicationTaskHandler.go new file mode 100644 index 00000000000..f719804b4c2 --- /dev/null +++ b/service/frontend/domainReplicationTaskHandler.go @@ -0,0 +1,121 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "errors" + + "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/persistence" +) + +var ( + // ErrInvalidDomainStatus is the error to indicate invalid domain status + ErrInvalidDomainStatus = errors.New("invalid domain status attribute") +) + +// NOTE: the counterpart of domain replication receiving logic is in service/worker package + +type ( + // DomainReplicator is the interface which can replicate the domain + DomainReplicator interface { + HandleTransmissionTask(domainOperation replicator.DomainOperation, info *persistence.DomainInfo, + config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig, + configVersion int64, failoverVersion int64) error + } + + domainReplicatorImpl struct { + kafka messaging.Producer + logger bark.Logger + } +) + +// NewDomainReplicator create a new instance odf domain replicator +func NewDomainReplicator(kafka messaging.Producer, logger bark.Logger) DomainReplicator { + return &domainReplicatorImpl{ + kafka: kafka, + logger: logger, + } +} + +// HandleTransmissionTask handle transmission of the domain replication task +func (domainReplicator *domainReplicatorImpl) HandleTransmissionTask(domainOperation replicator.DomainOperation, + info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig, + configVersion int64, failoverVersion int64) error { + status, err := domainReplicator.convertDomainStatusToThrift(info.Status) + if err != nil { + return err + } + + taskType := replicator.ReplicationTaskTypeDomain + task := &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(info.ID), + Info: &shared.DomainInfo{ + Name: common.StringPtr(info.Name), + Status: status, + Description: common.StringPtr(info.Description), + OwnerEmail: common.StringPtr(info.OwnerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(config.Retention), + EmitMetric: common.BoolPtr(config.EmitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(replicationConfig.ActiveClusterName), + Clusters: domainReplicator.convertClusterReplicationConfigToThrift(replicationConfig.Clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + } + + return domainReplicator.kafka.Publish(&replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: task, + }) +} + +func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift( + input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration { + output := []*shared.ClusterReplicationConfiguration{} + for _, cluster := range input { + clusterName := common.StringPtr(cluster.ClusterName) + output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName}) + } + return output +} + +func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) { + switch input { + case persistence.DomainStatusRegistered: + output := shared.DomainStatusRegistered + return &output, nil + case persistence.DomainStatusDeprecated: + output := shared.DomainStatusDeprecated + return &output, nil + default: + return nil, ErrInvalidDomainStatus + } +} diff --git a/service/frontend/domainReplicationTaskHandler_test.go b/service/frontend/domainReplicationTaskHandler_test.go new file mode 100644 index 00000000000..01f3b908e13 --- /dev/null +++ b/service/frontend/domainReplicationTaskHandler_test.go @@ -0,0 +1,206 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "log" + "os" + "testing" + + "github.com/pborman/uuid" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" +) + +type ( + domainReplicatorSuite struct { + suite.Suite + domainReplicator *domainReplicatorImpl + kafkaProducer *mocks.KafkaProducer + } +) + +func TestDomainReplicatorSuite(t *testing.T) { + s := new(domainReplicatorSuite) + suite.Run(t, s) +} + +func (s *domainReplicatorSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } +} + +func (s *domainReplicatorSuite) TearDownSuite() { + +} + +func (s *domainReplicatorSuite) SetupTest() { + s.kafkaProducer = &mocks.KafkaProducer{} + s.domainReplicator = NewDomainReplicator( + s.kafkaProducer, + bark.NewLoggerFromLogrus(logrus.New()), + ).(*domainReplicatorImpl) + +} + +func (s *domainReplicatorSuite) TearDownTest() { +} + +func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask() { + taskType := replicator.ReplicationTaskTypeDomain + id := uuid.New() + name := "some random domain test name" + status := shared.DomainStatusRegistered + description := "some random test description" + ownerEmail := "some random test owner" + retention := int32(10) + emitMetric := true + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(0) + failoverVersion := int64(59) + clusters := []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ + ClusterName: clusterActive, + }, + &persistence.ClusterReplicationConfig{ + ClusterName: clusterStandby, + }, + } + + domainOperation := replicator.DomainOperationCreate + info := &persistence.DomainInfo{ + ID: id, + Name: name, + Status: persistence.DomainStatusRegistered, + Description: description, + OwnerEmail: ownerEmail, + } + config := &persistence.DomainConfig{ + Retention: retention, + EmitMetric: emitMetric, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: clusterActive, + Clusters: clusters, + } + + s.kafkaProducer.On("Publish", &replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(id), + Info: &shared.DomainInfo{ + Name: common.StringPtr(name), + Status: &status, + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(ownerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(clusterActive), + Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + }, + }).Return(nil).Once() + + err := s.domainReplicator.HandleTransmissionTask(domainOperation, info, config, replicationConfig, configVersion, failoverVersion) + s.Nil(err) +} + +func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask() { + taskType := replicator.ReplicationTaskTypeDomain + id := uuid.New() + name := "some random domain test name" + status := shared.DomainStatusDeprecated + description := "some random test description" + ownerEmail := "some random test owner" + retention := int32(10) + emitMetric := true + clusterActive := "some random active cluster name" + clusterStandby := "some random standby cluster name" + configVersion := int64(0) + failoverVersion := int64(59) + clusters := []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ + ClusterName: clusterActive, + }, + &persistence.ClusterReplicationConfig{ + ClusterName: clusterStandby, + }, + } + + domainOperation := replicator.DomainOperationUpdate + info := &persistence.DomainInfo{ + ID: id, + Name: name, + Status: persistence.DomainStatusDeprecated, + Description: description, + OwnerEmail: ownerEmail, + } + config := &persistence.DomainConfig{ + Retention: retention, + EmitMetric: emitMetric, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: clusterActive, + Clusters: clusters, + } + + s.kafkaProducer.On("Publish", &replicator.ReplicationTask{ + TaskType: &taskType, + DomainTaskAttributes: &replicator.DomainTaskAttributes{ + DomainOperation: &domainOperation, + ID: common.StringPtr(id), + Info: &shared.DomainInfo{ + Name: common.StringPtr(name), + Status: &status, + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(ownerEmail), + }, + Config: &shared.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfig: &shared.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(clusterActive), + Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters), + }, + ConfigVersion: common.Int64Ptr(configVersion), + FailoverVersion: common.Int64Ptr(failoverVersion), + }, + }).Return(nil).Once() + + err := s.domainReplicator.HandleTransmissionTask(domainOperation, info, config, replicationConfig, configVersion, failoverVersion) + s.Nil(err) +} diff --git a/service/frontend/handler.go b/service/frontend/handler.go index d8e03c25165..ef1336798df 100644 --- a/service/frontend/handler.go +++ b/service/frontend/handler.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/uber/cadence/common/messaging" + "github.com/pborman/uuid" "github.com/uber-common/bark" "github.com/uber-go/tally" @@ -35,6 +37,7 @@ import ( "github.com/uber/cadence/.gen/go/health/metaserver" h "github.com/uber/cadence/.gen/go/history" m "github.com/uber/cadence/.gen/go/matching" + "github.com/uber/cadence/.gen/go/replicator" gen "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" @@ -65,6 +68,7 @@ type ( startWG sync.WaitGroup rateLimiter common.TokenBucket config *Config + domainReplicator DomainReplicator service.Service } @@ -106,7 +110,8 @@ var ( // NewWorkflowHandler creates a thrift handler for the cadence service func NewWorkflowHandler( sVice service.Service, config *Config, metadataMgr persistence.MetadataManager, - historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager) *WorkflowHandler { + historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager, + kafkaProducer messaging.Producer) *WorkflowHandler { handler := &WorkflowHandler{ Service: sVice, config: config, @@ -117,6 +122,7 @@ func NewWorkflowHandler( hSerializerFactory: persistence.NewHistorySerializerFactory(), domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()), rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()), + domainReplicator: NewDomainReplicator(kafkaProducer, sVice.GetLogger()), } // prevent us from trying to serve requests before handler's Start() is complete handler.startWG.Add(1) @@ -214,7 +220,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * return wh.error(err, scope) } - response, err := wh.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ + domainRequest := &persistence.CreateDomainRequest{ Info: &persistence.DomainInfo{ ID: uuid.New(), Name: registerRequest.GetName(), @@ -232,14 +238,25 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * }, IsGlobalDomain: clusterMetadata.IsGlobalDomainEnabled(), FailoverVersion: 0, // TODO do something? - }) + } + domainResponse, err := wh.metadataMgr.CreateDomain(domainRequest) if err != nil { return wh.error(err, scope) } + // TODO remove the IsGlobalDomainEnabled check once cross DC is public + if clusterMetadata.IsGlobalDomainEnabled() { + err = wh.domainReplicator.HandleTransmissionTask(replicator.DomainOperationCreate, + domainRequest.Info, domainRequest.Config, domainRequest.ReplicationConfig, 0, domainRequest.FailoverVersion) + if err != nil { + return wh.error(err, scope) + } + } + // TODO: Log through logging framework. We need to have good auditing of domain CRUD - wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", *registerRequest.Name, response.ID) + wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", registerRequest.GetName(), domainResponse.ID) + return nil } @@ -423,6 +440,14 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context, if err != nil { return nil, wh.error(err, scope) } + // TODO remove the IsGlobalDomainEnabled check once cross DC is public + if clusterMetadata.IsGlobalDomainEnabled() { + err = wh.domainReplicator.HandleTransmissionTask(replicator.DomainOperationUpdate, + info, config, replicationConfig, configVersion, failoverVersion) + if err != nil { + return nil, wh.error(err, scope) + } + } } else if clusterMetadata.IsGlobalDomainEnabled() && !clusterMetadata.IsMasterCluster() { // although there is no attr updated, just prevent customer to use the non master cluster // for update domain, ever (except if customer want to do a domain failover) diff --git a/service/frontend/service.go b/service/frontend/service.go index b8a122198bd..6fef2a8d377 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -114,7 +114,12 @@ func (s *Service) Start() { history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient()) - handler := NewWorkflowHandler(base, s.config, metadata, history, visibility) + kafkaProducer, err := base.GetMessagingClient().NewProducer(base.GetClusterMetadata().GetCurrentClusterName()) + if err != nil { + log.Fatalf("Creating kafka producer failed: %v", err) + } + + handler := NewWorkflowHandler(base, s.config, metadata, history, visibility, kafkaProducer) handler.Start() log.Infof("%v started", common.FrontendServiceName) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index b25f5a54fb2..8ec025203a4 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2121,7 +2121,7 @@ func getScheduleID(activityID string, msBuilder *mutableStateBuilder) (int64, er } scheduleID, ok := msBuilder.GetScheduleIDByActivityID(activityID) if !ok { - return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %d\n", activityID)} + return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %s\n", activityID)} } return scheduleID, nil } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 25729f2f77b..94610f65b1e 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1325,7 +1325,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfNoAidFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() { @@ -1836,7 +1836,7 @@ func (s *engineSuite) TestRespondActivityTaskFailededIfNoAIdFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { @@ -2548,7 +2548,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledIfNoAidFound() { Identity: &identity, }, }) - s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}") + s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}") } func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_NotScheduled() { diff --git a/service/worker/domainReplicationTaskHandler.go b/service/worker/domainReplicationTaskHandler.go index 57ee276cd8c..30a7b0448a6 100644 --- a/service/worker/domainReplicationTaskHandler.go +++ b/service/worker/domainReplicationTaskHandler.go @@ -26,6 +26,7 @@ import ( "github.com/uber-common/bark" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" ) @@ -50,6 +51,8 @@ var ( ErrInvalidDomainStatus = errors.New("invalid domain status attribute") ) +// NOTE: the counterpart of domain replication transmission logic is in service/fropntend package + type ( domainReplicatorImpl struct { metadataManager persistence.MetadataManager @@ -65,8 +68,8 @@ func NewDomainReplicator(metadataManager persistence.MetadataManager, logger bar } } -// handleDomainReplicationTask handle the domain replication task -func (domainReplicator *domainReplicatorImpl) HandleReceiveTask(task *replicator.DomainTaskAttributes) error { +// HandleReceiveTask handle receiving of the domain replication task +func (domainReplicator *domainReplicatorImpl) HandleReceivingTask(task *replicator.DomainTaskAttributes) error { if err := domainReplicator.validateDomainReplicationTask(task); err != nil { return err } @@ -84,7 +87,7 @@ func (domainReplicator *domainReplicatorImpl) HandleReceiveTask(task *replicator // handleDomainCreationReplicationTask handle the domain creation replication task func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTask(task *replicator.DomainTaskAttributes) error { // task already validated - status, err := domainReplicator.convertDomainStatus(task.Info.Status) + status, err := domainReplicator.convertDomainStatusFromThrift(task.Info.Status) if err != nil { return err } @@ -103,7 +106,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas }, ReplicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: task.ReplicationConfig.GetActiveClusterName(), - Clusters: domainReplicator.convertClusterReplicationConfig(task.ReplicationConfig.Clusters), + Clusters: domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters), }, IsGlobalDomain: true, // local domain will not be replicated FailoverVersion: task.GetFailoverVersion(), @@ -116,7 +119,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas // handleDomainUpdateReplicationTask handle the domain update replication task func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask(task *replicator.DomainTaskAttributes) error { // task already validated - status, err := domainReplicator.convertDomainStatus(task.Info.Status) + status, err := domainReplicator.convertDomainStatusFromThrift(task.Info.Status) if err != nil { return err } @@ -154,7 +157,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask( Retention: task.Config.GetWorkflowExecutionRetentionPeriodInDays(), EmitMetric: task.Config.GetEmitMetric(), } - request.ReplicationConfig.Clusters = domainReplicator.convertClusterReplicationConfig(task.ReplicationConfig.Clusters) + request.ReplicationConfig.Clusters = domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters) request.ConfigVersion = task.GetConfigVersion() } if resp.FailoverVersion < task.GetFailoverVersion() { @@ -193,7 +196,7 @@ func (domainReplicator *domainReplicatorImpl) validateDomainReplicationTask(task return nil } -func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfig( +func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigFromThrift( input []*shared.ClusterReplicationConfiguration) []*persistence.ClusterReplicationConfig { output := []*persistence.ClusterReplicationConfig{} for _, cluster := range input { @@ -203,12 +206,22 @@ func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfig( return output } -func (domainReplicator *domainReplicatorImpl) convertDomainStatus(status *shared.DomainStatus) (int, error) { - if status == nil { +func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift( + input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration { + output := []*shared.ClusterReplicationConfiguration{} + for _, cluster := range input { + clusterName := common.StringPtr(cluster.ClusterName) + output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName}) + } + return output +} + +func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(input *shared.DomainStatus) (int, error) { + if input == nil { return 0, ErrInvalidDomainStatus } - switch *status { + switch *input { case shared.DomainStatusRegistered: return persistence.DomainStatusRegistered, nil case shared.DomainStatusDeprecated: @@ -217,3 +230,17 @@ func (domainReplicator *domainReplicatorImpl) convertDomainStatus(status *shared return 0, ErrInvalidDomainStatus } } + +func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) { + switch input { + case persistence.DomainStatusRegistered: + output := shared.DomainStatusRegistered + return &output, nil + case persistence.DomainStatusDeprecated: + output := shared.DomainStatusDeprecated + return &output, nil + default: + return nil, ErrInvalidDomainStatus + } + +} diff --git a/service/worker/domainReplicationTaskHandler_test.go b/service/worker/domainReplicationTaskHandler_test.go index 1a3e817d65e..f441cb727cf 100644 --- a/service/worker/domainReplicationTaskHandler_test.go +++ b/service/worker/domainReplicationTaskHandler_test.go @@ -70,7 +70,7 @@ func (s *domainReplicatorSuite) TearDownTest() { s.TearDownWorkflowStore() } -func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -113,7 +113,7 @@ func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(task) + err := s.domainReplicator.HandleReceivingTask(task) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{ID: id}) @@ -127,13 +127,13 @@ func (s *domainReplicatorSuite) TestReplicateRegisterDomainTask() { s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(0), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_UpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_UpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -176,7 +176,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -218,7 +218,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -231,13 +231,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_Updat s.Equal(updateRetention, resp.Config.Retention) s.Equal(updateEmitMetric, resp.Config.EmitMetric) s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(updateClusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters) s.Equal(updateConfigVersion, resp.ConfigVersion) s.Equal(updateFailoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_NoUpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -280,7 +280,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -322,7 +322,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -335,13 +335,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_UpdateConfig_NoUpd s.Equal(updateRetention, resp.Config.Retention) s.Equal(updateEmitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(updateClusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(updateClusters), resp.ReplicationConfig.Clusters) s.Equal(updateConfigVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_UpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdateConfig_UpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -384,7 +384,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -426,7 +426,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -439,13 +439,13 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_Upd s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(updateClusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(updateFailoverVersion, resp.FailoverVersion) s.Equal(int64(1), resp.DBVersion) } -func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoUpdateActiveCluster() { +func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_NoUpdateConfig_NoUpdateActiveCluster() { operation := replicator.DomainOperationCreate id := uuid.New() name := "some random domain test name" @@ -488,7 +488,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU FailoverVersion: common.Int64Ptr(failoverVersion), } - err := s.domainReplicator.HandleReceiveTask(createTask) + err := s.domainReplicator.HandleReceivingTask(createTask) s.Nil(err) // success update case @@ -530,7 +530,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU ConfigVersion: common.Int64Ptr(updateConfigVersion), FailoverVersion: common.Int64Ptr(updateFailoverVersion), } - err = s.domainReplicator.HandleReceiveTask(updateTask) + err = s.domainReplicator.HandleReceivingTask(updateTask) s.Nil(err) resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name}) s.Nil(err) @@ -543,7 +543,7 @@ func (s *domainReplicatorSuite) TestReplicateUpdateDomainTask_NoUpdateConfig_NoU s.Equal(retention, resp.Config.Retention) s.Equal(emitMetric, resp.Config.EmitMetric) s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName) - s.Equal(s.domainReplicator.convertClusterReplicationConfig(clusters), resp.ReplicationConfig.Clusters) + s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters) s.Equal(configVersion, resp.ConfigVersion) s.Equal(failoverVersion, resp.FailoverVersion) s.Equal(int64(0), resp.DBVersion) diff --git a/service/worker/processor.go b/service/worker/processor.go index bfe51a69d10..ae4e2767072 100644 --- a/service/worker/processor.go +++ b/service/worker/processor.go @@ -42,7 +42,7 @@ type ( // DomainReplicator is the interface which can replicate the domain DomainReplicator interface { - HandleReceiveTask(task *replicator.DomainTaskAttributes) error + HandleReceivingTask(task *replicator.DomainTaskAttributes) error } replicationTaskProcessor struct { @@ -176,7 +176,7 @@ func (p *replicationTaskProcessor) worker(workerWG *sync.WaitGroup) { switch task.GetTaskType() { case replicator.ReplicationTaskTypeDomain: p.logger.Debugf("Recieved domain replication task %v.", task.DomainTaskAttributes) - p.domainReplicator.HandleReceiveTask(task.DomainTaskAttributes) + err = p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) case replicator.ReplicationTaskTypeHistory: p.logger.Debugf("Recieved history replication task %v.", task.HistoryTaskAttributes) default: