Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire replication stream caller side #4178

Merged
merged 2 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 94 additions & 26 deletions service/history/api/replication/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination stream_mock.go

package replication

import (
Expand All @@ -34,25 +36,55 @@ import (
"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
historyclient "go.temporal.io/server/client/history"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)

type (
TaskConvertorImpl struct {
Ctx context.Context
Engine shard.Engine
NamespaceCache namespace.Registry
SourceClusterShardCount int32
SourceClusterShardID historyclient.ClusterShardID
}
TaskConvertor interface {
Convert(task tasks.Task) (*replicationspb.ReplicationTask, error)
}
)

func StreamReplicationTasks(
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) error {
sourceClusterInfo, ok := shardContext.GetClusterMetadata().GetAllClusterInfo()[sourceClusterShardID.ClusterName]
if !ok {
return serviceerror.NewInternal(fmt.Sprintf("Unknown cluster: %v", sourceClusterInfo.ClusterID))
}
engine, err := shardContext.GetEngine(server.Context())
if err != nil {
return err
}
filter := &TaskConvertorImpl{
Ctx: server.Context(),
Engine: engine,
NamespaceCache: shardContext.GetNamespaceRegistry(),
SourceClusterShardCount: sourceClusterInfo.ShardCount,
SourceClusterShardID: sourceClusterShardID,
}
errGroup, ctx := errgroup.WithContext(server.Context())
errGroup.Go(func() error {
return recvLoop(ctx, server, shardContext, sourceClusterShardID)
})
errGroup.Go(func() error {
return sendLoop(ctx, server, shardContext, sourceClusterShardID, targetClusterShardID)
return sendLoop(ctx, server, shardContext, filter, sourceClusterShardID)
})
return errGroup.Wait()
}
Expand Down Expand Up @@ -98,33 +130,36 @@ func recvSyncReplicationState(
) error {
lastProcessedMessageID := attr.GetLastProcessedMessageId()
lastProcessedMessageIDTime := attr.GetLastProcessedMessageTime()
if lastProcessedMessageID != persistence.EmptyQueueMessageID {
if err := shardContext.UpdateQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
tasks.NewImmediateKey(lastProcessedMessageID),
); err != nil {
shardContext.GetLogger().Error(
"error updating replication level for shard",
tag.Error(err),
tag.OperationFailed,
)
}
shardContext.UpdateRemoteClusterInfo(
sourceClusterShardID.ClusterName,
lastProcessedMessageID,
*lastProcessedMessageIDTime,
if lastProcessedMessageID == persistence.EmptyQueueMessageID {
return nil
}

// TODO wait for #4176 to be merged and then use cluster & shard ID as reader ID
if err := shardContext.UpdateQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
tasks.NewImmediateKey(lastProcessedMessageID),
); err != nil {
shardContext.GetLogger().Error(
"error updating replication level for shard",
tag.Error(err),
tag.OperationFailed,
)
}
shardContext.UpdateRemoteClusterInfo(
sourceClusterShardID.ClusterName,
lastProcessedMessageID,
*lastProcessedMessageIDTime,
)
return nil
}

func sendLoop(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) error {
engine, err := shardContext.GetEngine(ctx)
if err != nil {
Expand All @@ -137,8 +172,8 @@ func sendLoop(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
)
if err != nil {
shardContext.GetLogger().Error(
Expand All @@ -152,8 +187,8 @@ func sendLoop(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
newTaskNotificationChan,
catchupEndExclusiveWatermark,
); err != nil {
Expand All @@ -172,9 +207,10 @@ func sendCatchUp(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
) (int64, error) {
// TODO wait for #4176 to be merged and then use cluster & shard ID as reader ID
catchupBeginInclusiveWatermark := shardContext.GetQueueClusterAckLevel(
tasks.CategoryReplication,
sourceClusterShardID.ClusterName,
Expand All @@ -184,8 +220,8 @@ func sendCatchUp(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
catchupBeginInclusiveWatermark.TaskID,
catchupEndExclusiveWatermark.TaskID,
); err != nil {
Expand All @@ -198,8 +234,8 @@ func sendLive(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
newTaskNotificationChan <-chan struct{},
beginInclusiveWatermark int64,
) error {
Expand All @@ -211,8 +247,8 @@ func sendLive(
ctx,
server,
shardContext,
taskConvertor,
sourceClusterShardID,
targetClusterShardID,
beginInclusiveWatermark,
endExclusiveWatermark,
); err != nil {
Expand All @@ -229,8 +265,8 @@ func sendTasks(
ctx context.Context,
server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer,
shardContext shard.Context,
taskConvertor TaskConvertor,
sourceClusterShardID historyclient.ClusterShardID,
targetClusterShardID historyclient.ClusterShardID,
beginInclusiveWatermark int64,
endExclusiveWatermark int64,
) error {
Expand Down Expand Up @@ -261,7 +297,7 @@ Loop:
if err != nil {
return err
}
task, err := engine.ConvertReplicationTask(ctx, item)
task, err := taskConvertor.Convert(item)
if err != nil {
return err
}
Expand Down Expand Up @@ -290,3 +326,35 @@ Loop:
},
})
}

func (f *TaskConvertorImpl) Convert(
task tasks.Task,
) (*replicationspb.ReplicationTask, error) {
if namespaceEntry, err := f.NamespaceCache.GetNamespaceByID(
namespace.ID(task.GetNamespaceID()),
); err == nil {
shouldProcessTask := false
FilterLoop:
for _, targetCluster := range namespaceEntry.ClusterNames() {
if f.SourceClusterShardID.ClusterName == targetCluster {
shouldProcessTask = true
break FilterLoop
Copy link
Contributor

@hehaifengcn hehaifengcn Apr 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why break with label? and can you add some comments on why the task should be processed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me break with label will make the logic easier to read

}
}
if !shouldProcessTask {
return nil, nil
}
}
// if there is error, then blindly send the task, better safe than sorry

sourceShardID := common.WorkflowIDToHistoryShard(task.GetNamespaceID(), task.GetWorkflowID(), f.SourceClusterShardCount)
if sourceShardID != f.SourceClusterShardID.ShardID {
return nil, nil
}

replicationTask, err := f.Engine.ConvertReplicationTask(f.Ctx, task)
if err != nil {
return nil, err
}
return replicationTask, nil
}
75 changes: 75 additions & 0 deletions service/history/api/replication/stream_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading