diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4c68d1a5699..4c2f1942d3f 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -2115,6 +2115,11 @@ that task will be sent to DLQ.`, false, `EnableReplicateLocalGeneratedEvents is a feature flag for replicating locally generated events`, ) + EnableReplicationTaskTieredProcessing = NewGlobalBoolSetting( + "history.EnableReplicationTaskTieredProcessing", + false, + `EnableReplicationTaskTieredProcessing is a feature flag for enabling tiered replication task processing stack`, + ) // keys for worker diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e0e52c1f25f..e3db94c6b93 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -272,6 +272,7 @@ type Config struct { EnableReplicationEagerRefreshNamespace dynamicconfig.BoolPropertyFn EnableReplicationTaskBatching dynamicconfig.BoolPropertyFn EnableReplicateLocalGeneratedEvent dynamicconfig.BoolPropertyFn + EnableReplicationTaskTieredProcessing dynamicconfig.BoolPropertyFn // The following are used by consistent query MaxBufferedQueryCount dynamicconfig.IntPropertyFn @@ -490,6 +491,7 @@ func NewConfig( EnableReplicationEagerRefreshNamespace: dynamicconfig.EnableEagerNamespaceRefresher.Get(dc), EnableReplicationTaskBatching: dynamicconfig.EnableReplicationTaskBatching.Get(dc), EnableReplicateLocalGeneratedEvent: dynamicconfig.EnableReplicateLocalGeneratedEvents.Get(dc), + EnableReplicationTaskTieredProcessing: dynamicconfig.EnableReplicationTaskTieredProcessing.Get(dc), MaximumBufferedEventsBatch: dynamicconfig.MaximumBufferedEventsBatch.Get(dc), MaximumBufferedEventsSizeInBytes: dynamicconfig.MaximumBufferedEventsSizeInBytes.Get(dc), diff --git a/service/history/handler.go b/service/history/handler.go index 71b98cd6ea5..d70d796ab9b 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -2069,6 +2069,7 @@ func (h *Handler) StreamWorkflowReplicationMessages( clientClusterName, replication.NewClusterShardKey(clientClusterShardID.ClusterID, clientClusterShardID.ShardID), replication.NewClusterShardKey(serverClusterShardID.ClusterID, serverClusterShardID.ShardID), + h.config, ) h.streamReceiverMonitor.RegisterInboundStream(streamSender) streamSender.Start() diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index a5ca70b685a..d1a14a48a7b 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -33,6 +33,8 @@ import ( "sync/atomic" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/service/history/configs" + "google.golang.org/protobuf/types/known/timestamppb" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/historyservice/v1" @@ -49,6 +51,8 @@ import ( "go.temporal.io/server/service/history/tasks" ) +const TaskMaxSkipCount int = 1000 + type ( StreamSender interface { IsValid() bool @@ -63,10 +67,12 @@ type ( metrics metrics.Handler logger log.Logger - status int32 - clientShardKey ClusterShardKey - serverShardKey ClusterShardKey - shutdownChan channel.ShutdownOnce + status int32 + clientShardKey ClusterShardKey + serverShardKey ClusterShardKey + shutdownChan channel.ShutdownOnce + config *configs.Config + isTieredStackEnabled bool } ) @@ -78,6 +84,7 @@ func NewStreamSender( clientClusterName string, clientShardKey ClusterShardKey, serverShardKey ClusterShardKey, + config *configs.Config, ) *StreamSenderImpl { logger := log.With( shardContext.GetLogger(), @@ -92,10 +99,12 @@ func NewStreamSender( metrics: shardContext.GetMetricsHandler(), logger: logger, - status: common.DaemonStatusInitialized, - clientShardKey: clientShardKey, - serverShardKey: serverShardKey, - shutdownChan: channel.NewShutdownOnce(), + status: common.DaemonStatusInitialized, + clientShardKey: clientShardKey, + serverShardKey: serverShardKey, + shutdownChan: channel.NewShutdownOnce(), + config: config, + isTieredStackEnabled: config.EnableReplicationTaskTieredProcessing(), } } @@ -107,8 +116,19 @@ func (s *StreamSenderImpl) Start() { ) { return } + getSenderEventLoop := func(priority enumsspb.TaskPriority) func() error { + return func() error { + return s.sendEventLoop(priority) + } + } + + if s.isTieredStackEnabled { + go WrapEventLoop(getSenderEventLoop(enumsspb.TASK_PRIORITY_HIGH), s.Stop, s.logger, s.metrics, s.clientShardKey, s.serverShardKey, streamReceiverMonitorInterval) + go WrapEventLoop(getSenderEventLoop(enumsspb.TASK_PRIORITY_LOW), s.Stop, s.logger, s.metrics, s.clientShardKey, s.serverShardKey, streamReceiverMonitorInterval) + } else { + go WrapEventLoop(getSenderEventLoop(enumsspb.TASK_PRIORITY_UNSPECIFIED), s.Stop, s.logger, s.metrics, s.clientShardKey, s.serverShardKey, streamReceiverMonitorInterval) + } - go WrapEventLoop(s.sendEventLoop, s.Stop, s.logger, s.metrics, s.clientShardKey, s.serverShardKey, streamReceiverMonitorInterval) go WrapEventLoop(s.recvEventLoop, s.Stop, s.logger, s.metrics, s.clientShardKey, s.serverShardKey, streamReceiverMonitorInterval) s.logger.Info("StreamSender started.") @@ -156,6 +176,10 @@ func (s *StreamSenderImpl) recvEventLoop() (retErr error) { var inclusiveLowWatermark int64 for !s.shutdownChan.IsShutdown() { + if s.isTieredStackEnabled != s.config.EnableReplicationTaskTieredProcessing() { + s.logger.Warn("ReplicationStreamError StreamSender detected tiered stack change, restart the stream") + return NewStreamError("StreamError tiered stack change, reconnect stream", serviceerror.NewInternal("tiered stack change")) + } req, err := s.server.Recv() if err != nil { s.logger.Error("ReplicationStreamError StreamSender failed to receive", tag.Error(err)) @@ -188,7 +212,7 @@ func (s *StreamSenderImpl) recvEventLoop() (retErr error) { return nil } -func (s *StreamSenderImpl) sendEventLoop() (retErr error) { +func (s *StreamSenderImpl) sendEventLoop(priority enumsspb.TaskPriority) (retErr error) { var panicErr error defer func() { if panicErr != nil { @@ -202,13 +226,14 @@ func (s *StreamSenderImpl) sendEventLoop() (retErr error) { newTaskNotificationChan, subscriberID := s.historyEngine.SubscribeReplicationNotification() defer s.historyEngine.UnsubscribeReplicationNotification(subscriberID) - catchupEndExclusiveWatermark, err := s.sendCatchUp() + catchupEndExclusiveWatermark, err := s.sendCatchUp(priority) if err != nil { s.logger.Error("ReplicationServiceError StreamSender unable to catch up replication tasks", tag.Error(err)) return err } s.logger.Debug(fmt.Sprintf("StreamSender sendCatchUp finished with catchupEndExclusiveWatermark %v", catchupEndExclusiveWatermark)) if err := s.sendLive( + priority, newTaskNotificationChan, catchupEndExclusiveWatermark, ); err != nil { @@ -221,6 +246,86 @@ func (s *StreamSenderImpl) sendEventLoop() (retErr error) { func (s *StreamSenderImpl) recvSyncReplicationState( attr *replicationspb.SyncReplicationState, ) error { + var readerState *persistencespb.QueueReaderState + switch s.isTieredStackEnabled { + case true: + if attr.HighPriorityState == nil || attr.LowPriorityState == nil { + return NewStreamError("ReplicationServiceError StreamSender encountered unsupported SyncReplicationState", nil) + } + readerState = &persistencespb.QueueReaderState{ + Scopes: []*persistencespb.QueueSliceScope{ + // index 0 is for overall low watermark. In tiered stack it is Min(LowWatermark-high priority, LowWatermark-low priority) + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(attr.GetInclusiveLowWatermark()), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + // index 1 is for high priority + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(attr.GetHighPriorityState().GetInclusiveLowWatermark()), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + // index 2 is for low priority + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(attr.GetLowPriorityState().GetInclusiveLowWatermark()), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + } + case false: + if attr.HighPriorityState != nil || attr.LowPriorityState != nil { + return NewStreamError("ReplicationServiceError StreamSender encountered unsupported SyncReplicationState", nil) + } + readerState = &persistencespb.QueueReaderState{ + Scopes: []*persistencespb.QueueSliceScope{ + // in single stack, index 0 is for overall low watermark + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(attr.GetInclusiveLowWatermark()), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + } + } + inclusiveLowWatermark := attr.GetInclusiveLowWatermark() inclusiveLowWatermarkTime := attr.GetInclusiveLowWatermarkTime() @@ -228,22 +333,7 @@ func (s *StreamSenderImpl) recvSyncReplicationState( int64(s.clientShardKey.ClusterID), s.clientShardKey.ShardID, ) - readerState := &persistencespb.QueueReaderState{ - Scopes: []*persistencespb.QueueSliceScope{{ - Range: &persistencespb.QueueSliceRange{ - InclusiveMin: shard.ConvertToPersistenceTaskKey( - tasks.NewImmediateKey(inclusiveLowWatermark), - ), - ExclusiveMax: shard.ConvertToPersistenceTaskKey( - tasks.NewImmediateKey(math.MaxInt64), - ), - }, - Predicate: &persistencespb.Predicate{ - PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, - Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, - }, - }}, - } + if err := s.shardContext.UpdateReplicationQueueReaderState( readerID, readerState, @@ -257,7 +347,7 @@ func (s *StreamSenderImpl) recvSyncReplicationState( ) } -func (s *StreamSenderImpl) sendCatchUp() (int64, error) { +func (s *StreamSenderImpl) sendCatchUp(priority enumsspb.TaskPriority) (int64, error) { readerID := shard.ReplicationReaderIDFromClusterShardID( int64(s.clientShardKey.ClusterID), s.clientShardKey.ShardID, @@ -278,10 +368,11 @@ func (s *StreamSenderImpl) sendCatchUp() (int64, error) { s.logger.Debug(fmt.Sprintf("StreamSender readerState not found, readerID %v", readerID)) catchupBeginInclusiveWatermark = catchupEndExclusiveWatermark } else { - catchupBeginInclusiveWatermark = readerState.Scopes[0].Range.InclusiveMin.TaskId + catchupBeginInclusiveWatermark = s.getSendCatchupBeginInclusiveWatermark(readerState, priority) } } if err := s.sendTasks( + priority, catchupBeginInclusiveWatermark, catchupEndExclusiveWatermark, ); err != nil { @@ -290,7 +381,35 @@ func (s *StreamSenderImpl) sendCatchUp() (int64, error) { return catchupEndExclusiveWatermark, nil } +func (s *StreamSenderImpl) getSendCatchupBeginInclusiveWatermark(readerState *persistencespb.QueueReaderState, priority enumsspb.TaskPriority) int64 { + getReaderScopesIndex := func(priority enumsspb.TaskPriority) int { + switch priority { + case enumsspb.TASK_PRIORITY_HIGH: + /* + this is to handle the case when switch from single stack to tiered stack, the reader state is still in old format. + In this case, it is safe to use the overall low watermark as the beginInclusiveWatermark, as long as we always guarantee + the overall low watermark is Min(lowPriorityLowWatermark, highPriorityLowWatermark) + */ + if len(readerState.Scopes) != 3 { + return 0 + } + return 1 + case enumsspb.TASK_PRIORITY_LOW: + if len(readerState.Scopes) != 3 { + return 0 + } + return 2 + case enumsspb.TASK_PRIORITY_UNSPECIFIED: + return 0 + default: + return 0 + } + } + return readerState.Scopes[getReaderScopesIndex(priority)].Range.InclusiveMin.TaskId +} + func (s *StreamSenderImpl) sendLive( + priority enumsspb.TaskPriority, newTaskNotificationChan <-chan struct{}, beginInclusiveWatermark int64, ) error { @@ -299,6 +418,7 @@ func (s *StreamSenderImpl) sendLive( case <-newTaskNotificationChan: endExclusiveWatermark := s.shardContext.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication).TaskID if err := s.sendTasks( + priority, beginInclusiveWatermark, endExclusiveWatermark, ); err != nil { @@ -312,6 +432,7 @@ func (s *StreamSenderImpl) sendLive( } func (s *StreamSenderImpl) sendTasks( + priority enumsspb.TaskPriority, beginInclusiveWatermark int64, endExclusiveWatermark int64, ) error { @@ -331,6 +452,7 @@ func (s *StreamSenderImpl) sendTasks( ReplicationTasks: nil, ExclusiveHighWatermark: endExclusiveWatermark, ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(), + Priority: priority, }, }, }) @@ -348,6 +470,7 @@ func (s *StreamSenderImpl) sendTasks( if err != nil { return err } + skipCount := 0 Loop: for iter.HasNext() { if s.shutdownChan.IsShutdown() { @@ -358,6 +481,27 @@ Loop: if err != nil { return err } + skipCount++ + // To avoid a situation: we are skipping a lot of tasks and never send any task, receiver side will not have updated high watermark, + // so it will not ACK back to sender, sender will not update the ACK level. + // i.e. in tiered stack, if no low priority task in queue, we should still send watermark info to receiver to let it update ACK level. + if skipCount > TaskMaxSkipCount { + if err := s.sendToStream(&historyservice.StreamWorkflowReplicationMessagesResponse{ + Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ + Messages: &replicationspb.WorkflowReplicationMessages{ + ExclusiveHighWatermark: item.GetTaskID(), + ExclusiveHighWatermarkTime: timestamppb.New(item.GetVisibilityTime()), + Priority: priority, + }, + }, + }); err != nil { + return err + } + } + if priority != enumsspb.TASK_PRIORITY_UNSPECIFIED && // case: skip priority check. When priority is unspecified, send all tasks + priority != s.getTaskPriority(item) { // case: skip task with different priority than this loop + continue Loop + } task, err := s.taskConverter.Convert(item) if err != nil { return err @@ -365,6 +509,7 @@ Loop: if task == nil { continue Loop } + task.Priority = priority s.logger.Debug("StreamSender send replication task", tag.TaskID(task.SourceTaskId)) if err := s.sendToStream(&historyservice.StreamWorkflowReplicationMessagesResponse{ Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ @@ -372,11 +517,13 @@ Loop: ReplicationTasks: []*replicationspb.ReplicationTask{task}, ExclusiveHighWatermark: task.SourceTaskId + 1, ExclusiveHighWatermarkTime: task.VisibilityTime, + Priority: priority, }, }, }); err != nil { return err } + skipCount = 0 metrics.ReplicationTasksSend.With(s.metrics).Record( int64(1), metrics.FromClusterIDTag(s.serverShardKey.ClusterID), @@ -390,6 +537,7 @@ Loop: ReplicationTasks: nil, ExclusiveHighWatermark: endExclusiveWatermark, ExclusiveHighWatermarkTime: timestamp.TimeNowPtrUtc(), + Priority: priority, }, }, }) @@ -403,3 +551,15 @@ func (s *StreamSenderImpl) sendToStream(payload *historyservice.StreamWorkflowRe } return nil } + +func (s *StreamSenderImpl) getTaskPriority(task tasks.Task) enumsspb.TaskPriority { + switch t := task.(type) { + case *tasks.SyncWorkflowStateTask: + if t.Priority == enumsspb.TASK_PRIORITY_UNSPECIFIED { + return enumsspb.TASK_PRIORITY_LOW + } + return t.Priority + default: + return enumsspb.TASK_PRIORITY_HIGH + } +} diff --git a/service/history/replication/stream_sender_test.go b/service/history/replication/stream_sender_test.go index e3431fe3945..0bb42a52a14 100644 --- a/service/history/replication/stream_sender_test.go +++ b/service/history/replication/stream_sender_test.go @@ -34,6 +34,8 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/tests" "google.golang.org/protobuf/types/known/timestamppb" enumsspb "go.temporal.io/server/api/enums/v1" @@ -65,6 +67,7 @@ type ( serverShardKey ClusterShardKey streamSender *StreamSenderImpl + config *configs.Config } ) @@ -87,6 +90,7 @@ func (s *streamSenderSuite) SetupTest() { s.shardContext = shard.NewMockContext(s.controller) s.historyEngine = shard.NewMockEngine(s.controller) s.taskConverter = NewMockSourceTaskConverter(s.controller) + s.config = tests.NewDynamicConfig() s.clientShardKey = NewClusterShardKey(rand.Int31(), rand.Int31()) s.serverShardKey = NewClusterShardKey(rand.Int31(), rand.Int31()) @@ -102,6 +106,7 @@ func (s *streamSenderSuite) SetupTest() { "target_cluster", s.clientShardKey, s.serverShardKey, + s.config, ) } @@ -109,7 +114,7 @@ func (s *streamSenderSuite) TearDownTest() { s.controller.Finish() } -func (s *streamSenderSuite) TestRecvSyncReplicationState_Success() { +func (s *streamSenderSuite) TestRecvSyncReplicationState_SingleStack_Success() { readerID := shard.ReplicationReaderIDFromClusterShardID( int64(s.clientShardKey.ClusterID), s.clientShardKey.ShardID, @@ -148,7 +153,7 @@ func (s *streamSenderSuite) TestRecvSyncReplicationState_Success() { s.NoError(err) } -func (s *streamSenderSuite) TestRecvSyncReplicationState_Error() { +func (s *streamSenderSuite) TestRecvSyncReplicationState_SingleStack_Error() { readerID := shard.ReplicationReaderIDFromClusterShardID( int64(s.clientShardKey.ClusterID), s.clientShardKey.ShardID, @@ -190,7 +195,170 @@ func (s *streamSenderSuite) TestRecvSyncReplicationState_Error() { s.Equal(ownershipLost, err) } -func (s *streamSenderSuite) TestSendCatchUp() { +func (s *streamSenderSuite) TestRecvSyncReplicationState_TieredStack_Success() { + s.streamSender.isTieredStackEnabled = true + readerID := shard.ReplicationReaderIDFromClusterShardID( + int64(s.clientShardKey.ClusterID), + s.clientShardKey.ShardID, + ) + inclusiveWatermark := int64(1234) + timestamp := timestamppb.New(time.Unix(0, rand.Int63())) + replicationState := &replicationspb.SyncReplicationState{ + InclusiveLowWatermark: inclusiveWatermark, + InclusiveLowWatermarkTime: timestamp, + HighPriorityState: &replicationspb.ReplicationState{ + InclusiveLowWatermark: inclusiveWatermark, + InclusiveLowWatermarkTime: timestamp, + }, + LowPriorityState: &replicationspb.ReplicationState{ + InclusiveLowWatermark: inclusiveWatermark + 10, + InclusiveLowWatermarkTime: timestamp, + }, + } + + s.shardContext.EXPECT().UpdateReplicationQueueReaderState( + readerID, + &persistencespb.QueueReaderState{ + Scopes: []*persistencespb.QueueSliceScope{ + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.HighPriorityState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.LowPriorityState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + }, + ).Return(nil) + s.shardContext.EXPECT().UpdateRemoteReaderInfo( + readerID, + replicationState.InclusiveLowWatermark-1, + replicationState.InclusiveLowWatermarkTime.AsTime(), + ).Return(nil) + + err := s.streamSender.recvSyncReplicationState(replicationState) + s.NoError(err) +} + +func (s *streamSenderSuite) TestRecvSyncReplicationState_TieredStack_Error() { + s.streamSender.isTieredStackEnabled = true + readerID := shard.ReplicationReaderIDFromClusterShardID( + int64(s.clientShardKey.ClusterID), + s.clientShardKey.ShardID, + ) + inclusiveWatermark := int64(1234) + timestamp := timestamppb.New(time.Unix(0, rand.Int63())) + replicationState := &replicationspb.SyncReplicationState{ + InclusiveLowWatermark: inclusiveWatermark, + InclusiveLowWatermarkTime: timestamp, + HighPriorityState: &replicationspb.ReplicationState{ + InclusiveLowWatermark: inclusiveWatermark, + InclusiveLowWatermarkTime: timestamp, + }, + LowPriorityState: &replicationspb.ReplicationState{ + InclusiveLowWatermark: inclusiveWatermark + 10, + InclusiveLowWatermarkTime: timestamp, + }, + } + + var ownershipLost error + if rand.Float64() < 0.5 { + ownershipLost = &persistence.ShardOwnershipLostError{} + } else { + ownershipLost = serviceerrors.NewShardOwnershipLost("", "") + } + + s.shardContext.EXPECT().UpdateReplicationQueueReaderState( + readerID, + &persistencespb.QueueReaderState{ + Scopes: []*persistencespb.QueueSliceScope{ + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.HighPriorityState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(replicationState.LowPriorityState.InclusiveLowWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + }, + ).Return(ownershipLost) + + err := s.streamSender.recvSyncReplicationState(replicationState) + s.Error(err) + s.Equal(ownershipLost, err) +} + +func (s *streamSenderSuite) TestSendCatchUp_SingleStack() { readerID := shard.ReplicationReaderIDFromClusterShardID( int64(s.clientShardKey.ClusterID), s.clientShardKey.ShardID, @@ -241,12 +409,172 @@ func (s *streamSenderSuite) TestSendCatchUp() { return nil }) - taskID, err := s.streamSender.sendCatchUp() + taskID, err := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_UNSPECIFIED) s.NoError(err) s.Equal(endExclusiveWatermark, taskID) } -func (s *streamSenderSuite) TestSendCatchUp_NoReaderState() { +func (s *streamSenderSuite) TestSendCatchUp_TieredStack_SingleReaderScope() { + s.streamSender.isTieredStackEnabled = true + readerID := shard.ReplicationReaderIDFromClusterShardID( + int64(s.clientShardKey.ClusterID), + s.clientShardKey.ShardID, + ) + beginInclusiveWatermark := rand.Int63() + endExclusiveWatermark := beginInclusiveWatermark + 1 + s.shardContext.EXPECT().GetQueueState( + tasks.CategoryReplication, + ).Return(&persistencespb.QueueState{ + ExclusiveReaderHighWatermark: nil, + ReaderStates: map[int64]*persistencespb.QueueReaderState{ + readerID: { + Scopes: []*persistencespb.QueueSliceScope{{ // only has one scope + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(beginInclusiveWatermark), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }}, + }, + }, + }, true).Times(2) + s.shardContext.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication).Return( + tasks.NewImmediateKey(endExclusiveWatermark), + ).Times(2) + + iter := collection.NewPagingIterator[tasks.Task]( + func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{}, nil, nil + }, + ) + s.historyEngine.EXPECT().GetReplicationTasksIter( + gomock.Any(), + string(s.clientShardKey.ClusterID), + beginInclusiveWatermark, + endExclusiveWatermark, + ).Return(iter, nil).Times(2) + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }).Times(2) + + highPriorityCatchupTaskID, highPriorityCatchupErr := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_HIGH) + lowPriorityCatchupTaskID, lowPriorityCatchupErr := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_LOW) + s.NoError(highPriorityCatchupErr) + s.Equal(endExclusiveWatermark, highPriorityCatchupTaskID) + s.NoError(lowPriorityCatchupErr) + s.Equal(endExclusiveWatermark, lowPriorityCatchupTaskID) +} + +func (s *streamSenderSuite) TestSendCatchUp_TieredStack_TieredReaderScope() { + s.streamSender.isTieredStackEnabled = true + readerID := shard.ReplicationReaderIDFromClusterShardID( + int64(s.clientShardKey.ClusterID), + s.clientShardKey.ShardID, + ) + beginInclusiveWatermarkHighPriority := rand.Int63() + endExclusiveWatermark := beginInclusiveWatermarkHighPriority + 1 + beginInclusiveWatermarkLowPriority := beginInclusiveWatermarkHighPriority - 100 + s.shardContext.EXPECT().GetQueueState( + tasks.CategoryReplication, + ).Return(&persistencespb.QueueState{ + ExclusiveReaderHighWatermark: nil, + ReaderStates: map[int64]*persistencespb.QueueReaderState{ + readerID: { + Scopes: []*persistencespb.QueueSliceScope{ + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(beginInclusiveWatermarkLowPriority), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(beginInclusiveWatermarkHighPriority), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + { + Range: &persistencespb.QueueSliceRange{ + InclusiveMin: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(beginInclusiveWatermarkLowPriority), + ), + ExclusiveMax: shard.ConvertToPersistenceTaskKey( + tasks.NewImmediateKey(math.MaxInt64), + ), + }, + Predicate: &persistencespb.Predicate{ + PredicateType: enumsspb.PREDICATE_TYPE_UNIVERSAL, + Attributes: &persistencespb.Predicate_UniversalPredicateAttributes{}, + }, + }, + }, + }, + }, + }, true).Times(2) + s.shardContext.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication).Return( + tasks.NewImmediateKey(endExclusiveWatermark), + ).Times(2) + + iter := collection.NewPagingIterator[tasks.Task]( + func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{}, nil, nil + }, + ) + + s.historyEngine.EXPECT().GetReplicationTasksIter( + gomock.Any(), + string(s.clientShardKey.ClusterID), + beginInclusiveWatermarkHighPriority, + endExclusiveWatermark, + ).Return(iter, nil).Times(1) + + s.historyEngine.EXPECT().GetReplicationTasksIter( + gomock.Any(), + string(s.clientShardKey.ClusterID), + beginInclusiveWatermarkLowPriority, + endExclusiveWatermark, + ).Return(iter, nil).Times(1) + + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }).Times(2) + + lowPriorityCatchupTaskID, lowPriorityCatchupErr := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_LOW) + highPriorityCatchupTaskID, highPriorityCatchupErr := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_HIGH) + s.NoError(highPriorityCatchupErr) + s.Equal(endExclusiveWatermark, highPriorityCatchupTaskID) + s.NoError(lowPriorityCatchupErr) + s.Equal(endExclusiveWatermark, lowPriorityCatchupTaskID) +} + +func (s *streamSenderSuite) TestSendCatchUp_SingleStack_NoReaderState() { endExclusiveWatermark := int64(1234) s.shardContext.EXPECT().GetQueueState( tasks.CategoryReplication, @@ -264,12 +592,39 @@ func (s *streamSenderSuite) TestSendCatchUp_NoReaderState() { return nil }) - taskID, err := s.streamSender.sendCatchUp() + taskID, err := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_UNSPECIFIED) + s.NoError(err) + s.Equal(endExclusiveWatermark, taskID) +} + +func (s *streamSenderSuite) TestSendCatchUp_TieredStack_NoReaderState() { + s.streamSender.isTieredStackEnabled = true + endExclusiveWatermark := int64(1234) + s.shardContext.EXPECT().GetQueueState( + tasks.CategoryReplication, + ).Return(&persistencespb.QueueState{ + ExclusiveReaderHighWatermark: nil, + ReaderStates: map[int64]*persistencespb.QueueReaderState{}, + }, true).Times(2) + s.shardContext.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication).Return( + tasks.NewImmediateKey(endExclusiveWatermark), + ).Times(2) + + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }).Times(2) + + taskID, err := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_HIGH) + s.NoError(err) + s.Equal(endExclusiveWatermark, taskID) + taskID, err = s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_LOW) s.NoError(err) s.Equal(endExclusiveWatermark, taskID) } -func (s *streamSenderSuite) TestSendCatchUp_NoQueueState() { +func (s *streamSenderSuite) TestSendCatchUp_SingleStack_NoQueueState() { endExclusiveWatermark := int64(1234) s.shardContext.EXPECT().GetQueueState( tasks.CategoryReplication, @@ -284,7 +639,7 @@ func (s *streamSenderSuite) TestSendCatchUp_NoQueueState() { return nil }) - taskID, err := s.streamSender.sendCatchUp() + taskID, err := s.streamSender.sendCatchUp(enumsspb.TASK_PRIORITY_UNSPECIFIED) s.NoError(err) s.Equal(endExclusiveWatermark, taskID) } @@ -340,6 +695,7 @@ func (s *streamSenderSuite) TestSendLive() { s.streamSender.shutdownChan.Shutdown() }() err := s.streamSender.sendLive( + enumsspb.TASK_PRIORITY_UNSPECIFIED, channel, watermark0, ) @@ -358,6 +714,7 @@ func (s *streamSenderSuite) TestSendTasks_Noop() { }) err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_UNSPECIFIED, beginInclusiveWatermark, endExclusiveWatermark, ) @@ -386,6 +743,7 @@ func (s *streamSenderSuite) TestSendTasks_WithoutTasks() { }) err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_UNSPECIFIED, beginInclusiveWatermark, endExclusiveWatermark, ) @@ -448,6 +806,140 @@ func (s *streamSenderSuite) TestSendTasks_WithTasks() { ) err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_UNSPECIFIED, + beginInclusiveWatermark, + endExclusiveWatermark, + ) + s.NoError(err) +} + +func (s *streamSenderSuite) TestSendTasks_TieredStack_HighPriority() { + beginInclusiveWatermark := rand.Int63() + endExclusiveWatermark := beginInclusiveWatermark + 100 + item0 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_LOW, + } + + item1 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_HIGH, + } + item2 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_LOW, + } + task1 := &replicationspb.ReplicationTask{ + SourceTaskId: beginInclusiveWatermark, + VisibilityTime: timestamppb.New(time.Unix(0, rand.Int63())), + Priority: enumsspb.TASK_PRIORITY_HIGH, + } + + iter := collection.NewPagingIterator[tasks.Task]( + func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{item0, item1, item2}, nil, nil + }, + ) + s.historyEngine.EXPECT().GetReplicationTasksIter( + gomock.Any(), + string(s.clientShardKey.ClusterID), + beginInclusiveWatermark, + endExclusiveWatermark, + ).Return(iter, nil) + s.taskConverter.EXPECT().Convert(item1).Return(task1, nil) + + gomock.InOrder( + s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ + Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ + Messages: &replicationspb.WorkflowReplicationMessages{ + ReplicationTasks: []*replicationspb.ReplicationTask{task1}, + ExclusiveHighWatermark: task1.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task1.VisibilityTime, + Priority: enumsspb.TASK_PRIORITY_HIGH, + }, + }, + }).Return(nil), + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.Equal(enumsspb.TASK_PRIORITY_HIGH, resp.GetMessages().Priority) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }), + ) + + err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_HIGH, + beginInclusiveWatermark, + endExclusiveWatermark, + ) + s.NoError(err) +} + +func (s *streamSenderSuite) TestSendTasks_TieredStack_LowPriority() { + beginInclusiveWatermark := rand.Int63() + endExclusiveWatermark := beginInclusiveWatermark + 100 + item0 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_LOW, + } + item1 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_HIGH, + } + item2 := &tasks.SyncWorkflowStateTask{ + Priority: enumsspb.TASK_PRIORITY_LOW, + } + + task0 := &replicationspb.ReplicationTask{ + SourceTaskId: beginInclusiveWatermark, + VisibilityTime: timestamppb.New(time.Unix(0, rand.Int63())), + Priority: enumsspb.TASK_PRIORITY_LOW, + } + task2 := &replicationspb.ReplicationTask{ + SourceTaskId: beginInclusiveWatermark, + VisibilityTime: timestamppb.New(time.Unix(0, rand.Int63())), + Priority: enumsspb.TASK_PRIORITY_LOW, + } + iter := collection.NewPagingIterator[tasks.Task]( + func(paginationToken []byte) ([]tasks.Task, []byte, error) { + return []tasks.Task{item0, item1, item2}, nil, nil + }, + ) + s.historyEngine.EXPECT().GetReplicationTasksIter( + gomock.Any(), + string(s.clientShardKey.ClusterID), + beginInclusiveWatermark, + endExclusiveWatermark, + ).Return(iter, nil) + s.taskConverter.EXPECT().Convert(item0).Return(task0, nil) + s.taskConverter.EXPECT().Convert(item0).Return(task2, nil) + + gomock.InOrder( + s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ + Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ + Messages: &replicationspb.WorkflowReplicationMessages{ + ReplicationTasks: []*replicationspb.ReplicationTask{task0}, + ExclusiveHighWatermark: task0.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task0.VisibilityTime, + Priority: enumsspb.TASK_PRIORITY_LOW, + }, + }, + }).Return(nil), + s.server.EXPECT().Send(&historyservice.StreamWorkflowReplicationMessagesResponse{ + Attributes: &historyservice.StreamWorkflowReplicationMessagesResponse_Messages{ + Messages: &replicationspb.WorkflowReplicationMessages{ + ReplicationTasks: []*replicationspb.ReplicationTask{task2}, + ExclusiveHighWatermark: task2.SourceTaskId + 1, + ExclusiveHighWatermarkTime: task2.VisibilityTime, + Priority: enumsspb.TASK_PRIORITY_LOW, + }, + }, + }).Return(nil), + s.server.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *historyservice.StreamWorkflowReplicationMessagesResponse) error { + s.Equal(endExclusiveWatermark, resp.GetMessages().ExclusiveHighWatermark) + s.Equal(enumsspb.TASK_PRIORITY_LOW, resp.GetMessages().Priority) + s.NotNil(resp.GetMessages().ExclusiveHighWatermarkTime) + return nil + }), + ) + + err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_LOW, beginInclusiveWatermark, endExclusiveWatermark, ) @@ -458,7 +950,7 @@ func (s *streamSenderSuite) TestSendEventLoop_Panic_ShouldCaptureAsError() { s.historyEngine.EXPECT().SubscribeReplicationNotification().Do(func() { panic("panic") }) - err := s.streamSender.sendEventLoop() + err := s.streamSender.sendEventLoop(enumsspb.TASK_PRIORITY_UNSPECIFIED) s.Error(err) // panic is captured as error } @@ -479,6 +971,7 @@ func (s *streamSenderSuite) TestSendEventLoop_StreamSendError_ShouldReturnStream }) err := s.streamSender.sendTasks( + enumsspb.TASK_PRIORITY_UNSPECIFIED, beginInclusiveWatermark, endExclusiveWatermark, )