Skip to content

Commit

Permalink
Add namespace migration support for replication stream (#4488)
Browse files Browse the repository at this point in the history
* Add namespace migration support for replication stream, specifically supporting migration between clusters with different # of shards
  • Loading branch information
wxing1292 committed Jun 14, 2023
1 parent f178ca7 commit 31fa19d
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 28 deletions.
5 changes: 2 additions & 3 deletions service/history/replication/stream_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,11 @@ func (s *StreamSenderImpl) recvSyncReplicationState(
); err != nil {
return err
}
s.shardContext.UpdateRemoteClusterInfo(
string(s.clientShardKey.ClusterID),
return s.shardContext.UpdateRemoteReaderInfo(
readerID,
inclusiveLowWatermark-1,
*inclusiveLowWatermarkTime,
)
return nil
}

func (s *StreamSenderImpl) sendCatchUp() (int64, error) {
Expand Down
6 changes: 3 additions & 3 deletions service/history/replication/stream_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ func (s *streamSenderSuite) TestRecvSyncReplicationState_Success() {
}},
},
).Return(nil)
s.shardContext.EXPECT().UpdateRemoteClusterInfo(
string(s.clientShardKey.ClusterID),
s.shardContext.EXPECT().UpdateRemoteReaderInfo(
readerID,
replicationState.InclusiveLowWatermark-1,
*replicationState.InclusiveLowWatermarkTime,
)
).Return(nil)

err := s.streamSender.recvSyncReplicationState(replicationState)
s.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type (
UpdateReplicatorDLQAckLevel(sourCluster string, ackLevel int64) error

UpdateRemoteClusterInfo(cluster string, ackTaskID int64, ackTimestamp time.Time)
UpdateRemoteReaderInfo(readerID int64, ackTaskID int64, ackTimestamp time.Time) error

SetCurrentTime(cluster string, currentTime time.Time)
GetCurrentTime(cluster string) time.Time
Expand Down
88 changes: 66 additions & 22 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ type (
}

remoteClusterInfo struct {
CurrentTime time.Time
AckedReplicationTaskID int64
AckedReplicationTimestamp time.Time
CurrentTime time.Time
AckedReplicationTaskIDs map[int32]int64
AckedReplicationTimestamps map[int32]time.Time
}

namespaceHandOverInfo struct {
Expand Down Expand Up @@ -373,6 +373,8 @@ func (s *ContextImpl) UpdateReplicationQueueReaderState(
return s.updateShardInfoLocked()
}

// UpdateRemoteClusterInfo deprecated
// Deprecated use UpdateRemoteReaderInfo in the future instead
func (s *ContextImpl) UpdateRemoteClusterInfo(
cluster string,
ackTaskID int64,
Expand All @@ -382,8 +384,29 @@ func (s *ContextImpl) UpdateRemoteClusterInfo(
defer s.wUnlock()

remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(cluster)
remoteClusterInfo.AckedReplicationTaskID = ackTaskID
remoteClusterInfo.AckedReplicationTimestamp = ackTimestamp
remoteClusterInfo.AckedReplicationTaskIDs[s.shardID] = ackTaskID
remoteClusterInfo.AckedReplicationTimestamps[s.shardID] = ackTimestamp
}

func (s *ContextImpl) UpdateRemoteReaderInfo(
readerID int64,
ackTaskID int64,
ackTimestamp time.Time,
) error {
clusterID, shardID := ReplicationReaderIDToClusterShardID(readerID)
clusterName, _, ok := ClusterNameInfoFromClusterID(s.clusterMetadata.GetAllClusterInfo(), clusterID)
if !ok {
// cluster is not present in cluster metadata map
return serviceerror.NewInternal(fmt.Sprintf("unknown cluster ID: %v", clusterID))
}

s.wLock()
defer s.wUnlock()

remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(clusterName)
remoteClusterInfo.AckedReplicationTaskIDs[shardID] = ackTaskID
remoteClusterInfo.AckedReplicationTimestamps[shardID] = ackTimestamp
return nil
}

func (s *ContextImpl) GetReplicatorDLQAckLevel(sourceCluster string) int64 {
Expand Down Expand Up @@ -1646,7 +1669,11 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
)

if clusterName != currentClusterName {
remoteClusterInfos[clusterName] = &remoteClusterInfo{CurrentTime: maxReadTime}
remoteClusterInfos[clusterName] = &remoteClusterInfo{
CurrentTime: maxReadTime,
AckedReplicationTaskIDs: make(map[int32]int64),
AckedReplicationTimestamps: make(map[int32]time.Time),
}
}
}

Expand All @@ -1660,29 +1687,45 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
return nil
}

func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) {
func (s *ContextImpl) GetReplicationStatus(clusterNames []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) {
remoteClusters := make(map[string]*historyservice.ShardReplicationStatusPerCluster)
handoverNamespaces := make(map[string]*historyservice.HandoverNamespaceInfo)
clusterInfo := s.clusterMetadata.GetAllClusterInfo()
s.rLock()
defer s.rUnlock()

if len(cluster) == 0 {
// remote acked info for all known remote clusters
for k, v := range s.remoteClusterInfos {
remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{
AckedTaskId: v.AckedReplicationTaskID,
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp),
}
if len(clusterNames) == 0 {
for clusterName := range clusterInfo {
clusterNames = append(clusterNames, clusterName)
}
} else {
for _, k := range cluster {
if v, ok := s.remoteClusterInfos[k]; ok {
remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{
AckedTaskId: v.AckedReplicationTaskID,
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp),
}
}

for _, clusterName := range clusterNames {
if _, ok := clusterInfo[clusterName]; !ok {
continue
}
v, ok := s.remoteClusterInfos[clusterName]
if !ok {
continue
}

var taskID *int64
var ackTime *time.Time
for _, remoteShardID := range common.MapShardID(
clusterInfo[s.clusterMetadata.GetCurrentClusterName()].ShardCount,
clusterInfo[clusterName].ShardCount,
s.shardID,
) {
if taskID == nil || v.AckedReplicationTaskIDs[remoteShardID] < *taskID {
taskID = convert.Int64Ptr(v.AckedReplicationTaskIDs[remoteShardID])
ackTime = timestamp.TimePtr(v.AckedReplicationTimestamps[remoteShardID])
}
}

remoteClusters[clusterName] = &historyservice.ShardReplicationStatusPerCluster{
AckedTaskId: *taskID,
AckedTaskVisibilityTime: ackTime,
}
}

for k, v := range s.handoverNamespaces {
Expand All @@ -1699,7 +1742,8 @@ func (s *ContextImpl) getOrUpdateRemoteClusterInfoLocked(clusterName string) *re
return info
}
info := &remoteClusterInfo{
AckedReplicationTaskID: persistence.EmptyQueueMessageID,
AckedReplicationTaskIDs: make(map[int32]int64),
AckedReplicationTimestamps: make(map[int32]time.Time),
}
s.remoteClusterInfos[clusterName] = info
return info
Expand Down
14 changes: 14 additions & 0 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 31fa19d

Please sign in to comment.