From bb629e69b920724887abdea7e941c0466ffbd9e9 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Thu, 21 Jun 2018 13:23:28 -0700 Subject: [PATCH] optimize queue ack manager, in case the task IDs are not sequencial. --- service/history/queueAckMgr.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/service/history/queueAckMgr.go b/service/history/queueAckMgr.go index 2e254965ab2..e73f9e75f2f 100644 --- a/service/history/queueAckMgr.go +++ b/service/history/queueAckMgr.go @@ -21,6 +21,7 @@ package history import ( + "sort" "sync" "github.com/uber-common/bark" @@ -168,18 +169,24 @@ func (a *queueAckMgrImpl) updateQueueAckLevel() { a.Lock() ackLevel := a.ackLevel + // task ID is not sequancial, meaning there are a ton of missing chunks, + // so to optimize the performance, a sort is required + var taskIDs []int64 + for k := range a.outstandingTasks { + taskIDs = append(taskIDs, k) + } + sort.Slice(taskIDs, func(i, j int) bool { return taskIDs[i] < taskIDs[j] }) + MoveAckLevelLoop: - for current := a.ackLevel + 1; current <= a.readLevel; current++ { - // TODO: What happens if !ok? - if acked, ok := a.outstandingTasks[current]; ok { - if acked { - a.logger.Debugf("Updating ack level: %v", current) - ackLevel = current - a.finishedTaskCounter++ - delete(a.outstandingTasks, current) - } else { - break MoveAckLevelLoop - } + for _, current := range taskIDs { + acked := a.outstandingTasks[current] + if acked { + a.logger.Debugf("Updating ack level: %v", current) + ackLevel = current + a.finishedTaskCounter++ + delete(a.outstandingTasks, current) + } else { + break MoveAckLevelLoop } } a.ackLevel = ackLevel