Skip to content

Commit

Permalink
Use dedicated proto definition for replication stream (#4106)
Browse files Browse the repository at this point in the history
* Use dedicated proto definition for replication stream
  • Loading branch information
wxing1292 authored Mar 28, 2023
1 parent afc5ac7 commit a20faeb
Show file tree
Hide file tree
Showing 9 changed files with 1,031 additions and 843 deletions.
536 changes: 228 additions & 308 deletions api/adminservice/v1/request_response.pb.go

Large diffs are not rendered by default.

681 changes: 300 additions & 381 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

595 changes: 472 additions & 123 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,13 @@ message DeleteWorkflowExecutionResponse {
}

message StreamWorkflowReplicationMessagesRequest {
int32 shard_id = 1;
oneof attributes {
temporal.server.api.replication.v1.SyncReplicationState sync_replication_state = 2;
temporal.server.api.replication.v1.SyncReplicationState sync_replication_state = 1;
}
}

message StreamWorkflowReplicationMessagesResponse {
int32 shard_id = 1;
oneof attributes {
temporal.server.api.replication.v1.ReplicationMessages replication_messages = 2;
temporal.server.api.replication.v1.WorkflowReplicationMessages messages = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,13 @@ message UpdateWorkflowExecutionResponse {
}

message StreamWorkflowReplicationMessagesRequest {
int32 shard_id = 1;
oneof attributes {
temporal.server.api.replication.v1.SyncReplicationState sync_replication_state = 2;
temporal.server.api.replication.v1.SyncReplicationState sync_replication_state = 1;
}
}

message StreamWorkflowReplicationMessagesResponse {
int32 shard_id = 1;
oneof attributes {
temporal.server.api.replication.v1.ReplicationMessages replication_messages = 2;
temporal.server.api.replication.v1.WorkflowReplicationMessages messages = 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ message ReplicationMessages {
SyncShardStatus sync_shard_status = 4;
}

message WorkflowReplicationMessages {
repeated ReplicationTask replication_tasks = 1;
// This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).
int64 last_task_id = 2;
google.protobuf.Timestamp last_task_time = 3 [(gogoproto.stdtime) = true];
}

message ReplicationTaskInfo {
string namespace_id = 1;
string workflow_id = 2;
Expand Down
8 changes: 3 additions & 5 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,6 @@ func (adh *AdminHandler) StreamWorkflowReplicationMessages(
switch attr := req.GetAttributes().(type) {
case *adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState:
if err = sourceCluster.Send(&historyservice.StreamWorkflowReplicationMessagesRequest{
ShardId: req.ShardId,
Attributes: &historyservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState{
SyncReplicationState: attr.SyncReplicationState,
},
Expand All @@ -1992,11 +1991,10 @@ func (adh *AdminHandler) StreamWorkflowReplicationMessages(
return err
}
switch attr := resp.GetAttributes().(type) {
case *historyservice.StreamWorkflowReplicationMessagesResponse_ReplicationMessages:
case *historyservice.StreamWorkflowReplicationMessagesResponse_Messages:
if err = targetCluster.Send(&adminservice.StreamWorkflowReplicationMessagesResponse{
ShardId: resp.ShardId,
Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_ReplicationMessages{
ReplicationMessages: attr.ReplicationMessages,
Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: attr.Messages,
},
}); err != nil {
return err
Expand Down
21 changes: 10 additions & 11 deletions service/history/replication/stream_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ import (
)

const (
sendStatusInterval = 30 * time.Second
sendStatusInterval = 1 * time.Second
)

type (
ClusterShardKey struct {
ClusterName string
ShardID int32
}
ClusterShardKeyPair struct {
Source ClusterShardKey
Target ClusterShardKey
}

Stream BiDirectionStream[*adminservice.StreamWorkflowReplicationMessagesRequest, *adminservice.StreamWorkflowReplicationMessagesResponse]
StreamReceiver struct {
Expand Down Expand Up @@ -132,10 +136,6 @@ func (r *StreamReceiver) IsValid() bool {
return atomic.LoadInt32(&r.status) != common.DaemonStatusStopped
}

func (r *StreamReceiver) Key() ClusterShardKey {
return r.targetShardKey
}

func (r *StreamReceiver) sendEventLoop() {
defer r.Stop()
ticker := time.NewTicker(sendStatusInterval)
Expand Down Expand Up @@ -164,7 +164,7 @@ func (r *StreamReceiver) recvEventLoop() {
_ = r.processMessages(stream)

r.Lock()
r.stream = newStream(
r.stream = newStream( // TODO add exp backoff
r.ProcessToolBox,
r.sourceShardKey,
r.targetShardKey,
Expand All @@ -181,7 +181,6 @@ func (r *StreamReceiver) ackMessage(
return
}
if err := stream.Send(&adminservice.StreamWorkflowReplicationMessagesRequest{
ShardId: r.targetShardKey.ShardID,
Attributes: &adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState{
SyncReplicationState: &repicationpb.SyncReplicationState{
LastProcessedMessageId: watermarkInfo.Watermark,
Expand All @@ -207,11 +206,11 @@ func (r *StreamReceiver) processMessages(
return streamResp.Err
}
tasks := r.ConvertTasks(
r.sourceShardKey.ClusterName,
streamResp.Resp.GetReplicationMessages().ReplicationTasks...,
r.targetShardKey.ClusterName, // data come from target
streamResp.Resp.GetMessages().ReplicationTasks...,
)
highWatermark := streamResp.Resp.GetReplicationMessages().LastRetrievedMessageId
highWatermarkTime := time.Now() // TODO this should be passed from src
highWatermark := streamResp.Resp.GetMessages().LastTaskId
highWatermarkTime := timestamp.TimeValue(streamResp.Resp.GetMessages().LastTaskTime)
r.taskTracker.TrackTasks(WatermarkInfo{
Watermark: highWatermark,
Timestamp: highWatermarkTime,
Expand Down
14 changes: 7 additions & 7 deletions service/history/replication/stream_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (s *streamReceiverSuite) TestAckMessage_SyncStatus() {
s.streamReceiver.ackMessage(s.stream)

s.Equal([]*adminservice.StreamWorkflowReplicationMessagesRequest{{
ShardId: s.streamReceiver.targetShardKey.ShardID,
Attributes: &adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState{
SyncReplicationState: &repicationpb.SyncReplicationState{
LastProcessedMessageId: watermarkInfo.Watermark,
Expand All @@ -141,11 +140,11 @@ func (s *streamReceiverSuite) TestProcessMessage_TrackSubmit() {
}
streamResp := StreamResp[*adminservice.StreamWorkflowReplicationMessagesResponse]{
Resp: &adminservice.StreamWorkflowReplicationMessagesResponse{
ShardId: s.streamReceiver.sourceShardKey.ShardID,
Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_ReplicationMessages{
ReplicationMessages: &repicationpb.ReplicationMessages{
LastRetrievedMessageId: rand.Int63(),
ReplicationTasks: []*repicationpb.ReplicationTask{replicationTask},
Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &repicationpb.WorkflowReplicationMessages{
ReplicationTasks: []*repicationpb.ReplicationTask{replicationTask},
LastTaskId: rand.Int63(),
LastTaskTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
},
},
},
Expand All @@ -156,7 +155,8 @@ func (s *streamReceiverSuite) TestProcessMessage_TrackSubmit() {

s.taskTracker.EXPECT().TrackTasks(gomock.Any(), gomock.Any()).Do(
func(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) {
s.Equal(streamResp.Resp.GetReplicationMessages().LastRetrievedMessageId, highWatermarkInfo.Watermark)
s.Equal(streamResp.Resp.GetMessages().LastTaskId, highWatermarkInfo.Watermark)
s.Equal(*streamResp.Resp.GetMessages().LastTaskTime, highWatermarkInfo.Timestamp)
s.Equal(1, len(tasks))
s.IsType(&ExecutableUnknownTask{}, tasks[0])
},
Expand Down

0 comments on commit a20faeb

Please sign in to comment.