From b8feb950ae19df7f82a12f53270d6db50d65a48f Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Thu, 22 Jun 2023 09:32:42 -0700 Subject: [PATCH] Perform task range completion on shard rangeID update (#4528) --- service/history/queues/queue_base.go | 16 +++++++++++++++- service/history/shard/context.go | 1 + service/history/shard/context_impl.go | 7 +++++++ service/history/shard/context_mock.go | 14 ++++++++++++++ 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 87b54217e80..9e85cd3ccdd 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -97,6 +97,7 @@ type ( paginationFnProvider PaginationFnProvider executableInitializer ExecutableInitializer + lastRangeID int64 exclusiveDeletionHighWatermark tasks.Key nonReadableScope Scope readerRateLimiter quotas.RequestRateLimiter @@ -258,6 +259,7 @@ func newQueueBase( paginationFnProvider: paginationFnProvider, executableInitializer: executableInitializer, + lastRangeID: -1, // start from an invalid rangeID exclusiveDeletionHighWatermark: exclusiveDeletionHighWatermark, nonReadableScope: NewScope( NewRange(exclusiveReaderHighWatermark, tasks.MaximumKey), @@ -389,7 +391,10 @@ func (p *queueBase) checkpoint() { // Emit metric before the deletion watermark comparsion so we have the emit even if there's no task // for the queue p.metricsHandler.Counter(metrics.TaskBatchCompleteCounter.GetMetricName()).Record(1) - if newExclusiveDeletionHighWatermark.CompareTo(p.exclusiveDeletionHighWatermark) > 0 { + if newExclusiveDeletionHighWatermark.CompareTo(p.exclusiveDeletionHighWatermark) > 0 || + p.updateShardRangeID() { + // when shard rangeID is updated, perform range completion again in case the underlying persistence implementation + // serves traffic based on the persisted shardInfo err := p.rangeCompleteTasks(p.exclusiveDeletionHighWatermark, newExclusiveDeletionHighWatermark) if err != nil { p.resetCheckpointTimer(err) @@ -449,6 +454,15 @@ func (p *queueBase) updateReaderProgress( } } +func (p *queueBase) updateShardRangeID() bool { + newRangeID := p.shard.GetRangeID() + if p.lastRangeID < newRangeID { + p.lastRangeID = newRangeID + return true + } + return false +} + func (p *queueBase) rangeCompleteTasks( oldExclusiveDeletionHighWatermark tasks.Key, newExclusiveDeletionHighWatermark tasks.Key, diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 6f323094c67..8813fd9d571 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -55,6 +55,7 @@ type ( // Context represents a history engine shard Context interface { GetShardID() int32 + GetRangeID() int64 IsValid() bool GetOwner() string GetExecutionManager() persistence.ExecutionManager diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index da54c3dc99d..a18926a7b4e 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -195,6 +195,13 @@ func (s *ContextImpl) GetShardID() int32 { return s.shardID } +func (s *ContextImpl) GetRangeID() int64 { + s.rLock() + defer s.rUnlock() + + return s.getRangeIDLocked() +} + func (s *ContextImpl) GetOwner() string { // constant from initialization, no need for locks return s.owner diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 3ea88e5a5cd..9039ce0f9f9 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -449,6 +449,20 @@ func (mr *MockContextMockRecorder) GetQueueState(category interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueState", reflect.TypeOf((*MockContext)(nil).GetQueueState), category) } +// GetRangeID mocks base method. +func (m *MockContext) GetRangeID() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRangeID") + ret0, _ := ret[0].(int64) + return ret0 +} + +// GetRangeID indicates an expected call of GetRangeID. +func (mr *MockContextMockRecorder) GetRangeID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeID", reflect.TypeOf((*MockContext)(nil).GetRangeID)) +} + // GetRemoteAdminClient mocks base method. func (m *MockContext) GetRemoteAdminClient(arg0 string) (v10.AdminServiceClient, error) { m.ctrl.T.Helper()