Skip to content

Commit

Permalink
update test and replication
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Mar 1, 2023
1 parent 3ad6e60 commit 723ce47
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
2 changes: 2 additions & 0 deletions common/persistence/tests/execution_mutable_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask(
key := task.GetKey()
resp, err := s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{
ShardID: s.ShardID,
ReaderID: common.DefaultQueueReaderID,
TaskCategory: category,
TaskKey: key,
})
Expand All @@ -639,6 +640,7 @@ func (s *ExecutionMutableStateTaskSuite) GetAndCompleteHistoryTask(

_, err = s.ExecutionManager.GetHistoryTask(s.Ctx, &p.GetHistoryTaskRequest{
ShardID: s.ShardID,
ReaderID: common.DefaultQueueReaderID,
TaskCategory: category,
TaskKey: key,
})
Expand Down
19 changes: 13 additions & 6 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (p *ackMgrImpl) getTasks(
replicationTasks := make([]*replicationspb.ReplicationTask, 0, p.pageSize())
skippedTaskCount := 0
lastTaskID := maxTaskID // If no tasks are returned, then it means there are no tasks bellow maxTaskID.
iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, minTaskID, maxTaskID, p.pageSize()))
iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, pollingCluster, minTaskID, maxTaskID, p.pageSize()))
// iter.HasNext() should be the last check to avoid extra page read in case if replicationTasks is already full.
for len(replicationTasks) < p.pageSize() && skippedTaskCount <= p.maxSkipTaskCount() && iter.HasNext() {
task, err := iter.Next()
Expand Down Expand Up @@ -323,17 +323,16 @@ func (p *ackMgrImpl) getTasks(

func (p *ackMgrImpl) getReplicationTasksFn(
ctx context.Context,
pollingCluster string,
minTaskID int64,
maxTaskID int64,
batchSize int,
) collection.PaginationFn[tasks.Task] {
return func(paginationToken []byte) ([]tasks.Task, []byte, error) {
response, err := p.executionMgr.GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{
ShardID: p.shard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
// TODO: need different readerID for different remote clusters
// e.g. use cluster's initial failover version
ReaderID: common.DefaultQueueReaderID,
ShardID: p.shard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
ReaderID: p.clusterToReaderID(pollingCluster),
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: batchSize,
Expand Down Expand Up @@ -750,3 +749,11 @@ func (p *ackMgrImpl) handleReadHistoryError(
return err
}
}

func (p *ackMgrImpl) clusterToReaderID(
pollingCluster string,
) int32 {
// TODO: need different readerID for different remote clusters
// e.g. use cluster's initial failover version
return common.DefaultQueueReaderID
}

0 comments on commit 723ce47

Please sign in to comment.