Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify readerID on loading history tasks #3994

Merged
merged 4 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ const (
// Limit for schedule notes field
ScheduleNotesSizeLimit = 1000
)

const (
// DefaultQueueReaderID is the default readerID when loading history tasks
DefaultQueueReaderID = 0
)
5 changes: 5 additions & 0 deletions common/persistence/tests/execution_mutable_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -518,6 +519,7 @@ func (s *ExecutionMutableStateTaskSuite) TestGetScheduledTasksOrdered() {
response, err := s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{
ShardID: s.ShardID,
TaskCategory: fakeScheduledTaskCategory,
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: tasks.NewKey(now, 0),
ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0),
BatchSize: 10,
Expand Down Expand Up @@ -565,6 +567,7 @@ func (s *ExecutionMutableStateTaskSuite) PaginateTasks(
request := &p.GetHistoryTasksRequest{
ShardID: s.ShardID,
TaskCategory: category,
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: inclusiveMinTaskKey,
ExclusiveMaxTaskKey: exclusiveMaxTaskKey,
BatchSize: batchSize,
Expand Down Expand Up @@ -621,6 +624,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask(
key := task.GetKey()
resp, err := s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{
ShardID: s.ShardID,
ReaderID: common.DefaultQueueReaderID,
TaskCategory: category,
TaskKey: key,
})
Expand All @@ -636,6 +640,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask(

_, err = s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{
ShardID: s.ShardID,
ReaderID: common.DefaultQueueReaderID,
TaskCategory: category,
TaskKey: key,
})
Expand Down
1 change: 1 addition & 0 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ func (adh *AdminHandler) ListHistoryTasks(
resp, err := adh.persistenceExecutionManager.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
ShardID: request.ShardId,
TaskCategory: taskCategory,
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: minTaskKey,
ExclusiveMaxTaskKey: maxTaskKey,
BatchSize: int(request.BatchSize),
Expand Down
22 changes: 13 additions & 9 deletions service/history/queues/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (

type (
Iterator interface {
collection.Iterator[tasks.Task]
HasNext(readerID int32) bool
Next(readerID int32) (tasks.Task, error)
Comment on lines +36 to +37
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want iterators to be able to freely move between slices & queue readers, instead of bind to a specific slice or reader. Existing implementation is also based on that idea.

So readerID here can be regarded as a context for loading operations. The iterator implementation knows that if readerID changes, a new underlying pagingIterator is needed for talking to persistence.


Range() Range
CanSplit(tasks.Key) bool
Expand All @@ -43,13 +44,14 @@ type (
Remaining() Iterator
}

PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task]
PaginationFnProvider func(int32, Range) collection.PaginationFn[tasks.Task]

IteratorImpl struct {
paginationFnProvider PaginationFnProvider
remainingRange Range

pagingIterator collection.Iterator[tasks.Task]
iteratorReaderID int32
pagingIterator collection.Iterator[tasks.Task]
}
)

Expand All @@ -62,20 +64,22 @@ func NewIterator(
remainingRange: r,

// lazy initialized to prevent task pre-fetching on creating the iterator
pagingIterator: nil,
iteratorReaderID: 0,
pagingIterator: nil,
}
}

func (i *IteratorImpl) HasNext() bool {
if i.pagingIterator == nil {
i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(i.remainingRange))
func (i *IteratorImpl) HasNext(readerID int32) bool {
if i.pagingIterator == nil || i.iteratorReaderID != readerID {
i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(readerID, i.remainingRange))
i.iteratorReaderID = readerID
}

return i.pagingIterator.HasNext()
}

func (i *IteratorImpl) Next() (tasks.Task, error) {
if !i.HasNext() {
func (i *IteratorImpl) Next(readerID int32) (tasks.Task, error) {
if !i.HasNext(readerID) {
panic("Iterator encountered Next call when there is no next item")
}

Expand Down
89 changes: 70 additions & 19 deletions service/history/queues/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package queues

import (
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() {
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
s.Equal(r, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask}, nil, nil
Expand All @@ -81,14 +82,64 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() {
iterator := NewIterator(paginationFnProvider, r)
s.Equal(r, iterator.Range())

s.True(iterator.HasNext())
task, err := iterator.Next()
s.True(iterator.HasNext(DefaultReaderId))
task, err := iterator.Next(DefaultReaderId)
s.NoError(err)
s.Equal(mockTask, task)

s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range())

s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))
}

func (s *iteratorSuite) TestNext_ReaderIDChange() {
r := NewRandomRange()

firstPageReaderID := int32(DefaultReaderId)
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)

secondPageReaderID := int32(DefaultReaderId + 1)
secondPageRange := NewRange(taskKey.Next(), r.ExclusiveMax)
taskKey2 := NewRandomKeyInRange(secondPageRange)
mockTask2 := tasks.NewMockTask(s.controller)
mockTask2.EXPECT().GetKey().Return(taskKey2).Times(1)

paginationFnProvider := func(readerID int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
switch readerID {
case firstPageReaderID:
s.Equal(r, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask}, []byte("nextPageToken"), nil
}
case secondPageReaderID:
s.Equal(secondPageRange, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask2}, nil, nil
}
default:
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return nil, nil, fmt.Errorf("unexpected readerID: %v", readerID)
}
}
}

iterator := NewIterator(paginationFnProvider, r)
s.Equal(r, iterator.Range())

s.True(iterator.HasNext(firstPageReaderID))
task, err := iterator.Next(firstPageReaderID)
s.NoError(err)
s.Equal(mockTask, task)
s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range())

s.True(iterator.HasNext(secondPageReaderID))
task2, err := iterator.Next(secondPageReaderID)
s.NoError(err)
s.Equal(mockTask2, task2)

s.Equal(NewRange(taskKey2.Next(), r.ExclusiveMax), iterator.Range())
}

func (s *iteratorSuite) TestCanSplit() {
Expand All @@ -113,7 +164,7 @@ func (s *iteratorSuite) TestCanSplit() {

func (s *iteratorSuite) TestSplit() {
r := NewRandomRange()
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{}, nil, nil
}
Expand All @@ -127,8 +178,8 @@ func (s *iteratorSuite) TestSplit() {
leftIterator, rightIterator := iterator.Split(splitKey)
s.Equal(NewRange(r.InclusiveMin, splitKey), leftIterator.Range())
s.Equal(NewRange(splitKey, r.ExclusiveMax), rightIterator.Range())
s.False(leftIterator.HasNext())
s.False(leftIterator.HasNext())
s.False(leftIterator.HasNext(DefaultReaderId))
s.False(leftIterator.HasNext(DefaultReaderId))
}

func (s *iteratorSuite) TestCanMerge() {
Expand Down Expand Up @@ -170,60 +221,60 @@ func (s *iteratorSuite) TestMerge() {
r := NewRandomRange()

numLoad := 0
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
numLoad++
return []tasks.Task{}, nil, nil
}
}

iterator := NewIterator(paginationFnProvider, r)
s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))

incomingIterator := NewIterator(paginationFnProvider, r)
mergedIterator := iterator.Merge(incomingIterator)
s.Equal(r, mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, r.InclusiveMin),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(r.ExclusiveMax, tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, NewRandomKeyInRange(r)),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(NewRandomKeyInRange(r), tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

incomingIterator = NewIterator(
paginationFnProvider,
NewRange(tasks.MinimumKey, tasks.MaximumKey),
)
mergedIterator = iterator.Merge(incomingIterator)
s.Equal(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergedIterator.Range())
s.False(mergedIterator.HasNext())
s.False(mergedIterator.HasNext(DefaultReaderId))

// test if Merge returns a new iterator
s.Equal(7, numLoad)
Expand All @@ -238,7 +289,7 @@ func (s *iteratorSuite) TestRemaining() {
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)
paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
numLoad++
if paginationRange.ContainsKey(taskKey) {
Expand All @@ -249,13 +300,13 @@ func (s *iteratorSuite) TestRemaining() {
}

iterator := NewIterator(paginationFnProvider, r)
_, err := iterator.Next()
_, err := iterator.Next(DefaultReaderId)
s.NoError(err)
s.False(iterator.HasNext())
s.False(iterator.HasNext(DefaultReaderId))

remaining := iterator.Remaining()
s.Equal(iterator.Range(), remaining.Range())
s.False(remaining.HasNext())
s.False(remaining.HasNext(DefaultReaderId))

// test if Remaining returns a new iterator
s.Equal(2, numLoad)
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
)

const (
DefaultReaderId = 0
DefaultReaderId = common.DefaultQueueReaderID

// Non-default readers will use critical pending task count * this coefficient
// as its max pending task count so that their loading will never trigger pending
Expand Down
2 changes: 1 addition & 1 deletion service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (s *queueBaseSuite) TestStartStop() {
)
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()

paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
mockTask := tasks.NewMockTask(s.controller)
key := NewRandomKeyInRange(paginationRange)
Expand Down
3 changes: 2 additions & 1 deletion service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func NewImmediateQueue(
logger log.Logger,
metricsHandler metrics.Handler,
) *immediateQueue {
paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
ctx, cancel := newQueueIOContext()
defer cancel()

request := &persistence.GetHistoryTasksRequest{
ShardID: shard.GetShardID(),
TaskCategory: category,
ReaderID: readerID,
InclusiveMinTaskKey: r.InclusiveMin,
ExclusiveMaxTaskKey: r.ExclusiveMax,
BatchSize: options.BatchSize(),
Expand Down
4 changes: 3 additions & 1 deletion service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ func NewScheduledQueue(
logger log.Logger,
metricsHandler metrics.Handler,
) *scheduledQueue {
paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] {
paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
ctx, cancel := newQueueIOContext()
defer cancel()

request := &persistence.GetHistoryTasksRequest{
ShardID: shard.GetShardID(),
TaskCategory: category,
ReaderID: readerID,
InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0),
BatchSize: options.BatchSize(),
Expand Down Expand Up @@ -261,6 +262,7 @@ func (p *scheduledQueue) lookAheadTask() {
request := &persistence.GetHistoryTasksRequest{
ShardID: p.shard.GetShardID(),
TaskCategory: p.category,
ReaderID: DefaultReaderId,
InclusiveMinTaskKey: tasks.NewKey(lookAheadMinTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(lookAheadMaxTime, 0),
BatchSize: 1,
Expand Down
4 changes: 3 additions & 1 deletion service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() {
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryTimer,
ReaderID: DefaultReaderId,
InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0),
ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0),
BatchSize: testQueueOptions.BatchSize(),
Expand All @@ -177,7 +178,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() {
NextPageToken: nextPageToken,
}, nil).Times(1)

paginationFn := paginationFnProvider(r)
paginationFn := paginationFnProvider(DefaultReaderId, r)
loadedTasks, actualNextPageToken, err := paginationFn(currentPageToken)
s.NoError(err)
for _, task := range loadedTasks {
Expand Down Expand Up @@ -265,6 +266,7 @@ func (s *scheduledQueueSuite) setupLookAheadMock(
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) {
s.Equal(s.mockShard.GetShardID(), request.ShardID)
s.Equal(tasks.CategoryTimer, request.TaskCategory)
s.Equal(int32(DefaultReaderId), request.ReaderID)
s.Equal(lookAheadRange.InclusiveMin, request.InclusiveMinTaskKey)
s.Equal(1, request.BatchSize)
s.Nil(request.NextPageToken)
Expand Down
Loading