Skip to content

Commit

Permalink
Single scheduled queue max read level (#3778)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jan 20, 2023
1 parent 82c958d commit 5175ab6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 91 deletions.
5 changes: 1 addition & 4 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,7 @@ func (p *queueBase) processNewRange() error {
newMaxKey = p.shard.GetImmediateQueueExclusiveHighReadWatermark()
case tasks.CategoryTypeScheduled:
var err error
if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark(
p.shard.GetClusterMetadata().GetCurrentClusterName(),
true,
); err != nil {
if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark(); err != nil {
return err
}
default:
Expand Down
4 changes: 1 addition & 3 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ type (
GenerateTaskIDs(number int) ([]int64, error)

GetImmediateQueueExclusiveHighReadWatermark() tasks.Key
// TODO: remove cluster and singleProcessorMode parameter after deprecating old task procesing logic
// In multi-cursor world, there's only one maxReadLevel for scheduled queue for all clusters.
UpdateScheduledQueueExclusiveHighReadWatermark(cluster string, singleProcessorMode bool) (tasks.Key, error)
UpdateScheduledQueueExclusiveHighReadWatermark() (tasks.Key, error)
GetQueueAckLevel(category tasks.Category) tasks.Key
UpdateQueueAckLevel(category tasks.Category, ackLevel tasks.Key) error
GetQueueClusterAckLevel(category tasks.Category, cluster string) tasks.Key
Expand Down
64 changes: 19 additions & 45 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type (
taskSequenceNumber int64
maxTaskSequenceNumber int64
immediateTaskExclusiveMaxReadLevel int64
scheduledTaskMaxReadLevelMap map[string]time.Time // cluster -> scheduledTaskMaxReadLevel
scheduledTaskMaxReadLevel time.Time

// exist only in memory
remoteClusterInfos map[string]*remoteClusterInfo
Expand Down Expand Up @@ -295,50 +295,26 @@ func (s *ContextImpl) getScheduledTaskMaxReadLevel(cluster string) tasks.Key {
s.rLock()
defer s.rUnlock()

if _, ok := s.scheduledTaskMaxReadLevelMap[cluster]; !ok {
s.scheduledTaskMaxReadLevelMap[cluster] = tasks.DefaultFireTime
}

return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0)
return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0)
}

func (s *ContextImpl) UpdateScheduledQueueExclusiveHighReadWatermark(
cluster string,
singleProcessorMode bool,
) (tasks.Key, error) {
func (s *ContextImpl) UpdateScheduledQueueExclusiveHighReadWatermark() (tasks.Key, error) {
s.wLock()
defer s.wUnlock()

if _, ok := s.scheduledTaskMaxReadLevelMap[cluster]; !ok {
s.scheduledTaskMaxReadLevelMap[cluster] = tasks.DefaultFireTime
}

if err := s.errorByState(); err != nil {
return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0), err
return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0), err
}

currentTime := s.timeSource.Now()
if cluster != "" && cluster != s.GetClusterMetadata().GetCurrentClusterName() {
currentTime = s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime
}

// Truncation here is just to make sure max read level has the same precision as the old logic
// in case existing code can't work correctly with precision higher than 1ms.
// Once we validate the rest of the code can worker correctly with higher precision, the truncation should be removed.
newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(persistence.ScheduledTaskMinPrecision)
if singleProcessorMode {
// When generating scheduled tasks, the task's timestamp will be compared to the namespace's active cluster's
// maxReadLevel to avoid generatnig a task before maxReadLevel.
// But when there's a single procssor, the queue is only using current cluster maxReadLevel.
// So update the maxReadLevel map for all clusters to ensure scheduled task won't be lost.
for key := range s.scheduledTaskMaxReadLevelMap {
s.scheduledTaskMaxReadLevelMap[key] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[key], newMaxReadLevel)
}
} else {
s.scheduledTaskMaxReadLevelMap[cluster] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[cluster], newMaxReadLevel)
}
s.scheduledTaskMaxReadLevel = util.MaxTime(s.scheduledTaskMaxReadLevel, newMaxReadLevel)

return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0), nil
return tasks.NewKey(s.scheduledTaskMaxReadLevel, 0), nil
}

