diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 41187cb7896..8840b99c67d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -192,7 +192,24 @@ func (e *historyEngineImpl) Stop() { } func (e *historyEngineImpl) registerDomainFailoverCallback() { - // first check whether this shard should catch up + // first set the failover callback + e.shard.GetDomainCache().RegisterDomainChangeCallback( + e.shard.GetShardID(), + func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) { + if prevDomain.GetFailoverVersion() < nextDomain.GetFailoverVersion() && + nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName { + domainID := prevDomain.GetInfo().ID + e.txProcessor.FailoverDomain(domainID) + e.timerProcessor.FailoverDomain(domainID) + } + // v1 table domain cache entry will have this version being 0 + if nextDomain.GetNotificationVersion() > 0 { + e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion()) + } + }, + ) + + // second check whether this shard should do catch up domainNotificationVersion := e.shard.GetDomainCache().GetDomainNotificationVersion() shardDomainNotificationVersion := e.shard.GetDomainNotificationVersion() if domainNotificationVersion > shardDomainNotificationVersion { @@ -209,25 +226,12 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() { e.timerProcessor.FailoverDomain(domainID) } } - e.shard.UpdateDomainNotificationVersion(domainNotificationVersion) + if domainNotificationVersion > e.shard.GetDomainNotificationVersion() { + // double check the version for update, because when doing catch up, the shard's + // domain notification version can change + e.shard.UpdateDomainNotificationVersion(domainNotificationVersion) + } } - - // set the failover callback - e.shard.GetDomainCache().RegisterDomainChangeCallback( - e.shard.GetShardID(), - func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) { - if prevDomain.GetFailoverVersion() < nextDomain.GetFailoverVersion() && - nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName { - domainID := prevDomain.GetInfo().ID - e.txProcessor.FailoverDomain(domainID) - e.timerProcessor.FailoverDomain(domainID) - } - // v1 table domain cache entry will have this version being 0 - if nextDomain.GetNotificationVersion() > 0 { - e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion()) - } - }, - ) } // StartWorkflowExecution starts a workflow execution diff --git a/service/history/transferQueueActiveProcessor_test.go b/service/history/transferQueueActiveProcessor_test.go index 40c696132bd..444260a7672 100644 --- a/service/history/transferQueueActiveProcessor_test.go +++ b/service/history/transferQueueActiveProcessor_test.go @@ -1050,8 +1050,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once() - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: childDomainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockHistoryClient.On("StartWorkflowExecution", nil, s.createChildWorkflowExecutionRequest( transferTask, @@ -1131,8 +1139,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once() - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: childDomainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockHistoryClient.On("StartWorkflowExecution", nil, s.createChildWorkflowExecutionRequest( transferTask, @@ -1209,8 +1225,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once() - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: childDomainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockHistoryClient.On("ScheduleDecisionTask", nil, &history.ScheduleDecisionTaskRequest{ DomainUUID: common.StringPtr(childDomainID), @@ -1290,8 +1314,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli persistenceMutableState := createMutableState(msBuilder) s.mockMetadataMgr.ExpectedCalls = nil - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once() - s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: domainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() + s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{Name: childDomainName}, + Config: &persistence.DomainConfig{}, + ReplicationConfig: &persistence.DomainReplicationConfig{}, + }, nil).Once() s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockQueueAckMgr.On("completeTask", taskID).Return(nil).Once()