Skip to content

Commit

Permalink
change the order of shard catch up && shard registeration to domain c…
Browse files Browse the repository at this point in the history
…ache
  • Loading branch information
Wenquan Xing committed May 25, 2018
1 parent 61bd092 commit 9f2a2c3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 27 deletions.
42 changes: 23 additions & 19 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,24 @@ func (e *historyEngineImpl) Stop() {
}

func (e *historyEngineImpl) registerDomainFailoverCallback() {
// first check whether this shard should catch up
// first set the failover callback
e.shard.GetDomainCache().RegisterDomainChangeCallback(
e.shard.GetShardID(),
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
if prevDomain.GetFailoverVersion() < nextDomain.GetFailoverVersion() &&
nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName {
domainID := prevDomain.GetInfo().ID
e.txProcessor.FailoverDomain(domainID)
e.timerProcessor.FailoverDomain(domainID)
}
// v1 table domain cache entry will have this version being 0
if nextDomain.GetNotificationVersion() > 0 {
e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion())
}
},
)

// second check whether this shard should do catch up
domainNotificationVersion := e.shard.GetDomainCache().GetDomainNotificationVersion()
shardDomainNotificationVersion := e.shard.GetDomainNotificationVersion()
if domainNotificationVersion > shardDomainNotificationVersion {
Expand All @@ -209,25 +226,12 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
e.timerProcessor.FailoverDomain(domainID)
}
}
e.shard.UpdateDomainNotificationVersion(domainNotificationVersion)
if domainNotificationVersion > e.shard.GetDomainNotificationVersion() {
// double check the version for update, because when doing catch up, the shard's
// domain notification version can change
e.shard.UpdateDomainNotificationVersion(domainNotificationVersion)
}
}

// set the failover callback
e.shard.GetDomainCache().RegisterDomainChangeCallback(
e.shard.GetShardID(),
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
if prevDomain.GetFailoverVersion() < nextDomain.GetFailoverVersion() &&
nextDomain.GetReplicationConfig().ActiveClusterName == e.currentClusterName {
domainID := prevDomain.GetInfo().ID
e.txProcessor.FailoverDomain(domainID)
e.timerProcessor.FailoverDomain(domainID)
}
// v1 table domain cache entry will have this version being 0
if nextDomain.GetNotificationVersion() > 0 {
e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion())
}
},
)
}

// StartWorkflowExecution starts a workflow execution
Expand Down
48 changes: 40 additions & 8 deletions service/history/transferQueueActiveProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,8 +1050,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe

persistenceMutableState := createMutableState(msBuilder)
s.mockMetadataMgr.ExpectedCalls = nil
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: domainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: childDomainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.On("StartWorkflowExecution", nil, s.createChildWorkflowExecutionRequest(
transferTask,
Expand Down Expand Up @@ -1131,8 +1139,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Failu

persistenceMutableState := createMutableState(msBuilder)
s.mockMetadataMgr.ExpectedCalls = nil
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: domainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: childDomainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.On("StartWorkflowExecution", nil, s.createChildWorkflowExecutionRequest(
transferTask,
Expand Down Expand Up @@ -1209,8 +1225,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Succe

persistenceMutableState := createMutableState(msBuilder)
s.mockMetadataMgr.ExpectedCalls = nil
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: domainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: childDomainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockHistoryClient.On("ScheduleDecisionTask", nil, &history.ScheduleDecisionTaskRequest{
DomainUUID: common.StringPtr(childDomainID),
Expand Down Expand Up @@ -1290,8 +1314,16 @@ func (s *transferQueueActiveProcessorSuite) TestProcessStartChildExecution_Dupli

persistenceMutableState := createMutableState(msBuilder)
s.mockMetadataMgr.ExpectedCalls = nil
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: domainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{Info: &persistence.DomainInfo{Name: childDomainName}}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: domainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: domainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockMetadataMgr.On("GetDomain", &persistence.GetDomainRequest{ID: childDomainID}).Return(&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{Name: childDomainName},
Config: &persistence.DomainConfig{},
ReplicationConfig: &persistence.DomainReplicationConfig{},
}, nil).Once()
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
s.mockQueueAckMgr.On("completeTask", taskID).Return(nil).Once()

Expand Down

0 comments on commit 9f2a2c3

Please sign in to comment.