Skip to content

Commit

Permalink
Perform task range completion on shard rangeID update (#4528)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and mindaugasrukas committed Jun 22, 2023
1 parent 8699916 commit b8feb95
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 1 deletion.
16 changes: 15 additions & 1 deletion service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type (
paginationFnProvider PaginationFnProvider
executableInitializer ExecutableInitializer

lastRangeID int64
exclusiveDeletionHighWatermark tasks.Key
nonReadableScope Scope
readerRateLimiter quotas.RequestRateLimiter
Expand Down Expand Up @@ -258,6 +259,7 @@ func newQueueBase(
paginationFnProvider: paginationFnProvider,
executableInitializer: executableInitializer,

lastRangeID: -1, // start from an invalid rangeID
exclusiveDeletionHighWatermark: exclusiveDeletionHighWatermark,
nonReadableScope: NewScope(
NewRange(exclusiveReaderHighWatermark, tasks.MaximumKey),
Expand Down Expand Up @@ -389,7 +391,10 @@ func (p *queueBase) checkpoint() {
// Emit metric before the deletion watermark comparsion so we have the emit even if there's no task
// for the queue
p.metricsHandler.Counter(metrics.TaskBatchCompleteCounter.GetMetricName()).Record(1)
if newExclusiveDeletionHighWatermark.CompareTo(p.exclusiveDeletionHighWatermark) > 0 {
if newExclusiveDeletionHighWatermark.CompareTo(p.exclusiveDeletionHighWatermark) > 0 ||
p.updateShardRangeID() {
// when shard rangeID is updated, perform range completion again in case the underlying persistence implementation
// serves traffic based on the persisted shardInfo
err := p.rangeCompleteTasks(p.exclusiveDeletionHighWatermark, newExclusiveDeletionHighWatermark)
if err != nil {
p.resetCheckpointTimer(err)
Expand Down Expand Up @@ -449,6 +454,15 @@ func (p *queueBase) updateReaderProgress(
}
}

func (p *queueBase) updateShardRangeID() bool {
newRangeID := p.shard.GetRangeID()
if p.lastRangeID < newRangeID {
p.lastRangeID = newRangeID
return true
}
return false
}

func (p *queueBase) rangeCompleteTasks(
oldExclusiveDeletionHighWatermark tasks.Key,
newExclusiveDeletionHighWatermark tasks.Key,
Expand Down
1 change: 1 addition & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type (
// Context represents a history engine shard
Context interface {
GetShardID() int32
GetRangeID() int64
IsValid() bool
GetOwner() string
GetExecutionManager() persistence.ExecutionManager
Expand Down
7 changes: 7 additions & 0 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ func (s *ContextImpl) GetShardID() int32 {
return s.shardID
}

func (s *ContextImpl) GetRangeID() int64 {
s.rLock()
defer s.rUnlock()

return s.getRangeIDLocked()
}

func (s *ContextImpl) GetOwner() string {
// constant from initialization, no need for locks
return s.owner
Expand Down
14 changes: 14 additions & 0 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b8feb95

Please sign in to comment.