Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: ExecutableTaskTracker LowWatermark funcion #4343

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 163 additions & 161 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ message SyncShardStatus {
}

message SyncReplicationState {
int64 last_processed_message_id = 1;
google.protobuf.Timestamp last_processed_message_time = 2 [(gogoproto.stdtime) = true];
int64 inclusive_low_watermark = 1;
google.protobuf.Timestamp inclusive_low_watermark_time = 2 [(gogoproto.stdtime) = true];
}

message ReplicationMessages {
Expand All @@ -86,8 +86,8 @@ message ReplicationMessages {
message WorkflowReplicationMessages {
repeated ReplicationTask replication_tasks = 1;
// This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).
int64 last_task_id = 2;
google.protobuf.Timestamp last_task_time = 3 [(gogoproto.stdtime) = true];
int64 exclusive_high_watermark = 2;
google.protobuf.Timestamp exclusive_high_watermark_time = 3 [(gogoproto.stdtime) = true];
}

message ReplicationTaskInfo {
Expand Down
46 changes: 29 additions & 17 deletions service/history/api/replication/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -158,11 +157,8 @@ func recvSyncReplicationState(
attr *replicationspb.SyncReplicationState,
clientClusterShardID historyclient.ClusterShardID,
) error {
lastProcessedMessageID := attr.GetLastProcessedMessageId()
lastProcessedMessageIDTime := attr.GetLastProcessedMessageTime()
if lastProcessedMessageID == persistence.EmptyQueueMessageID {
return nil
}
inclusiveLowWatermark := attr.GetInclusiveLowWatermark()
inclusiveLowWatermarkTime := attr.GetInclusiveLowWatermarkTime()

readerID := shard.ReplicationReaderIDFromClusterShardID(
int64(clientClusterShardID.ClusterID),
Expand All @@ -172,7 +168,7 @@ func recvSyncReplicationState(
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(lastProcessedMessageID + 1),
tasks.NewImmediateKey(inclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand All @@ -192,8 +188,8 @@ func recvSyncReplicationState(
}
shardContext.UpdateRemoteClusterInfo(
string(clientClusterShardID.ClusterID),
lastProcessedMessageID,
*lastProcessedMessageIDTime,
inclusiveLowWatermark-1,
*inclusiveLowWatermarkTime,
)
return nil
}
Expand Down Expand Up @@ -335,8 +331,24 @@ func sendTasks(
beginInclusiveWatermark int64,
endExclusiveWatermark int64,
) error {
if beginInclusiveWatermark >= endExclusiveWatermark {
return nil
if beginInclusiveWatermark > endExclusiveWatermark {
err := serviceerror.NewInternal(fmt.Sprintf("StreamWorkflowReplication encountered invalid task range [%v, %v)",
beginInclusiveWatermark,
endExclusiveWatermark,
))
shardContext.GetLogger().Error("StreamWorkflowReplication unable to", tag.Error(err))
return err
}
if beginInclusiveWatermark == endExclusiveWatermark {
return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: nil,
ExclusiveHighWatermark: endExclusiveWatermark,
ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(),
},
},
})
}

engine, err := shardContext.GetEngine(ctx)
Expand Down Expand Up @@ -372,9 +384,9 @@ Loop:
if err := server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task},
LastTaskId: task.SourceTaskId,
LastTaskTime: task.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task},
ExclusiveHighWatermark: task.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task.VisibilityTime,
},
},
}); err != nil {
Expand All @@ -390,9 +402,9 @@ Loop:
return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: nil,
LastTaskId: endExclusiveWatermark - 1,
LastTaskTime: timestamp.TimeNowPtrUtc(),
ReplicationTasks: nil,
ExclusiveHighWatermark: endExclusiveWatermark,
ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(),
},
},
})
Expand Down
54 changes: 30 additions & 24 deletions service/history/api/replication/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
s.clientClusterShardID.ShardID,
)
replicationState := &replicationspb.SyncReplicationState{
LastProcessedMessageId: rand.Int63(),
LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
InclusiveLowWatermark: rand.Int63(),
InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
}