// NOTE: the ack level returned is inclusive for immediate task category (acked),
Expand Down Expand Up @@ -1388,7 +1364,6 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked(
transferExclusiveMaxReadLevel *int64,
) error {
now := s.timeSource.Now()
currentCluster := s.GetClusterMetadata().GetCurrentClusterName()
for category, tasksByCategory := range newTasks {
for _, task := range tasksByCategory {
// set taskID
Expand All @@ -1409,13 +1384,7 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked(
// if scheduled task, check if fire time is in the past
if category.Type() == tasks.CategoryTypeScheduled {
ts := task.GetVisibilityTime()
if task.GetVersion() != common.EmptyVersion && category.ID() == tasks.CategoryIDTimer {
// cannot use version to determine the corresponding cluster for timer task
// this is because during failover, timer task should be created as active
// or otherwise, failover + active processing logic may not pick up the task.
currentCluster = namespaceEntry.ActiveClusterName()
}
readCursorTS := s.scheduledTaskMaxReadLevelMap[currentCluster]
readCursorTS := s.scheduledTaskMaxReadLevel
if ts.Truncate(persistence.ScheduledTaskMinPrecision).Before(readCursorTS) {
// make sure scheduled task timestamp is higher than max read level after truncation
// as persistence layer may lose precision when persisting the task.
Expand All @@ -1427,12 +1396,14 @@ func (s *ContextImpl) allocateTaskIDAndTimestampLocked(
tag.Timestamp(ts),
tag.CursorTimestamp(readCursorTS),
tag.ValueShardAllocateTimerBeforeRead)
task.SetVisibilityTime(s.scheduledTaskMaxReadLevelMap[currentCluster].Add(persistence.ScheduledTaskMinPrecision))
task.SetVisibilityTime(s.scheduledTaskMaxReadLevel.Add(persistence.ScheduledTaskMinPrecision))
}

visibilityTs := task.GetVisibilityTime()
s.contextTaggedLogger.Debug("Assigning new timer",
tag.Timestamp(visibilityTs), tag.TaskID(task.GetTaskID()), tag.MaxQueryLevel(readCursorTS))
tag.Timestamp(task.GetVisibilityTime()),
tag.TaskID(task.GetTaskID()),
tag.MaxQueryLevel(readCursorTS),
)
}
}
}
Expand Down Expand Up @@ -1849,7 +1820,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {

// initialize the cluster current time to be the same as ack level
remoteClusterInfos := make(map[string]*remoteClusterInfo)
scheduledTaskMaxReadLevelMap := make(map[string]time.Time)
var scheduledTaskMaxReadLevel time.Time
currentClusterName := s.GetClusterMetadata().GetCurrentClusterName()
taskCategories := tasks.GetCategories()
for clusterName, info := range s.GetClusterMetadata().GetAllClusterInfo() {
Expand Down Expand Up @@ -1890,8 +1861,11 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
// Add().Truncate() here is just to make sure max read level has the same precision as the old logic
// in case existing code can't work correctly with precision higher than 1ms.
// Once we validate the rest of the code can worker correctly with higher precision, the code should simply be
// scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime
scheduledTaskMaxReadLevelMap[clusterName] = maxReadTime.Add(persistence.ScheduledTaskMinPrecision).Truncate(persistence.ScheduledTaskMinPrecision)
// scheduledTaskMaxReadLevel = util.MaxTime(scheduledTaskMaxReadLevel, maxReadTime)
scheduledTaskMaxReadLevel = util.MaxTime(
scheduledTaskMaxReadLevel,
maxReadTime.Add(persistence.ScheduledTaskMinPrecision).Truncate(persistence.ScheduledTaskMinPrecision),
)

if clusterName != currentClusterName {
remoteClusterInfos[clusterName] = &remoteClusterInfo{CurrentTime: maxReadTime}
Expand All @@ -1903,7 +1877,7 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {

s.shardInfo = updatedShardInfo
s.remoteClusterInfos = remoteClusterInfos
s.scheduledTaskMaxReadLevelMap = scheduledTaskMaxReadLevelMap
s.scheduledTaskMaxReadLevel = scheduledTaskMaxReadLevel

return nil
}
Expand Down
8 changes: 4 additions & 4 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 4 additions & 33 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,43 +183,14 @@ func (s *contextSuite) TestTimerMaxReadLevelInitialization() {
}
}

func (s *contextSuite) TestTimerMaxReadLevelUpdate_MultiProcessor() {
now := time.Now()
func (s *contextSuite) TestTimerMaxReadLevelUpdate() {
now := time.Now().Add(time.Minute)
s.timeSource.Update(now)
maxReadLevel, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)

s.timeSource.Update(now.Add(-time.Minute))
newMaxReadLevel, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)
s.Equal(maxReadLevel, newMaxReadLevel)

s.timeSource.Update(now.Add(time.Minute))
newMaxReadLevel, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)
s.True(newMaxReadLevel.FireTime.After(maxReadLevel.FireTime))
}

func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() {
now := time.Now()
s.timeSource.Update(now)

// make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster
_, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, false)
s.NoError(err)
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestAlternativeClusterName, false)
_, err := s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark()
s.NoError(err)

now = time.Now().Add(time.Minute)
s.timeSource.Update(now)

// update in single processor mode
_, err = s.mockShard.UpdateScheduledQueueExclusiveHighReadWatermark(cluster.TestCurrentClusterName, true)
s.NoError(err)
scheduledTaskMaxReadLevelMap := s.mockShard.scheduledTaskMaxReadLevelMap
s.Len(scheduledTaskMaxReadLevelMap, 2)
s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now))
s.True(scheduledTaskMaxReadLevelMap[cluster.TestAlternativeClusterName].After(now))
s.True(s.mockShard.scheduledTaskMaxReadLevel.After(now))
}

func (s *contextSuite) TestDeleteWorkflowExecution_Success() {
Expand Down
2 changes: 0 additions & 2 deletions service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package shard
import (
"context"
"fmt"
"time"

"github.com/golang/mock/gomock"

Expand Down Expand Up @@ -100,7 +99,6 @@ func NewTestContext(
taskSequenceNumber: shardInfo.RangeId << int64(config.RangeSizeBits),
immediateTaskExclusiveMaxReadLevel: shardInfo.RangeId << int64(config.RangeSizeBits),
maxTaskSequenceNumber: (shardInfo.RangeId + 1) << int64(config.RangeSizeBits),
scheduledTaskMaxReadLevelMap: make(map[string]time.Time),
remoteClusterInfos: make(map[string]*remoteClusterInfo),
handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo),

Expand Down

0 comments on commit 5175ab6

Please sign in to comment.