diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index 4e2cceb56e7..397b4bd56e4 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -231,12 +231,11 @@ func (s *StreamSenderImpl) recvSyncReplicationState( ); err != nil { return err } - s.shardContext.UpdateRemoteClusterInfo( - string(s.clientShardKey.ClusterID), + return s.shardContext.UpdateRemoteReaderInfo( + readerID, inclusiveLowWatermark-1, *inclusiveLowWatermarkTime, ) - return nil } func (s *StreamSenderImpl) sendCatchUp() (int64, error) { diff --git a/service/history/replication/stream_sender_test.go b/service/history/replication/stream_sender_test.go index 86c6b7529be..4b709a86ea0 100644 --- a/service/history/replication/stream_sender_test.go +++ b/service/history/replication/stream_sender_test.go @@ -138,11 +138,11 @@ func (s *streamSenderSuite) TestRecvSyncReplicationState_Success() { }}, }, ).Return(nil) - s.shardContext.EXPECT().UpdateRemoteClusterInfo( - string(s.clientShardKey.ClusterID), + s.shardContext.EXPECT().UpdateRemoteReaderInfo( + readerID, replicationState.InclusiveLowWatermark-1, *replicationState.InclusiveLowWatermarkTime, - ) + ).Return(nil) err := s.streamSender.recvSyncReplicationState(replicationState) s.NoError(err) diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 7f70282b37c..6f323094c67 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -94,6 +94,7 @@ type ( UpdateReplicatorDLQAckLevel(sourCluster string, ackLevel int64) error UpdateRemoteClusterInfo(cluster string, ackTaskID int64, ackTimestamp time.Time) + UpdateRemoteReaderInfo(readerID int64, ackTaskID int64, ackTimestamp time.Time) error SetCurrentTime(cluster string, currentTime time.Time) GetCurrentTime(cluster string) time.Time diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 29cf9fa6779..da54c3dc99d 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -147,9 +147,9 @@ type ( } remoteClusterInfo struct { - CurrentTime time.Time - AckedReplicationTaskID int64 - AckedReplicationTimestamp time.Time + CurrentTime time.Time + AckedReplicationTaskIDs map[int32]int64 + AckedReplicationTimestamps map[int32]time.Time } namespaceHandOverInfo struct { @@ -373,6 +373,8 @@ func (s *ContextImpl) UpdateReplicationQueueReaderState( return s.updateShardInfoLocked() } +// UpdateRemoteClusterInfo deprecated +// Deprecated use UpdateRemoteReaderInfo in the future instead func (s *ContextImpl) UpdateRemoteClusterInfo( cluster string, ackTaskID int64, @@ -382,8 +384,29 @@ func (s *ContextImpl) UpdateRemoteClusterInfo( defer s.wUnlock() remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(cluster) - remoteClusterInfo.AckedReplicationTaskID = ackTaskID - remoteClusterInfo.AckedReplicationTimestamp = ackTimestamp + remoteClusterInfo.AckedReplicationTaskIDs[s.shardID] = ackTaskID + remoteClusterInfo.AckedReplicationTimestamps[s.shardID] = ackTimestamp +} + +func (s *ContextImpl) UpdateRemoteReaderInfo( + readerID int64, + ackTaskID int64, + ackTimestamp time.Time, +) error { + clusterID, shardID := ReplicationReaderIDToClusterShardID(readerID) + clusterName, _, ok := ClusterNameInfoFromClusterID(s.clusterMetadata.GetAllClusterInfo(), clusterID) + if !ok { + // cluster is not present in cluster metadata map + return serviceerror.NewInternal(fmt.Sprintf("unknown cluster ID: %v", clusterID)) + } + + s.wLock() + defer s.wUnlock() + + remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(clusterName) + remoteClusterInfo.AckedReplicationTaskIDs[shardID] = ackTaskID + remoteClusterInfo.AckedReplicationTimestamps[shardID] = ackTimestamp + return nil } func (s *ContextImpl) GetReplicatorDLQAckLevel(sourceCluster string) int64 { @@ -1644,7 +1667,11 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { ) if clusterName != currentClusterName { - remoteClusterInfos[clusterName] = &remoteClusterInfo{CurrentTime: maxReadTime} + remoteClusterInfos[clusterName] = &remoteClusterInfo{ + CurrentTime: maxReadTime, + AckedReplicationTaskIDs: make(map[int32]int64), + AckedReplicationTimestamps: make(map[int32]time.Time), + } } } @@ -1658,29 +1685,45 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error { return nil } -func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) { +func (s *ContextImpl) GetReplicationStatus(clusterNames []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) { remoteClusters := make(map[string]*historyservice.ShardReplicationStatusPerCluster) handoverNamespaces := make(map[string]*historyservice.HandoverNamespaceInfo) + clusterInfo := s.clusterMetadata.GetAllClusterInfo() s.rLock() defer s.rUnlock() - if len(cluster) == 0 { - // remote acked info for all known remote clusters - for k, v := range s.remoteClusterInfos { - remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{ - AckedTaskId: v.AckedReplicationTaskID, - AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp), - } + if len(clusterNames) == 0 { + for clusterName := range clusterInfo { + clusterNames = append(clusterNames, clusterName) } - } else { - for _, k := range cluster { - if v, ok := s.remoteClusterInfos[k]; ok { - remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{ - AckedTaskId: v.AckedReplicationTaskID, - AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp), - } + } + + for _, clusterName := range clusterNames { + if _, ok := clusterInfo[clusterName]; !ok { + continue + } + v, ok := s.remoteClusterInfos[clusterName] + if !ok { + continue + } + + var taskID *int64 + var ackTime *time.Time + for _, remoteShardID := range common.MapShardID( + clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount, + clusterInfo[clusterName].ShardCount, + s.shardID, + ) { + if taskID == nil || v.AckedReplicationTaskIDs[remoteShardID] < *taskID { + taskID = convert.Int64Ptr(v.AckedReplicationTaskIDs[remoteShardID]) + ackTime = timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID]) } } + + remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{ + AckedTaskId: *taskID, + AckedTaskVisibilityTime: ackTime, + } } for k, v := range s.handoverNamespaces { @@ -1697,7 +1740,8 @@ func (s *ContextImpl) getOrUpdateRemoteClusterInfoLocked(clusterName string) *re return info } info := &remoteClusterInfo{ - AckedReplicationTaskID: persistence.EmptyQueueMessageID, + AckedReplicationTaskIDs: make(map[int32]int64), + AckedReplicationTimestamps: make(map[int32]time.Time), } s.remoteClusterInfos[clusterName] = info return info diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 0b8c9c4e5a6..3ea88e5a5cd 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -685,6 +685,20 @@ func (mr *MockContextMockRecorder) UpdateRemoteClusterInfo(cluster, ackTaskID, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateRemoteClusterInfo", reflect.TypeOf((*MockContext)(nil).UpdateRemoteClusterInfo), cluster, ackTaskID, ackTimestamp) } +// UpdateRemoteReaderInfo mocks base method. +func (m *MockContext) UpdateRemoteReaderInfo(readerID, ackTaskID int64, ackTimestamp time.Time) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateRemoteReaderInfo", readerID, ackTaskID, ackTimestamp) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateRemoteReaderInfo indicates an expected call of UpdateRemoteReaderInfo. +func (mr *MockContextMockRecorder) UpdateRemoteReaderInfo(readerID, ackTaskID, ackTimestamp interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateRemoteReaderInfo", reflect.TypeOf((*MockContext)(nil).UpdateRemoteReaderInfo), readerID, ackTaskID, ackTimestamp) +} + // UpdateReplicationQueueReaderState mocks base method. func (m *MockContext) UpdateReplicationQueueReaderState(readerID int64, readerState *v13.QueueReaderState) error { m.ctrl.T.Helper()