Skip to content

Commit

Permalink
pass reader id to persistence as a hint
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 27, 2023
1 parent 6a03e8f commit b2b5da6
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 43 deletions.
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ const (
// Limit for schedule notes field
ScheduleNotesSizeLimit = 1000
)

const (
// DefaultQueueReaderID is the default readerID when loading history tasks
DefaultQueueReaderID = 0
)
3 changes: 3 additions & 0 deletions common/persistence/tests/execution_mutable_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -518,6 +519,7 @@ func (s *ExecutionMutableStateTaskSuite) TestGetScheduledTasksOrdered() {
response, err := s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{
ShardID: s.ShardID,
TaskCategory: fakeScheduledTaskCategory,
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: tasks.NewKey(now, 0),
ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0),
BatchSize: 10,
Expand Down Expand Up @@ -565,6 +567,7 @@ func (s *ExecutionMutableStateTaskSuite) PaginateTasks(
request := &p.GetHistoryTasksRequest{
ShardID: s.ShardID,
TaskCategory: category,
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: inclusiveMinTaskKey,
ExclusiveMaxTaskKey: exclusiveMaxTaskKey,
BatchSize: batchSize,
Expand Down
1 change: 1 addition & 0 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ func (adh *AdminHandler) ListHistoryTasks(
resp, err := adh.persistenceExecutionManager.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
ShardID: request.ShardId,
TaskCategory: taskCategory,
ReaderID: common.DefaultQueueReaderID, // TODO: 1. does this work? need to move to history?
InclusiveMinTaskKey: minTaskKey,
ExclusiveMaxTaskKey: maxTaskKey,
BatchSize: int(request.BatchSize),
Expand Down
22 changes: 13 additions & 9 deletions service/history/queues/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (

type (
Iterator interface {
collection.Iterator[tasks.Task]
HasNext(readerID int32) bool
Next(readerID int32) (tasks.Task, error)

Range() Range
CanSplit(tasks.Key) bool
Expand All @@ -43,13 +44,14 @@ type (
Remaining() Iterator
}

PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task]
PaginationFnProvider func(int32, Range) collection.PaginationFn[tasks.Task]

IteratorImpl struct {
paginationFnProvider PaginationFnProvider
remainingRange Range

pagingIterator collection.Iterator[tasks.Task]
iteratorReaderID int32
pagingIterator collection.Iterator[tasks.Task]
}
)

Expand All @@ -62,20 +64,22 @@ func NewIterator(
remainingRange: r,

// lazy initialized to prevent task pre-fetching on creating the iterator
pagingIterator: nil,
iteratorReaderID: 0,
pagingIterator: nil,
}
}

func (i *IteratorImpl) HasNext() bool {
if i.pagingIterator == nil {
i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(i.remainingRange))
func (i *IteratorImpl) HasNext(readerID int32) bool {
if i.pagingIterator == nil || i.iteratorReaderID != readerID {
i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(readerID, i.remainingRange))
i.iteratorReaderID = readerID
}

return i.pagingIterator.HasNext()
}

func (i *IteratorImpl) Next() (tasks.Task, error) {
if !i.HasNext() {
func (i *IteratorImpl) Next(readerID int32) (tasks.Task, error) {
if !i.HasNext(readerID) {
panic("Iterator encountered Next call when there is no next item")
}

Expand Down
40 changes: 21 additions & 19 deletions service/history/queues/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() {
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
s.Equal(r, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask}, nil, nil
Expand All @@ -81,14 +81,14 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() {
iterator := NewIterator(paginationFnProvider, r)
s.Equal(r, iterator.Range())

s.True(iterator.HasNext())
task, err := iterator.Next()
s.True(iterator.HasNext(DefaultReaderId))
task, err := iterator.Next(DefaultReaderId)
s.NoError(err)
s.Equal(mockTask, task)

s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range())

s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))
}

func (s *iteratorSuite) TestCanSplit() {
Expand All @@ -113,7 +113,7 @@ func (s *iteratorSuite) TestCanSplit() {

func (s *iteratorSuite) TestSplit() {
r := NewRandomRange()
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{}, nil, nil
}
Expand All @@ -127,8 +127,8 @@ func (s *iteratorSuite) TestSplit() {
leftIterator, rightIterator := iterator.Split(splitKey)
s.Equal(NewRange(r.InclusiveMin, splitKey), leftIterator.Range())
s.Equal(NewRange(splitKey, r.ExclusiveMax), rightIterator.Range())
s.False(leftIterator.HasNext())
s.False(leftIterator.HasNext())
s.False(leftIterator.HasNext(DefaultReaderId))
s.False(leftIterator.HasNext(DefaultReaderId))
}

func (s *iteratorSuite) TestCanMerge() {
Expand Down Expand Up @@ -170,60 +170,60 @@ func (s *iteratorSuite) TestMerge() {
r := NewRandomRange()

numLoad := 0
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
numLoad++
return []tasks.Task{}, nil, nil
}
}

iterator := NewIterator(paginationFnProvider, r)
s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))

incomingIterator := NewIterator(paginationFnProvider, r)
mergedIterator := iterator.Merge(incomingIterator)
s.Equal(r, mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, r.InclusiveMin),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(r.ExclusiveMax, tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(NewRandomKeyInRange(r), tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

// test if Merge returns a new iterator
s.Equal(7, numLoad)
Expand All @@ -238,7 +238,7 @@ func (s *iteratorSuite) TestRemaining() {
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
numLoad++
if paginationRange.ContainsKey(taskKey) {
Expand All @@ -249,14 +249,16 @@ func (s *iteratorSuite) TestRemaining() {
}

iterator := NewIterator(paginationFnProvider, r)
_, err := iterator.Next()
_, err := iterator.Next(DefaultReaderId)
s.NoError(err)
s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))

remaining := iterator.Remaining()
s.Equal(iterator.Range(), remaining.Range())
s.False(remaining.HasNext())
s.False(remaining.HasNext(DefaultReaderId))

// test if Remaining returns a new iterator
s.Equal(2, numLoad)
}

// TODO: add a test on readerID change
2 changes: 1 addition & 1 deletion service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
)

const (
DefaultReaderId = 0
DefaultReaderId = common.DefaultQueueReaderID

// Non-default readers will use critical pending task count * this coefficient
// as its max pending task count so that their loading will never trigger pending
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (s *queueBaseSuite) TestStartStop() {
)
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
key := NewRandomKeyInRange(paginationRange)
Expand Down
3 changes: 2 additions & 1 deletion service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func NewImmediateQueue(
logger log.Logger,
metricsHandler metrics.Handler,
) *immediateQueue {
paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] { // readerID
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
ctx, cancel := newQueueIOContext()
defer cancel()

request := &persistence.GetHistoryTasksRequest{
ShardID: shard.GetShardID(),
TaskCategory: category,
ReaderID: readerID,
InclusiveMinTaskKey: r.InclusiveMin,
ExclusiveMaxTaskKey: r.ExclusiveMax,
BatchSize: options.BatchSize(),
Expand Down
4 changes: 3 additions & 1 deletion service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ func NewScheduledQueue(
logger log.Logger,
metricsHandler metrics.Handler,
) *scheduledQueue {
paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
ctx, cancel := newQueueIOContext()
defer cancel()

request := &persistence.GetHistoryTasksRequest{
ShardID: shard.GetShardID(),
TaskCategory: category,
ReaderID: readerID,
InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0),
BatchSize: options.BatchSize(),
Expand Down Expand Up @@ -264,6 +265,7 @@ func (p *scheduledQueue) lookAheadTask() {
request := &persistence.GetHistoryTasksRequest{
ShardID: p.shard.GetShardID(),
TaskCategory: p.category,
ReaderID: DefaultReaderId,
InclusiveMinTaskKey: tasks.NewKey(lookAheadMinTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(lookAheadMaxTime, 0),
BatchSize: 1,
Expand Down
4 changes: 3 additions & 1 deletion service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() {
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryTimer,
ReaderID: DefaultReaderId,
InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0),
BatchSize: testQueueOptions.BatchSize(),
Expand All @@ -177,7 +178,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() {
NextPageToken: nextPageToken,
}, nil).Times(1)

paginationFn := paginationFnProvider(r)
paginationFn := paginationFnProvider(DefaultReaderId, r)
loadedTasks, actualNextPageToken, err := paginationFn(currentPageToken)
s.NoError(err)
for _, task := range loadedTasks {
Expand Down Expand Up @@ -278,6 +279,7 @@ func (s *scheduledQueueSuite) setupLookAheadMock(
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) {
s.Equal(s.mockShard.GetShardID(), request.ShardID)
s.Equal(tasks.CategoryTimer, request.TaskCategory)
s.Equal(int32(DefaultReaderId), request.ReaderID)
s.Equal(lookAheadRange.InclusiveMin, request.InclusiveMinTaskKey)
s.Equal(1, request.BatchSize)
s.Nil(request.NextPageToken)
Expand Down
10 changes: 5 additions & 5 deletions service/history/queues/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *readerSuite) TestStartLoadStop() {
r := NewRandomRange()
scopes := []Scope{NewScope(r, predicates.Universal[tasks.Task]())}

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
s.Equal(r, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *readerSuite) TestShrinkSlices() {
func (s *readerSuite) TestThrottle() {
scopes := NewRandomScopes(1)

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes()
Expand Down Expand Up @@ -319,7 +319,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_TooManyPendingTasks() {
func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() {
scopes := NewRandomScopes(1)

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
result := make([]tasks.Task, 0, 100)
for i := 0; i != 100; i++ {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() {
func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() {
scopes := NewRandomScopes(2)

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes()
Expand Down Expand Up @@ -384,7 +384,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() {
func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_NoNextSlice() {
scopes := NewRandomScopes(1)

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes()
Expand Down
4 changes: 2 additions & 2 deletions service/history/queues/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func (s *SliceImpl) SelectTasks(readerID int32, batchSize int) ([]Executable, er

executables := make([]Executable, 0, batchSize)
for len(executables) < batchSize && len(s.iterators) != 0 {
if s.iterators[0].HasNext() {
task, err := s.iterators[0].Next()
if s.iterators[0].HasNext(readerID) {
task, err := s.iterators[0].Next(readerID)
if err != nil {
s.iterators[0] = s.iterators[0].Remaining()
if len(executables) != 0 {
Expand Down
Loading

0 comments on commit b2b5da6

Please sign in to comment.