diff --git a/common/constants.go b/common/constants.go index aaee624aa57..eaa126f0eae 100644 --- a/common/constants.go +++ b/common/constants.go @@ -97,3 +97,8 @@ const ( // Limit for schedule notes field ScheduleNotesSizeLimit = 1000 ) + +const ( + // DefaultQueueReaderID is the default readerID when loading history tasks + DefaultQueueReaderID = 0 +) diff --git a/common/persistence/tests/execution_mutable_state_task.go b/common/persistence/tests/execution_mutable_state_task.go index c99b18b9a4e..0afe635f9ed 100644 --- a/common/persistence/tests/execution_mutable_state_task.go +++ b/common/persistence/tests/execution_mutable_state_task.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/debug" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/dynamicconfig" @@ -518,6 +519,7 @@ func (s *ExecutionMutableStateTaskSuite) TestGetScheduledTasksOrdered() { response, err := s.ExecutionManager.GetHistoryTasks(s.Ctx, &p.GetHistoryTasksRequest{ ShardID: s.ShardID, TaskCategory: fakeScheduledTaskCategory, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewKey(now, 0), ExclusiveMaxTaskKey: tasks.NewKey(now.Add(time.Second), 0), BatchSize: 10, @@ -565,6 +567,7 @@ func (s *ExecutionMutableStateTaskSuite) PaginateTasks( request := &p.GetHistoryTasksRequest{ ShardID: s.ShardID, TaskCategory: category, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: inclusiveMinTaskKey, ExclusiveMaxTaskKey: exclusiveMaxTaskKey, BatchSize: batchSize, @@ -621,6 +624,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask( key := task.GetKey() resp, err := s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{ ShardID: s.ShardID, + ReaderID: common.DefaultQueueReaderID, TaskCategory: category, TaskKey: key, }) @@ -636,6 +640,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask( _, err = s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{ ShardID: s.ShardID, + ReaderID: common.DefaultQueueReaderID, TaskCategory: category, TaskKey: key, }) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index a6a35c86010..354aa5482ce 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -747,6 +747,7 @@ func (adh *AdminHandler) ListHistoryTasks( resp, err := adh.persistenceExecutionManager.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: request.ShardId, TaskCategory: taskCategory, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: minTaskKey, ExclusiveMaxTaskKey: maxTaskKey, BatchSize: int(request.BatchSize), diff --git a/service/history/queues/iterator.go b/service/history/queues/iterator.go index fb31a6729d2..23ae347aa93 100644 --- a/service/history/queues/iterator.go +++ b/service/history/queues/iterator.go @@ -33,7 +33,8 @@ import ( type ( Iterator interface { - collection.Iterator[tasks.Task] + HasNext(readerID int32) bool + Next(readerID int32) (tasks.Task, error) Range() Range CanSplit(tasks.Key) bool @@ -43,13 +44,14 @@ type ( Remaining() Iterator } - PaginationFnProvider func(Range) collection.PaginationFn[tasks.Task] + PaginationFnProvider func(int32, Range) collection.PaginationFn[tasks.Task] IteratorImpl struct { paginationFnProvider PaginationFnProvider remainingRange Range - pagingIterator collection.Iterator[tasks.Task] + iteratorReaderID int32 + pagingIterator collection.Iterator[tasks.Task] } ) @@ -62,20 +64,22 @@ func NewIterator( remainingRange: r, // lazy initialized to prevent task pre-fetching on creating the iterator - pagingIterator: nil, + iteratorReaderID: 0, + pagingIterator: nil, } } -func (i *IteratorImpl) HasNext() bool { - if i.pagingIterator == nil { - i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(i.remainingRange)) +func (i *IteratorImpl) HasNext(readerID int32) bool { + if i.pagingIterator == nil || i.iteratorReaderID != readerID { + i.pagingIterator = collection.NewPagingIterator(i.paginationFnProvider(readerID, i.remainingRange)) + i.iteratorReaderID = readerID } return i.pagingIterator.HasNext() } -func (i *IteratorImpl) Next() (tasks.Task, error) { - if !i.HasNext() { +func (i *IteratorImpl) Next(readerID int32) (tasks.Task, error) { + if !i.HasNext(readerID) { panic("Iterator encountered Next call when there is no next item") } diff --git a/service/history/queues/iterator_test.go b/service/history/queues/iterator_test.go index 406429f143a..db33e6fcb51 100644 --- a/service/history/queues/iterator_test.go +++ b/service/history/queues/iterator_test.go @@ -25,6 +25,7 @@ package queues import ( + "fmt" "math/rand" "testing" "time" @@ -71,7 +72,7 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() { taskKey := NewRandomKeyInRange(r) mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(taskKey).Times(1) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { s.Equal(r, paginationRange) return func(paginationToken []byte) ([]tasks.Task, []byte, error) { return []tasks.Task{mockTask}, nil, nil @@ -81,14 +82,64 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() { iterator := NewIterator(paginationFnProvider, r) s.Equal(r, iterator.Range()) - s.True(iterator.HasNext()) - task, err := iterator.Next() + s.True(iterator.HasNext(DefaultReaderId)) + task, err := iterator.Next(DefaultReaderId) s.NoError(err) s.Equal(mockTask, task) s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range()) - s.False(iterator.HasNext()) + s.False(iterator.HasNext(DefaultReaderId)) +} + +func (s *iteratorSuite) TestNext_ReaderIDChange() { + r := NewRandomRange() + + firstPageReaderID := int32(DefaultReaderId) + taskKey := NewRandomKeyInRange(r) + mockTask := tasks.NewMockTask(s.controller) + mockTask.EXPECT().GetKey().Return(taskKey).Times(1) + + secondPageReaderID := int32(DefaultReaderId + 1) + secondPageRange := NewRange(taskKey.Next(), r.ExclusiveMax) + taskKey2 := NewRandomKeyInRange(secondPageRange) + mockTask2 := tasks.NewMockTask(s.controller) + mockTask2.EXPECT().GetKey().Return(taskKey2).Times(1) + + paginationFnProvider := func(readerID int32, paginationRange Range) collection.PaginationFn[tasks.Task] { + switch readerID { + case firstPageReaderID: + s.Equal(r, paginationRange) + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{mockTask}, []byte("nextPageToken"), nil + } + case secondPageReaderID: + s.Equal(secondPageRange, paginationRange) + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{mockTask2}, nil, nil + } + default: + return func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return nil, nil, fmt.Errorf("unexpected readerID: %v", readerID) + } + } + } + + iterator := NewIterator(paginationFnProvider, r) + s.Equal(r, iterator.Range()) + + s.True(iterator.HasNext(firstPageReaderID)) + task, err := iterator.Next(firstPageReaderID) + s.NoError(err) + s.Equal(mockTask, task) + s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range()) + + s.True(iterator.HasNext(secondPageReaderID)) + task2, err := iterator.Next(secondPageReaderID) + s.NoError(err) + s.Equal(mockTask2, task2) + + s.Equal(NewRange(taskKey2.Next(), r.ExclusiveMax), iterator.Range()) } func (s *iteratorSuite) TestCanSplit() { @@ -113,7 +164,7 @@ func (s *iteratorSuite) TestCanSplit() { func (s *iteratorSuite) TestSplit() { r := NewRandomRange() - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { return []tasks.Task{}, nil, nil } @@ -127,8 +178,8 @@ func (s *iteratorSuite) TestSplit() { leftIterator, rightIterator := iterator.Split(splitKey) s.Equal(NewRange(r.InclusiveMin, splitKey), leftIterator.Range()) s.Equal(NewRange(splitKey, r.ExclusiveMax), rightIterator.Range()) - s.False(leftIterator.HasNext()) - s.False(leftIterator.HasNext()) + s.False(leftIterator.HasNext(DefaultReaderId)) + s.False(leftIterator.HasNext(DefaultReaderId)) } func (s *iteratorSuite) TestCanMerge() { @@ -170,7 +221,7 @@ func (s *iteratorSuite) TestMerge() { r := NewRandomRange() numLoad := 0 - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { numLoad++ return []tasks.Task{}, nil, nil @@ -178,12 +229,12 @@ func (s *iteratorSuite) TestMerge() { } iterator := NewIterator(paginationFnProvider, r) - s.False(iterator.HasNext()) + s.False(iterator.HasNext(DefaultReaderId)) incomingIterator := NewIterator(paginationFnProvider, r) mergedIterator := iterator.Merge(incomingIterator) s.Equal(r, mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) incomingIterator = NewIterator( paginationFnProvider, @@ -191,7 +242,7 @@ func (s *iteratorSuite) TestMerge() { ) mergedIterator = iterator.Merge(incomingIterator) s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) incomingIterator = NewIterator( paginationFnProvider, @@ -199,7 +250,7 @@ func (s *iteratorSuite) TestMerge() { ) mergedIterator = iterator.Merge(incomingIterator) s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) incomingIterator = NewIterator( paginationFnProvider, @@ -207,7 +258,7 @@ func (s *iteratorSuite) TestMerge() { ) mergedIterator = iterator.Merge(incomingIterator) s.Equal(NewRange(tasks.MinimumKey, r.ExclusiveMax), mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) incomingIterator = NewIterator( paginationFnProvider, @@ -215,7 +266,7 @@ func (s *iteratorSuite) TestMerge() { ) mergedIterator = iterator.Merge(incomingIterator) s.Equal(NewRange(r.InclusiveMin, tasks.MaximumKey), mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) incomingIterator = NewIterator( paginationFnProvider, @@ -223,7 +274,7 @@ func (s *iteratorSuite) TestMerge() { ) mergedIterator = iterator.Merge(incomingIterator) s.Equal(NewRange(tasks.MinimumKey, tasks.MaximumKey), mergedIterator.Range()) - s.False(mergedIterator.HasNext()) + s.False(mergedIterator.HasNext(DefaultReaderId)) // test if Merge returns a new iterator s.Equal(7, numLoad) @@ -238,7 +289,7 @@ func (s *iteratorSuite) TestRemaining() { taskKey := NewRandomKeyInRange(r) mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(taskKey).Times(1) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { numLoad++ if paginationRange.ContainsKey(taskKey) { @@ -249,13 +300,13 @@ func (s *iteratorSuite) TestRemaining() { } iterator := NewIterator(paginationFnProvider, r) - _, err := iterator.Next() + _, err := iterator.Next(DefaultReaderId) s.NoError(err) - s.False(iterator.HasNext()) + s.False(iterator.HasNext(DefaultReaderId)) remaining := iterator.Remaining() s.Equal(iterator.Range(), remaining.Range()) - s.False(remaining.HasNext()) + s.False(remaining.HasNext(DefaultReaderId)) // test if Remaining returns a new iterator s.Equal(2, numLoad) diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 55864847ef0..757560480f7 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -47,7 +47,7 @@ import ( ) const ( - DefaultReaderId = 0 + DefaultReaderId = common.DefaultQueueReaderID // Non-default readers will use critical pending task count * this coefficient // as its max pending task count so that their loading will never trigger pending diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index 68e775e903a..d3081fb198b 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -259,7 +259,7 @@ func (s *queueBaseSuite) TestStartStop() { ) mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTask := tasks.NewMockTask(s.controller) key := NewRandomKeyInRange(paginationRange) diff --git a/service/history/queues/queue_immediate.go b/service/history/queues/queue_immediate.go index 538a0e5c7c8..8bf49039e9f 100644 --- a/service/history/queues/queue_immediate.go +++ b/service/history/queues/queue_immediate.go @@ -62,7 +62,7 @@ func NewImmediateQueue( logger log.Logger, metricsHandler metrics.Handler, ) *immediateQueue { - paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { ctx, cancel := newQueueIOContext() defer cancel() @@ -70,6 +70,7 @@ func NewImmediateQueue( request := &persistence.GetHistoryTasksRequest{ ShardID: shard.GetShardID(), TaskCategory: category, + ReaderID: readerID, InclusiveMinTaskKey: r.InclusiveMin, ExclusiveMaxTaskKey: r.ExclusiveMax, BatchSize: options.BatchSize(), diff --git a/service/history/queues/queue_scheduled.go b/service/history/queues/queue_scheduled.go index 40bbe462c2e..5f86948d0a8 100644 --- a/service/history/queues/queue_scheduled.go +++ b/service/history/queues/queue_scheduled.go @@ -75,7 +75,7 @@ func NewScheduledQueue( logger log.Logger, metricsHandler metrics.Handler, ) *scheduledQueue { - paginationFnProvider := func(r Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(readerID int32, r Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { ctx, cancel := newQueueIOContext() defer cancel() @@ -83,6 +83,7 @@ func NewScheduledQueue( request := &persistence.GetHistoryTasksRequest{ ShardID: shard.GetShardID(), TaskCategory: category, + ReaderID: readerID, InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0), ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0), BatchSize: options.BatchSize(), @@ -261,6 +262,7 @@ func (p *scheduledQueue) lookAheadTask() { request := &persistence.GetHistoryTasksRequest{ ShardID: p.shard.GetShardID(), TaskCategory: p.category, + ReaderID: DefaultReaderId, InclusiveMinTaskKey: tasks.NewKey(lookAheadMinTime, 0), ExclusiveMaxTaskKey: tasks.NewKey(lookAheadMaxTime, 0), BatchSize: 1, diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index ab1726f2d3a..5006ea68777 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -168,6 +168,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() { s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryTimer, + ReaderID: DefaultReaderId, InclusiveMinTaskKey: tasks.NewKey(r.InclusiveMin.FireTime, 0), ExclusiveMaxTaskKey: tasks.NewKey(r.ExclusiveMax.FireTime.Add(persistence.ScheduledTaskMinPrecision), 0), BatchSize: testQueueOptions.BatchSize(), @@ -177,7 +178,7 @@ func (s *scheduledQueueSuite) TestPaginationFnProvider() { NextPageToken: nextPageToken, }, nil).Times(1) - paginationFn := paginationFnProvider(r) + paginationFn := paginationFnProvider(DefaultReaderId, r) loadedTasks, actualNextPageToken, err := paginationFn(currentPageToken) s.NoError(err) for _, task := range loadedTasks { @@ -265,6 +266,7 @@ func (s *scheduledQueueSuite) setupLookAheadMock( s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) { s.Equal(s.mockShard.GetShardID(), request.ShardID) s.Equal(tasks.CategoryTimer, request.TaskCategory) + s.Equal(int32(DefaultReaderId), request.ReaderID) s.Equal(lookAheadRange.InclusiveMin, request.InclusiveMinTaskKey) s.Equal(1, request.BatchSize) s.Nil(request.NextPageToken) diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index 2933f34a90a..07ba168b7f5 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -94,7 +94,7 @@ func (s *readerSuite) TestStartLoadStop() { r := NewRandomRange() scopes := []Scope{NewScope(r, predicates.Universal[tasks.Task]())} - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { s.Equal(r, paginationRange) return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTask := tasks.NewMockTask(s.controller) @@ -256,7 +256,7 @@ func (s *readerSuite) TestShrinkSlices() { func (s *readerSuite) TestThrottle() { scopes := NewRandomScopes(1) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes() @@ -323,7 +323,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_TooManyPendingTasks() { func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() { scopes := NewRandomScopes(1) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { result := make([]tasks.Task, 0, 100) for i := 0; i != 100; i++ { @@ -360,7 +360,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_MoreTasks() { func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() { scopes := NewRandomScopes(2) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes() @@ -392,7 +392,7 @@ func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_HasNextSlice() { func (s *readerSuite) TestLoadAndSubmitTasks_NoMoreTasks_NoNextSlice() { scopes := NewRandomScopes(1) - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, _ Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTask := tasks.NewMockTask(s.controller) mockTask.EXPECT().GetKey().Return(NewRandomKeyInRange(scopes[0].Range)).AnyTimes() diff --git a/service/history/queues/slice.go b/service/history/queues/slice.go index ad7655a92c1..9ab9f1b1b35 100644 --- a/service/history/queues/slice.go +++ b/service/history/queues/slice.go @@ -372,8 +372,8 @@ func (s *SliceImpl) SelectTasks(readerID int32, batchSize int) ([]Executable, er executables := make([]Executable, 0, batchSize) for len(executables) < batchSize && len(s.iterators) != 0 { - if s.iterators[0].HasNext() { - task, err := s.iterators[0].Next() + if s.iterators[0].HasNext(readerID) { + task, err := s.iterators[0].Next(readerID) if err != nil { s.iterators[0] = s.iterators[0].Remaining() if len(executables) != 0 { diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 35ddb77a51a..d7dc34fa035 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -425,7 +425,7 @@ func (s *sliceSuite) TestSelectTasks_NoError() { predicate := tasks.NewNamespacePredicate(namespaceIDs) numTasks := 20 - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { mockTasks := make([]tasks.Task, 0, numTasks) @@ -475,7 +475,7 @@ func (s *sliceSuite) TestSelectTasks_Error_NoLoadedTasks() { numTasks := 20 loadErr := true - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { if loadErr { loadErr = false @@ -515,7 +515,7 @@ func (s *sliceSuite) TestSelectTasks_Error_WithLoadedTasks() { numTasks := 20 loadErr := false - paginationFnProvider := func(paginationRange Range) collection.PaginationFn[tasks.Task] { + paginationFnProvider := func(_ int32, paginationRange Range) collection.PaginationFn[tasks.Task] { return func(paginationToken []byte) ([]tasks.Task, []byte, error) { defer func() { loadErr = !loadErr diff --git a/service/history/replication/ack_manager.go b/service/history/replication/ack_manager.go index dc8a8d2264b..0571bb9cb24 100644 --- a/service/history/replication/ack_manager.go +++ b/service/history/replication/ack_manager.go @@ -276,7 +276,7 @@ func (p *ackMgrImpl) getTasks( replicationTasks := make([]*replicationspb.ReplicationTask, 0, p.pageSize()) skippedTaskCount := 0 lastTaskID := maxTaskID // If no tasks are returned, then it means there are no tasks bellow maxTaskID. - iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, minTaskID, maxTaskID, p.pageSize())) + iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, pollingCluster, minTaskID, maxTaskID, p.pageSize())) // iter.HasNext() should be the last check to avoid extra page read in case if replicationTasks is already full. for len(replicationTasks) < p.pageSize() && skippedTaskCount <= p.maxSkipTaskCount() && iter.HasNext() { task, err := iter.Next() @@ -323,6 +323,7 @@ func (p *ackMgrImpl) getTasks( func (p *ackMgrImpl) getReplicationTasksFn( ctx context.Context, + pollingCluster string, minTaskID int64, maxTaskID int64, batchSize int, @@ -331,6 +332,7 @@ func (p *ackMgrImpl) getReplicationTasksFn( response, err := p.executionMgr.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: p.shard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: p.clusterToReaderID(pollingCluster), InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: batchSize, @@ -747,3 +749,11 @@ func (p *ackMgrImpl) handleReadHistoryError( return err } } + +func (p *ackMgrImpl) clusterToReaderID( + pollingCluster string, +) int32 { + // TODO: need different readerID for different remote clusters + // e.g. use cluster's initial failover version + return common.DefaultQueueReaderID +} diff --git a/service/history/replication/ack_manager_test.go b/service/history/replication/ack_manager_test.go index 49a2715e2ab..7aa1fadf45e 100644 --- a/service/history/replication/ack_manager_test.go +++ b/service/history/replication/ack_manager_test.go @@ -559,6 +559,7 @@ func (s *ackManagerSuite) TestGetTasks_NoTasksInDB() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -580,6 +581,7 @@ func (s *ackManagerSuite) TestGetTasks_FirstPersistenceErrorReturnsErrorAndEmpty s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -610,6 +612,7 @@ func (s *ackManagerSuite) TestGetTasks_SecondPersistenceErrorReturnsPartialResul s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -661,6 +664,7 @@ func (s *ackManagerSuite) TestGetTasks_FullPage() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -712,6 +716,7 @@ func (s *ackManagerSuite) TestGetTasks_PartialPage() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -775,6 +780,7 @@ func (s *ackManagerSuite) TestGetTasks_FilterNamespace() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -788,6 +794,7 @@ func (s *ackManagerSuite) TestGetTasks_FilterNamespace() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), @@ -798,6 +805,7 @@ func (s *ackManagerSuite) TestGetTasks_FilterNamespace() { s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), BatchSize: s.replicationAckManager.pageSize(), diff --git a/service/history/replication/dlq_handler.go b/service/history/replication/dlq_handler.go index 3857064122d..5ab331bbc48 100644 --- a/service/history/replication/dlq_handler.go +++ b/service/history/replication/dlq_handler.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/server/api/historyservice/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client" + "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/persistence" @@ -257,6 +258,7 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel( GetHistoryTasksRequest: persistence.GetHistoryTasksRequest{ ShardID: r.shard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(ackLevel + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(lastMessageID + 1), BatchSize: pageSize, diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 105a3bfafc4..8da7c035e6e 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -41,6 +41,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" "go.temporal.io/server/client" + "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/persistence" @@ -188,6 +189,7 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() { GetHistoryTasksRequest: persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(persistence.EmptyQueueMessageID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(lastMessageID + 1), BatchSize: pageSize, @@ -277,6 +279,7 @@ func (s *dlqHandlerSuite) TestMergeMessages() { GetHistoryTasksRequest: persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(persistence.EmptyQueueMessageID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(lastMessageID + 1), BatchSize: pageSize, diff --git a/tests/ndc/replication_integration_test.go b/tests/ndc/replication_integration_test.go index c9fac739958..2d9b2a481ce 100644 --- a/tests/ndc/replication_integration_test.go +++ b/tests/ndc/replication_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/pborman/uuid" historypb "go.temporal.io/api/history/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/persistence" test "go.temporal.io/server/common/testing" "go.temporal.io/server/service/history/tasks" @@ -130,6 +131,7 @@ Loop: GetHistoryTasksRequest: persistence.GetHistoryTasksRequest{ ShardID: shardID, TaskCategory: tasks.CategoryReplication, + ReaderID: common.DefaultQueueReaderID, InclusiveMinTaskKey: tasks.NewImmediateKey(0), ExclusiveMaxTaskKey: tasks.NewImmediateKey(math.MaxInt64), BatchSize: math.MaxInt64,