s.shardContext.EXPECT().UpdateReplicationQueueReaderState(
Expand All @@ -125,7 +125,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1),
tasks.NewImmediateKey(replicationState.InclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand All @@ -140,8 +140,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() {
).Return(nil)
s.shardContext.EXPECT().UpdateRemoteClusterInfo(
string(s.clientClusterShardID.ClusterID),
replicationState.LastProcessedMessageId,
*replicationState.LastProcessedMessageTime,
replicationState.InclusiveLowWatermark-1,
*replicationState.InclusiveLowWatermarkTime,
)

err := recvSyncReplicationState(s.shardContext, replicationState, s.clientClusterShardID)
Expand All @@ -154,8 +154,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() {
s.clientClusterShardID.ShardID,
)
replicationState := &replicationspb.SyncReplicationState{
LastProcessedMessageId: rand.Int63(),
LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
InclusiveLowWatermark: rand.Int63(),
InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())),
}

var ownershipLost error
Expand All @@ -171,7 +171,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() {
Scopes: []*persistencespb.QueueSliceScope{{
Range: &persistencespb.QueueSliceRange{
InclusiveMin: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1),
tasks.NewImmediateKey(replicationState.InclusiveLowWatermark),
),
ExclusiveMax: shard.ConvertToPersistenceTaskKey(
tasks.NewImmediateKey(math.MaxInt64),
Expand Down Expand Up @@ -236,8 +236,8 @@ func (s *streamSuite) TestSendCatchUp() {
endExclusiveWatermark,
).Return(iter, nil)
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

Expand Down Expand Up @@ -288,13 +288,13 @@ func (s *streamSuite) TestSendLive() {
)
gomock.InOrder(
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(watermark1-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(watermark1, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(watermark2-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(watermark2, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
)
Expand All @@ -320,6 +320,12 @@ func (s *streamSuite) TestSendTasks_Noop() {
beginInclusiveWatermark := rand.Int63()
endExclusiveWatermark := beginInclusiveWatermark

s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

err := sendTasks(
s.ctx,
s.server,
Expand Down Expand Up @@ -349,8 +355,8 @@ func (s *streamSuite) TestSendTasks_WithoutTasks() {
endExclusiveWatermark,
).Return(iter, nil)
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
})

Expand Down Expand Up @@ -400,24 +406,24 @@ func (s *streamSuite) TestSendTasks_WithTasks() {
s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task0},
LastTaskId: task0.SourceTaskId,
LastTaskTime: task0.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task0},
ExclusiveHighWatermark: task0.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task0.VisibilityTime,
},
},
}).Return(nil),
s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{
Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{
Messages: &replicationspb.WorkflowReplicationMessages{
ReplicationTasks: []*replicationspb.ReplicationTask{task2},
LastTaskId: task2.SourceTaskId,
LastTaskTime: task2.VisibilityTime,
ReplicationTasks: []*replicationspb.ReplicationTask{task2},
ExclusiveHighWatermark: task2.SourceTaskId + 1,
ExclusiveHighWatermarkTime: task2.VisibilityTime,
},
},
}).Return(nil),
s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error {
s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId)
s.NotNil(resp.GetMessages().LastTaskTime)
s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark)
s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime)
return nil
}),
)
Expand Down
53 changes: 30 additions & 23 deletions service/history/replication/executable_task_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type (
Timestamp time.Time
}
ExecutableTaskTracker interface {
TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask
TrackTasks(exclusiveHighWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask
LowWatermark() *WatermarkInfo
Size() int
Cancel()
Expand All @@ -58,10 +58,10 @@ type (
logger log.Logger

sync.Mutex
cancelled bool
highWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID
taskQueue *list.List // sorted by task ID
taskIDs map[int64]struct{}
cancelled bool
exclusiveHighWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID
taskQueue *list.List // sorted by task ID
taskIDs map[int64]struct{}
}
)

Expand All @@ -73,16 +73,17 @@ func NewExecutableTaskTracker(
return &ExecutableTaskTrackerImpl{
logger: logger,

highWatermarkInfo: nil,
taskQueue: list.New(),
taskIDs: make(map[int64]struct{}),
exclusiveHighWatermarkInfo: nil,
taskQueue: list.New(),
taskIDs: make(map[int64]struct{}),
}
}

// TrackTasks add tasks for tracking, return valid tasks (dedup)
// if task tracker is cancelled, then newly added tasks will also be cancelled
// tasks should be sorted by task ID, all task IDs < exclusiveHighWatermarkInfo
func (t *ExecutableTaskTrackerImpl) TrackTasks(
highWatermarkInfo WatermarkInfo,
exclusiveHighWatermarkInfo WatermarkInfo,
tasks ...TrackableExecutableTask,
) []TrackableExecutableTask {
filteredTasks := make([]TrackableExecutableTask, 0, len(tasks))
Expand All @@ -91,7 +92,7 @@ func (t *ExecutableTaskTrackerImpl) TrackTasks(
defer t.Unlock()

// need to assume source side send replication tasks in order
if t.highWatermarkInfo != nil && highWatermarkInfo.Watermark <= t.highWatermarkInfo.Watermark {
if t.exclusiveHighWatermarkInfo != nil && exclusiveHighWatermarkInfo.Watermark <= t.exclusiveHighWatermarkInfo.Watermark {
return filteredTasks
}

Expand All @@ -111,14 +112,14 @@ Loop:
lastTaskID = task.TaskID()
}

if highWatermarkInfo.Watermark < lastTaskID {
if exclusiveHighWatermarkInfo.Watermark <= lastTaskID {
panic(fmt.Sprintf(
"ExecutableTaskTracker encountered lower high watermark: %v < %v",
highWatermarkInfo.Watermark,
exclusiveHighWatermarkInfo.Watermark,
lastTaskID,
))
}
t.highWatermarkInfo = &highWatermarkInfo
t.exclusiveHighWatermarkInfo = &exclusiveHighWatermarkInfo

if t.cancelled {
t.cancelLocked()
Expand All @@ -130,30 +131,36 @@ func (t *ExecutableTaskTrackerImpl) LowWatermark() *WatermarkInfo {
t.Lock()
defer t.Unlock()

element := t.taskQueue.Front()
Loop:
for element := t.taskQueue.Front(); element != nil; element = element.Next() {
for element != nil {
task := element.Value.(TrackableExecutableTask)
taskState := task.State()
switch taskState {
case ctasks.TaskStateAcked:
nextElement := element.Next()
delete(t.taskIDs, task.TaskID())
t.taskQueue.Remove(element)
element = nextElement
Comment on lines +141 to +144
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the real bugfix, i.e. need to keep track of the next element when deleting the current element

case ctasks.TaskStateNacked:
if err := task.MarkPoisonPill(); err != nil {
// unable to save poison pill, retry later
break Loop
element = element.Next()
continue Loop
}
nextElement := element.Next()
delete(t.taskIDs, task.TaskID())
t.taskQueue.Remove(element)
element = nextElement
case ctasks.TaskStateAborted:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
case ctasks.TaskStateCancelled:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
case ctasks.TaskStatePending:
// noop, do not remove from queue, let it block low watermark
break Loop
element = element.Next()
default:
panic(fmt.Sprintf(
"ExecutableTaskTracker encountered unknown task state: %v",
Expand All @@ -163,14 +170,14 @@ Loop:
}

if element := t.taskQueue.Front(); element != nil {
lowWatermarkInfo := WatermarkInfo{
inclusiveLowWatermarkInfo := WatermarkInfo{
Watermark: element.Value.(TrackableExecutableTask).TaskID(),
Timestamp: element.Value.(TrackableExecutableTask).TaskCreationTime(),
}
return &lowWatermarkInfo
} else if t.highWatermarkInfo != nil {
lowWatermarkInfo := *t.highWatermarkInfo
return &lowWatermarkInfo
return &inclusiveLowWatermarkInfo
} else if t.exclusiveHighWatermarkInfo != nil {
inclusiveLowWatermarkInfo := *t.exclusiveHighWatermarkInfo
return &inclusiveLowWatermarkInfo
} else {
return nil
}
Expand Down
Loading