Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 28, 2023
1 parent b2b5da6 commit 184a5cb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
2 changes: 1 addition & 1 deletion service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func (adh *AdminHandler) ListHistoryTasks(
resp, err := adh.persistenceExecutionManager.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
ShardID: request.ShardId,
TaskCategory: taskCategory,
ReaderID: common.DefaultQueueReaderID, // TODO: 1. does this work? need to move to history?
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: minTaskKey,
ExclusiveMaxTaskKey: maxTaskKey,
BatchSize: int(request.BatchSize),
Expand Down
53 changes: 51 additions & 2 deletions service/history/queues/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package queues

import (
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -91,6 +92,56 @@ func (s *iteratorSuite) TestNext_IncreaseTaskKey() {
s.False(iterator.HasNext(DefaultReaderId))
}

func (s *iteratorSuite) TestNext_ReaderIDChange() {
r := NewRandomRange()

firstPageReaderID := int32(DefaultReaderId)
taskKey := NewRandomKeyInRange(r)
mockTask := tasks.NewMockTask(s.controller)
mockTask.EXPECT().GetKey().Return(taskKey).Times(1)

secondPageReaderID := int32(DefaultReaderId + 1)
secondPageRange := NewRange(taskKey.Next(), r.ExclusiveMax)
taskKey2 := NewRandomKeyInRange(secondPageRange)
mockTask2 := tasks.NewMockTask(s.controller)
mockTask2.EXPECT().GetKey().Return(taskKey2).Times(1)

paginationFnProvider := func(readerID int32, paginationRange Range) collection.PaginationFn[tasks.Task] {
switch readerID {
case firstPageReaderID:
s.Equal(r, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask}, []byte("nextPageToken"), nil
}
case secondPageReaderID:
s.Equal(secondPageRange, paginationRange)
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return []tasks.Task{mockTask2}, nil, nil
}
default:
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
return nil, nil, fmt.Errorf("unexpected readerID: %v", readerID)
}
}
}

iterator := NewIterator(paginationFnProvider, r)
s.Equal(r, iterator.Range())

s.True(iterator.HasNext(firstPageReaderID))
task, err := iterator.Next(firstPageReaderID)
s.NoError(err)
s.Equal(mockTask, task)
s.Equal(NewRange(taskKey.Next(), r.ExclusiveMax), iterator.Range())

s.True(iterator.HasNext(secondPageReaderID))
task2, err := iterator.Next(secondPageReaderID)
s.NoError(err)
s.Equal(mockTask2, task2)

s.Equal(NewRange(taskKey2.Next(), r.ExclusiveMax), iterator.Range())
}

func (s *iteratorSuite) TestCanSplit() {
r := NewRandomRange()

Expand Down Expand Up @@ -260,5 +311,3 @@ func (s *iteratorSuite) TestRemaining() {
// test if Remaining returns a new iterator
s.Equal(2, numLoad)
}

// TODO: add a test on readerID change
8 changes: 5 additions & 3 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,11 @@ func (p *ackMgrImpl) getReplicationTasksFn(
) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
response, err := p.executionMgr.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
ShardID: p.shard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
ReaderID: common.DefaultQueueReaderID, // TODO: need different readerID for different remote cluster
ShardID: p.shard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
// TODO: need different readerID for different remote clusters
// e.g. use cluster's initial failover version
ReaderID: common.DefaultQueueReaderID,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: batchSize,
Expand Down

0 comments on commit 184a5cb

Please sign in to comment.