diff --git a/api/replication/v1/message.pb.go b/api/replication/v1/message.pb.go index e1f1d2df286..168f42edab7 100644 --- a/api/replication/v1/message.pb.go +++ b/api/replication/v1/message.pb.go @@ -325,8 +325,8 @@ func (m *SyncShardStatus) GetStatusTime() *time.Time { } type SyncReplicationState struct { - LastProcessedMessageId int64 `protobuf:"varint,1,opt,name=last_processed_message_id,json=lastProcessedMessageId,proto3" json:"last_processed_message_id,omitempty"` - LastProcessedMessageTime *time.Time `protobuf:"bytes,2,opt,name=last_processed_message_time,json=lastProcessedMessageTime,proto3,stdtime" json:"last_processed_message_time,omitempty"` + InclusiveLowWatermark int64 `protobuf:"varint,1,opt,name=inclusive_low_watermark,json=inclusiveLowWatermark,proto3" json:"inclusive_low_watermark,omitempty"` + InclusiveLowWatermarkTime *time.Time `protobuf:"bytes,2,opt,name=inclusive_low_watermark_time,json=inclusiveLowWatermarkTime,proto3,stdtime" json:"inclusive_low_watermark_time,omitempty"` } func (m *SyncReplicationState) Reset() { *m = SyncReplicationState{} } @@ -361,16 +361,16 @@ func (m *SyncReplicationState) XXX_DiscardUnknown() { var xxx_messageInfo_SyncReplicationState proto.InternalMessageInfo -func (m *SyncReplicationState) GetLastProcessedMessageId() int64 { +func (m *SyncReplicationState) GetInclusiveLowWatermark() int64 { if m != nil { - return m.LastProcessedMessageId + return m.InclusiveLowWatermark } return 0 } -func (m *SyncReplicationState) GetLastProcessedMessageTime() *time.Time { +func (m *SyncReplicationState) GetInclusiveLowWatermarkTime() *time.Time { if m != nil { - return m.LastProcessedMessageTime + return m.InclusiveLowWatermarkTime } return nil } @@ -447,8 +447,8 @@ func (m *ReplicationMessages) GetSyncShardStatus() *SyncShardStatus { type WorkflowReplicationMessages struct { ReplicationTasks []*ReplicationTask `protobuf:"bytes,1,rep,name=replication_tasks,json=replicationTasks,proto3" json:"replication_tasks,omitempty"` // This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows). - LastTaskId int64 `protobuf:"varint,2,opt,name=last_task_id,json=lastTaskId,proto3" json:"last_task_id,omitempty"` - LastTaskTime *time.Time `protobuf:"bytes,3,opt,name=last_task_time,json=lastTaskTime,proto3,stdtime" json:"last_task_time,omitempty"` + ExclusiveHighWatermark int64 `protobuf:"varint,2,opt,name=exclusive_high_watermark,json=exclusiveHighWatermark,proto3" json:"exclusive_high_watermark,omitempty"` + ExclusiveHighWatermarkTime *time.Time `protobuf:"bytes,3,opt,name=exclusive_high_watermark_time,json=exclusiveHighWatermarkTime,proto3,stdtime" json:"exclusive_high_watermark_time,omitempty"` } func (m *WorkflowReplicationMessages) Reset() { *m = WorkflowReplicationMessages{} } @@ -490,16 +490,16 @@ func (m *WorkflowReplicationMessages) GetReplicationTasks() []*ReplicationTask { return nil } -func (m *WorkflowReplicationMessages) GetLastTaskId() int64 { +func (m *WorkflowReplicationMessages) GetExclusiveHighWatermark() int64 { if m != nil { - return m.LastTaskId + return m.ExclusiveHighWatermark } return 0 } -func (m *WorkflowReplicationMessages) GetLastTaskTime() *time.Time { +func (m *WorkflowReplicationMessages) GetExclusiveHighWatermarkTime() *time.Time { if m != nil { - return m.LastTaskTime + return m.ExclusiveHighWatermarkTime } return nil } @@ -1079,109 +1079,111 @@ func init() { } var fileDescriptor_edd9fae2af6b0532 = []byte{ - // 1620 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x41, 0x6f, 0x1b, 0xc5, - 0x17, 0xcf, 0xda, 0x8e, 0xed, 0x3c, 0x3b, 0x8e, 0x33, 0x69, 0xfe, 0x71, 0x52, 0xd5, 0x49, 0xfd, - 0x6f, 0x21, 0x85, 0x62, 0x37, 0xc9, 0x81, 0x96, 0x22, 0x50, 0x12, 0x5a, 0xea, 0x4a, 0x2d, 0xd5, - 0x36, 0x6a, 0x25, 0x2e, 0xcb, 0xd8, 0x3b, 0x4e, 0x56, 0xb1, 0x77, 0xad, 0x99, 0xb1, 0x53, 0xdf, - 0x10, 0x5c, 0x10, 0x12, 0x52, 0x25, 0xbe, 0x00, 0x07, 0x0e, 0x9c, 0x38, 0xf0, 0x21, 0x10, 0xc7, - 0x4a, 0x5c, 0x7a, 0xa3, 0x4d, 0x2f, 0x70, 0xeb, 0x47, 0x40, 0x33, 0x3b, 0x63, 0xef, 0xda, 0x6b, - 0xd7, 0x14, 0x10, 0x37, 0xef, 0x9b, 0xdf, 0xfb, 0xbd, 0x37, 0x6f, 0xde, 0x7b, 0xf3, 0xc6, 0x70, - 0x85, 0x93, 0x56, 0xdb, 0xa3, 0xb8, 0x59, 0x61, 0x84, 0x76, 0x09, 0xad, 0xe0, 0xb6, 0x53, 0xa1, - 0xa4, 0xdd, 0x74, 0xea, 0x98, 0x3b, 0x9e, 0x5b, 0xe9, 0x6e, 0x55, 0x5a, 0x84, 0x31, 0x7c, 0x48, - 0xca, 0x6d, 0xea, 0x71, 0x0f, 0x95, 0xb4, 0x46, 0xd9, 0xd7, 0x28, 0xe3, 0xb6, 0x53, 0x0e, 0x68, - 0x94, 0xbb, 0x5b, 0x6b, 0xeb, 0x87, 0x9e, 0x77, 0xd8, 0x24, 0x15, 0xa9, 0x51, 0xeb, 0x34, 0x2a, - 0xdc, 0x69, 0x11, 0xc6, 0x71, 0xab, 0xed, 0x93, 0xac, 0x9d, 0xb7, 0x49, 0x9b, 0xb8, 0x36, 0x71, - 0xeb, 0x0e, 0x61, 0x95, 0x43, 0xef, 0xd0, 0x93, 0x72, 0xf9, 0x4b, 0x41, 0xca, 0x51, 0x9e, 0x11, - 0xb7, 0xd3, 0x62, 0xc2, 0xa7, 0xa0, 0x41, 0x1f, 0xff, 0xe6, 0x44, 0x3c, 0xc7, 0xec, 0x58, 0x01, - 0x2f, 0x47, 0x01, 0x8f, 0x1c, 0xc6, 0x3d, 0xda, 0x1b, 0xd9, 0xee, 0xda, 0x87, 0x51, 0xe8, 0x36, - 0xa1, 0xcc, 0x61, 0x9c, 0xb8, 0x75, 0x22, 0x34, 0x4e, 0x3c, 0x7a, 0xdc, 0x68, 0x7a, 0x27, 0x56, - 0xab, 0xc3, 0x71, 0xad, 0x49, 0x2c, 0xc6, 0x31, 0xd7, 0x04, 0x17, 0xfa, 0x04, 0x42, 0xb3, 0xee, - 0xb5, 0x5a, 0x11, 0x51, 0x0d, 0x78, 0x2f, 0x50, 0x2e, 0x6e, 0x11, 0xd6, 0xc6, 0x3e, 0x7b, 0x18, - 0x78, 0x29, 0x04, 0x9c, 0x74, 0x52, 0x6b, 0x17, 0x43, 0xd0, 0x06, 0x76, 0x9a, 0x1d, 0x1a, 0xc1, - 0xf8, 0x4e, 0xd4, 0x0e, 0xf5, 0x96, 0x46, 0xe0, 0xa5, 0xaf, 0x52, 0xb0, 0x60, 0x0e, 0xcc, 0x1e, - 0x60, 0x76, 0x8c, 0xee, 0xc2, 0x9c, 0x08, 0xb0, 0xc5, 0x7b, 0x6d, 0x52, 0x30, 0x36, 0x8c, 0xcd, - 0xdc, 0xf6, 0x56, 0x39, 0x2a, 0x4f, 0xe4, 0x79, 0x94, 0xbb, 0x5b, 0xe5, 0x21, 0x86, 0x83, 0x5e, - 0x9b, 0x98, 0x69, 0xae, 0x7e, 0xa1, 0x0b, 0x90, 0x63, 0x5e, 0x87, 0xd6, 0x89, 0x25, 0x69, 0x1d, - 0xbb, 0x10, 0xdb, 0x30, 0x36, 0xe3, 0x66, 0xd6, 0x97, 0x0a, 0x8d, 0xaa, 0x8d, 0x7a, 0xb0, 0xda, - 0x0f, 0x94, 0x0f, 0xc4, 0x9c, 0x53, 0xa7, 0xd6, 0xe1, 0x84, 0x15, 0xe2, 0x1b, 0xc6, 0x66, 0x66, - 0xfb, 0x7a, 0xf9, 0xd5, 0xd9, 0x5a, 0xbe, 0xab, 0x49, 0x04, 0xef, 0x6e, 0x9f, 0xe2, 0xd6, 0x8c, - 0xb9, 0xe2, 0x46, 0x2f, 0xa1, 0x6f, 0x0c, 0x38, 0xcf, 0x7a, 0x6e, 0xdd, 0x62, 0x47, 0x98, 0xda, - 0xf2, 0xbc, 0x3b, 0x6c, 0xc4, 0x87, 0x59, 0xe9, 0xc3, 0xee, 0x34, 0x3e, 0xdc, 0xef, 0xb9, 0xf5, - 0xfb, 0x82, 0xeb, 0xbe, 0xa4, 0x1a, 0xf1, 0xe4, 0x1c, 0x9b, 0x04, 0x40, 0x5f, 0x1a, 0x20, 0x11, - 0x16, 0xae, 0x73, 0xa7, 0xeb, 0xf0, 0xde, 0x88, 0x2f, 0x49, 0xe9, 0xcb, 0x07, 0xd3, 0xfa, 0xb2, - 0xab, 0x78, 0x46, 0x1c, 0x59, 0x63, 0x63, 0x57, 0x11, 0x83, 0x15, 0x55, 0x47, 0x23, 0xe6, 0xd3, - 0xd2, 0xfc, 0xb5, 0x69, 0xcc, 0xdf, 0xf2, 0x29, 0x46, 0x2c, 0x2f, 0x1f, 0x45, 0x2d, 0xa0, 0x6f, - 0x0d, 0xf8, 0xbf, 0xdc, 0x7a, 0xbf, 0x0a, 0x65, 0xf5, 0x8d, 0x78, 0x00, 0xd2, 0x83, 0xfd, 0x69, - 0x03, 0xf0, 0x50, 0xb1, 0x89, 0x70, 0x8f, 0x26, 0xc6, 0x3a, 0x9b, 0x0c, 0x41, 0x55, 0x58, 0xe8, - 0x3a, 0xcc, 0xa9, 0x39, 0x4d, 0x79, 0x18, 0x4e, 0x8b, 0x14, 0xe6, 0xa4, 0x03, 0x6b, 0x65, 0xbf, - 0x37, 0x96, 0x75, 0x6f, 0x2c, 0x1f, 0xe8, 0xde, 0xb8, 0x97, 0x78, 0xfc, 0xdb, 0xba, 0x61, 0xe6, - 0x06, 0x8a, 0x62, 0x69, 0x2f, 0x0b, 0x30, 0xd8, 0xc6, 0xed, 0x44, 0x3a, 0x91, 0x9f, 0xbd, 0x9d, - 0x48, 0xa7, 0xf2, 0xe9, 0xd2, 0xd7, 0x31, 0xc8, 0x07, 0x0b, 0xc9, 0x3b, 0x26, 0x2e, 0x5a, 0x85, - 0xb4, 0x9f, 0x94, 0x8e, 0x2d, 0x4b, 0x71, 0xd6, 0x4c, 0xc9, 0xef, 0xaa, 0x8d, 0xae, 0xc1, 0x6a, - 0x13, 0x33, 0x6e, 0x51, 0xc2, 0xa9, 0x43, 0xba, 0xc4, 0xb6, 0x54, 0x69, 0x0f, 0x2a, 0xec, 0x7f, - 0x02, 0x60, 0xea, 0xf5, 0x3b, 0xfe, 0x72, 0x40, 0xb5, 0x4d, 0xbd, 0x3a, 0x61, 0x2c, 0xac, 0x1a, - 0x1f, 0xa8, 0xde, 0xd3, 0xeb, 0x03, 0x55, 0x02, 0xc5, 0x21, 0xd5, 0xe1, 0xc8, 0x24, 0xa6, 0x8c, - 0xcc, 0xd9, 0x90, 0x85, 0x07, 0xa1, 0x30, 0x95, 0x0e, 0x60, 0x61, 0xa8, 0x88, 0xd0, 0x2e, 0x64, - 0x74, 0x65, 0x0a, 0x33, 0xc6, 0x94, 0x66, 0xc0, 0x57, 0x92, 0xac, 0x3f, 0x19, 0x70, 0x46, 0xd0, - 0x06, 0xc2, 0x2c, 0x8f, 0x7b, 0x72, 0x40, 0x8c, 0x89, 0x01, 0xb1, 0xe0, 0xec, 0x18, 0x55, 0xe9, - 0x66, 0x6c, 0x4a, 0x37, 0x0b, 0x51, 0xf4, 0xd2, 0xe9, 0x1f, 0x63, 0xb0, 0x14, 0x70, 0x58, 0x2d, - 0x31, 0xf4, 0x19, 0x2c, 0x06, 0x32, 0x5d, 0x56, 0x08, 0x2b, 0x18, 0x1b, 0xf1, 0xcd, 0xcc, 0xf6, - 0xce, 0x34, 0x75, 0x31, 0xd4, 0xb4, 0xcd, 0x3c, 0x0d, 0x0b, 0xd8, 0xdf, 0xc9, 0xb0, 0x55, 0x48, - 0x1f, 0x61, 0x66, 0xb5, 0x3c, 0x4a, 0x64, 0x42, 0xa5, 0xcd, 0xd4, 0x11, 0x66, 0x77, 0x3c, 0x4a, - 0x90, 0x05, 0x8b, 0x23, 0xcd, 0x56, 0x25, 0xcd, 0xce, 0x6b, 0x34, 0x57, 0x73, 0x61, 0xa8, 0x99, - 0x96, 0xfe, 0x30, 0xe0, 0xac, 0xae, 0xe6, 0xff, 0x26, 0x70, 0x1b, 0x90, 0x95, 0x81, 0x0b, 0xdf, - 0x77, 0x20, 0x64, 0xea, 0xb6, 0xbb, 0x09, 0xb9, 0x01, 0x42, 0x26, 0x4a, 0x7c, 0xca, 0x44, 0xc9, - 0x6a, 0x16, 0x99, 0x1c, 0xcf, 0xc2, 0xc9, 0x21, 0xd9, 0xdd, 0x86, 0x87, 0xce, 0x43, 0x76, 0x70, - 0x9b, 0xaa, 0x1c, 0x9e, 0x33, 0x33, 0x7d, 0x59, 0xd5, 0x46, 0xeb, 0x90, 0xe9, 0x37, 0x59, 0xe5, - 0xe3, 0x9c, 0x09, 0x5a, 0x54, 0xb5, 0xd1, 0x32, 0x24, 0x69, 0xc7, 0xd5, 0x2d, 0x61, 0xce, 0x9c, - 0xa5, 0x1d, 0xb7, 0x6a, 0xa3, 0xfd, 0xe0, 0x78, 0x90, 0x90, 0xe3, 0xc1, 0x1b, 0x93, 0xc7, 0x83, - 0x88, 0x99, 0x60, 0x05, 0x52, 0x3a, 0x38, 0xb3, 0x32, 0x38, 0x49, 0xee, 0x07, 0xa6, 0x00, 0xa9, - 0xae, 0x98, 0xc7, 0x3c, 0x57, 0x5e, 0x72, 0x71, 0x53, 0x7f, 0x8a, 0x31, 0xa2, 0xe1, 0x50, 0xc6, - 0x2d, 0xd2, 0x25, 0x2e, 0x17, 0x9a, 0x29, 0x7f, 0x8c, 0x90, 0xd2, 0x1b, 0x42, 0x58, 0xb5, 0x51, - 0x09, 0xe6, 0x5d, 0xf2, 0x28, 0x00, 0x4a, 0x4b, 0x50, 0x46, 0x08, 0x35, 0xe6, 0x32, 0x20, 0x56, - 0x3f, 0x22, 0x76, 0xa7, 0x49, 0xec, 0x01, 0x70, 0x4e, 0x02, 0xf3, 0xfd, 0x15, 0x85, 0x2e, 0x7d, - 0x97, 0x80, 0x95, 0x31, 0x43, 0x05, 0xc2, 0xb0, 0x34, 0x08, 0xb3, 0xd7, 0x26, 0x54, 0x9e, 0x82, - 0x1a, 0x9a, 0xae, 0x4c, 0x8e, 0x4a, 0x9f, 0xf3, 0x13, 0xad, 0x67, 0x22, 0x77, 0x44, 0x86, 0x72, - 0x10, 0xeb, 0x9f, 0x4e, 0xcc, 0xb1, 0xd1, 0xfb, 0x90, 0x70, 0xdc, 0x86, 0xa7, 0xf2, 0x65, 0x73, - 0x60, 0x43, 0x90, 0xf7, 0xf5, 0x43, 0x06, 0x44, 0x46, 0x98, 0x52, 0x0b, 0xed, 0x41, 0xb2, 0xee, - 0xb9, 0x0d, 0xe7, 0x50, 0x55, 0xdc, 0x5b, 0xd3, 0xe8, 0xef, 0x4b, 0x0d, 0x53, 0x69, 0xa2, 0x06, - 0xa0, 0x60, 0xfd, 0x28, 0x3e, 0x7f, 0x3c, 0x7a, 0x37, 0xcc, 0x37, 0x6e, 0x36, 0x0b, 0xa4, 0xac, - 0x22, 0x0f, 0x96, 0xa4, 0x2f, 0x42, 0x17, 0x21, 0xe7, 0x73, 0x5b, 0xe1, 0x8c, 0x98, 0xf7, 0xa5, - 0x0f, 0x54, 0x5e, 0x5c, 0x82, 0xbc, 0x98, 0x86, 0xbd, 0x2e, 0xa1, 0x7d, 0xa0, 0x9f, 0x19, 0x0b, - 0x5a, 0xae, 0xa1, 0x0f, 0x02, 0x50, 0x35, 0x7f, 0x14, 0xd2, 0xb2, 0xf0, 0xdf, 0x9e, 0xe8, 0xf7, - 0x4d, 0xa5, 0xa4, 0x3b, 0x8e, 0x26, 0x51, 0xc3, 0x4d, 0xe9, 0x7b, 0x03, 0xce, 0x4d, 0x9c, 0xf9, - 0xc4, 0x5e, 0xd4, 0x0c, 0x5c, 0x6f, 0x76, 0x18, 0x27, 0x54, 0x55, 0xe4, 0xbc, 0x2f, 0xdd, 0xf7, - 0x85, 0xa1, 0xeb, 0x3e, 0x16, 0xbe, 0xee, 0x87, 0xae, 0xbf, 0xf8, 0x6b, 0x5c, 0x7f, 0xbf, 0x26, - 0x61, 0x6d, 0xfc, 0x38, 0xf8, 0x6f, 0xf6, 0x8c, 0x40, 0x55, 0x27, 0xc2, 0x55, 0x1d, 0x5d, 0x8b, - 0xb3, 0xd1, 0xb5, 0x88, 0x3e, 0x86, 0xdc, 0x00, 0x2d, 0xe3, 0x90, 0x9c, 0x32, 0x0e, 0xf3, 0x7d, - 0x3d, 0xb1, 0x82, 0x36, 0x21, 0xcf, 0x38, 0xa6, 0x3c, 0x68, 0xd4, 0x4f, 0x9a, 0x9c, 0x92, 0x6b, - 0x93, 0xfb, 0x90, 0xd5, 0x48, 0x69, 0x30, 0x3d, 0xa5, 0xc1, 0x8c, 0xd2, 0x92, 0xe6, 0xee, 0xc1, - 0x92, 0x6c, 0xf7, 0x47, 0x04, 0x53, 0x5e, 0x23, 0x98, 0xff, 0xb5, 0x21, 0x72, 0x51, 0x28, 0xdf, - 0xd2, 0xba, 0x92, 0xf1, 0x3d, 0x48, 0xd9, 0x84, 0x63, 0xa7, 0xa9, 0x67, 0xe1, 0x8d, 0x70, 0x06, - 0xfb, 0x4f, 0x53, 0x91, 0xbc, 0xf7, 0x70, 0xaf, 0xe9, 0x61, 0x9b, 0x99, 0x5a, 0x41, 0x9c, 0x06, - 0xe6, 0x02, 0xcd, 0x0b, 0x19, 0x3f, 0xc9, 0xd4, 0xa7, 0xd8, 0xac, 0xf4, 0x53, 0x3d, 0x2f, 0x0b, - 0xd9, 0x28, 0x6a, 0xb5, 0xa8, 0x0b, 0xa3, 0x43, 0x89, 0x99, 0x11, 0x5a, 0xea, 0x03, 0x5d, 0x81, - 0x33, 0x92, 0x44, 0xa4, 0x05, 0xa1, 0x96, 0x63, 0x13, 0x97, 0x3b, 0xbc, 0x57, 0x98, 0x97, 0x19, - 0x81, 0xc4, 0xda, 0x43, 0xb9, 0x54, 0x55, 0x2b, 0xe8, 0x21, 0x2c, 0xa8, 0x7c, 0xe8, 0x97, 0x65, - 0x4e, 0x5a, 0x2e, 0x47, 0xb6, 0x50, 0x85, 0x11, 0x0e, 0xa8, 0xca, 0x56, 0x85, 0x68, 0xe6, 0xba, - 0xa1, 0x6f, 0x54, 0x83, 0xa5, 0x1a, 0x66, 0xc4, 0x22, 0x8f, 0x48, 0xbd, 0x23, 0xbb, 0x95, 0xec, - 0x9d, 0x0b, 0x92, 0x7c, 0x3b, 0x92, 0x5c, 0x27, 0xb3, 0x60, 0xdf, 0xc3, 0x8c, 0xdc, 0xd0, 0xaa, - 0xb2, 0x8b, 0x2e, 0xd6, 0x86, 0x45, 0xa5, 0x9f, 0xe3, 0xb0, 0x1c, 0xf9, 0xca, 0x19, 0x29, 0xa8, - 0xd8, 0x2b, 0x0b, 0x2a, 0x3e, 0xa1, 0xa0, 0x12, 0xc1, 0x82, 0x6a, 0xc0, 0xf2, 0x50, 0xc4, 0x2c, - 0x87, 0x93, 0x96, 0x78, 0xa5, 0xc6, 0xc7, 0x6e, 0x6d, 0x6c, 0xdc, 0xaa, 0x9c, 0xb4, 0xcc, 0xa5, - 0xee, 0x88, 0x8c, 0xa1, 0xab, 0x90, 0x94, 0xf5, 0xa1, 0x9f, 0x9c, 0x63, 0xb3, 0xec, 0x23, 0xcc, - 0xf1, 0x5e, 0xd3, 0xab, 0x99, 0x0a, 0x2f, 0x26, 0x1c, 0x97, 0x9c, 0x58, 0xc2, 0x79, 0xc5, 0x90, - 0x9a, 0x92, 0x21, 0xeb, 0x92, 0x13, 0xb3, 0xe3, 0xde, 0xf0, 0x79, 0xc6, 0x1c, 0x61, 0xfa, 0x1f, - 0x3c, 0xc2, 0xdb, 0x89, 0xb4, 0x91, 0x8f, 0x95, 0xbe, 0x30, 0x60, 0xfd, 0x15, 0x8f, 0x45, 0x64, - 0x41, 0x2e, 0xfc, 0x32, 0x55, 0xef, 0x90, 0xab, 0x91, 0x8e, 0x04, 0xfe, 0x59, 0x12, 0xbe, 0x68, - 0xe2, 0x3b, 0xfe, 0x1f, 0x4b, 0x92, 0xdf, 0x9c, 0x3f, 0x09, 0x9a, 0xdb, 0x73, 0x9e, 0x3c, 0x2f, - 0xce, 0x3c, 0x7d, 0x5e, 0x9c, 0x79, 0xf9, 0xbc, 0x68, 0x7c, 0x7e, 0x5a, 0x34, 0x7e, 0x38, 0x2d, - 0x1a, 0xbf, 0x9c, 0x16, 0x8d, 0x27, 0xa7, 0x45, 0xe3, 0xd9, 0x69, 0xd1, 0xf8, 0xfd, 0xb4, 0x38, - 0xf3, 0xf2, 0xb4, 0x68, 0x3c, 0x7e, 0x51, 0x9c, 0x79, 0xf2, 0xa2, 0x38, 0xf3, 0xf4, 0x45, 0x71, - 0xe6, 0xd3, 0x9d, 0x43, 0x6f, 0xe0, 0x80, 0xe3, 0x8d, 0xff, 0xfb, 0xef, 0x3a, 0x25, 0x6d, 0xf5, - 0x55, 0x4b, 0xca, 0x7e, 0xb3, 0xf3, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x87, 0x36, 0xf8, 0xde, - 0x36, 0x14, 0x00, 0x00, + // 1657 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x4f, 0x6f, 0x1b, 0x45, + 0x14, 0xcf, 0xda, 0x8e, 0xed, 0x8c, 0x13, 0xc7, 0x99, 0x34, 0xc4, 0x71, 0xa9, 0x93, 0x9a, 0x16, + 0x52, 0x28, 0x76, 0x93, 0x48, 0xd0, 0x52, 0x04, 0x4a, 0x42, 0x4b, 0x5c, 0xd1, 0x52, 0x6d, 0xa3, + 0x46, 0xe2, 0xb2, 0x8c, 0xbd, 0x63, 0x7b, 0x14, 0x7b, 0xd7, 0x9a, 0x19, 0x3b, 0xf5, 0x0d, 0xc1, + 0x05, 0x21, 0x21, 0x55, 0xe2, 0x8e, 0x38, 0x70, 0xe0, 0x84, 0xc4, 0x97, 0x40, 0x1c, 0x2b, 0x71, + 0xe9, 0x8d, 0x36, 0xbd, 0x70, 0xec, 0x47, 0x40, 0x33, 0x3b, 0x63, 0xef, 0xda, 0x6b, 0xd7, 0x14, + 0x10, 0x37, 0xef, 0xfb, 0xf3, 0x7b, 0x6f, 0xdf, 0xfc, 0xde, 0xdb, 0x37, 0x06, 0x57, 0x38, 0x6e, + 0xb5, 0x5d, 0x8a, 0x9a, 0x25, 0x86, 0x69, 0x17, 0xd3, 0x12, 0x6a, 0x93, 0x12, 0xc5, 0xed, 0x26, + 0xa9, 0x22, 0x4e, 0x5c, 0xa7, 0xd4, 0xdd, 0x2a, 0xb5, 0x30, 0x63, 0xa8, 0x8e, 0x8b, 0x6d, 0xea, + 0x72, 0x17, 0x16, 0xb4, 0x47, 0xd1, 0xf3, 0x28, 0xa2, 0x36, 0x29, 0xfa, 0x3c, 0x8a, 0xdd, 0xad, + 0xdc, 0x7a, 0xdd, 0x75, 0xeb, 0x4d, 0x5c, 0x92, 0x1e, 0x95, 0x4e, 0xad, 0xc4, 0x49, 0x0b, 0x33, + 0x8e, 0x5a, 0x6d, 0x0f, 0x24, 0x77, 0xde, 0xc6, 0x6d, 0xec, 0xd8, 0xd8, 0xa9, 0x12, 0xcc, 0x4a, + 0x75, 0xb7, 0xee, 0x4a, 0xb9, 0xfc, 0xa5, 0x4c, 0x8a, 0x61, 0x99, 0x61, 0xa7, 0xd3, 0x62, 0x22, + 0x27, 0x7f, 0x40, 0xcf, 0xfe, 0x8d, 0x89, 0xf6, 0x1c, 0xb1, 0x63, 0x65, 0x78, 0x39, 0xcc, 0xb0, + 0x41, 0x18, 0x77, 0x69, 0x6f, 0xe4, 0x75, 0x73, 0x1f, 0x86, 0x59, 0xb7, 0x31, 0x65, 0x84, 0x71, + 0xec, 0x54, 0xb1, 0xf0, 0x38, 0x71, 0xe9, 0x71, 0xad, 0xe9, 0x9e, 0x58, 0xad, 0x0e, 0x47, 0x95, + 0x26, 0xb6, 0x18, 0x47, 0x5c, 0x03, 0x5c, 0xe8, 0x03, 0x08, 0xcf, 0xaa, 0xdb, 0x6a, 0x85, 0x54, + 0xd5, 0x97, 0xbd, 0xb0, 0x72, 0x50, 0x0b, 0xb3, 0x36, 0xf2, 0xd0, 0x83, 0x86, 0x97, 0x02, 0x86, + 0x93, 0x4e, 0x2a, 0x77, 0x31, 0x60, 0x5a, 0x43, 0xa4, 0xd9, 0xa1, 0x21, 0x88, 0x6f, 0x87, 0xbd, + 0xa1, 0x7e, 0xa5, 0x11, 0xf3, 0xc2, 0xd7, 0x09, 0xb0, 0x68, 0x0e, 0xc2, 0x1e, 0x22, 0x76, 0x0c, + 0xef, 0x80, 0x39, 0x51, 0x60, 0x8b, 0xf7, 0xda, 0x38, 0x6b, 0x6c, 0x18, 0x9b, 0xe9, 0xed, 0xad, + 0x62, 0x18, 0x4f, 0xe4, 0x79, 0x14, 0xbb, 0x5b, 0xc5, 0x21, 0x84, 0xc3, 0x5e, 0x1b, 0x9b, 0x49, + 0xae, 0x7e, 0xc1, 0x0b, 0x20, 0xcd, 0xdc, 0x0e, 0xad, 0x62, 0x4b, 0xc2, 0x12, 0x3b, 0x1b, 0xd9, + 0x30, 0x36, 0xa3, 0xe6, 0xbc, 0x27, 0x15, 0x1e, 0x65, 0x1b, 0xf6, 0xc0, 0x5a, 0xbf, 0x50, 0x9e, + 0x21, 0xe2, 0x9c, 0x92, 0x4a, 0x87, 0x63, 0x96, 0x8d, 0x6e, 0x18, 0x9b, 0xa9, 0xed, 0xeb, 0xc5, + 0x17, 0xb3, 0xb5, 0x78, 0x47, 0x83, 0x08, 0xdc, 0xdd, 0x3e, 0xc4, 0xc1, 0x8c, 0xb9, 0xea, 0x84, + 0xab, 0xe0, 0xb7, 0x06, 0x38, 0xcf, 0x7a, 0x4e, 0xd5, 0x62, 0x0d, 0x44, 0x6d, 0x79, 0xde, 0x1d, + 0x36, 0x92, 0xc3, 0xac, 0xcc, 0x61, 0x77, 0x9a, 0x1c, 0xee, 0xf5, 0x9c, 0xea, 0x3d, 0x81, 0x75, + 0x4f, 0x42, 0x8d, 0x64, 0x72, 0x8e, 0x4d, 0x32, 0x80, 0x5f, 0x19, 0x40, 0x5a, 0x58, 0xa8, 0xca, + 0x49, 0x97, 0xf0, 0xde, 0x48, 0x2e, 0x71, 0x99, 0xcb, 0x07, 0xd3, 0xe6, 0xb2, 0xab, 0x70, 0x46, + 0x12, 0xc9, 0xb1, 0xb1, 0x5a, 0xc8, 0xc0, 0xaa, 0xea, 0xa3, 0x91, 0xf0, 0x49, 0x19, 0xfe, 0xda, + 0x34, 0xe1, 0x0f, 0x3c, 0x88, 0x91, 0xc8, 0x2b, 0x8d, 0x30, 0x05, 0xfc, 0xce, 0x00, 0xaf, 0xc9, + 0x57, 0xef, 0x77, 0xa1, 0xec, 0xbe, 0x91, 0x0c, 0x80, 0xcc, 0x60, 0x7f, 0xda, 0x02, 0x1c, 0x29, + 0x34, 0x51, 0xee, 0x51, 0x62, 0xac, 0xb3, 0xc9, 0x26, 0xb0, 0x0c, 0x16, 0xbb, 0x84, 0x91, 0x0a, + 0x69, 0xca, 0xc3, 0x20, 0x2d, 0x9c, 0x9d, 0x93, 0x09, 0xe4, 0x8a, 0xde, 0x6c, 0x2c, 0xea, 0xd9, + 0x58, 0x3c, 0xd4, 0xb3, 0x71, 0x2f, 0xf6, 0xf0, 0x8f, 0x75, 0xc3, 0x4c, 0x0f, 0x1c, 0x85, 0x6a, + 0x6f, 0x1e, 0x80, 0xc1, 0x6b, 0xdc, 0x8a, 0x25, 0x63, 0x99, 0xd9, 0x5b, 0xb1, 0x64, 0x22, 0x93, + 0x2c, 0x7c, 0x13, 0x01, 0x19, 0x7f, 0x23, 0xb9, 0xc7, 0xd8, 0x81, 0x6b, 0x20, 0xe9, 0x91, 0x92, + 0xd8, 0xb2, 0x15, 0x67, 0xcd, 0x84, 0x7c, 0x2e, 0xdb, 0xf0, 0x1a, 0x58, 0x6b, 0x22, 0xc6, 0x2d, + 0x8a, 0x39, 0x25, 0xb8, 0x8b, 0x6d, 0x4b, 0xb5, 0xf6, 0xa0, 0xc3, 0x5e, 0x11, 0x06, 0xa6, 0xd6, + 0xdf, 0xf6, 0xd4, 0x3e, 0xd7, 0x36, 0x75, 0xab, 0x98, 0xb1, 0xa0, 0x6b, 0x74, 0xe0, 0x7a, 0x57, + 0xeb, 0x07, 0xae, 0x18, 0xe4, 0x87, 0x5c, 0x87, 0x2b, 0x13, 0x9b, 0xb2, 0x32, 0x67, 0x03, 0x11, + 0xee, 0x07, 0xca, 0x54, 0x38, 0x04, 0x8b, 0x43, 0x4d, 0x04, 0x77, 0x41, 0x4a, 0x77, 0xa6, 0x08, + 0x63, 0x4c, 0x19, 0x06, 0x78, 0x4e, 0x12, 0xf5, 0x17, 0x03, 0x9c, 0x11, 0xb0, 0xbe, 0x32, 0xcb, + 0xe3, 0x86, 0xef, 0x80, 0x55, 0xe2, 0x54, 0x9b, 0x1d, 0x46, 0xba, 0xd8, 0x12, 0xac, 0x3b, 0x41, + 0x1c, 0xd3, 0x16, 0xa2, 0xc7, 0x32, 0x4e, 0xd4, 0x5c, 0xe9, 0xab, 0x3f, 0x71, 0x4f, 0x8e, 0xb4, + 0x12, 0x22, 0xf0, 0xea, 0x18, 0x3f, 0x2f, 0xc9, 0xc8, 0x94, 0x49, 0xae, 0x85, 0xc2, 0xcb, 0x9c, + 0x7f, 0x8e, 0x80, 0x65, 0x5f, 0xbe, 0xea, 0x24, 0x18, 0xfc, 0x1c, 0x2c, 0xf9, 0x88, 0x2e, 0x1b, + 0x84, 0x65, 0x8d, 0x8d, 0xe8, 0x66, 0x6a, 0x7b, 0x67, 0x9a, 0xb6, 0x18, 0x9a, 0xd9, 0x66, 0x86, + 0x06, 0x05, 0xec, 0x9f, 0x10, 0x6c, 0x0d, 0x24, 0x1b, 0x88, 0x59, 0x2d, 0x97, 0x62, 0xc9, 0xa7, + 0xa4, 0x99, 0x68, 0x20, 0x76, 0xdb, 0xa5, 0x18, 0x5a, 0x60, 0x69, 0x64, 0xd6, 0x2a, 0xce, 0xec, + 0xbc, 0xc4, 0x6c, 0x35, 0x17, 0x87, 0x66, 0x69, 0xe1, 0xfb, 0x08, 0x38, 0xab, 0x9b, 0xf9, 0xff, + 0x29, 0xdc, 0x55, 0x90, 0xc5, 0x0f, 0x34, 0x2b, 0x1a, 0xa4, 0xde, 0xf0, 0xd1, 0x49, 0xd5, 0xad, + 0xaf, 0x3f, 0x20, 0xf5, 0xc6, 0x80, 0x4f, 0x55, 0x70, 0x6e, 0x9c, 0xa7, 0x47, 0xa8, 0xe8, 0x94, + 0x84, 0xca, 0x85, 0x07, 0x90, 0x8c, 0x7a, 0x12, 0x64, 0x94, 0xfc, 0xfe, 0x3a, 0x35, 0x17, 0x9e, + 0x07, 0xf3, 0x83, 0x2f, 0xb0, 0x9a, 0x37, 0x73, 0x66, 0xaa, 0x2f, 0x2b, 0xdb, 0x70, 0x1d, 0xa4, + 0xfa, 0x83, 0x59, 0x91, 0x60, 0xce, 0x04, 0x5a, 0x54, 0xb6, 0xe1, 0x0a, 0x88, 0xd3, 0x8e, 0xa3, + 0xc7, 0xc8, 0x9c, 0x39, 0x4b, 0x3b, 0x4e, 0xd9, 0x86, 0xfb, 0xfe, 0x95, 0x22, 0x26, 0x57, 0x8a, + 0xd7, 0x27, 0xaf, 0x14, 0x21, 0x7b, 0xc4, 0x2a, 0x48, 0xe8, 0x05, 0x62, 0x56, 0x56, 0x31, 0xce, + 0xbd, 0xd5, 0x21, 0x0b, 0x12, 0x5d, 0xb1, 0xc3, 0xb9, 0x8e, 0xfc, 0x30, 0x46, 0x4d, 0xfd, 0x28, + 0x56, 0x8f, 0x1a, 0xa1, 0x8c, 0x5b, 0xb8, 0x8b, 0x1d, 0x2e, 0x3c, 0x13, 0xde, 0xea, 0x21, 0xa5, + 0x37, 0x84, 0xb0, 0x6c, 0xc3, 0x02, 0x58, 0x70, 0xf0, 0x03, 0x9f, 0x51, 0x52, 0x1a, 0xa5, 0x84, + 0x50, 0xdb, 0x5c, 0x06, 0x90, 0x55, 0x1b, 0xd8, 0xee, 0x34, 0xb1, 0x3d, 0x30, 0x9c, 0x93, 0x86, + 0x99, 0xbe, 0x46, 0x59, 0x17, 0x7e, 0x88, 0x81, 0xd5, 0x31, 0x8b, 0x08, 0x44, 0x60, 0x79, 0x50, + 0x66, 0xb7, 0x8d, 0xa9, 0x3c, 0x05, 0xb5, 0x68, 0x5d, 0x99, 0x5c, 0x95, 0x3e, 0xe6, 0xa7, 0xda, + 0xcf, 0x84, 0xce, 0x88, 0x0c, 0xa6, 0x41, 0xa4, 0x7f, 0x3a, 0x11, 0x62, 0xc3, 0xf7, 0x41, 0x8c, + 0x38, 0x35, 0x57, 0xb1, 0x67, 0x73, 0x10, 0x43, 0x80, 0xf7, 0xfd, 0x03, 0x01, 0x04, 0x23, 0x4c, + 0xe9, 0x05, 0xf7, 0x40, 0xbc, 0xea, 0x3a, 0x35, 0x52, 0x57, 0x6d, 0xfa, 0xe6, 0x34, 0xfe, 0xfb, + 0xd2, 0xc3, 0x54, 0x9e, 0xb0, 0x06, 0xa0, 0xbf, 0xe9, 0x14, 0x9e, 0xb7, 0x52, 0xbd, 0x1b, 0xc4, + 0x1b, 0xb7, 0xcf, 0xf9, 0x28, 0xab, 0xc0, 0xfd, 0x7d, 0xec, 0x89, 0xe0, 0x45, 0x90, 0xf6, 0xb0, + 0xad, 0x20, 0x23, 0x16, 0x3c, 0xe9, 0x7d, 0xc5, 0x8b, 0x4b, 0x20, 0x23, 0x36, 0x68, 0xb7, 0x8b, + 0x69, 0xdf, 0xd0, 0x63, 0xc6, 0xa2, 0x96, 0x6b, 0xd3, 0xfb, 0x3e, 0x53, 0xb5, 0xb3, 0x64, 0x93, + 0x72, 0x5a, 0xbc, 0x35, 0x31, 0xef, 0x9b, 0xca, 0x49, 0x8f, 0x29, 0x0d, 0xa2, 0x16, 0xa2, 0xc2, + 0x8f, 0x06, 0x38, 0x37, 0x71, 0x4f, 0x14, 0xef, 0xa2, 0xf6, 0x66, 0xd1, 0xca, 0x1c, 0x53, 0xd5, + 0x91, 0x0b, 0x9e, 0x74, 0xdf, 0x13, 0x06, 0x56, 0x84, 0x48, 0x70, 0x45, 0x18, 0xfa, 0x64, 0x46, + 0x5f, 0xe2, 0x93, 0xf9, 0x7b, 0x1c, 0xe4, 0xc6, 0xaf, 0x90, 0xff, 0xe5, 0xcc, 0xf0, 0x75, 0x75, + 0x2c, 0xd8, 0xd5, 0xe1, 0xbd, 0x38, 0x1b, 0xde, 0x8b, 0xf0, 0x63, 0x90, 0x1e, 0x58, 0xcb, 0x3a, + 0xc4, 0xa7, 0xac, 0xc3, 0x42, 0xdf, 0x4f, 0x68, 0xe0, 0x26, 0xc8, 0x30, 0x8e, 0x28, 0xf7, 0x07, + 0xf5, 0x48, 0x93, 0x56, 0x72, 0x1d, 0x72, 0x1f, 0xcc, 0x6b, 0x4b, 0x19, 0x30, 0x39, 0x65, 0xc0, + 0x94, 0xf2, 0x92, 0xe1, 0xee, 0x82, 0x65, 0xf9, 0xf9, 0x6d, 0x60, 0x44, 0x79, 0x05, 0x23, 0xfe, + 0xf7, 0x16, 0xcf, 0x25, 0xe1, 0x7c, 0xa0, 0x7d, 0x25, 0xe2, 0x7b, 0x20, 0x61, 0x63, 0x8e, 0x48, + 0x53, 0xef, 0xcf, 0x1b, 0x41, 0x06, 0x7b, 0xd7, 0x59, 0x41, 0xde, 0xbb, 0xa8, 0xd7, 0x74, 0x91, + 0xcd, 0x4c, 0xed, 0x20, 0x4e, 0x03, 0x71, 0x61, 0xcd, 0xb3, 0x29, 0x8f, 0x64, 0xea, 0x51, 0xbc, + 0xac, 0xcc, 0x53, 0x5d, 0x49, 0xb3, 0xf3, 0x61, 0xd0, 0x4a, 0xa9, 0x1b, 0xa3, 0x43, 0xb1, 0x99, + 0x12, 0x5e, 0xea, 0x01, 0x5e, 0x01, 0x67, 0x24, 0x88, 0xa0, 0x05, 0xa6, 0x16, 0xb1, 0xb1, 0xc3, + 0x09, 0xef, 0x65, 0x17, 0x24, 0x23, 0xa0, 0xd0, 0x1d, 0x49, 0x55, 0x59, 0x69, 0xe0, 0x11, 0x58, + 0x54, 0x7c, 0xe8, 0xb7, 0x65, 0x5a, 0x46, 0x2e, 0x86, 0x8e, 0x50, 0x65, 0x23, 0x12, 0x50, 0x9d, + 0xad, 0x1a, 0xd1, 0x4c, 0x77, 0x03, 0xcf, 0xb0, 0x02, 0x96, 0x2b, 0x88, 0x61, 0x0b, 0x3f, 0xc0, + 0xd5, 0x8e, 0x9c, 0x56, 0x72, 0x76, 0x2e, 0x4a, 0xf0, 0xed, 0x50, 0x70, 0x4d, 0x66, 0x81, 0xbe, + 0x87, 0x18, 0xbe, 0xa1, 0x5d, 0xe5, 0x14, 0x5d, 0xaa, 0x0c, 0x8b, 0x0a, 0xbf, 0x46, 0xc1, 0x4a, + 0xe8, 0xcd, 0x68, 0xa4, 0xa1, 0x22, 0x2f, 0x6c, 0xa8, 0xe8, 0x84, 0x86, 0x8a, 0xf9, 0x1b, 0xaa, + 0x06, 0x56, 0x86, 0x2a, 0x66, 0x11, 0x8e, 0x5b, 0xe2, 0x66, 0x1b, 0x1d, 0xfb, 0x6a, 0x63, 0xeb, + 0x56, 0xe6, 0xb8, 0x65, 0x2e, 0x77, 0x47, 0x64, 0x62, 0xfd, 0x89, 0xcb, 0xfe, 0xd0, 0xd7, 0xd4, + 0xb1, 0x2c, 0xfb, 0x08, 0x71, 0xb4, 0xd7, 0x74, 0x2b, 0xa6, 0xb2, 0x87, 0x37, 0x41, 0xda, 0xc1, + 0x27, 0x96, 0x48, 0x5e, 0x21, 0x24, 0xa6, 0x44, 0x98, 0x77, 0xf0, 0x89, 0xd9, 0x71, 0x6e, 0x78, + 0x38, 0x63, 0x8e, 0x30, 0xf9, 0x2f, 0x1e, 0xe1, 0xad, 0x58, 0xd2, 0xc8, 0x44, 0x0a, 0x5f, 0x1a, + 0x60, 0xfd, 0x05, 0x17, 0x4c, 0x68, 0x81, 0x74, 0xf0, 0x36, 0xab, 0xee, 0x2e, 0x57, 0x43, 0x13, + 0xf1, 0xfd, 0x1b, 0x25, 0x72, 0xd1, 0xc0, 0xb7, 0xbd, 0x3f, 0xa3, 0x24, 0xbe, 0xb9, 0x70, 0xe2, + 0x0f, 0xb7, 0x47, 0x1e, 0x3d, 0xcd, 0xcf, 0x3c, 0x7e, 0x9a, 0x9f, 0x79, 0xfe, 0x34, 0x6f, 0x7c, + 0x71, 0x9a, 0x37, 0x7e, 0x3a, 0xcd, 0x1b, 0xbf, 0x9d, 0xe6, 0x8d, 0x47, 0xa7, 0x79, 0xe3, 0xc9, + 0x69, 0xde, 0xf8, 0xf3, 0x34, 0x3f, 0xf3, 0xfc, 0x34, 0x6f, 0x3c, 0x7c, 0x96, 0x9f, 0x79, 0xf4, + 0x2c, 0x3f, 0xf3, 0xf8, 0x59, 0x7e, 0xe6, 0xb3, 0x9d, 0xba, 0x3b, 0x48, 0x80, 0xb8, 0xe3, 0xff, + 0x32, 0xbc, 0x4e, 0x71, 0x5b, 0x3d, 0x55, 0xe2, 0x72, 0xde, 0xec, 0xfc, 0x15, 0x00, 0x00, 0xff, + 0xff, 0x5e, 0xfe, 0x68, 0x0b, 0x6a, 0x14, 0x00, 0x00, } func (this *ReplicationTask) Equal(that interface{}) bool { @@ -1431,14 +1433,14 @@ func (this *SyncReplicationState) Equal(that interface{}) bool { } else if this == nil { return false } - if this.LastProcessedMessageId != that1.LastProcessedMessageId { + if this.InclusiveLowWatermark != that1.InclusiveLowWatermark { return false } - if that1.LastProcessedMessageTime == nil { - if this.LastProcessedMessageTime != nil { + if that1.InclusiveLowWatermarkTime == nil { + if this.InclusiveLowWatermarkTime != nil { return false } - } else if !this.LastProcessedMessageTime.Equal(*that1.LastProcessedMessageTime) { + } else if !this.InclusiveLowWatermarkTime.Equal(*that1.InclusiveLowWatermarkTime) { return false } return true @@ -1508,14 +1510,14 @@ func (this *WorkflowReplicationMessages) Equal(that interface{}) bool { return false } } - if this.LastTaskId != that1.LastTaskId { + if this.ExclusiveHighWatermark != that1.ExclusiveHighWatermark { return false } - if that1.LastTaskTime == nil { - if this.LastTaskTime != nil { + if that1.ExclusiveHighWatermarkTime == nil { + if this.ExclusiveHighWatermarkTime != nil { return false } - } else if !this.LastTaskTime.Equal(*that1.LastTaskTime) { + } else if !this.ExclusiveHighWatermarkTime.Equal(*that1.ExclusiveHighWatermarkTime) { return false } return true @@ -1885,8 +1887,8 @@ func (this *SyncReplicationState) GoString() string { } s := make([]string, 0, 6) s = append(s, "&repication.SyncReplicationState{") - s = append(s, "LastProcessedMessageId: "+fmt.Sprintf("%#v", this.LastProcessedMessageId)+",\n") - s = append(s, "LastProcessedMessageTime: "+fmt.Sprintf("%#v", this.LastProcessedMessageTime)+",\n") + s = append(s, "InclusiveLowWatermark: "+fmt.Sprintf("%#v", this.InclusiveLowWatermark)+",\n") + s = append(s, "InclusiveLowWatermarkTime: "+fmt.Sprintf("%#v", this.InclusiveLowWatermarkTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1916,8 +1918,8 @@ func (this *WorkflowReplicationMessages) GoString() string { if this.ReplicationTasks != nil { s = append(s, "ReplicationTasks: "+fmt.Sprintf("%#v", this.ReplicationTasks)+",\n") } - s = append(s, "LastTaskId: "+fmt.Sprintf("%#v", this.LastTaskId)+",\n") - s = append(s, "LastTaskTime: "+fmt.Sprintf("%#v", this.LastTaskTime)+",\n") + s = append(s, "ExclusiveHighWatermark: "+fmt.Sprintf("%#v", this.ExclusiveHighWatermark)+",\n") + s = append(s, "ExclusiveHighWatermarkTime: "+fmt.Sprintf("%#v", this.ExclusiveHighWatermarkTime)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2310,8 +2312,8 @@ func (m *SyncReplicationState) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if m.LastProcessedMessageTime != nil { - n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.LastProcessedMessageTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.LastProcessedMessageTime):]) + if m.InclusiveLowWatermarkTime != nil { + n9, err9 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.InclusiveLowWatermarkTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.InclusiveLowWatermarkTime):]) if err9 != nil { return 0, err9 } @@ -2320,8 +2322,8 @@ func (m *SyncReplicationState) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x12 } - if m.LastProcessedMessageId != 0 { - i = encodeVarintMessage(dAtA, i, uint64(m.LastProcessedMessageId)) + if m.InclusiveLowWatermark != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.InclusiveLowWatermark)) i-- dAtA[i] = 0x8 } @@ -2412,8 +2414,8 @@ func (m *WorkflowReplicationMessages) MarshalToSizedBuffer(dAtA []byte) (int, er _ = i var l int _ = l - if m.LastTaskTime != nil { - n11, err11 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.LastTaskTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.LastTaskTime):]) + if m.ExclusiveHighWatermarkTime != nil { + n11, err11 := github_com_gogo_protobuf_types.StdTimeMarshalTo(*m.ExclusiveHighWatermarkTime, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(*m.ExclusiveHighWatermarkTime):]) if err11 != nil { return 0, err11 } @@ -2422,8 +2424,8 @@ func (m *WorkflowReplicationMessages) MarshalToSizedBuffer(dAtA []byte) (int, er i-- dAtA[i] = 0x1a } - if m.LastTaskId != 0 { - i = encodeVarintMessage(dAtA, i, uint64(m.LastTaskId)) + if m.ExclusiveHighWatermark != 0 { + i = encodeVarintMessage(dAtA, i, uint64(m.ExclusiveHighWatermark)) i-- dAtA[i] = 0x10 } @@ -3070,11 +3072,11 @@ func (m *SyncReplicationState) Size() (n int) { } var l int _ = l - if m.LastProcessedMessageId != 0 { - n += 1 + sovMessage(uint64(m.LastProcessedMessageId)) + if m.InclusiveLowWatermark != 0 { + n += 1 + sovMessage(uint64(m.InclusiveLowWatermark)) } - if m.LastProcessedMessageTime != nil { - l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.LastProcessedMessageTime) + if m.InclusiveLowWatermarkTime != nil { + l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.InclusiveLowWatermarkTime) n += 1 + l + sovMessage(uint64(l)) } return n @@ -3117,11 +3119,11 @@ func (m *WorkflowReplicationMessages) Size() (n int) { n += 1 + l + sovMessage(uint64(l)) } } - if m.LastTaskId != 0 { - n += 1 + sovMessage(uint64(m.LastTaskId)) + if m.ExclusiveHighWatermark != 0 { + n += 1 + sovMessage(uint64(m.ExclusiveHighWatermark)) } - if m.LastTaskTime != nil { - l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.LastTaskTime) + if m.ExclusiveHighWatermarkTime != nil { + l = github_com_gogo_protobuf_types.SizeOfStdTime(*m.ExclusiveHighWatermarkTime) n += 1 + l + sovMessage(uint64(l)) } return n @@ -3440,8 +3442,8 @@ func (this *SyncReplicationState) String() string { return "nil" } s := strings.Join([]string{`&SyncReplicationState{`, - `LastProcessedMessageId:` + fmt.Sprintf("%v", this.LastProcessedMessageId) + `,`, - `LastProcessedMessageTime:` + strings.Replace(fmt.Sprintf("%v", this.LastProcessedMessageTime), "Timestamp", "types.Timestamp", 1) + `,`, + `InclusiveLowWatermark:` + fmt.Sprintf("%v", this.InclusiveLowWatermark) + `,`, + `InclusiveLowWatermarkTime:` + strings.Replace(fmt.Sprintf("%v", this.InclusiveLowWatermarkTime), "Timestamp", "types.Timestamp", 1) + `,`, `}`, }, "") return s @@ -3475,8 +3477,8 @@ func (this *WorkflowReplicationMessages) String() string { repeatedStringForReplicationTasks += "}" s := strings.Join([]string{`&WorkflowReplicationMessages{`, `ReplicationTasks:` + repeatedStringForReplicationTasks + `,`, - `LastTaskId:` + fmt.Sprintf("%v", this.LastTaskId) + `,`, - `LastTaskTime:` + strings.Replace(fmt.Sprintf("%v", this.LastTaskTime), "Timestamp", "types.Timestamp", 1) + `,`, + `ExclusiveHighWatermark:` + fmt.Sprintf("%v", this.ExclusiveHighWatermark) + `,`, + `ExclusiveHighWatermarkTime:` + strings.Replace(fmt.Sprintf("%v", this.ExclusiveHighWatermarkTime), "Timestamp", "types.Timestamp", 1) + `,`, `}`, }, "") return s @@ -4164,9 +4166,9 @@ func (m *SyncReplicationState) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LastProcessedMessageId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field InclusiveLowWatermark", wireType) } - m.LastProcessedMessageId = 0 + m.InclusiveLowWatermark = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMessage @@ -4176,14 +4178,14 @@ func (m *SyncReplicationState) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.LastProcessedMessageId |= int64(b&0x7F) << shift + m.InclusiveLowWatermark |= int64(b&0x7F) << shift if b < 0x80 { break } } case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field LastProcessedMessageTime", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field InclusiveLowWatermarkTime", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -4210,10 +4212,10 @@ func (m *SyncReplicationState) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.LastProcessedMessageTime == nil { - m.LastProcessedMessageTime = new(time.Time) + if m.InclusiveLowWatermarkTime == nil { + m.InclusiveLowWatermarkTime = new(time.Time) } - if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.LastProcessedMessageTime, dAtA[iNdEx:postIndex]); err != nil { + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.InclusiveLowWatermarkTime, dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex @@ -4468,9 +4470,9 @@ func (m *WorkflowReplicationMessages) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field LastTaskId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ExclusiveHighWatermark", wireType) } - m.LastTaskId = 0 + m.ExclusiveHighWatermark = 0 for shift := uint(0); ; shift += 7 { if shift >= 64 { return ErrIntOverflowMessage @@ -4480,14 +4482,14 @@ func (m *WorkflowReplicationMessages) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.LastTaskId |= int64(b&0x7F) << shift + m.ExclusiveHighWatermark |= int64(b&0x7F) << shift if b < 0x80 { break } } case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field LastTaskTime", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field ExclusiveHighWatermarkTime", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -4514,10 +4516,10 @@ func (m *WorkflowReplicationMessages) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - if m.LastTaskTime == nil { - m.LastTaskTime = new(time.Time) + if m.ExclusiveHighWatermarkTime == nil { + m.ExclusiveHighWatermarkTime = new(time.Time) } - if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.LastTaskTime, dAtA[iNdEx:postIndex]); err != nil { + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(m.ExclusiveHighWatermarkTime, dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex diff --git a/proto/internal/temporal/server/api/replication/v1/message.proto b/proto/internal/temporal/server/api/replication/v1/message.proto index a0e67736e36..a7feb0d4e6e 100644 --- a/proto/internal/temporal/server/api/replication/v1/message.proto +++ b/proto/internal/temporal/server/api/replication/v1/message.proto @@ -70,8 +70,8 @@ message SyncShardStatus { } message SyncReplicationState { - int64 last_processed_message_id = 1; - google.protobuf.Timestamp last_processed_message_time = 2 [(gogoproto.stdtime) = true]; + int64 inclusive_low_watermark = 1; + google.protobuf.Timestamp inclusive_low_watermark_time = 2 [(gogoproto.stdtime) = true]; } message ReplicationMessages { @@ -86,8 +86,8 @@ message ReplicationMessages { message WorkflowReplicationMessages { repeated ReplicationTask replication_tasks = 1; // This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows). - int64 last_task_id = 2; - google.protobuf.Timestamp last_task_time = 3 [(gogoproto.stdtime) = true]; + int64 exclusive_high_watermark = 2; + google.protobuf.Timestamp exclusive_high_watermark_time = 3 [(gogoproto.stdtime) = true]; } message ReplicationTaskInfo { diff --git a/service/history/api/replication/stream.go b/service/history/api/replication/stream.go index ce2fec4e542..d79f6bfe1ea 100644 --- a/service/history/api/replication/stream.go +++ b/service/history/api/replication/stream.go @@ -44,7 +44,6 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/replication" "go.temporal.io/server/service/history/shard" @@ -158,11 +157,8 @@ func recvSyncReplicationState( attr *replicationspb.SyncReplicationState, clientClusterShardID historyclient.ClusterShardID, ) error { - lastProcessedMessageID := attr.GetLastProcessedMessageId() - lastProcessedMessageIDTime := attr.GetLastProcessedMessageTime() - if lastProcessedMessageID == persistence.EmptyQueueMessageID { - return nil - } + inclusiveLowWatermark := attr.GetInclusiveLowWatermark() + inclusiveLowWatermarkTime := attr.GetInclusiveLowWatermarkTime() readerID := shard.ReplicationReaderIDFromClusterShardID( int64(clientClusterShardID.ClusterID), @@ -172,7 +168,7 @@ func recvSyncReplicationState( Scopes: []*persistencespb.QueueSliceScope{{ Range: &persistencespb.QueueSliceRange{ InclusiveMin: shard.ConvertToPersistenceTaskKey( - tasks.NewImmediateKey(lastProcessedMessageID + 1), + tasks.NewImmediateKey(inclusiveLowWatermark), ), ExclusiveMax: shard.ConvertToPersistenceTaskKey( tasks.NewImmediateKey(math.MaxInt64), @@ -192,8 +188,8 @@ func recvSyncReplicationState( } shardContext.UpdateRemoteClusterInfo( string(clientClusterShardID.ClusterID), - lastProcessedMessageID, - *lastProcessedMessageIDTime, + inclusiveLowWatermark-1, + *inclusiveLowWatermarkTime, ) return nil } @@ -335,8 +331,24 @@ func sendTasks( beginInclusiveWatermark int64, endExclusiveWatermark int64, ) error { - if beginInclusiveWatermark >= endExclusiveWatermark { - return nil + if beginInclusiveWatermark > endExclusiveWatermark { + err := serviceerror.NewInternal(fmt.Sprintf("StreamWorkflowReplication encountered invalid task range [%v, %v)", + beginInclusiveWatermark, + endExclusiveWatermark, + )) + shardContext.GetLogger().Error("StreamWorkflowReplication unable to", tag.Error(err)) + return err + } + if beginInclusiveWatermark == endExclusiveWatermark { + return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ + Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ + Messages: &replicationspb.WorkflowReplicationMessages{ + ReplicationTasks: nil, + ExclusiveHighWatermark: endExclusiveWatermark, + ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(), + }, + }, + }) } engine, err := shardContext.GetEngine(ctx) @@ -372,9 +384,9 @@ Loop: if err := server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &replicationspb.WorkflowReplicationMessages{ - ReplicationTasks: []*replicationspb.ReplicationTask{task}, - LastTaskId: task.SourceTaskId, - LastTaskTime: task.VisibilityTime, + ReplicationTasks: []*replicationspb.ReplicationTask{task}, + ExclusiveHighWatermark: task.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task.VisibilityTime, }, }, }); err != nil { @@ -390,9 +402,9 @@ Loop: return server.Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &replicationspb.WorkflowReplicationMessages{ - ReplicationTasks: nil, - LastTaskId: endExclusiveWatermark - 1, - LastTaskTime: timestamp.TimeNowPtrUtc(), + ReplicationTasks: nil, + ExclusiveHighWatermark: endExclusiveWatermark, + ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(), }, }, }) diff --git a/service/history/api/replication/stream_test.go b/service/history/api/replication/stream_test.go index abb99a014e5..c3f64a0ec0a 100644 --- a/service/history/api/replication/stream_test.go +++ b/service/history/api/replication/stream_test.go @@ -115,8 +115,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() { s.clientClusterShardID.ShardID, ) replicationState := &replicationspb.SyncReplicationState{ - LastProcessedMessageId: rand.Int63(), - LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + InclusiveLowWatermark: rand.Int63(), + InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), } s.shardContext.EXPECT().UpdateReplicationQueueReaderState( @@ -125,7 +125,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() { Scopes: []*persistencespb.QueueSliceScope{{ Range: &persistencespb.QueueSliceRange{ InclusiveMin: shard.ConvertToPersistenceTaskKey( - tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1), + tasks.NewImmediateKey(replicationState.InclusiveLowWatermark), ), ExclusiveMax: shard.ConvertToPersistenceTaskKey( tasks.NewImmediateKey(math.MaxInt64), @@ -140,8 +140,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Success() { ).Return(nil) s.shardContext.EXPECT().UpdateRemoteClusterInfo( string(s.clientClusterShardID.ClusterID), - replicationState.LastProcessedMessageId, - *replicationState.LastProcessedMessageTime, + replicationState.InclusiveLowWatermark-1, + *replicationState.InclusiveLowWatermarkTime, ) err := recvSyncReplicationState(s.shardContext, replicationState, s.clientClusterShardID) @@ -154,8 +154,8 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() { s.clientClusterShardID.ShardID, ) replicationState := &replicationspb.SyncReplicationState{ - LastProcessedMessageId: rand.Int63(), - LastProcessedMessageTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + InclusiveLowWatermark: rand.Int63(), + InclusiveLowWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), } var ownershipLost error @@ -171,7 +171,7 @@ func (s *streamSuite) TestRecvSyncReplicationState_Error() { Scopes: []*persistencespb.QueueSliceScope{{ Range: &persistencespb.QueueSliceRange{ InclusiveMin: shard.ConvertToPersistenceTaskKey( - tasks.NewImmediateKey(replicationState.LastProcessedMessageId + 1), + tasks.NewImmediateKey(replicationState.InclusiveLowWatermark), ), ExclusiveMax: shard.ConvertToPersistenceTaskKey( tasks.NewImmediateKey(math.MaxInt64), @@ -236,8 +236,8 @@ func (s *streamSuite) TestSendCatchUp() { endExclusiveWatermark, ).Return(iter, nil) s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { - s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId) - s.NotNil(resp.GetMessages().LastTaskTime) + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) return nil }) @@ -288,13 +288,13 @@ func (s *streamSuite) TestSendLive() { ) gomock.InOrder( s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { - s.Equal(watermark1-1, resp.GetMessages().LastTaskId) - s.NotNil(resp.GetMessages().LastTaskTime) + s.Equal(watermark1, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) return nil }), s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { - s.Equal(watermark2-1, resp.GetMessages().LastTaskId) - s.NotNil(resp.GetMessages().LastTaskTime) + s.Equal(watermark2, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) return nil }), ) @@ -320,6 +320,12 @@ func (s *streamSuite) TestSendTasks_Noop() { beginInclusiveWatermark := rand.Int63() endExclusiveWatermark := beginInclusiveWatermark + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }) + err := sendTasks( s.ctx, s.server, @@ -349,8 +355,8 @@ func (s *streamSuite) TestSendTasks_WithoutTasks() { endExclusiveWatermark, ).Return(iter, nil) s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { - s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId) - s.NotNil(resp.GetMessages().LastTaskTime) + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) return nil }) @@ -400,24 +406,24 @@ func (s *streamSuite) TestSendTasks_WithTasks() { s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &replicationspb.WorkflowReplicationMessages{ - ReplicationTasks: []*replicationspb.ReplicationTask{task0}, - LastTaskId: task0.SourceTaskId, - LastTaskTime: task0.VisibilityTime, + ReplicationTasks: []*replicationspb.ReplicationTask{task0}, + ExclusiveHighWatermark: task0.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task0.VisibilityTime, }, }, }).Return(nil), s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &replicationspb.WorkflowReplicationMessages{ - ReplicationTasks: []*replicationspb.ReplicationTask{task2}, - LastTaskId: task2.SourceTaskId, - LastTaskTime: task2.VisibilityTime, + ReplicationTasks: []*replicationspb.ReplicationTask{task2}, + ExclusiveHighWatermark: task2.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task2.VisibilityTime, }, }, }).Return(nil), s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { - s.Equal(endExclusiveWatermark-1, resp.GetMessages().LastTaskId) - s.NotNil(resp.GetMessages().LastTaskTime) + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) return nil }), ) diff --git a/service/history/replication/executable_task_tracker.go b/service/history/replication/executable_task_tracker.go index 22a67dd40e3..b0f45775419 100644 --- a/service/history/replication/executable_task_tracker.go +++ b/service/history/replication/executable_task_tracker.go @@ -49,7 +49,7 @@ type ( Timestamp time.Time } ExecutableTaskTracker interface { - TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask + TrackTasks(exclusiveHighWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask LowWatermark() *WatermarkInfo Size() int Cancel() @@ -58,10 +58,10 @@ type ( logger log.Logger sync.Mutex - cancelled bool - highWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID - taskQueue *list.List // sorted by task ID - taskIDs map[int64]struct{} + cancelled bool + exclusiveHighWatermarkInfo *WatermarkInfo // this is exclusive, i.e. source need to resend with this watermark / task ID + taskQueue *list.List // sorted by task ID + taskIDs map[int64]struct{} } ) @@ -73,16 +73,17 @@ func NewExecutableTaskTracker( return &ExecutableTaskTrackerImpl{ logger: logger, - highWatermarkInfo: nil, - taskQueue: list.New(), - taskIDs: make(map[int64]struct{}), + exclusiveHighWatermarkInfo: nil, + taskQueue: list.New(), + taskIDs: make(map[int64]struct{}), } } // TrackTasks add tasks for tracking, return valid tasks (dedup) // if task tracker is cancelled, then newly added tasks will also be cancelled +// tasks should be sorted by task ID, all task IDs < exclusiveHighWatermarkInfo func (t *ExecutableTaskTrackerImpl) TrackTasks( - highWatermarkInfo WatermarkInfo, + exclusiveHighWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask, ) []TrackableExecutableTask { filteredTasks := make([]TrackableExecutableTask, 0, len(tasks)) @@ -91,7 +92,7 @@ func (t *ExecutableTaskTrackerImpl) TrackTasks( defer t.Unlock() // need to assume source side send replication tasks in order - if t.highWatermarkInfo != nil && highWatermarkInfo.Watermark <= t.highWatermarkInfo.Watermark { + if t.exclusiveHighWatermarkInfo != nil && exclusiveHighWatermarkInfo.Watermark <= t.exclusiveHighWatermarkInfo.Watermark { return filteredTasks } @@ -111,14 +112,14 @@ Loop: lastTaskID = task.TaskID() } - if highWatermarkInfo.Watermark < lastTaskID { + if exclusiveHighWatermarkInfo.Watermark <= lastTaskID { panic(fmt.Sprintf( "ExecutableTaskTracker encountered lower high watermark: %v < %v", - highWatermarkInfo.Watermark, + exclusiveHighWatermarkInfo.Watermark, lastTaskID, )) } - t.highWatermarkInfo = &highWatermarkInfo + t.exclusiveHighWatermarkInfo = &exclusiveHighWatermarkInfo if t.cancelled { t.cancelLocked() @@ -130,30 +131,36 @@ func (t *ExecutableTaskTrackerImpl) LowWatermark() *WatermarkInfo { t.Lock() defer t.Unlock() + element := t.taskQueue.Front() Loop: - for element := t.taskQueue.Front(); element != nil; element = element.Next() { + for element != nil { task := element.Value.(TrackableExecutableTask) taskState := task.State() switch taskState { case ctasks.TaskStateAcked: + nextElement := element.Next() delete(t.taskIDs, task.TaskID()) t.taskQueue.Remove(element) + element = nextElement case ctasks.TaskStateNacked: if err := task.MarkPoisonPill(); err != nil { // unable to save poison pill, retry later - break Loop + element = element.Next() + continue Loop } + nextElement := element.Next() delete(t.taskIDs, task.TaskID()) t.taskQueue.Remove(element) + element = nextElement case ctasks.TaskStateAborted: // noop, do not remove from queue, let it block low watermark - break Loop + element = element.Next() case ctasks.TaskStateCancelled: // noop, do not remove from queue, let it block low watermark - break Loop + element = element.Next() case ctasks.TaskStatePending: // noop, do not remove from queue, let it block low watermark - break Loop + element = element.Next() default: panic(fmt.Sprintf( "ExecutableTaskTracker encountered unknown task state: %v", @@ -163,14 +170,14 @@ Loop: } if element := t.taskQueue.Front(); element != nil { - lowWatermarkInfo := WatermarkInfo{ + inclusiveLowWatermarkInfo := WatermarkInfo{ Watermark: element.Value.(TrackableExecutableTask).TaskID(), Timestamp: element.Value.(TrackableExecutableTask).TaskCreationTime(), } - return &lowWatermarkInfo - } else if t.highWatermarkInfo != nil { - lowWatermarkInfo := *t.highWatermarkInfo - return &lowWatermarkInfo + return &inclusiveLowWatermarkInfo + } else if t.exclusiveHighWatermarkInfo != nil { + inclusiveLowWatermarkInfo := *t.exclusiveHighWatermarkInfo + return &inclusiveLowWatermarkInfo } else { return nil } diff --git a/service/history/replication/executable_task_tracker_mock.go b/service/history/replication/executable_task_tracker_mock.go index 26d4a441a0d..5856121c92c 100644 --- a/service/history/replication/executable_task_tracker_mock.go +++ b/service/history/replication/executable_task_tracker_mock.go @@ -310,9 +310,9 @@ func (mr *MockExecutableTaskTrackerMockRecorder) Size() *gomock.Call { } // TrackTasks mocks base method. -func (m *MockExecutableTaskTracker) TrackTasks(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask { +func (m *MockExecutableTaskTracker) TrackTasks(exclusiveHighWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask { m.ctrl.T.Helper() - varargs := []interface{}{highWatermarkInfo} + varargs := []interface{}{exclusiveHighWatermarkInfo} for _, a := range tasks { varargs = append(varargs, a) } @@ -322,8 +322,8 @@ func (m *MockExecutableTaskTracker) TrackTasks(highWatermarkInfo WatermarkInfo, } // TrackTasks indicates an expected call of TrackTasks. -func (mr *MockExecutableTaskTrackerMockRecorder) TrackTasks(highWatermarkInfo interface{}, tasks ...interface{}) *gomock.Call { +func (mr *MockExecutableTaskTrackerMockRecorder) TrackTasks(exclusiveHighWatermarkInfo interface{}, tasks ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{highWatermarkInfo}, tasks...) + varargs := append([]interface{}{exclusiveHighWatermarkInfo}, tasks...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TrackTasks", reflect.TypeOf((*MockExecutableTaskTracker)(nil).TrackTasks), varargs...) } diff --git a/service/history/replication/executable_task_tracker_test.go b/service/history/replication/executable_task_tracker_test.go index fd263e15683..49ca01a4bfa 100644 --- a/service/history/replication/executable_task_tracker_test.go +++ b/service/history/replication/executable_task_tracker_test.go @@ -33,6 +33,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/log" ctasks "go.temporal.io/server/common/tasks" @@ -77,7 +78,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks() { task0 := NewMockTrackableExecutableTask(s.controller) task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID(), + Watermark: task0.TaskID() + 1, Timestamp: time.Unix(0, rand.Int63()), } @@ -89,7 +90,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID()}, taskIDs) - s.Equal(highWatermark0, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark0, *s.taskTracker.exclusiveHighWatermarkInfo) task1 := NewMockTrackableExecutableTask(s.controller) task1.EXPECT().TaskID().Return(task0.TaskID() + 1).AnyTimes() @@ -108,14 +109,14 @@ func (s *executableTaskTrackerSuite) TestTrackTasks() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID(), task1.TaskID(), task2.TaskID()}, taskIDs) - s.Equal(highWatermark2, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark2, *s.taskTracker.exclusiveHighWatermarkInfo) } func (s *executableTaskTrackerSuite) TestTrackTasks_Duplication() { task0 := NewMockTrackableExecutableTask(s.controller) task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID(), + Watermark: task0.TaskID() + 1, Timestamp: time.Unix(0, rand.Int63()), } tasks := s.taskTracker.TrackTasks(highWatermark0, task0) @@ -128,7 +129,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks_Duplication() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID()}, taskIDs) - s.Equal(highWatermark0, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark0, *s.taskTracker.exclusiveHighWatermarkInfo) task1 := NewMockTrackableExecutableTask(s.controller) task1.EXPECT().TaskID().Return(task0.TaskID() + 1).AnyTimes() @@ -144,7 +145,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks_Duplication() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID(), task1.TaskID()}, taskIDs) - s.Equal(highWatermark1, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark1, *s.taskTracker.exclusiveHighWatermarkInfo) task2 := NewMockTrackableExecutableTask(s.controller) task2.EXPECT().TaskID().Return(task1.TaskID() + 1).AnyTimes() @@ -160,7 +161,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks_Duplication() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID(), task1.TaskID(), task2.TaskID()}, taskIDs) - s.Equal(highWatermark2, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark2, *s.taskTracker.exclusiveHighWatermarkInfo) } func (s *executableTaskTrackerSuite) TestTrackTasks_Cancellation() { @@ -168,7 +169,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks_Cancellation() { task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() task0.EXPECT().Cancel() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID(), + Watermark: task0.TaskID() + 1, Timestamp: time.Unix(0, rand.Int63()), } @@ -181,7 +182,7 @@ func (s *executableTaskTrackerSuite) TestTrackTasks_Cancellation() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID()}, taskIDs) - s.Equal(highWatermark0, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark0, *s.taskTracker.exclusiveHighWatermarkInfo) } func (s *executableTaskTrackerSuite) TestLowWatermark_Empty() { @@ -195,17 +196,54 @@ func (s *executableTaskTrackerSuite) TestLowWatermark_Empty() { s.Nil(lowWatermark) } -func (s *executableTaskTrackerSuite) TestLowWatermark_AckedTask() { +func (s *executableTaskTrackerSuite) TestLowWatermark_AckedTask_Case0() { + task0ID := rand.Int63() task0 := NewMockTrackableExecutableTask(s.controller) - task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() task0.EXPECT().State().Return(ctasks.TaskStateAcked).AnyTimes() + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID() + 1, + Watermark: task1ID + 1, Timestamp: time.Unix(0, rand.Int63()), } - tasks := s.taskTracker.TrackTasks(highWatermark0, task0) - s.Equal([]TrackableExecutableTask{task0}, tasks) + tasks := s.taskTracker.TrackTasks(highWatermark0, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) + + lowWatermark := s.taskTracker.LowWatermark() + s.Equal(WatermarkInfo{ + Watermark: task1ID, + Timestamp: task1.TaskCreationTime(), + }, *lowWatermark) + + taskIDs := []int64{} + for element := s.taskTracker.taskQueue.Front(); element != nil; element = element.Next() { + taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) + } + s.Equal([]int64{task1ID}, taskIDs) +} + +func (s *executableTaskTrackerSuite) TestLowWatermark_AckedTask_Case1() { + task0ID := rand.Int63() + task0 := NewMockTrackableExecutableTask(s.controller) + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() + task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task0.EXPECT().State().Return(ctasks.TaskStateAcked).AnyTimes() + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStateAcked).AnyTimes() + highWatermark0 := WatermarkInfo{ + Watermark: task1ID + 1, + Timestamp: time.Unix(0, rand.Int63()), + } + tasks := s.taskTracker.TrackTasks(highWatermark0, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) lowWatermark := s.taskTracker.LowWatermark() s.Equal(highWatermark0, *lowWatermark) @@ -217,19 +255,59 @@ func (s *executableTaskTrackerSuite) TestLowWatermark_AckedTask() { s.Equal([]int64{}, taskIDs) } -func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Success() { +func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Success_Case0() { + task0ID := rand.Int63() task0 := NewMockTrackableExecutableTask(s.controller) - task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() task0.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() task0.EXPECT().MarkPoisonPill().Return(nil) + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID() + 1, + Watermark: task1ID + 1, Timestamp: time.Unix(0, rand.Int63()), } - tasks := s.taskTracker.TrackTasks(highWatermark0, task0) - s.Equal([]TrackableExecutableTask{task0}, tasks) + tasks := s.taskTracker.TrackTasks(highWatermark0, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) + + lowWatermark := s.taskTracker.LowWatermark() + s.Equal(WatermarkInfo{ + Watermark: task1ID, + Timestamp: task1.TaskCreationTime(), + }, *lowWatermark) + + taskIDs := []int64{} + for element := s.taskTracker.taskQueue.Front(); element != nil; element = element.Next() { + taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) + } + s.Equal([]int64{task1ID}, taskIDs) +} + +func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Success_Case1() { + task0ID := rand.Int63() + task0 := NewMockTrackableExecutableTask(s.controller) + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() + task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task0.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() + task0.EXPECT().MarkPoisonPill().Return(nil) + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() + task1.EXPECT().MarkPoisonPill().Return(nil) + + highWatermark0 := WatermarkInfo{ + Watermark: task1ID + 1, + Timestamp: time.Unix(0, rand.Int63()), + } + tasks := s.taskTracker.TrackTasks(highWatermark0, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) lowWatermark := s.taskTracker.LowWatermark() s.Equal(highWatermark0, *lowWatermark) @@ -241,18 +319,24 @@ func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Success() { s.Equal([]int64{}, taskIDs) } -func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Error() { +func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Error_Case0() { + task0ID := rand.Int63() task0 := NewMockTrackableExecutableTask(s.controller) - task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() task0.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() task0.EXPECT().MarkPoisonPill().Return(errors.New("random error")) + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStatePending).AnyTimes() tasks := s.taskTracker.TrackTasks(WatermarkInfo{ - Watermark: task0.TaskID() + 1, + Watermark: task1ID + 1, Timestamp: time.Unix(0, rand.Int63()), - }, task0) - s.Equal([]TrackableExecutableTask{task0}, tasks) + }, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) lowWatermark := s.taskTracker.LowWatermark() s.Equal(WatermarkInfo{ @@ -264,7 +348,40 @@ func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Error() { for element := s.taskTracker.taskQueue.Front(); element != nil; element = element.Next() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } - s.Equal([]int64{task0.TaskID()}, taskIDs) + s.Equal([]int64{task0ID, task1ID}, taskIDs) +} + +func (s *executableTaskTrackerSuite) TestLowWatermark_NackedTask_Error_Case1() { + task0ID := rand.Int63() + task0 := NewMockTrackableExecutableTask(s.controller) + task0.EXPECT().TaskID().Return(task0ID).AnyTimes() + task0.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task0.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() + task0.EXPECT().MarkPoisonPill().Return(serviceerror.NewInternal("random error")) + task1ID := task0ID + 1 + task1 := NewMockTrackableExecutableTask(s.controller) + task1.EXPECT().TaskID().Return(task1ID).AnyTimes() + task1.EXPECT().TaskCreationTime().Return(time.Unix(0, rand.Int63())).AnyTimes() + task1.EXPECT().State().Return(ctasks.TaskStateNacked).AnyTimes() + task1.EXPECT().MarkPoisonPill().Return(serviceerror.NewInternal("random error")) + + tasks := s.taskTracker.TrackTasks(WatermarkInfo{ + Watermark: task1ID + 1, + Timestamp: time.Unix(0, rand.Int63()), + }, task0, task1) + s.Equal([]TrackableExecutableTask{task0, task1}, tasks) + + lowWatermark := s.taskTracker.LowWatermark() + s.Equal(WatermarkInfo{ + Watermark: task0.TaskID(), + Timestamp: task0.TaskCreationTime(), + }, *lowWatermark) + + taskIDs := []int64{} + for element := s.taskTracker.taskQueue.Front(); element != nil; element = element.Next() { + taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) + } + s.Equal([]int64{task0ID, task1ID}, taskIDs) } func (s *executableTaskTrackerSuite) TestLowWatermark_AbortedTask() { @@ -347,7 +464,7 @@ func (s *executableTaskTrackerSuite) TestCancellation() { task0.EXPECT().TaskID().Return(rand.Int63()).AnyTimes() task0.EXPECT().Cancel() highWatermark0 := WatermarkInfo{ - Watermark: task0.TaskID(), + Watermark: task0.TaskID() + 1, Timestamp: time.Unix(0, rand.Int63()), } @@ -360,5 +477,5 @@ func (s *executableTaskTrackerSuite) TestCancellation() { taskIDs = append(taskIDs, element.Value.(TrackableExecutableTask).TaskID()) } s.Equal([]int64{task0.TaskID()}, taskIDs) - s.Equal(highWatermark0, *s.taskTracker.highWatermarkInfo) + s.Equal(highWatermark0, *s.taskTracker.exclusiveHighWatermarkInfo) } diff --git a/service/history/replication/stream_receiver.go b/service/history/replication/stream_receiver.go index f1e23fd777f..4d8c4082b6c 100644 --- a/service/history/replication/stream_receiver.go +++ b/service/history/replication/stream_receiver.go @@ -208,8 +208,8 @@ func (r *StreamReceiver) ackMessage( if err := stream.Send(&adminservice.StreamWorkflowReplicationMessagesRequest{ Attributes: &adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState{ SyncReplicationState: &repicationpb.SyncReplicationState{ - LastProcessedMessageId: watermarkInfo.Watermark, - LastProcessedMessageTime: timestamp.TimePtr(watermarkInfo.Timestamp), + InclusiveLowWatermark: watermarkInfo.Watermark, + InclusiveLowWatermarkTime: timestamp.TimePtr(watermarkInfo.Timestamp), }, }, }); err != nil { @@ -255,11 +255,11 @@ func (r *StreamReceiver) processMessages( r.serverShardKey, streamResp.Resp.GetMessages().ReplicationTasks..., ) - highWatermark := streamResp.Resp.GetMessages().LastTaskId - highWatermarkTime := timestamp.TimeValue(streamResp.Resp.GetMessages().LastTaskTime) + exclusiveHighWatermark := streamResp.Resp.GetMessages().ExclusiveHighWatermark + exclusiveHighWatermarkTime := timestamp.TimeValue(streamResp.Resp.GetMessages().ExclusiveHighWatermarkTime) for _, task := range r.taskTracker.TrackTasks(WatermarkInfo{ - Watermark: highWatermark, - Timestamp: highWatermarkTime, + Watermark: exclusiveHighWatermark, + Timestamp: exclusiveHighWatermarkTime, }, tasks...) { r.ProcessToolBox.TaskScheduler.Submit(task) } diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index cddfa484197..a60d2785f67 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -108,9 +108,9 @@ func (s *streamReceiverMonitorSuite) SetupTest() { streamClient.EXPECT().Recv().Return(&adminservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &repicationpb.WorkflowReplicationMessages{ - ReplicationTasks: []*repicationpb.ReplicationTask{}, - LastTaskId: 100, - LastTaskTime: timestamp.TimePtr(time.Unix(0, 100)), + ReplicationTasks: []*repicationpb.ReplicationTask{}, + ExclusiveHighWatermark: 100, + ExclusiveHighWatermarkTime: timestamp.TimePtr(time.Unix(0, 100)), }, }, }, nil).AnyTimes() diff --git a/service/history/replication/stream_receiver_test.go b/service/history/replication/stream_receiver_test.go index 5a3ce223b90..8dd992621f6 100644 --- a/service/history/replication/stream_receiver_test.go +++ b/service/history/replication/stream_receiver_test.go @@ -144,8 +144,8 @@ func (s *streamReceiverSuite) TestAckMessage_SyncStatus() { s.Equal([]*adminservice.StreamWorkflowReplicationMessagesRequest{{ Attributes: &adminservice.StreamWorkflowReplicationMessagesRequest_SyncReplicationState{ SyncReplicationState: &repicationpb.SyncReplicationState{ - LastProcessedMessageId: watermarkInfo.Watermark, - LastProcessedMessageTime: timestamp.TimePtr(watermarkInfo.Timestamp), + InclusiveLowWatermark: watermarkInfo.Watermark, + InclusiveLowWatermarkTime: timestamp.TimePtr(watermarkInfo.Timestamp), }, }, }, @@ -162,9 +162,9 @@ func (s *streamReceiverSuite) TestProcessMessage_TrackSubmit() { Resp: &adminservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &repicationpb.WorkflowReplicationMessages{ - ReplicationTasks: []*repicationpb.ReplicationTask{replicationTask}, - LastTaskId: rand.Int63(), - LastTaskTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), + ReplicationTasks: []*repicationpb.ReplicationTask{replicationTask}, + ExclusiveHighWatermark: rand.Int63(), + ExclusiveHighWatermarkTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), }, }, }, @@ -175,8 +175,8 @@ func (s *streamReceiverSuite) TestProcessMessage_TrackSubmit() { s.taskTracker.EXPECT().TrackTasks(gomock.Any(), gomock.Any()).DoAndReturn( func(highWatermarkInfo WatermarkInfo, tasks ...TrackableExecutableTask) []TrackableExecutableTask { - s.Equal(streamResp.Resp.GetMessages().LastTaskId, highWatermarkInfo.Watermark) - s.Equal(*streamResp.Resp.GetMessages().LastTaskTime, highWatermarkInfo.Timestamp) + s.Equal(streamResp.Resp.GetMessages().ExclusiveHighWatermark, highWatermarkInfo.Watermark) + s.Equal(*streamResp.Resp.GetMessages().ExclusiveHighWatermarkTime, highWatermarkInfo.Timestamp) s.Equal(1, len(tasks)) s.IsType(&ExecutableUnknownTask{}, tasks[0]) return []TrackableExecutableTask{tasks[0]} diff --git a/tests/ndc/ndc_integration_test.go b/tests/ndc/ndc_integration_test.go index bc06fca1960..19242636b4f 100644 --- a/tests/ndc/ndc_integration_test.go +++ b/tests/ndc/ndc_integration_test.go @@ -124,9 +124,9 @@ func (s *nDCIntegrationTestSuite) SetupSuite() { mockStreamClient.EXPECT().Recv().Return(&adminservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &adminservice.StreamWorkflowReplicationMessagesResponse_Messages{ Messages: &repicationpb.WorkflowReplicationMessages{ - ReplicationTasks: []*repicationpb.ReplicationTask{}, - LastTaskId: 100, - LastTaskTime: timestamp.TimePtr(time.Unix(0, 100)), + ReplicationTasks: []*repicationpb.ReplicationTask{}, + ExclusiveHighWatermark: 100, + ExclusiveHighWatermarkTime: timestamp.TimePtr(time.Unix(0, 100)), }, }, }, nil).AnyTimes()