diff --git a/host/integration_cross_domain_test.go b/host/integration_cross_domain_test.go new file mode 100644 index 00000000000..dc973408b13 --- /dev/null +++ b/host/integration_cross_domain_test.go @@ -0,0 +1,823 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package host + +import ( + "flag" + "os" + "testing" + + "github.com/pborman/uuid" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/uber-common/bark" + + wsc "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" + workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/messaging" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/persistence" +) + +type ( + integrationCrossDCSuite struct { + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + *require.Assertions + domainName string + foreignDomainName string + mockMessagingClient messaging.Client + mockProducer messaging.Producer + host Cadence + engine wsc.Interface + logger bark.Logger + suite.Suite + persistence.TestBase + } +) + +func TestIntegrationCrossDCSuite(t *testing.T) { + flag.Parse() + if *integration { + s := new(integrationCrossDCSuite) + suite.Run(t, s) + } else { + t.Skip() + } +} + +func (s *integrationCrossDCSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } + + logger := log.New() + formatter := &log.TextFormatter{} + formatter.FullTimestamp = true + logger.Formatter = formatter + //logger.Level = log.DebugLevel + s.logger = bark.NewLoggerFromLogrus(logger) +} + +func (s *integrationCrossDCSuite) TearDownSuite() { +} + +func (s *integrationCrossDCSuite) SetupTest() { + s.setupTest(false, false) +} + +func (s *integrationCrossDCSuite) TearDownTest() { + s.host.Stop() + s.host = nil + s.TearDownWorkflowStore() +} + +func (s *integrationCrossDCSuite) setupTest(enableGlobalDomain bool, isMasterCluster bool) { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) + options := persistence.TestBaseOptions{} + options.ClusterHost = "127.0.0.1" + options.DropKeySpace = true + options.SchemaDir = ".." + options.EnableGlobalDomain = enableGlobalDomain + options.IsMasterCluster = isMasterCluster + s.SetupWorkflowStoreWithOptions(options) + + s.setupShards() + + // TODO: Use mock messaging client until we support kafka setup onebox to write end-to-end integration test + s.mockProducer = &mocks.KafkaProducer{} + s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil) + + s.host = NewCadence(s.ClusterMetadata, s.mockMessagingClient, s.MetadataManager, s.ShardMgr, s.HistoryMgr, s.ExecutionMgrFactory, s.TaskMgr, + s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger) + + s.host.Start() + + s.engine = s.host.GetFrontendClient() + s.domainName = "integration-test-domain" + s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: s.domainName, + Status: persistence.DomainStatusRegistered, + Description: "Test domain for integration test", + }, + Config: &persistence.DomainConfig{ + Retention: 1, + EmitMetric: false, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }) + s.foreignDomainName = "integration-foreign-test-domain" + s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: s.foreignDomainName, + Status: persistence.DomainStatusRegistered, + Description: "Test foreign domain for integration test", + }, + Config: &persistence.DomainConfig{ + Retention: 1, + EmitMetric: false, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }) +} + +func (s *integrationCrossDCSuite) setupShards() { + // shard 0 is always created, we create additional shards if needed + for shardID := 1; shardID < testNumberOfHistoryShards; shardID++ { + err := s.CreateShard(shardID, "", 0) + if err != nil { + s.logger.WithField("error", err).Fatal("Failed to create shard") + } + } +} + +// Note: if the global domain is not enabled, active clusters and clusters +// will be ignored on the server side +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainDisabled_AllDefault() { + testFn := func(isMasterCluster bool) { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(false, isMasterCluster) + + domainName := "some random domain name" + clusters := []*workflow.ClusterReplicationConfiguration{} + for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(replicationConfig.ClusterName), + }) + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + + resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, resp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) + s.Empty(resp.DomainInfo.GetDescription()) + s.Empty(resp.DomainInfo.GetOwnerEmail()) + s.Equal(int32(0), resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(false, resp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), resp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, resp.ReplicationConfiguration.Clusters) + } + + testFn(false) + testFn(true) +} + +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_NotMaster_AllDefault() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, false) + + domainName := "some random domain name" + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.NotNil(err) +} + +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_IsMaster_AllDefault() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, true) + + domainName := "some random domain name" + clusters := []*workflow.ClusterReplicationConfiguration{} + for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(replicationConfig.ClusterName), + }) + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + + resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, resp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) + s.Empty(resp.DomainInfo.GetDescription()) + s.Empty(resp.DomainInfo.GetOwnerEmail()) + s.Equal(int32(0), resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(false, resp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), resp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, resp.ReplicationConfiguration.Clusters) +} + +// Note: if the global domain is not enabled, active clusters and clusters +// will be ignored on the server side +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainDisabled_NoDefault() { + testFn := func(isMasterCluster bool) { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(false, isMasterCluster) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + activeClusterName := "" + currentClusterName := s.ClusterMetadata.GetCurrentClusterName() + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + if clusterName != currentClusterName { + activeClusterName = clusterName + } + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + Clusters: clusters, + ActiveClusterName: common.StringPtr(activeClusterName), + }) + s.Nil(err) + + resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, resp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) + s.Equal(description, resp.DomainInfo.GetDescription()) + s.Equal(email, resp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, resp.Configuration.GetEmitMetric()) + s.Equal(currentClusterName, resp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(1, len(resp.ReplicationConfiguration.Clusters)) + s.Equal(currentClusterName, resp.ReplicationConfiguration.Clusters[0].GetClusterName()) + } + + testFn(false) + testFn(true) +} + +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_NotMaster_NoDefault() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, false) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + activeClusterName := "" + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + if clusterName != s.ClusterMetadata.GetCurrentClusterName() { + activeClusterName = clusterName + } + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + Clusters: clusters, + ActiveClusterName: common.StringPtr(activeClusterName), + }) + s.NotNil(err) +} + +func (s *integrationCrossDCSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_IsMaster_NoDefault() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, true) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + activeClusterName := "" + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + if clusterName != s.ClusterMetadata.GetCurrentClusterName() { + activeClusterName = clusterName + } + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + Clusters: clusters, + ActiveClusterName: common.StringPtr(activeClusterName), + }) + s.Nil(err) + + resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, resp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) + s.Equal(description, resp.DomainInfo.GetDescription()) + s.Equal(email, resp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, resp.Configuration.GetEmitMetric()) + s.Equal(activeClusterName, resp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, resp.ReplicationConfiguration.Clusters) +} + +// Note: if the global domain is not enabled, active clusters and clusters +// will be ignored on the server side +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainDisabled_AllSet() { + testFn := func(isMasterCluster bool) { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(false, isMasterCluster) + + domainName := "some random domain name" + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + currentClusterName := s.ClusterMetadata.GetCurrentClusterName() + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + } + + updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + UpdatedInfo: &workflow.UpdateDomainInfo{ + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + }, + Configuration: &workflow.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + Clusters: clusters, + }, + }) + s.Nil(err) + s.Equal(domainName, updateResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) + s.Equal(description, updateResp.DomainInfo.GetDescription()) + s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) + s.Equal(currentClusterName, updateResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(1, len(updateResp.ReplicationConfiguration.Clusters)) + s.Equal(currentClusterName, updateResp.ReplicationConfiguration.Clusters[0].GetClusterName()) + + describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, describeResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) + s.Equal(description, describeResp.DomainInfo.GetDescription()) + s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) + s.Equal(currentClusterName, describeResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(1, len(describeResp.ReplicationConfiguration.Clusters)) + s.Equal(currentClusterName, describeResp.ReplicationConfiguration.Clusters[0].GetClusterName()) + } + + testFn(false) + testFn(true) +} + +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_NotMaster_AllSet() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, false) + + domainName := "some random domain name" + // bypass to create a domain, since this cluster is not the master + // set all attr to default + _, err := s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: domainName, + Status: persistence.DomainStatusRegistered, + Description: "", + OwnerEmail: "", + }, + Config: &persistence.DomainConfig{ + Retention: 0, + EmitMetric: false, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: s.ClusterMetadata.GetCurrentClusterName()}, + }, + }, + FailoverVersion: 0, + }) + s.Nil(err) + + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + } + + _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + UpdatedInfo: &workflow.UpdateDomainInfo{ + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + }, + Configuration: &workflow.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + Clusters: clusters, + }, + }) + s.NotNil(err) +} + +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_IsMaster_AllSet() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, true) + + domainName := "some random domain name" + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + } + + updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + UpdatedInfo: &workflow.UpdateDomainInfo{ + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + }, + Configuration: &workflow.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + Clusters: clusters, + }, + }) + s.Nil(err) + s.Equal(domainName, updateResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) + s.Equal(description, updateResp.DomainInfo.GetDescription()) + s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) + + describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, describeResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) + s.Equal(description, describeResp.DomainInfo.GetDescription()) + s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) + + // update domain with less replicated regions is not allowed + _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + Clusters: []*workflow.ClusterReplicationConfiguration{ + &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), + }, + }, + }, + }) + s.NotNil(err) +} + +// Note: if the global domain is not enabled, active clusters and clusters +// will be ignored on the server side +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainDisabled_NoSet() { + testFn := func(isMasterCluster bool) { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(false, isMasterCluster) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + currentClusterName := s.ClusterMetadata.GetCurrentClusterName() + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }) + s.Nil(err) + + updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, updateResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) + s.Equal(description, updateResp.DomainInfo.GetDescription()) + s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) + s.Equal(currentClusterName, updateResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(1, len(updateResp.ReplicationConfiguration.Clusters)) + s.Equal(currentClusterName, updateResp.ReplicationConfiguration.Clusters[0].GetClusterName()) + + describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, describeResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) + s.Equal(description, describeResp.DomainInfo.GetDescription()) + s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) + s.Equal(currentClusterName, describeResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(1, len(describeResp.ReplicationConfiguration.Clusters)) + s.Equal(currentClusterName, describeResp.ReplicationConfiguration.Clusters[0].GetClusterName()) + } + + testFn(false) + testFn(true) +} + +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_NotMaster_NoSet() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, false) + + domainName := "some random domain name" + // bypass to create a domain, since this cluster is not the master + // set all attr to default + _, err := s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: domainName, + Status: persistence.DomainStatusRegistered, + Description: "", + OwnerEmail: "", + }, + Config: &persistence.DomainConfig{ + Retention: 0, + EmitMetric: false, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: s.ClusterMetadata.GetCurrentClusterName()}, + }, + }, + FailoverVersion: 0, + }) + s.Nil(err) + + _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.NotNil(err) +} + +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_IsMaster_NoSet() { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, true) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + clusters := []*workflow.ClusterReplicationConfiguration{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + } + + err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ + Name: common.StringPtr(domainName), + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + Clusters: clusters, + }) + s.Nil(err) + + updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, updateResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) + s.Equal(description, updateResp.DomainInfo.GetDescription()) + s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) + + describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, describeResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) + s.Equal(description, describeResp.DomainInfo.GetDescription()) + s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) +} + +func (s *integrationCrossDCSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_Failover() { + testFn := func(isMasterCluster bool) { + // re-initialize to enable global domain + s.TearDownTest() + s.setupTest(true, isMasterCluster) + + domainName := "some random domain name" + description := "some random description" + email := "some random email" + retention := int32(7) + emitMetric := true + clusters := []*workflow.ClusterReplicationConfiguration{} + + activeClusterName := "" + failoverVersion := int64(59) + persistenceClusters := []*persistence.ClusterReplicationConfig{} + for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { + clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ + ClusterName: common.StringPtr(clusterName), + }) + + persistenceClusters = append(persistenceClusters, &persistence.ClusterReplicationConfig{ + ClusterName: clusterName, + }) + if clusterName != s.ClusterMetadata.GetCurrentClusterName() { + activeClusterName = clusterName + } + } + + // create a domain which is not currently active + s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ + Info: &persistence.DomainInfo{ + ID: uuid.New(), + Name: domainName, + Status: persistence.DomainStatusRegistered, + Description: description, + OwnerEmail: email, + }, + Config: &persistence.DomainConfig{ + Retention: retention, + EmitMetric: emitMetric, + }, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: activeClusterName, + Clusters: persistenceClusters, + }, + FailoverVersion: failoverVersion, + }) + + // when doing the failover, the only thing can be updated is the active cluster + updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + UpdatedInfo: &workflow.UpdateDomainInfo{ + Description: common.StringPtr(description), + OwnerEmail: common.StringPtr(email), + }, + Configuration: &workflow.DomainConfiguration{ + WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), + EmitMetric: common.BoolPtr(emitMetric), + }, + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), + Clusters: clusters, + }, + }) + s.Nil(updateResp) + s.NotNil(err) + + updateResp, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ + Name: common.StringPtr(domainName), + ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ + ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), + }, + }) + s.Nil(err) + s.Equal(domainName, updateResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) + s.Equal(description, updateResp.DomainInfo.GetDescription()) + s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) + + describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ + Name: common.StringPtr(domainName), + }) + s.Nil(err) + s.Equal(domainName, describeResp.DomainInfo.GetName()) + s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) + s.Equal(description, describeResp.DomainInfo.GetDescription()) + s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) + s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) + s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) + s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) + s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) + } + + testFn(true) + testFn(false) +} diff --git a/host/integration_test.go b/host/integration_test.go index 2de1102160d..a99ba3fb2ad 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -50,15 +50,6 @@ import ( "github.com/uber/cadence/service/matching" ) -var ( - integration = flag.Bool("integration", true, "run integration tests") -) - -const ( - testNumberOfHistoryShards = 4 - testNumberOfHistoryHosts = 1 -) - type ( integrationSuite struct { // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, @@ -118,22 +109,24 @@ func (s *integrationSuite) SetupSuite() { logger.Formatter = formatter //logger.Level = log.DebugLevel s.logger = bark.NewLoggerFromLogrus(logger) + s.setupSuite(false, false) } func (s *integrationSuite) TearDownSuite() { + s.host.Stop() + s.host = nil + s.TearDownWorkflowStore() } func (s *integrationSuite) SetupTest() { - s.setupTest(false, false) + } func (s *integrationSuite) TearDownTest() { - s.host.Stop() - s.host = nil - s.TearDownWorkflowStore() + } -func (s *integrationSuite) setupTest(enableGlobalDomain bool, isMasterCluster bool) { +func (s *integrationSuite) setupSuite(enableGlobalDomain bool, isMasterCluster bool) { // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil s.Assertions = require.New(s.T()) options := persistence.TestBaseOptions{} @@ -186,673 +179,6 @@ func (s *integrationSuite) setupTest(enableGlobalDomain bool, isMasterCluster bo }) } -// Note: if the global domain is not enabled, active clusters and clusters -// will be ignored on the server side -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainDisabled_AllDefault() { - testFn := func(isMasterCluster bool) { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(false, isMasterCluster) - - domainName := "some random domain name" - clusters := []*workflow.ClusterReplicationConfiguration{} - for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(replicationConfig.ClusterName), - }) - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - - resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, resp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) - s.Empty(resp.DomainInfo.GetDescription()) - s.Empty(resp.DomainInfo.GetOwnerEmail()) - s.Equal(int32(0), resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(false, resp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), resp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, resp.ReplicationConfiguration.Clusters) - } - - testFn(false) - testFn(true) -} - -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_NotMaster_AllDefault() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, false) - - domainName := "some random domain name" - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.NotNil(err) -} - -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_IsMaster_AllDefault() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, true) - - domainName := "some random domain name" - clusters := []*workflow.ClusterReplicationConfiguration{} - for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(replicationConfig.ClusterName), - }) - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - - resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, resp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) - s.Empty(resp.DomainInfo.GetDescription()) - s.Empty(resp.DomainInfo.GetOwnerEmail()) - s.Equal(int32(0), resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(false, resp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), resp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, resp.ReplicationConfiguration.Clusters) -} - -// Note: if the global domain is not enabled, active clusters and clusters -// will be ignored on the server side -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainDisabled_NoDefault() { - testFn := func(isMasterCluster bool) { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(false, isMasterCluster) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - activeClusterName := "" - currentClusterName := s.ClusterMetadata.GetCurrentClusterName() - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - if clusterName != currentClusterName { - activeClusterName = clusterName - } - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - Clusters: clusters, - ActiveClusterName: common.StringPtr(activeClusterName), - }) - s.Nil(err) - - resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, resp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) - s.Equal(description, resp.DomainInfo.GetDescription()) - s.Equal(email, resp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, resp.Configuration.GetEmitMetric()) - s.Equal(currentClusterName, resp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(1, len(resp.ReplicationConfiguration.Clusters)) - s.Equal(currentClusterName, resp.ReplicationConfiguration.Clusters[0].GetClusterName()) - } - - testFn(false) - testFn(true) -} - -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_NotMaster_NoDefault() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, false) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - activeClusterName := "" - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - if clusterName != s.ClusterMetadata.GetCurrentClusterName() { - activeClusterName = clusterName - } - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - Clusters: clusters, - ActiveClusterName: common.StringPtr(activeClusterName), - }) - s.NotNil(err) -} - -func (s *integrationSuite) TestIntegrationRegisterGetDomain_GlobalDomainEnabled_IsMaster_NoDefault() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, true) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - activeClusterName := "" - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - if clusterName != s.ClusterMetadata.GetCurrentClusterName() { - activeClusterName = clusterName - } - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - Clusters: clusters, - ActiveClusterName: common.StringPtr(activeClusterName), - }) - s.Nil(err) - - resp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, resp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *resp.DomainInfo.Status) - s.Equal(description, resp.DomainInfo.GetDescription()) - s.Equal(email, resp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, resp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, resp.Configuration.GetEmitMetric()) - s.Equal(activeClusterName, resp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, resp.ReplicationConfiguration.Clusters) -} - -// Note: if the global domain is not enabled, active clusters and clusters -// will be ignored on the server side -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainDisabled_AllSet() { - testFn := func(isMasterCluster bool) { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(false, isMasterCluster) - - domainName := "some random domain name" - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - currentClusterName := s.ClusterMetadata.GetCurrentClusterName() - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - } - - updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - UpdatedInfo: &workflow.UpdateDomainInfo{ - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - }, - Configuration: &workflow.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - }, - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - Clusters: clusters, - }, - }) - s.Nil(err) - s.Equal(domainName, updateResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) - s.Equal(description, updateResp.DomainInfo.GetDescription()) - s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) - s.Equal(currentClusterName, updateResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(1, len(updateResp.ReplicationConfiguration.Clusters)) - s.Equal(currentClusterName, updateResp.ReplicationConfiguration.Clusters[0].GetClusterName()) - - describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, describeResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) - s.Equal(description, describeResp.DomainInfo.GetDescription()) - s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) - s.Equal(currentClusterName, describeResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(1, len(describeResp.ReplicationConfiguration.Clusters)) - s.Equal(currentClusterName, describeResp.ReplicationConfiguration.Clusters[0].GetClusterName()) - } - - testFn(false) - testFn(true) -} - -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_NotMaster_AllSet() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, false) - - domainName := "some random domain name" - // bypass to create a domain, since this cluster is not the master - // set all attr to default - _, err := s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ - Info: &persistence.DomainInfo{ - ID: uuid.New(), - Name: domainName, - Status: persistence.DomainStatusRegistered, - Description: "", - OwnerEmail: "", - }, - Config: &persistence.DomainConfig{ - Retention: 0, - EmitMetric: false, - }, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), - Clusters: []*persistence.ClusterReplicationConfig{ - &persistence.ClusterReplicationConfig{ClusterName: s.ClusterMetadata.GetCurrentClusterName()}, - }, - }, - FailoverVersion: 0, - }) - s.Nil(err) - - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - } - - _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - UpdatedInfo: &workflow.UpdateDomainInfo{ - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - }, - Configuration: &workflow.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - }, - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - Clusters: clusters, - }, - }) - s.NotNil(err) -} - -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_IsMaster_AllSet() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, true) - - domainName := "some random domain name" - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - } - - updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - UpdatedInfo: &workflow.UpdateDomainInfo{ - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - }, - Configuration: &workflow.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - }, - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - Clusters: clusters, - }, - }) - s.Nil(err) - s.Equal(domainName, updateResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) - s.Equal(description, updateResp.DomainInfo.GetDescription()) - s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) - - describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, describeResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) - s.Equal(description, describeResp.DomainInfo.GetDescription()) - s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) - - // update domain with less replicated regions is not allowed - _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - Clusters: []*workflow.ClusterReplicationConfiguration{ - &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), - }, - }, - }, - }) - s.NotNil(err) -} - -// Note: if the global domain is not enabled, active clusters and clusters -// will be ignored on the server side -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainDisabled_NoSet() { - testFn := func(isMasterCluster bool) { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(false, isMasterCluster) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - currentClusterName := s.ClusterMetadata.GetCurrentClusterName() - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - }) - s.Nil(err) - - updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, updateResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) - s.Equal(description, updateResp.DomainInfo.GetDescription()) - s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) - s.Equal(currentClusterName, updateResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(1, len(updateResp.ReplicationConfiguration.Clusters)) - s.Equal(currentClusterName, updateResp.ReplicationConfiguration.Clusters[0].GetClusterName()) - - describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, describeResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) - s.Equal(description, describeResp.DomainInfo.GetDescription()) - s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) - s.Equal(currentClusterName, describeResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(1, len(describeResp.ReplicationConfiguration.Clusters)) - s.Equal(currentClusterName, describeResp.ReplicationConfiguration.Clusters[0].GetClusterName()) - } - - testFn(false) - testFn(true) -} - -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_NotMaster_NoSet() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, false) - - domainName := "some random domain name" - // bypass to create a domain, since this cluster is not the master - // set all attr to default - _, err := s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ - Info: &persistence.DomainInfo{ - ID: uuid.New(), - Name: domainName, - Status: persistence.DomainStatusRegistered, - Description: "", - OwnerEmail: "", - }, - Config: &persistence.DomainConfig{ - Retention: 0, - EmitMetric: false, - }, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), - Clusters: []*persistence.ClusterReplicationConfig{ - &persistence.ClusterReplicationConfig{ClusterName: s.ClusterMetadata.GetCurrentClusterName()}, - }, - }, - FailoverVersion: 0, - }) - s.Nil(err) - - _, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.NotNil(err) -} - -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_IsMaster_NoSet() { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, true) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - clusters := []*workflow.ClusterReplicationConfiguration{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - } - - err := s.engine.RegisterDomain(createContext(), &workflow.RegisterDomainRequest{ - Name: common.StringPtr(domainName), - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - Clusters: clusters, - }) - s.Nil(err) - - updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, updateResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) - s.Equal(description, updateResp.DomainInfo.GetDescription()) - s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) - - describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, describeResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) - s.Equal(description, describeResp.DomainInfo.GetDescription()) - s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) -} - -func (s *integrationSuite) TestIntegrationUpdateGetDomain_GlobalDomainEnabled_Failover() { - testFn := func(isMasterCluster bool) { - // re-initialize to enable global domain - s.TearDownTest() - s.setupTest(true, isMasterCluster) - - domainName := "some random domain name" - description := "some random description" - email := "some random email" - retention := int32(7) - emitMetric := true - clusters := []*workflow.ClusterReplicationConfiguration{} - - activeClusterName := "" - failoverVersion := int64(59) - persistenceClusters := []*persistence.ClusterReplicationConfig{} - for clusterName := range s.ClusterMetadata.GetAllClusterFailoverVersions() { - clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ - ClusterName: common.StringPtr(clusterName), - }) - - persistenceClusters = append(persistenceClusters, &persistence.ClusterReplicationConfig{ - ClusterName: clusterName, - }) - if clusterName != s.ClusterMetadata.GetCurrentClusterName() { - activeClusterName = clusterName - } - } - - // create a domain which is not currently active - s.MetadataManager.CreateDomain(&persistence.CreateDomainRequest{ - Info: &persistence.DomainInfo{ - ID: uuid.New(), - Name: domainName, - Status: persistence.DomainStatusRegistered, - Description: description, - OwnerEmail: email, - }, - Config: &persistence.DomainConfig{ - Retention: retention, - EmitMetric: emitMetric, - }, - ReplicationConfig: &persistence.DomainReplicationConfig{ - ActiveClusterName: activeClusterName, - Clusters: persistenceClusters, - }, - FailoverVersion: failoverVersion, - }) - - // when doing the failover, the only thing can be updated is the active cluster - updateResp, err := s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - UpdatedInfo: &workflow.UpdateDomainInfo{ - Description: common.StringPtr(description), - OwnerEmail: common.StringPtr(email), - }, - Configuration: &workflow.DomainConfiguration{ - WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention), - EmitMetric: common.BoolPtr(emitMetric), - }, - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), - Clusters: clusters, - }, - }) - s.Nil(updateResp) - s.NotNil(err) - - updateResp, err = s.engine.UpdateDomain(createContext(), &workflow.UpdateDomainRequest{ - Name: common.StringPtr(domainName), - ReplicationConfiguration: &workflow.DomainReplicationConfiguration{ - ActiveClusterName: common.StringPtr(s.ClusterMetadata.GetCurrentClusterName()), - }, - }) - s.Nil(err) - s.Equal(domainName, updateResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *updateResp.DomainInfo.Status) - s.Equal(description, updateResp.DomainInfo.GetDescription()) - s.Equal(email, updateResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, updateResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, updateResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), updateResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, updateResp.ReplicationConfiguration.Clusters) - - describeResp, err := s.engine.DescribeDomain(createContext(), &workflow.DescribeDomainRequest{ - Name: common.StringPtr(domainName), - }) - s.Nil(err) - s.Equal(domainName, describeResp.DomainInfo.GetName()) - s.Equal(workflow.DomainStatusRegistered, *describeResp.DomainInfo.Status) - s.Equal(description, describeResp.DomainInfo.GetDescription()) - s.Equal(email, describeResp.DomainInfo.GetOwnerEmail()) - s.Equal(retention, describeResp.Configuration.GetWorkflowExecutionRetentionPeriodInDays()) - s.Equal(emitMetric, describeResp.Configuration.GetEmitMetric()) - s.Equal(s.ClusterMetadata.GetCurrentClusterName(), describeResp.ReplicationConfiguration.GetActiveClusterName()) - s.Equal(clusters, describeResp.ReplicationConfiguration.Clusters) - } - - testFn(true) - testFn(false) -} - func (s *integrationSuite) TestIntegrationStartWorkflowExecution() { id := "integration-start-workflow-test" wt := "integration-start-workflow-test-type" @@ -932,7 +258,6 @@ func (s *integrationSuite) TestTerminateWorkflow() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) - workflowComplete := false activityCount := int32(1) activityCounter := int32(0) dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, @@ -957,7 +282,6 @@ func (s *integrationSuite) TestTerminateWorkflow() { }}, nil } - workflowComplete = true return []byte(strconv.Itoa(int(activityCounter))), []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -2433,9 +1757,9 @@ func (s *integrationSuite) TestSignalWorkflow() { } func (s *integrationSuite) TestSignalWorkflow_DuplicateRequest() { - id := "interation-signal-workflow-test" - wt := "interation-signal-workflow-test-type" - tl := "interation-signal-workflow-test-tasklist" + id := "interation-signal-workflow-test-duplicate" + wt := "interation-signal-workflow-test-duplicate-type" + tl := "interation-signal-workflow-test-duplicate-tasklist" identity := "worker1" activityName := "activity_type1" @@ -2697,10 +2021,10 @@ func (s *integrationSuite) TestBufferedEvents() { } func (s *integrationSuite) TestQueryWorkflow_Sticky() { - id := "interation-query-workflow-test" - wt := "interation-query-workflow-test-type" - tl := "interation-query-workflow-test-tasklist" - stl := "interation-query-workflow-test-tasklist-sticky" + id := "interation-query-workflow-test-sticky" + wt := "interation-query-workflow-test-sticky-type" + tl := "interation-query-workflow-test-sticky-tasklist" + stl := "interation-query-workflow-test-sticky-tasklist-sticky" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -2734,7 +2058,6 @@ func (s *integrationSuite) TestQueryWorkflow_Sticky() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, @@ -2760,7 +2083,6 @@ func (s *integrationSuite) TestQueryWorkflow_Sticky() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -2862,10 +2184,10 @@ func (s *integrationSuite) TestQueryWorkflow_Sticky() { } func (s *integrationSuite) TestQueryWorkflow_StickyTimeout() { - id := "interation-query-workflow-test" - wt := "interation-query-workflow-test-type" - tl := "interation-query-workflow-test-tasklist" - stl := "interation-query-workflow-test-tasklist-sticky" + id := "interation-query-workflow-test-sticky-timeout" + wt := "interation-query-workflow-test-sticky-timeout-type" + tl := "interation-query-workflow-test-sticky-timeout-tasklist" + stl := "interation-query-workflow-test-sticky-timeout-tasklist-sticky" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -2899,7 +2221,6 @@ func (s *integrationSuite) TestQueryWorkflow_StickyTimeout() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, @@ -2925,7 +2246,6 @@ func (s *integrationSuite) TestQueryWorkflow_StickyTimeout() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -3013,9 +2333,9 @@ func (s *integrationSuite) TestQueryWorkflow_StickyTimeout() { } func (s *integrationSuite) TestQueryWorkflow_NonSticky() { - id := "interation-query-workflow-test" - wt := "interation-query-workflow-test-type" - tl := "interation-query-workflow-test-tasklist" + id := "interation-query-workflow-test-non-sticky" + wt := "interation-query-workflow-test-non-sticky-type" + tl := "interation-query-workflow-test-non-sticky-tasklist" identity := "worker1" activityName := "activity_type1" queryType := "test-query" @@ -3045,7 +2365,6 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, @@ -3071,7 +2390,6 @@ func (s *integrationSuite) TestQueryWorkflow_NonSticky() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -4254,8 +3572,8 @@ func (s *integrationSuite) TestHistoryVersionCompatibilityCheck() { } func (s *integrationSuite) TestChildWorkflowExecution() { - parentID := "integration-child-workflow-parent-test" - childID := "integration-child-workflow-child-test" + parentID := "integration-child-workflow-test-parent" + childID := "integration-child-workflow-test-child" wtParent := "integration-child-workflow-test-parent-type" wtChild := "integration-child-workflow-test-child-type" tlParent := "integration-child-workflow-test-parent-tasklist" @@ -4290,7 +3608,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false childComplete := false childExecutionStarted := false var startedEvent *workflow.HistoryEvent @@ -4328,7 +3645,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() { if *event.EventType == workflow.EventTypeChildWorkflowExecutionCompleted { completedEvent = event - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -4416,8 +3732,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() { } func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() { - parentID := "integration-child-workflow-with-continue-as-new-parent-test" - childID := "integration-child-workflow-with-continue-as-new-child-test" + parentID := "integration-child-workflow-with-continue-as-new-test-parent" + childID := "integration-child-workflow-with-continue-as-new-test-child" wtParent := "integration-child-workflow-with-continue-as-new-test-parent-type" wtChild := "integration-child-workflow-with-continue-as-new-test-child-type" tl := "integration-child-workflow-with-continue-as-new-test-tasklist" @@ -4449,7 +3765,6 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false childComplete := false childExecutionStarted := false childData := int32(1) @@ -4512,7 +3827,6 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() { if *event.EventType == workflow.EventTypeChildWorkflowExecutionCompleted { completedEvent = event - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -4586,7 +3900,7 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() { func (s *integrationSuite) TestWorkflowTimeout() { startTime := time.Now().UnixNano() - id := "integration-workflow-timeout-test" + id := "integration-workflow-timeout" wt := "integration-workflow-timeout-type" tl := "integration-workflow-timeout-tasklist" identity := "worker1" @@ -4849,15 +4163,17 @@ func (s *integrationSuite) TestDecisionTaskFailed() { } func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { - workflowID := "interation-get-workflow-history-events-long-poll-test" + workflowID := "interation-get-workflow-history-events-long-poll-test-all" + workflowTypeName := "interation-get-workflow-history-events-long-poll-test-all-type" + tasklistName := "interation-get-workflow-history-events-long-poll-test-all-tasklist" identity := "worker1" activityName := "activity_type1" workflowType := &workflow.WorkflowType{} - workflowType.Name = common.StringPtr("interation-get-workflow-history-events-long-poll-test-type") + workflowType.Name = common.StringPtr(workflowTypeName) taskList := &workflow.TaskList{} - taskList.Name = common.StringPtr("interation-get-workflow-history-events-long-poll-test-tasklist") + taskList.Name = common.StringPtr(tasklistName) // Start workflow execution request := &workflow.StartWorkflowExecutionRequest{ @@ -4878,7 +4194,6 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) // var signalEvent *workflow.HistoryEvent @@ -4905,7 +4220,6 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -5015,15 +4329,17 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_All() { } func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { - workflowID := "interation-get-workflow-history-events-long-poll-test" + workflowID := "interation-get-workflow-history-events-long-poll-test-close" + workflowTypeName := "interation-get-workflow-history-events-long-poll-test-close-type" + tasklistName := "interation-get-workflow-history-events-long-poll-test-close-tasklist" identity := "worker1" activityName := "activity_type1" workflowType := &workflow.WorkflowType{} - workflowType.Name = common.StringPtr("interation-get-workflow-history-events-long-poll-test-type") + workflowType.Name = common.StringPtr(workflowTypeName) taskList := &workflow.TaskList{} - taskList.Name = common.StringPtr("interation-get-workflow-history-events-long-poll-test-tasklist") + taskList.Name = common.StringPtr(tasklistName) // Start workflow execution request := &workflow.StartWorkflowExecutionRequest{ @@ -5044,7 +4360,6 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) // var signalEvent *workflow.HistoryEvent @@ -5071,7 +4386,6 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -5175,14 +4489,16 @@ func (s *integrationSuite) TestGetWorkflowExecutionHistory_Close() { func (s *integrationSuite) TestDescribeTaskList() { workflowID := "interation-get-poller-history" + workflowTypeName := "interation-get-poller-history-type" + tasklistName := "interation-get-poller-history-tasklist" identity := "worker1" activityName := "activity_type1" workflowType := &workflow.WorkflowType{} - workflowType.Name = common.StringPtr("interation-get-poller-history-type") + workflowType.Name = common.StringPtr(workflowTypeName) taskList := &workflow.TaskList{} - taskList.Name = common.StringPtr("interation-get-poller-history-tasklist") + taskList.Name = common.StringPtr(tasklistName) // Start workflow execution request := &workflow.StartWorkflowExecutionRequest{ @@ -5203,7 +4519,6 @@ func (s *integrationSuite) TestDescribeTaskList() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) // decider logic - workflowComplete := false activityScheduled := false activityData := int32(1) // var signalEvent *workflow.HistoryEvent @@ -5230,7 +4545,6 @@ func (s *integrationSuite) TestDescribeTaskList() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ @@ -5494,6 +4808,7 @@ CheckHistoryLoopForSignalSent: } ewfeAttributes := signalRequestedEvent.ExternalWorkflowExecutionSignaledEventAttributes + s.NotNil(ewfeAttributes) s.Equal(int64(intiatedEventID), ewfeAttributes.GetInitiatedEventId()) s.Equal(id, ewfeAttributes.WorkflowExecution.GetWorkflowId()) s.Equal(we2.RunId, ewfeAttributes.WorkflowExecution.RunId) @@ -5517,9 +4832,9 @@ CheckHistoryLoopForSignalSent: } func (s *integrationSuite) TestSignalExternalWorkflowDecision_WithoutRunID() { - id := "integration-signal-external-workflow-test" - wt := "integration-signal-external-workflow-test-type" - tl := "integration-signal-external-workflow-test-tasklist" + id := "integration-signal-external-workflow-test-without-run-id" + wt := "integration-signal-external-workflow-test-without-run-id-type" + tl := "integration-signal-external-workflow-test-without-run-id-tasklist" identity := "worker1" activityName := "activity_type1" @@ -5711,6 +5026,7 @@ CheckHistoryLoopForSignalSent: } ewfeAttributes := signalRequestedEvent.ExternalWorkflowExecutionSignaledEventAttributes + s.NotNil(ewfeAttributes) s.Equal(int64(intiatedEventID), ewfeAttributes.GetInitiatedEventId()) s.Equal(id, ewfeAttributes.WorkflowExecution.GetWorkflowId()) s.Equal("", ewfeAttributes.WorkflowExecution.GetRunId()) diff --git a/host/onebox.go b/host/onebox.go index b308b4bb10a..33f573f9ee1 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -21,6 +21,7 @@ package host import ( + "flag" "fmt" "reflect" "sync" @@ -55,6 +56,15 @@ import ( const rpAppNamePrefix string = "cadence" const maxRpJoinTimeout = 30 * time.Second +var ( + integration = flag.Bool("integration", true, "run integration tests") +) + +const ( + testNumberOfHistoryShards = 4 + testNumberOfHistoryHosts = 1 +) + // Cadence hosts all of cadence services in one process type Cadence interface { Start() error