Skip to content

Commit

Permalink
Bugfix: deadlock when domain failover (#919)
Browse files Browse the repository at this point in the history
* fix deadlock issue in timer queue processor failover
* fix domain cache copy without locking issue
  • Loading branch information
wxing1292 authored Jul 3, 2018
1 parent 7eaf360 commit 2b92fd6
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
13 changes: 8 additions & 5 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
for ite.HasNext() {
entry := ite.Next()
id := entry.Key().(string)
domainCacheEntry := entry.Value().(*DomainCacheEntry).duplicate()
result[id] = domainCacheEntry
domainCacheEntry := entry.Value().(*DomainCacheEntry)
domainCacheEntry.RLock()
dup := domainCacheEntry.duplicate()
domainCacheEntry.RUnlock()
result[id] = dup
}
return result
}
Expand All @@ -185,13 +188,13 @@ func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
// afterCallback will be invoked when NOT holding the domain cache lock.
func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn) {
c.Lock()
defer c.Unlock()

c.beforeCallbacks[shard] = beforeCallback
c.afterCallbacks[shard] = afterCallback
domainNotificationVersion := c.domainNotificationVersion
c.Unlock()

// this section is trying to make the shard catch up with domain changes
if c.domainNotificationVersion > initialNotificationVersion {
if domainNotificationVersion > initialNotificationVersion {
domains := DomainCacheEntries{}
for _, domain := range c.GetAllDomain() {
domains = append(domains, domain)
Expand Down
1 change: 0 additions & 1 deletion service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (t *timerQueueProcessorImpl) FailoverDomain(domainID string) {
failoverTimerProcessor := newTimerQueueFailoverProcessor(t.shard, t.historyService, domainID,
standbyClusterName, minLevel, maxLevel, t.matchingClient, t.logger)
failoverTimerProcessor.Start()
failoverTimerProcessor.timerQueueProcessorBase.readAndFanoutTimerTasks()
}

func (t *timerQueueProcessorImpl) getTimerFiredCount(clusterName string) uint64 {
Expand Down
1 change: 0 additions & 1 deletion service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func (t *transferQueueProcessorImpl) FailoverDomain(domainID string) {
domainID, standbyClusterName, minLevel, maxLevel, t.logger,
)
failoverTaskProcessor.Start()
failoverTaskProcessor.notifyNewTask()
}

func (t *transferQueueProcessorImpl) completeTransferLoop() {
Expand Down

0 comments on commit 2b92fd6

Please sign in to comment.