diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 6f74e951bfc..bc16fda4778 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -383,16 +383,23 @@ func (s *ContextImpl) UpdateReplicationQueueReaderState( // UpdateRemoteClusterInfo deprecated // Deprecated use UpdateRemoteReaderInfo in the future instead func (s *ContextImpl) UpdateRemoteClusterInfo( - cluster string, + clusterName string, ackTaskID int64, ackTimestamp time.Time, ) { s.wLock() defer s.wUnlock() - remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(cluster) - remoteClusterInfo.AckedReplicationTaskIDs[s.shardID] = ackTaskID - remoteClusterInfo.AckedReplicationTimestamps[s.shardID] = ackTimestamp + clusterInfo := s.clusterMetadata.GetAllClusterInfo() + remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(clusterName) + for _, remoteShardID := range common.MapShardID( + clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount, + clusterInfo[clusterName].ShardCount, + s.shardID, + ) { + remoteClusterInfo.AckedReplicationTaskIDs[remoteShardID] = ackTaskID + remoteClusterInfo.AckedReplicationTimestamps[remoteShardID] = ackTimestamp + } } // UpdateRemoteReaderInfo do not use streaming replication until remoteClusterInfo is updated to allow both @@ -1716,10 +1723,27 @@ func (s *ContextImpl) GetReplicationStatus(clusterNames []string) (map[string]*h continue } - remoteShardID := s.shardID - remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{ - AckedTaskId: v.AckedReplicationTaskIDs[remoteShardID], - AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID]), + for _, remoteShardID := range common.MapShardID( + clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount, + clusterInfo[clusterName].ShardCount, + s.shardID, + ) { + ackTaskID := v.AckedReplicationTaskIDs[remoteShardID] // default to 0 + ackTimestamp := v.AckedReplicationTimestamps[remoteShardID] + if ackTimestamp.IsZero() { + ackTimestamp = time.Unix(0, 0) + } + if record, ok := remoteClusters[clusterName]; !ok { + remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{ + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + } + } else if record.AckedTaskId > ackTaskID { + remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{ + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + } + } } } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index 700c4351802..dd9bfebc0e7 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -28,6 +28,7 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" @@ -36,6 +37,7 @@ import ( "github.com/stretchr/testify/suite" "go.temporal.io/api/enums/v1" + "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" @@ -54,6 +56,7 @@ type ( *require.Assertions controller *gomock.Controller + shardID int32 mockShard *ContextTest mockClusterMetadata *cluster.MockMetadata mockShardManager *persistence.MockShardManager @@ -75,11 +78,12 @@ func (s *contextSuite) SetupTest() { s.controller = gomock.NewController(s.T()) + s.shardID = 1 s.timeSource = clock.NewEventTimeSource() shardContext := NewTestContextWithTimeSource( s.controller, &persistencespb.ShardInfo{ - ShardId: 0, + ShardId: s.shardID, RangeId: 1, }, tests.NewDynamicConfig(), @@ -499,3 +503,178 @@ func (s *contextSuite) TestHandoverNamespace() { _, ok = handoverNS[namespaceEntry.Name().String()] s.False(ok) } + +func (s *contextSuite) TestUpdateGetRemoteClusterInfo_Legacy_8_4() { + clusterMetadata := cluster.NewMockMetadata(s.controller) + clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{ + cluster.TestCurrentClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion, + RPCAddress: cluster.TestCurrentClusterFrontendAddress, + ShardCount: 8, + }, + cluster.TestAlternativeClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion, + RPCAddress: cluster.TestAlternativeClusterFrontendAddress, + ShardCount: 4, + }, + }).AnyTimes() + s.mockShard.clusterMetadata = clusterMetadata + + ackTaskID := rand.Int63() + ackTimestamp := time.Unix(0, rand.Int63()) + s.mockShard.UpdateRemoteClusterInfo( + cluster.TestAlternativeClusterName, + ackTaskID, + ackTimestamp, + ) + remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName}) + s.NoError(err) + s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{ + cluster.TestAlternativeClusterName: { + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + }, + }, remoteAckStatus) +} + +func (s *contextSuite) TestUpdateGetRemoteClusterInfo_Legacy_4_8() { + clusterMetadata := cluster.NewMockMetadata(s.controller) + clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{ + cluster.TestCurrentClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion, + RPCAddress: cluster.TestCurrentClusterFrontendAddress, + ShardCount: 4, + }, + cluster.TestAlternativeClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion, + RPCAddress: cluster.TestAlternativeClusterFrontendAddress, + ShardCount: 8, + }, + }).AnyTimes() + s.mockShard.clusterMetadata = clusterMetadata + + ackTaskID := rand.Int63() + ackTimestamp := time.Unix(0, rand.Int63()) + s.mockShard.UpdateRemoteClusterInfo( + cluster.TestAlternativeClusterName, + ackTaskID, + ackTimestamp, + ) + remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName}) + s.NoError(err) + s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{ + cluster.TestAlternativeClusterName: { + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + }, + }, remoteAckStatus) +} + +func (s *contextSuite) TestUpdateGetRemoteReaderInfo_8_4() { + clusterMetadata := cluster.NewMockMetadata(s.controller) + clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{ + cluster.TestCurrentClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion, + RPCAddress: cluster.TestCurrentClusterFrontendAddress, + ShardCount: 8, + }, + cluster.TestAlternativeClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion, + RPCAddress: cluster.TestAlternativeClusterFrontendAddress, + ShardCount: 4, + }, + }).AnyTimes() + s.mockShard.clusterMetadata = clusterMetadata + + ackTaskID := rand.Int63() + ackTimestamp := time.Unix(0, rand.Int63()) + err := s.mockShard.UpdateRemoteReaderInfo( + ReplicationReaderIDFromClusterShardID( + cluster.TestAlternativeClusterInitialFailoverVersion, + 1, + ), + ackTaskID, + ackTimestamp, + ) + s.NoError(err) + remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName}) + s.NoError(err) + s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{ + cluster.TestAlternativeClusterName: { + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + }, + }, remoteAckStatus) +} + +func (s *contextSuite) TestUpdateGetRemoteReaderInfo_4_8() { + clusterMetadata := cluster.NewMockMetadata(s.controller) + clusterMetadata.EXPECT().GetClusterID().Return(cluster.TestCurrentClusterInitialFailoverVersion).AnyTimes() + clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + clusterMetadata.EXPECT().GetAllClusterInfo().Return(map[string]cluster.ClusterInformation{ + cluster.TestCurrentClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestCurrentClusterInitialFailoverVersion, + RPCAddress: cluster.TestCurrentClusterFrontendAddress, + ShardCount: 4, + }, + cluster.TestAlternativeClusterName: { + Enabled: true, + InitialFailoverVersion: cluster.TestAlternativeClusterInitialFailoverVersion, + RPCAddress: cluster.TestAlternativeClusterFrontendAddress, + ShardCount: 8, + }, + }).AnyTimes() + s.mockShard.clusterMetadata = clusterMetadata + + ack1TaskID := rand.Int63() + ack1Timestamp := time.Unix(0, rand.Int63()) + err := s.mockShard.UpdateRemoteReaderInfo( + ReplicationReaderIDFromClusterShardID( + cluster.TestAlternativeClusterInitialFailoverVersion, + 1, // maps to local shard 1 + ), + ack1TaskID, + ack1Timestamp, + ) + s.NoError(err) + ack5TaskID := rand.Int63() + ack5Timestamp := time.Unix(0, rand.Int63()) + err = s.mockShard.UpdateRemoteReaderInfo( + ReplicationReaderIDFromClusterShardID( + cluster.TestAlternativeClusterInitialFailoverVersion, + 5, // maps to local shard 1 + ), + ack5TaskID, + ack5Timestamp, + ) + s.NoError(err) + + ackTaskID := ack1TaskID + ackTimestamp := ack1Timestamp + if ackTaskID > ack5TaskID { + ackTaskID = ack5TaskID + ackTimestamp = ack5Timestamp + } + + remoteAckStatus, _, err := s.mockShard.GetReplicationStatus([]string{cluster.TestAlternativeClusterName}) + s.NoError(err) + s.Equal(map[string]*historyservice.ShardReplicationStatusPerCluster{ + cluster.TestAlternativeClusterName: { + AckedTaskId: ackTaskID, + AckedTaskVisibilityTime: timestamp.TimePtr(ackTimestamp), + }, + }, remoteAckStatus) +}