Skip to content

Commit

Permalink
Wire replication stream caller side (#4178)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Apr 24, 2023
1 parent a52bb73 commit b97be78
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 42 deletions.
120 changes: 94 additions & 26 deletions service/history/api/replication/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination stream_mock.go

package replication

import (
Expand All @@ -34,25 +36,55 @@ import (
"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
historyclient "go.temporal.io/server/client/history"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)

type (
TaskConvertorImpl struct {
Ctx context.Context
Engine shard.Engine
NamespaceCache namespace.Registry
SourceClusterShardCount int32
SourceClusterShardID historyclient.ClusterShardID
}
TaskConvertor interface {
Convert(task tasks.Task) (*replicationspb.ReplicationTask, error)
}
)

func StreamReplicationTasks(
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) error {
sourceClusterInfo, ok := shardContext.GetClusterMetadata().GetAllClusterInfo()[sourceClusterShardID.ClusterName]
if !ok {
return serviceerror.NewInternal(fmt.Sprintf("Unknown cluster: %v", sourceClusterInfo.ClusterID))
}
engine, err := shardContext.GetEngine(server.Context())
if err != nil {
return err
}
filter := &TaskConvertorImpl{
Ctx: server.Context(),
Engine: engine,
NamespaceCache: shardContext.GetNamespaceRegistry(),
SourceClusterShardCount: sourceClusterInfo.ShardCount,
SourceClusterShardID: sourceClusterShardID,
}
errGroup, ctx := errgroup.WithContext(server.Context())
errGroup.Go(func() error {
return recvLoop(ctx, server, shardContext, sourceClusterShardID)
})
errGroup.Go(func() error {
return sendLoop(ctx, server, shardContext, sourceClusterShardID, targetClusterShardID)
return sendLoop(ctx, server, shardContext, filter, sourceClusterShardID)
})
return errGroup.Wait()
}
Expand Down Expand Up @@ -98,33 +130,36 @@ func recvSyncReplicationState(
) error {
lastProcessedMessageID := attr.GetLastProcessedMessageId()
lastProcessedMessageIDTime := attr.GetLastProcessedMessageTime()
if lastProcessedMessageID != persistence.EmptyQueueMessageID {
if err := shardContext.UpdateQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
tasks.NewImmediateKey(lastProcessedMessageID),
); err != nil {
shardContext.GetLogger().Error(
"error updating replication level for shard",
tag.Error(err),
tag.OperationFailed,
)
}
shardContext.UpdateRemoteClusterInfo(
sourceClusterShardID.ClusterName,
lastProcessedMessageID,
*lastProcessedMessageIDTime,
if lastProcessedMessageID == persistence.EmptyQueueMessageID {
return nil
}

// TODO wait for #4176 to be merged and then use cluster & shard ID as reader ID
if err := shardContext.UpdateQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
tasks.NewImmediateKey(lastProcessedMessageID),
); err != nil {
shardContext.GetLogger().Error(
"error updating replication level for shard",
tag.Error(err),
tag.OperationFailed,
)
}
shardContext.UpdateRemoteClusterInfo(
sourceClusterShardID.ClusterName,
lastProcessedMessageID,
*lastProcessedMessageIDTime,
)
return nil
}

func sendLoop(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) error {
engine, err := shardContext.GetEngine(ctx)
if err != nil {
Expand All @@ -137,8 +172,8 @@ func sendLoop(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
)
if err != nil {
shardContext.GetLogger().Error(
Expand All @@ -152,8 +187,8 @@ func sendLoop(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
newTaskNotificationChan,
catchupEndExclusiveWatermark,
); err != nil {
Expand All @@ -172,9 +207,10 @@ func sendCatchUp(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) (int64, error) {
// TODO wait for #4176 to be merged and then use cluster & shard ID as reader ID
catchupBeginInclusiveWatermark := shardContext.GetQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
Expand All @@ -184,8 +220,8 @@ func sendCatchUp(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
catchupBeginInclusiveWatermark.TaskID,
catchupEndExclusiveWatermark.TaskID,
); err != nil {
Expand All @@ -198,8 +234,8 @@ func sendLive(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
newTaskNotificationChan <-chan struct{},
beginInclusiveWatermark int64,
) error {
Expand All @@ -211,8 +247,8 @@ func sendLive(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
); err != nil {
Expand All @@ -229,8 +265,8 @@ func sendTasks(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
beginInclusiveWatermark int64,
endExclusiveWatermark int64,
) error {
Expand Down Expand Up @@ -261,7 +297,7 @@ Loop:
if err != nil {
return err
}
task, err := engine.ConvertReplicationTask(ctx, item)
task, err := taskConvertor.Convert(item)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,3 +326,35 @@ Loop:
},
})
}

func (f *TaskConvertorImpl) Convert(
task tasks.Task,
) (*replicationspb.ReplicationTask, error) {
if namespaceEntry, err := f.NamespaceCache.GetNamespaceByID(
namespace.ID(task.GetNamespaceID()),
); err == nil {
shouldProcessTask := false
FilterLoop:
for _, targetCluster := range namespaceEntry.ClusterNames() {
if f.SourceClusterShardID.ClusterName == targetCluster {
shouldProcessTask = true
break FilterLoop
}
}
if !shouldProcessTask {
return nil, nil
}
}
// if there is error, then blindly send the task, better safe than sorry

sourceShardID := common.WorkflowIDToHistoryShard(task.GetNamespaceID(), task.GetWorkflowID(), f.SourceClusterShardCount)
if sourceShardID != f.SourceClusterShardID.ShardID {
return nil, nil
}

replicationTask, err := f.Engine.ConvertReplicationTask(f.Ctx, task)
if err != nil {
return nil, err
}
return replicationTask, nil
}
75 changes: 75 additions & 0 deletions service/history/api/replication/stream_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b97be78

Please sign in to comment.