diff --git a/service/matching/pollerHistory.go b/service/matching/pollerHistory.go index dfd35afc620..8a37f3e3eb5 100644 --- a/service/matching/pollerHistory.go +++ b/service/matching/pollerHistory.go @@ -46,7 +46,7 @@ type ( ) type pollerHistory struct { - // // poller ID -> last access time + // poller ID -> last access time // pollers map[pollerID]time.Time history cache.Cache } diff --git a/service/matching/service.go b/service/matching/service.go index 1b4e11e52a7..5a918ed3c1e 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -38,6 +38,7 @@ type Config struct { RangeSize int64 GetTasksBatchSize int UpdateAckInterval time.Duration + IdleTasklistCheckInterval time.Duration MinTaskThrottlingBurstSize int // taskWriter configuration @@ -53,6 +54,7 @@ func NewConfig() *Config { RangeSize: 100000, GetTasksBatchSize: 1000, UpdateAckInterval: 10 * time.Second, + IdleTasklistCheckInterval: 5 * time.Minute, OutstandingTaskAppendsThreshold: 250, MaxTaskBatchSize: 100, MinTaskThrottlingBurstSize: 10000, diff --git a/service/matching/taskListManager.go b/service/matching/taskListManager.go index d2a08de7ddd..c53510de125 100644 --- a/service/matching/taskListManager.go +++ b/service/matching/taskListManager.go @@ -663,6 +663,7 @@ func (c *taskListManagerImpl) getTasksPump() { go c.deliverBufferTasksForPoll() updateAckTimer := time.NewTimer(c.config.UpdateAckInterval) + checkPollerTimer := time.NewTimer(c.config.IdleTasklistCheckInterval) getTasksPumpLoop: for { select { @@ -716,10 +717,19 @@ getTasksPumpLoop: c.signalNewTask() // periodically signal pump to check persistence for tasks updateAckTimer = time.NewTimer(c.config.UpdateAckInterval) } + case <-checkPollerTimer.C: + { + pollers := c.GetAllPollerInfo() + if len(pollers) == 0 { + c.Stop() + } + checkPollerTimer = time.NewTimer(c.config.IdleTasklistCheckInterval) + } } } updateAckTimer.Stop() + checkPollerTimer.Stop() } // Retry operation on transient error and on rangeID change. On rangeID update by another process calls c.Stop().