diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 70ccc666297..8a847718b14 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -288,10 +288,7 @@ func (p *queueBase) processNewRange() error { newMaxKey = p.shard.GetImmediateQueueExclusiveHighReadWatermark() case tasks.CategoryTypeScheduled: var err error - if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark( - p.shard.GetClusterMetadata().GetCurrentClusterName(), - true, - ); err != nil { + if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark(); err != nil { return err } default: diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 4dc1ddb52f3..c62f7be7346 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -75,9 +75,7 @@ type ( GenerateTaskIDs(number int) ([]int64, error) GetImmediateQueueExclusiveHighReadWatermark() tasks.Key - // TODO: remove cluster and singleProcessorMode parameter after deprecating old task procesing logic - // In multi-cursor world, there's only one maxReadLevel for scheduled queue for all clusters. - UpdateScheduledQueueExclusiveHighReadWatermark(cluster string, singleProcessorMode bool) (tasks.Key, error) + UpdateScheduledQueueExclusiveHighReadWatermark() (tasks.Key, error) GetQueueAckLevel(category tasks.Category) tasks.Key UpdateQueueAckLevel(category tasks.Category, ackLevel tasks.Key) error GetQueueClusterAckLevel(category tasks.Category, cluster string) tasks.Key diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index d094477a4b6..f6bba8746fb 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -132,7 +132,7 @@ type ( taskSequenceNumber int64 maxTaskSequenceNumber int64 immediateTaskExclusiveMaxReadLevel int64 - scheduledTaskMaxReadLevelMap map[string]time.Time // cluster -> scheduledTaskMaxReadLevel + scheduledTaskMaxReadLevel time.Time // exist only in memory remoteClusterInfos map[string]*remoteClusterInfo @@ -295,50 +295,26 @@ func (s *ContextImpl) getScheduledTaskMaxReadLevel(cluster string) tasks.Key { s.rLock() defer s.rUnlock() - if _, ok := s.scheduledTaskMaxReadLevelMap[cluster]; !ok { - s.scheduledTaskMaxReadLevelMap[cluster] = tasks.DefaultFireTime - } - - return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0) + return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0) } -func (s *ContextImpl) UpdateScheduledQueueExclusiveHighReadWatermark( - cluster string, - singleProcessorMode bool, -) (tasks.Key, error) { +func (s *ContextImpl) UpdateScheduledQueueExclusiveHighReadWatermark() (tasks.Key, error) { s.wLock() defer s.wUnlock() - if _, ok := s.scheduledTaskMaxReadLevelMap[cluster]; !ok { - s.scheduledTaskMaxReadLevelMap[cluster] = tasks.DefaultFireTime - } - if err := s.errorByState(); err != nil { - return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0), err + return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0), err } currentTime := s.timeSource.Now() - if cluster != "" && cluster != s.GetClusterMetadata().GetCurrentClusterName() { - currentTime = s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime - } // Truncation here is just to make sure max read level has the same precision as the old logic // in case existing code can't work correctly with precision higher than 1ms. // Once we validate the rest of the code can worker correctly with higher precision, the truncation should be removed. newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.ScheduledTaskMinPrecision) - if singleProcessorMode { - // When generating scheduled tasks, the task's timestamp will be compared to the namespace's active cluster's - // maxReadLevel to avoid generatnig a task before maxReadLevel. - // But when there's a single procssor, the queue is only using current cluster maxReadLevel. - // So update the maxReadLevel map for all clusters to ensure scheduled task won't be lost. - for key := range s.scheduledTaskMaxReadLevelMap { - s.scheduledTaskMaxReadLevelMap[key] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[key], newMaxReadLevel) - } - } else { - s.scheduledTaskMaxReadLevelMap[cluster] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[cluster], newMaxReadLevel) - } + s.scheduledTaskMaxReadLevel = util.MaxTime(s.scheduledTaskMaxReadLevel, newMaxReadLevel) - return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0), nil + return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0), nil } // NOTE: the ack level returned is inclusive for immediate task category (acked), @@ -1388,7 +1364,6 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked( transferExclusiveMaxReadLevel *int64, ) error { now := s.timeSource.Now() - currentCluster := s.GetClusterMetadata().GetCurrentClusterName() for category, tasksByCategory := range newTasks { for _, task := range tasksByCategory { // set taskID @@ -1409,13 +1384,7 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked( // if scheduled task, check if fire time is in the past if category.Type() == tasks.CategoryTypeScheduled { ts := task.GetVisibilityTime() - if task.GetVersion() != common.EmptyVersion && category.ID() == tasks.CategoryIDTimer { - // cannot use version to determine the corresponding cluster for timer task - // this is because during failover, timer task should be created as active - // or otherwise, failover + active processing logic may not pick up the task. - currentCluster = namespaceEntry.ActiveClusterName() - } - readCursorTS := s.scheduledTaskMaxReadLevelMap[currentCluster] + readCursorTS := s.scheduledTaskMaxReadLevel if ts.Truncate(persistence.ScheduledTaskMinPrecision).Before(readCursorTS) { // make sure scheduled task timestamp is higher than max read level after truncation // as persistence layer may lose precision when persisting the task. @@ -1427,12 +1396,14 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked( tag.Timestamp(ts), tag.CursorTimestamp(readCursorTS), tag.ValueShardAllocateTimerBeforeRead) - task.SetVisibilityTime(s.scheduledTaskMaxReadLevelMap[currentCluster].Add(persistence.ScheduledTaskMinPrecision)) + task.SetVisibilityTime(s.scheduledTaskMaxReadLevel.Add(persistence.ScheduledTaskMinPrecision)) } - visibilityTs := task.GetVisibilityTime() s.contextTaggedLogger.Debug("Assigning new timer", - tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.MaxQueryLevel(readCursorTS)) + tag.Timestamp(task.GetVisibilityTime()), + tag.TaskID(task.GetTaskID()), + tag.MaxQueryLevel(readCursorTS), + ) } } } @@ -1849,7 +1820,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { // initialize the cluster current time to be the same as ack level remoteClusterInfos := make(map[string]*remoteClusterInfo) - scheduledTaskMaxReadLevelMap := make(map[string]time.Time) + var scheduledTaskMaxReadLevel time.Time currentClusterName := s.GetClusterMetadata().GetCurrentClusterName() taskCategories := tasks.GetCategories() for clusterName, info := range s.GetClusterMetadata().GetAllClusterInfo() { @@ -1890,8 +1861,11 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { // Add().Truncate() here is just to make sure max read level has the same precision as the old logic // in case existing code can't work correctly with precision higher than 1ms. // Once we validate the rest of the code can worker correctly with higher precision, the code should simply be - // scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime - scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime.Add(persistence.ScheduledTaskMinPrecision).Truncate(persistence.ScheduledTaskMinPrecision) + // scheduledTaskMaxReadLevel = util.MaxTime(scheduledTaskMaxReadLevel, maxReadTime) + scheduledTaskMaxReadLevel = util.MaxTime( + scheduledTaskMaxReadLevel, + maxReadTime.Add(persistence.ScheduledTaskMinPrecision).Truncate(persistence.ScheduledTaskMinPrecision), + ) if clusterName != currentClusterName { remoteClusterInfos[clusterName] = &remoteClusterInfo{CurrentTime: maxReadTime} @@ -1903,7 +1877,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { s.shardInfo = updatedShardInfo s.remoteClusterInfos = remoteClusterInfos - s.scheduledTaskMaxReadLevelMap = scheduledTaskMaxReadLevelMap + s.scheduledTaskMaxReadLevel = scheduledTaskMaxReadLevel return nil } diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index bafe7dc4bce..c133b4c7158 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -798,18 +798,18 @@ func (mr *MockContextMockRecorder) UpdateReplicatorDLQAckLevel(sourCluster, ackL } // UpdateScheduledQueueExclusiveHighReadWatermark mocks base method. -func (m *MockContext) UpdateScheduledQueueExclusiveHighReadWatermark(cluster string, singleProcessorMode bool) (tasks.Key, error) { +func (m *MockContext) UpdateScheduledQueueExclusiveHighReadWatermark() (tasks.Key, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateScheduledQueueExclusiveHighReadWatermark", cluster, singleProcessorMode) + ret := m.ctrl.Call(m, "UpdateScheduledQueueExclusiveHighReadWatermark") ret0, _ := ret[0].(tasks.Key) ret1, _ := ret[1].(error) return ret0, ret1 } // UpdateScheduledQueueExclusiveHighReadWatermark indicates an expected call of UpdateScheduledQueueExclusiveHighReadWatermark. -func (mr *MockContextMockRecorder) UpdateScheduledQueueExclusiveHighReadWatermark(cluster, singleProcessorMode interface{}) *gomock.Call { +func (mr *MockContextMockRecorder) UpdateScheduledQueueExclusiveHighReadWatermark() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScheduledQueueExclusiveHighReadWatermark", reflect.TypeOf((*MockContext)(nil).UpdateScheduledQueueExclusiveHighReadWatermark), cluster, singleProcessorMode) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScheduledQueueExclusiveHighReadWatermark", reflect.TypeOf((*MockContext)(nil).UpdateScheduledQueueExclusiveHighReadWatermark)) } // UpdateWorkflowExecution mocks base method. diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index a4cf9c04d18..e5b992b8837 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -183,43 +183,14 @@ func (s *contextSuite) TestTimerMaxReadLevelInitialization() { } } -func (s *contextSuite) TestTimerMaxReadLevelUpdate_MultiProcessor() { - now := time.Now() +func (s *contextSuite) TestTimerMaxReadLevelUpdate() { + now := time.Now().Add(time.Minute) s.timeSource.Update(now) - maxReadLevel, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) - s.NoError(err) - - s.timeSource.Update(now.Add(-time.Minute)) - newMaxReadLevel, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) - s.NoError(err) - s.Equal(maxReadLevel, newMaxReadLevel) - - s.timeSource.Update(now.Add(time.Minute)) - newMaxReadLevel, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) - s.NoError(err) - s.True(newMaxReadLevel.FireTime.After(maxReadLevel.FireTime)) -} -func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() { - now := time.Now() - s.timeSource.Update(now) - - // make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster - _, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false) - s.NoError(err) - _, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false) + _, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark() s.NoError(err) - now = time.Now().Add(time.Minute) - s.timeSource.Update(now) - - // update in single processor mode - _, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true) - s.NoError(err) - scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap - s.Len(scheduledTaskMaxReadLevelMap, 2) - s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now)) - s.True(scheduledTaskMaxReadLevelMap[cluster.TestAlternativeClusterName].After(now)) + s.True(s.mockShard.scheduledTaskMaxReadLevel.After(now)) } func (s *contextSuite) TestDeleteWorkflowExecution_Success() { diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index d19b824573f..6438c5a3ee2 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -27,7 +27,6 @@ package shard import ( "context" "fmt" - "time" "github.com/golang/mock/gomock" @@ -100,7 +99,6 @@ func NewTestContext( taskSequenceNumber: shardInfo.RangeId << int64(config.RangeSizeBits), immediateTaskExclusiveMaxReadLevel: shardInfo.RangeId << int64(config.RangeSizeBits), maxTaskSequenceNumber: (shardInfo.RangeId + 1) << int64(config.RangeSizeBits), - scheduledTaskMaxReadLevelMap: make(map[string]time.Time), remoteClusterInfos: make(map[string]*remoteClusterInfo), handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo),