From b6525f17964f8bc8f550746f87f71a205cb1ff66 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Mon, 1 May 2023 18:35:55 -0700 Subject: [PATCH] Remove unused matching DB layer impl (#4141) --- service/matching/db_task_manager.go | 443 ---------------- service/matching/db_task_manager_test.go | 403 -------------- service/matching/db_task_queue_ownership.go | 343 ------------ .../matching/db_task_queue_ownership_mock.go | 161 ------ .../matching/db_task_queue_ownership_test.go | 476 ----------------- service/matching/db_task_reader.go | 173 ------- service/matching/db_task_reader_mock.go | 102 ---- service/matching/db_task_reader_test.go | 490 ------------------ service/matching/db_task_writer.go | 152 ------ service/matching/db_task_writer_mock.go | 100 ---- service/matching/db_task_writer_test.go | 277 ---------- 11 files changed, 3120 deletions(-) delete mode 100644 service/matching/db_task_manager.go delete mode 100644 service/matching/db_task_manager_test.go delete mode 100644 service/matching/db_task_queue_ownership.go delete mode 100644 service/matching/db_task_queue_ownership_mock.go delete mode 100644 service/matching/db_task_queue_ownership_test.go delete mode 100644 service/matching/db_task_reader.go delete mode 100644 service/matching/db_task_reader_mock.go delete mode 100644 service/matching/db_task_reader_test.go delete mode 100644 service/matching/db_task_writer.go delete mode 100644 service/matching/db_task_writer_mock.go delete mode 100644 service/matching/db_task_writer_test.go diff --git a/service/matching/db_task_manager.go b/service/matching/db_task_manager.go deleted file mode 100644 index bb4984693d2..00000000000 --- a/service/matching/db_task_manager.go +++ /dev/null @@ -1,443 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "sync" - "sync/atomic" - "time" - - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - enumsspb "go.temporal.io/server/api/enums/v1" - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common" - "go.temporal.io/server/common/backoff" - "go.temporal.io/server/common/future" - "go.temporal.io/server/common/headers" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/log/tag" - "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/service/worker/scanner/taskqueue" -) - -const ( - dbTaskInitialRangeID = 1 - dbTaskStickyTaskQueueTTL = 24 * time.Hour - - dbTaskFlushInterval = 24 * time.Millisecond - - dbTaskDeletionInterval = 10 * time.Second - - dbTaskUpdateAckInterval = time.Minute - dbTaskUpdateQueueInterval = time.Minute -) - -var ( - errDBTaskManagerNotReady = serviceerror.NewUnavailable("dbTaskManager is not ready") -) - -type ( - taskQueueOwnershipProviderFn func() dbTaskQueueOwnership - taskReaderProviderFn func(ownership dbTaskQueueOwnership) dbTaskReader - taskWriterProviderFn func(ownership dbTaskQueueOwnership) dbTaskWriter - - dbTaskManager struct { - status int32 - taskQueueKey persistence.TaskQueueKey - taskQueueKind enumspb.TaskQueueKind - taskIDRangeSize int64 - taskQueueOwnershipProvider taskQueueOwnershipProviderFn - taskReaderProvider taskReaderProviderFn - taskWriterProvider taskWriterProviderFn - dispatchTaskFn func(context.Context, *internalTask) error - store persistence.TaskManager - namespaceRegistry namespace.Registry - logger log.Logger - - dispatchChan chan struct{} - startupChan chan struct{} - shutdownChan chan struct{} - - dispatchBackoffTimerLock sync.Mutex - dispatchBackoffTimer *time.Timer - dispatchRetrier backoff.Retrier - - taskQueueOwnership dbTaskQueueOwnership - taskReader dbTaskReader - taskWriter dbTaskWriter - maxDeletedTaskIDInclusive int64 // in mem only - } -) - -func newDBTaskManager( - taskQueueKey persistence.TaskQueueKey, - taskQueueKind enumspb.TaskQueueKind, - taskIDRangeSize int64, - dispatchTaskFn func(context.Context, *internalTask) error, - store persistence.TaskManager, - namespaceRegistry namespace.Registry, - logger log.Logger, -) *dbTaskManager { - return &dbTaskManager{ - status: common.DaemonStatusInitialized, - taskQueueKey: taskQueueKey, - taskQueueKind: taskQueueKind, - taskIDRangeSize: taskIDRangeSize, - taskQueueOwnershipProvider: func() dbTaskQueueOwnership { - return newDBTaskQueueOwnership( - taskQueueKey, - taskQueueKind, - taskIDRangeSize, - store, - logger, - ) - }, - taskReaderProvider: func(taskQueueOwnership dbTaskQueueOwnership) dbTaskReader { - return newDBTaskReader( - taskQueueKey, - store, - taskQueueOwnership.getAckedTaskID(), - logger, - ) - }, - taskWriterProvider: func(taskQueueOwnership dbTaskQueueOwnership) dbTaskWriter { - return newDBTaskWriter( - taskQueueKey, - taskQueueOwnership, - logger, - ) - }, - dispatchTaskFn: dispatchTaskFn, - store: store, - namespaceRegistry: namespaceRegistry, - logger: logger, - - dispatchChan: make(chan struct{}, 1), - startupChan: make(chan struct{}), - shutdownChan: make(chan struct{}), - - dispatchRetrier: backoff.NewRetrier( - common.CreateReadTaskRetryPolicy(), - backoff.SystemClock, - ), - - taskQueueOwnership: nil, - taskWriter: nil, - taskReader: nil, - maxDeletedTaskIDInclusive: 0, - } -} - -func (d *dbTaskManager) Start() { - if !atomic.CompareAndSwapInt32( - &d.status, - common.DaemonStatusInitialized, - common.DaemonStatusStarted, - ) { - return - } - - d.signalDispatch() - - // TODO: consider using goro.Group - go d.acquireLoop(context.Background()) - go d.writerEventLoop(context.Background()) - go d.readerEventLoop(context.Background()) -} - -func (d *dbTaskManager) Stop() { - if !atomic.CompareAndSwapInt32( - &d.status, - common.DaemonStatusStarted, - common.DaemonStatusStopped, - ) { - return - } - - close(d.shutdownChan) -} - -func (d *dbTaskManager) isStopped() bool { - return atomic.LoadInt32(&d.status) == common.DaemonStatusStopped -} - -func (d *dbTaskManager) acquireLoop( - ctx context.Context, -) { - defer close(d.startupChan) - ctx = d.initContext(ctx) - -AcquireLoop: - for !d.isStopped() { - err := d.acquireOwnership(ctx) - if err == nil { - break AcquireLoop - } - if !common.IsPersistenceTransientError(err) { - d.Stop() - break AcquireLoop - } - time.Sleep(2 * time.Second) - } -} - -func (d *dbTaskManager) writerEventLoop( - ctx context.Context, -) { - <-d.startupChan - ctx = d.initContext(ctx) - - updateQueueTicker := time.NewTicker(dbTaskUpdateQueueInterval) - defer updateQueueTicker.Stop() - // TODO we should impl a more efficient method to - // buffer & wait for max duration - // right now simply just flush every dbTaskFlushInterval - flushTicker := time.NewTicker(dbTaskFlushInterval) - defer flushTicker.Stop() - - for { - if d.isStopped() { - return - } - - select { - case <-d.shutdownChan: - return - case <-d.taskQueueOwnership.getShutdownChan(): - d.Stop() - - case <-updateQueueTicker.C: - d.persistTaskQueue(ctx) - case <-flushTicker.C: - d.taskWriter.flushTasks(ctx) - d.signalDispatch() - case <-d.taskWriter.notifyFlushChan(): - d.taskWriter.flushTasks(ctx) - d.signalDispatch() - } - } -} - -func (d *dbTaskManager) readerEventLoop( - ctx context.Context, -) { - <-d.startupChan - ctx = d.initContext(ctx) - - updateAckTicker := time.NewTicker(dbTaskUpdateAckInterval) - defer updateAckTicker.Stop() - - dbTaskAckTicker := time.NewTicker(dbTaskDeletionInterval) - defer dbTaskAckTicker.Stop() - - for { - if d.isStopped() { - return - } - - select { - case <-d.shutdownChan: - return - case <-d.taskQueueOwnership.getShutdownChan(): - d.Stop() - - case <-updateAckTicker.C: - d.updateAckTaskID() - case <-dbTaskAckTicker.C: - d.deleteAckedTasks(ctx) - case <-d.dispatchChan: - d.readAndDispatchTasks(ctx) - } - } -} - -func (d *dbTaskManager) acquireOwnership( - ctx context.Context, -) error { - taskQueueOwnership := d.taskQueueOwnershipProvider() - if err := taskQueueOwnership.takeTaskQueueOwnership(ctx); err != nil { - return err - } - d.taskReader = d.taskReaderProvider(taskQueueOwnership) - d.taskWriter = d.taskWriterProvider(taskQueueOwnership) - d.maxDeletedTaskIDInclusive = taskQueueOwnership.getAckedTaskID() - d.taskQueueOwnership = taskQueueOwnership - return nil -} - -func (d *dbTaskManager) signalDispatch() { - select { - case d.dispatchChan <- struct{}{}: - default: // channel already has an event, don't block - } -} - -func (d *dbTaskManager) BufferAndWriteTask( - task *persistencespb.TaskInfo, -) dbTaskWriterFuture { - select { - case <-d.startupChan: - if d.isStopped() { - return future.NewReadyFuture(struct{}{}, errDBTaskManagerNotReady) - } - return d.taskWriter.appendTask(task) - default: - return future.NewReadyFuture(struct{}{}, errDBTaskManagerNotReady) - } -} - -func (d *dbTaskManager) readAndDispatchTasks( - ctx context.Context, -) { - iter := d.taskReader.taskIterator(ctx, d.taskQueueOwnership.getLastAllocatedTaskID()) - for iter.HasNext() { - task, err := iter.Next() - if err != nil { - d.logger.Error("dbTaskManager encountered error when fetching tasks", tag.Error(err)) - if common.IsResourceExhausted(err) { - d.backoffDispatch(taskReaderThrottleRetryDelay) - } else { - d.backoffDispatch(d.dispatchRetrier.NextBackOff()) - } - return - } - - d.mustDispatch(task) - } -} - -func (d *dbTaskManager) mustDispatch( - task *persistencespb.AllocatedTaskInfo, -) { - for !d.isStopped() { - if taskqueue.IsTaskExpired(task) { - d.taskReader.ackTask(task.TaskId) - return - } - - err := d.dispatchTaskFn(context.Background(), newInternalTask( - task, - d.finishTask, - enumsspb.TASK_SOURCE_DB_BACKLOG, - "", - false, - )) - if err == nil { - return - } - d.logger.Error("dbTaskManager unable to dispatch task", tag.Task(task), tag.Error(err)) - } -} - -func (d *dbTaskManager) updateAckTaskID() { - ackedTaskID := d.taskReader.moveAckedTaskID() - d.taskQueueOwnership.updateAckedTaskID(ackedTaskID) -} - -func (d *dbTaskManager) deleteAckedTasks( - ctx context.Context, -) { - ackedTaskID := d.taskQueueOwnership.getAckedTaskID() - if ackedTaskID <= d.maxDeletedTaskIDInclusive { - return - } - _, err := d.store.CompleteTasksLessThan(ctx, &persistence.CompleteTasksLessThanRequest{ - NamespaceID: d.taskQueueKey.NamespaceID, - TaskQueueName: d.taskQueueKey.TaskQueueName, - TaskType: d.taskQueueKey.TaskQueueType, - ExclusiveMaxTaskID: ackedTaskID + 1, - Limit: 100000, // TODO @wxing1292 why delete with limit? history service is not doing similar thing - }) - if err != nil { - d.logger.Error("dbTaskManager encountered task deletion error", tag.Error(err)) - return - } - d.maxDeletedTaskIDInclusive = ackedTaskID -} - -func (d *dbTaskManager) persistTaskQueue( - ctx context.Context, -) { - err := d.taskQueueOwnership.persistTaskQueue(ctx) - if err != nil { - d.logger.Error("dbTaskManager encountered unknown error", tag.Error(err)) - } -} - -func (d *dbTaskManager) finishTask( - info *persistencespb.AllocatedTaskInfo, - err error, -) { - if err == nil { - d.taskReader.ackTask(info.TaskId) - return - } - - // TODO @wxing1292 logic below is subject to discussion - // NOTE: logic below is legacy logic, which will move task with error - // to the end of the queue for later retry - // - // failed to start the task. - // We cannot just remove it from persistence because then it will be lost. - // We handle this by writing the task back to persistence with a higher taskID. - // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up - // again the underlying reason for failing to start will be resolved. - // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be - // re-written to persistence frequently. - _, err = d.BufferAndWriteTask(info.Data).Get(context.Background()) - if err != nil { - d.logger.Error("dbTaskManager encountered error when moving task to end of task queue", - tag.Error(err), - tag.WorkflowTaskQueueName(d.taskQueueKey.TaskQueueName), - tag.WorkflowTaskQueueType(d.taskQueueKey.TaskQueueType)) - d.Stop() - return - } - d.taskReader.ackTask(info.TaskId) -} - -func (d *dbTaskManager) backoffDispatch(duration time.Duration) { - d.dispatchBackoffTimerLock.Lock() - defer d.dispatchBackoffTimerLock.Unlock() - - if d.dispatchBackoffTimer == nil { - d.dispatchBackoffTimer = time.AfterFunc(duration, func() { - d.dispatchBackoffTimerLock.Lock() - defer d.dispatchBackoffTimerLock.Unlock() - - d.signalDispatch() // re-enqueue the event - d.dispatchBackoffTimer = nil - }) - } -} - -func (d *dbTaskManager) initContext(ctx context.Context) context.Context { - namespace, _ := d.namespaceRegistry.GetNamespaceName(namespace.ID(d.taskQueueKey.NamespaceID)) - return headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace.String())) -} diff --git a/service/matching/db_task_manager_test.go b/service/matching/db_task_manager_test.go deleted file mode 100644 index 9c24e9d45ff..00000000000 --- a/service/matching/db_task_manager_test.go +++ /dev/null @@ -1,403 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/collection" - "go.temporal.io/server/common/future" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/namespace" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/primitives/timestamp" -) - -type ( - dbTaskManagerSuite struct { - *require.Assertions - suite.Suite - - controller *gomock.Controller - taskQueueOwnership *MockdbTaskQueueOwnership - taskWriter *MockdbTaskWriter - taskReader *MockdbTaskReader - store *persistence.MockTaskManager - namespaceRegistry *namespace.MockRegistry - ackedTaskID int64 - lastAllocatedTaskID int64 - dispatchTaskFn func(context.Context, *internalTask) error - - namespaceID string - taskQueueName string - taskQueueType enumspb.TaskQueueType - taskQueueKind enumspb.TaskQueueKind - taskIDRangeSize int64 - - dbTaskManager *dbTaskManager - } -) - -func TestDBTaskManagerSuite(t *testing.T) { - s := new(dbTaskManagerSuite) - suite.Run(t, s) -} - -func (s *dbTaskManagerSuite) SetupSuite() { - rand.Seed(time.Now().UnixNano()) -} - -func (s *dbTaskManagerSuite) TearDownSuite() { - -} - -func (s *dbTaskManagerSuite) SetupTest() { - s.Assertions = require.New(s.T()) - - logger := log.NewTestLogger() - s.controller = gomock.NewController(s.T()) - s.taskQueueOwnership = NewMockdbTaskQueueOwnership(s.controller) - s.taskWriter = NewMockdbTaskWriter(s.controller) - s.taskReader = NewMockdbTaskReader(s.controller) - s.store = persistence.NewMockTaskManager(s.controller) - s.namespaceRegistry = namespace.NewMockRegistry(s.controller) - s.ackedTaskID = rand.Int63() - s.lastAllocatedTaskID = s.ackedTaskID + 100 - s.dispatchTaskFn = func(context.Context, *internalTask) error { - panic("unexpected call to dispatch function") - } - - s.namespaceID = uuid.New().String() - s.taskQueueName = uuid.New().String() - s.taskQueueType = enumspb.TASK_QUEUE_TYPE_WORKFLOW - s.taskQueueKind = enumspb.TASK_QUEUE_KIND_STICKY - s.taskIDRangeSize = rand.Int63() - - s.namespaceRegistry.EXPECT().GetNamespaceName(namespace.ID(s.namespaceID)). - Return(namespace.Name("namespaceName"), nil).AnyTimes() - - s.dbTaskManager = newDBTaskManager( - persistence.TaskQueueKey{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskQueueType: s.taskQueueType, - }, - s.taskQueueKind, - s.taskIDRangeSize, - s.dispatchTaskFn, - s.store, - s.namespaceRegistry, - logger, - ) - s.dbTaskManager.taskQueueOwnershipProvider = func() dbTaskQueueOwnership { - return s.taskQueueOwnership - } - s.dbTaskManager.taskReaderProvider = func(_ dbTaskQueueOwnership) dbTaskReader { - return s.taskReader - } - s.dbTaskManager.taskWriterProvider = func(_ dbTaskQueueOwnership) dbTaskWriter { - return s.taskWriter - } -} - -func (s *dbTaskManagerSuite) TearDownTest() { - s.dbTaskManager.Stop() - s.controller.Finish() -} - -func (s *dbTaskManagerSuite) TestAcquireOwnership_Success() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - s.NotNil(s.dbTaskManager.taskWriter) - s.NotNil(s.dbTaskManager.taskReader) - s.Equal(s.ackedTaskID, s.dbTaskManager.maxDeletedTaskIDInclusive) -} - -func (s *dbTaskManagerSuite) TestAcquireOwnership_Failed() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(serviceerror.NewUnavailable("some random error")) - - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.Error(err) - s.Nil(s.dbTaskManager.taskWriter) - s.Nil(s.dbTaskManager.taskReader) - s.Equal(int64(0), s.dbTaskManager.maxDeletedTaskIDInclusive) -} - -func (s *dbTaskManagerSuite) TestStart_Success() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getShutdownChan().Return(nil).AnyTimes() - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return nil, nil, nil - }, - )).AnyTimes() - s.taskWriter.EXPECT().notifyFlushChan().Return(nil).AnyTimes() - - s.dbTaskManager.Start() - <-s.dbTaskManager.startupChan - s.False(s.dbTaskManager.isStopped()) -} - -func (s *dbTaskManagerSuite) TestStart_ErrorThenSuccess() { - gomock.InOrder( - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(serviceerror.NewUnavailable("some random error")), - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil), - ) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getShutdownChan().Return(nil).AnyTimes() - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return nil, nil, nil - }, - )).AnyTimes() - s.taskWriter.EXPECT().notifyFlushChan().Return(nil).AnyTimes() - - s.dbTaskManager.Start() - <-s.dbTaskManager.startupChan - s.False(s.dbTaskManager.isStopped()) -} - -func (s *dbTaskManagerSuite) TestStart_Error() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(&persistence.ConditionFailedError{}) - - s.dbTaskManager.Start() - <-s.dbTaskManager.startupChan - s.True(s.dbTaskManager.isStopped()) -} - -func (s *dbTaskManagerSuite) TestBufferAndWriteTask_NotReady() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(serviceerror.NewUnavailable("some random error")).AnyTimes() - s.dbTaskManager.Start() - - taskInfo := &persistencespb.TaskInfo{} - fut := s.dbTaskManager.BufferAndWriteTask(taskInfo) - _, err := fut.Get(context.Background()) - s.Equal(errDBTaskManagerNotReady, err) -} - -func (s *dbTaskManagerSuite) TestBufferAndWriteTask_Ready() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID).AnyTimes() - s.taskQueueOwnership.EXPECT().getShutdownChan().Return(nil).AnyTimes() - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return nil, nil, nil - }, - )).AnyTimes() - s.taskWriter.EXPECT().notifyFlushChan().Return(nil).AnyTimes() - s.dbTaskManager.Start() - <-s.dbTaskManager.startupChan - - taskInfo := &persistencespb.TaskInfo{} - taskWriterErr := serviceerror.NewInternal("random error") - s.taskWriter.EXPECT().appendTask(taskInfo).Return( - future.NewReadyFuture(struct{}{}, taskWriterErr), - ) - fut := s.dbTaskManager.BufferAndWriteTask(taskInfo) - _, err := fut.Get(context.Background()) - s.Equal(taskWriterErr, err) -} - -func (s *dbTaskManagerSuite) TestReadAndDispatchTasks_ReadSuccess_Expired() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID) - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - - // make sure no signal exists in dispatch chan - select { - case <-s.dbTaskManager.dispatchChan: - default: - } - - allocatedTaskInfo := &persistencespb.AllocatedTaskInfo{ - TaskId: s.lastAllocatedTaskID + 100, - Data: &persistencespb.TaskInfo{ - NamespaceId: uuid.New().String(), - WorkflowId: uuid.New().String(), - RunId: uuid.New().String(), - ScheduledEventId: rand.Int63(), - CreateTime: timestamp.TimePtr(time.Now().UTC()), - ExpiryTime: timestamp.TimePtr(time.Now().UTC().Add(-time.Minute)), - }, - } - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID) - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return []*persistencespb.AllocatedTaskInfo{allocatedTaskInfo}, nil, nil - }, - )) - s.taskReader.EXPECT().ackTask(allocatedTaskInfo.TaskId) - - s.dbTaskManager.readAndDispatchTasks(context.Background()) -} - -func (s *dbTaskManagerSuite) TestReadAndDispatchTasks_ReadSuccess_Dispatch() { - var dispatchedTasks []*persistencespb.AllocatedTaskInfo - s.dbTaskManager.dispatchTaskFn = func(_ context.Context, task *internalTask) error { - dispatchedTasks = append(dispatchedTasks, task.event.AllocatedTaskInfo) - return nil - } - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID) - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - - // make sure no signal exists in dispatch chan - select { - case <-s.dbTaskManager.dispatchChan: - default: - } - - allocatedTaskInfo := &persistencespb.AllocatedTaskInfo{ - TaskId: s.lastAllocatedTaskID + 100, - Data: &persistencespb.TaskInfo{ - NamespaceId: uuid.New().String(), - WorkflowId: uuid.New().String(), - RunId: uuid.New().String(), - ScheduledEventId: rand.Int63(), - CreateTime: timestamp.TimePtr(time.Now().UTC()), - ExpiryTime: timestamp.TimePtr(time.Unix(0, 0)), - }, - } - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID) - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return []*persistencespb.AllocatedTaskInfo{allocatedTaskInfo}, nil, nil - }, - )) - - s.dbTaskManager.readAndDispatchTasks(context.Background()) - s.Equal([]*persistencespb.AllocatedTaskInfo{allocatedTaskInfo}, dispatchedTasks) -} - -func (s *dbTaskManagerSuite) TestReadAndDispatchTasks_ReadFailure() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID) - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - - // make sure no signal exists in dispatch chan - select { - case <-s.dbTaskManager.dispatchChan: - default: - } - - s.taskQueueOwnership.EXPECT().getLastAllocatedTaskID().Return(s.lastAllocatedTaskID) - s.taskReader.EXPECT().taskIterator(gomock.Any(), s.lastAllocatedTaskID).Return(collection.NewPagingIterator( - func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - return nil, nil, serviceerror.NewUnavailable("random error") - }, - )) - - s.dbTaskManager.readAndDispatchTasks(context.Background()) - timer := time.NewTimer(s.dbTaskManager.dispatchRetrier.NextBackOff()) - select { - case <-s.dbTaskManager.dispatchChan: - // noop - case <-timer.C: - s.Fail("dispatch channel should contain one signal") - } - - timer.Stop() -} - -func (s *dbTaskManagerSuite) TestUpdateAckTaskID() { - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID) - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - - ackedTaskID := rand.Int63() - s.taskReader.EXPECT().moveAckedTaskID().Return(ackedTaskID) - s.taskQueueOwnership.EXPECT().updateAckedTaskID(ackedTaskID) - - s.dbTaskManager.updateAckTaskID() -} - -func (s *dbTaskManagerSuite) TestDeleteAckedTasks_Success() { - maxDeletedTaskIDInclusive := s.ackedTaskID - 100 - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - s.dbTaskManager.maxDeletedTaskIDInclusive = maxDeletedTaskIDInclusive - - s.store.EXPECT().CompleteTasksLessThan(gomock.Any(), &persistence.CompleteTasksLessThanRequest{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskType: s.taskQueueType, - ExclusiveMaxTaskID: s.ackedTaskID + 1, - Limit: 100000, - }).Return(0, nil) - - s.dbTaskManager.deleteAckedTasks(context.Background()) - s.Equal(s.ackedTaskID, s.dbTaskManager.maxDeletedTaskIDInclusive) -} - -func (s *dbTaskManagerSuite) TestDeleteAckedTasks_Failed() { - maxDeletedTaskIDInclusive := s.ackedTaskID - 100 - s.taskQueueOwnership.EXPECT().takeTaskQueueOwnership(gomock.Any()).Return(nil) - s.taskQueueOwnership.EXPECT().getAckedTaskID().Return(s.ackedTaskID).AnyTimes() - err := s.dbTaskManager.acquireOwnership(context.Background()) - s.NoError(err) - s.dbTaskManager.maxDeletedTaskIDInclusive = maxDeletedTaskIDInclusive - - s.store.EXPECT().CompleteTasksLessThan(gomock.Any(), &persistence.CompleteTasksLessThanRequest{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskType: s.taskQueueType, - ExclusiveMaxTaskID: s.ackedTaskID + 1, - Limit: 100000, - }).Return(0, serviceerror.NewUnavailable("random error")) - - s.dbTaskManager.deleteAckedTasks(context.Background()) - s.Equal(maxDeletedTaskIDInclusive, s.dbTaskManager.maxDeletedTaskIDInclusive) -} - -// TODO @wxing1292 add necessary tests -// -// once there is concensus about whether to keep the `task move to end` behavior -func (s *dbTaskManagerSuite) TestFinishTask_Success() {} - -func (s *dbTaskManagerSuite) TestFinishTask_Error() {} diff --git a/service/matching/db_task_queue_ownership.go b/service/matching/db_task_queue_ownership.go deleted file mode 100644 index 519a5ff612e..00000000000 --- a/service/matching/db_task_queue_ownership.go +++ /dev/null @@ -1,343 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "fmt" - "sync" - "time" - - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/clock" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/primitives/timestamp" -) - -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination db_task_queue_ownership_mock.go - -const ( - dbTaskQueueOwnershipStatusUninitialized dbTaskQueueOwnershipStatus = 0 - dbTaskQueueOwnershipStatusOwned dbTaskQueueOwnershipStatus = 1 - dbTaskQueueOwnershipStatusLost dbTaskQueueOwnershipStatus = 2 -) - -type ( - dbTaskQueueOwnershipStatus int - - dbTaskQueueOwnership interface { - takeTaskQueueOwnership(ctx context.Context) error - getShutdownChan() <-chan struct{} - getAckedTaskID() int64 - updateAckedTaskID(taskID int64) - getLastAllocatedTaskID() int64 - persistTaskQueue(ctx context.Context) error - flushTasks(ctx context.Context, taskInfos ...*persistencespb.TaskInfo) error - } - - dbTaskQueueOwnershipState struct { - rangeID int64 - ackedTaskID int64 - lastAllocatedTaskID int64 - minTaskIDExclusive int64 // exclusive - maxTaskIDInclusive int64 // inclusive - } - - dbTaskQueueOwnershipImpl struct { - taskQueueKey persistence.TaskQueueKey - taskQueueKind enumspb.TaskQueueKind - taskIDRangeSize int64 - timeSource clock.TimeSource - store persistence.TaskManager - logger log.Logger - - sync.Mutex - status dbTaskQueueOwnershipStatus - ownershipState *dbTaskQueueOwnershipState - shutdownChan chan struct{} - stateLastUpdateTime *time.Time - } -) - -func newDBTaskQueueOwnership( - taskQueueKey persistence.TaskQueueKey, - taskQueueKind enumspb.TaskQueueKind, - taskIDRangeSize int64, - store persistence.TaskManager, - logger log.Logger, -) *dbTaskQueueOwnershipImpl { - taskOwnership := &dbTaskQueueOwnershipImpl{ - taskQueueKey: taskQueueKey, - taskQueueKind: taskQueueKind, - taskIDRangeSize: taskIDRangeSize, - timeSource: clock.NewRealTimeSource(), - store: store, - logger: logger, - - status: dbTaskQueueOwnershipStatusUninitialized, - ownershipState: nil, - shutdownChan: make(chan struct{}), - stateLastUpdateTime: nil, - } - return taskOwnership -} - -func (m *dbTaskQueueOwnershipImpl) getShutdownChan() <-chan struct{} { - return m.shutdownChan -} - -func (m *dbTaskQueueOwnershipImpl) getAckedTaskID() int64 { - m.Lock() - defer m.Unlock() - - return m.ownershipState.ackedTaskID -} - -func (m *dbTaskQueueOwnershipImpl) updateAckedTaskID(taskID int64) { - m.Lock() - defer m.Unlock() - - if m.ownershipState.ackedTaskID >= taskID { - return - } - m.ownershipState.ackedTaskID = taskID -} - -func (m *dbTaskQueueOwnershipImpl) getLastAllocatedTaskID() int64 { - m.Lock() - defer m.Unlock() - - return m.ownershipState.lastAllocatedTaskID -} - -func (m *dbTaskQueueOwnershipImpl) takeTaskQueueOwnership( - ctx context.Context, -) error { - m.Lock() - defer m.Unlock() - - response, err := m.store.GetTaskQueue(ctx, &persistence.GetTaskQueueRequest{ - NamespaceID: m.taskQueueKey.NamespaceID, - TaskQueue: m.taskQueueKey.TaskQueueName, - TaskType: m.taskQueueKey.TaskQueueType, - }) - switch err.(type) { - case nil: - m.updateStateLocked(response.RangeID, response.TaskQueueInfo.AckLevel) - if err := m.renewTaskQueueLocked(ctx, response.RangeID+1); err != nil { - return err - } - m.status = dbTaskQueueOwnershipStatusOwned - return nil - - case *serviceerror.NotFound: - if _, err := m.store.CreateTaskQueue(ctx, &persistence.CreateTaskQueueRequest{ - RangeID: dbTaskInitialRangeID, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: m.taskQueueKey.NamespaceID, - Name: m.taskQueueKey.TaskQueueName, - TaskType: m.taskQueueKey.TaskQueueType, - Kind: m.taskQueueKind, - AckLevel: 0, - ExpiryTime: m.expiryTime(), - LastUpdateTime: timestamp.TimePtr(m.timeSource.Now()), - }, - }); err != nil { - m.maybeShutdownLocked(err) - return err - } - m.stateLastUpdateTime = timestamp.TimePtr(m.timeSource.Now()) - m.updateStateLocked(dbTaskInitialRangeID, 0) - m.status = dbTaskQueueOwnershipStatusOwned - return nil - - default: - return err - } -} - -func (m *dbTaskQueueOwnershipImpl) renewTaskQueueLocked( - ctx context.Context, - rangeID int64, -) error { - _, err := m.store.UpdateTaskQueue(ctx, &persistence.UpdateTaskQueueRequest{ - RangeID: rangeID, - TaskQueueInfo: m.taskQueueInfoLocked(), - PrevRangeID: m.ownershipState.rangeID, - }) - if err != nil { - m.maybeShutdownLocked(err) - return err - } - m.stateLastUpdateTime = timestamp.TimePtr(m.timeSource.Now()) - m.updateStateLocked(rangeID, m.ownershipState.ackedTaskID) - return nil -} - -func (m *dbTaskQueueOwnershipImpl) persistTaskQueue( - ctx context.Context, -) error { - m.Lock() - defer m.Unlock() - - return m.renewTaskQueueLocked(ctx, m.ownershipState.rangeID) -} - -func (m *dbTaskQueueOwnershipImpl) flushTasks( - ctx context.Context, - taskInfos ...*persistencespb.TaskInfo, -) error { - m.Lock() - defer m.Unlock() - - taskIDs, err := m.generatedTaskIDsLocked(ctx, len(taskInfos)) - if err != nil { - return err - } - - allocatedTaskInfos := make([]*persistencespb.AllocatedTaskInfo, len(taskInfos)) - for i, taskID := range taskIDs { - allocatedTaskInfos[i] = &persistencespb.AllocatedTaskInfo{ - Data: taskInfos[i], - TaskId: taskID, - } - } - _, err = m.store.CreateTasks(ctx, &persistence.CreateTasksRequest{ - TaskQueueInfo: &persistence.PersistedTaskQueueInfo{ - Data: m.taskQueueInfoLocked(), - RangeID: m.ownershipState.rangeID, - }, - Tasks: allocatedTaskInfos, - }) - if err != nil { - m.maybeShutdownLocked(err) - return err - } - m.stateLastUpdateTime = timestamp.TimePtr(m.timeSource.Now()) - return nil -} - -func (m *dbTaskQueueOwnershipImpl) generatedTaskIDsLocked( - ctx context.Context, - numTasks int, -) ([]int64, error) { - if m.ownershipState.maxTaskIDInclusive-m.ownershipState.lastAllocatedTaskID < int64(numTasks) { - if err := m.renewTaskQueueLocked(ctx, m.ownershipState.rangeID+1); err != nil { - return nil, err - } - } - if m.ownershipState.maxTaskIDInclusive-m.ownershipState.lastAllocatedTaskID < int64(numTasks) { - panic("dbTaskQueueOwnershipImpl generatedTaskIDsLocked unable to allocate task IDs") - } - - allocatedTaskIDs := make([]int64, numTasks) - for i := 0; i < numTasks; i++ { - m.ownershipState.lastAllocatedTaskID++ - if m.ownershipState.lastAllocatedTaskID > m.ownershipState.maxTaskIDInclusive { - panic("dbTaskQueueOwnershipImpl generatedTaskIDsLocked encountered task ID overflow") - } - allocatedTaskIDs[i] = m.ownershipState.lastAllocatedTaskID - } - return allocatedTaskIDs, nil -} - -func (m *dbTaskQueueOwnershipImpl) taskQueueInfoLocked() *persistencespb.TaskQueueInfo { - return &persistencespb.TaskQueueInfo{ - NamespaceId: m.taskQueueKey.NamespaceID, - Name: m.taskQueueKey.TaskQueueName, - TaskType: m.taskQueueKey.TaskQueueType, - Kind: m.taskQueueKind, - AckLevel: m.ownershipState.ackedTaskID, - ExpiryTime: m.expiryTime(), - LastUpdateTime: timestamp.TimePtr(m.timeSource.Now()), - } -} - -func (m *dbTaskQueueOwnershipImpl) expiryTime() *time.Time { - switch m.taskQueueKind { - case enumspb.TASK_QUEUE_KIND_NORMAL: - return nil - case enumspb.TASK_QUEUE_KIND_STICKY: - return timestamp.TimePtr(m.timeSource.Now().Add(dbTaskStickyTaskQueueTTL)) - default: - panic(fmt.Sprintf("taskQueueDB encountered unknown task kind: %v", m.taskQueueKind)) - } -} - -func (m *dbTaskQueueOwnershipImpl) updateStateLocked( - rangeID int64, - ackedTaskID int64, -) { - minTaskID, maxTaskID := rangeIDToTaskIDRange(rangeID, m.taskIDRangeSize) - if m.ownershipState == nil { - m.ownershipState = &dbTaskQueueOwnershipState{ - rangeID: rangeID, - ackedTaskID: ackedTaskID, - lastAllocatedTaskID: minTaskID, - minTaskIDExclusive: minTaskID, - maxTaskIDInclusive: maxTaskID, - } - } else { - if rangeID < m.ownershipState.rangeID { - panic("dbTaskQueueOwnershipImpl updateStateLocked encountered smaller range ID") - } else if ackedTaskID < m.ownershipState.ackedTaskID { - panic("dbTaskQueueOwnershipImpl updateStateLocked encountered acked task ID") - } - m.ownershipState.rangeID = rangeID - m.ownershipState.ackedTaskID = ackedTaskID - if minTaskID > m.ownershipState.lastAllocatedTaskID { - m.ownershipState.lastAllocatedTaskID = minTaskID - } - m.ownershipState.minTaskIDExclusive = minTaskID - m.ownershipState.maxTaskIDInclusive = maxTaskID - } -} - -func (m *dbTaskQueueOwnershipImpl) maybeShutdownLocked( - err error, -) { - _, ok := err.(*persistence.ConditionFailedError) - if !ok { - return - } - - m.ownershipState = nil - if m.status == dbTaskQueueOwnershipStatusLost { - return - } - m.status = dbTaskQueueOwnershipStatusLost - close(m.shutdownChan) -} - -func rangeIDToTaskIDRange( - rangeID int64, - taskIDRangeSize int64, -) (int64, int64) { - return (rangeID - 1) * taskIDRangeSize, rangeID * taskIDRangeSize -} diff --git a/service/matching/db_task_queue_ownership_mock.go b/service/matching/db_task_queue_ownership_mock.go deleted file mode 100644 index 95958c3476c..00000000000 --- a/service/matching/db_task_queue_ownership_mock.go +++ /dev/null @@ -1,161 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: db_task_queue_ownership.go - -// Package matching is a generated GoMock package. -package matching - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - persistence "go.temporal.io/server/api/persistence/v1" -) - -// MockdbTaskQueueOwnership is a mock of dbTaskQueueOwnership interface. -type MockdbTaskQueueOwnership struct { - ctrl *gomock.Controller - recorder *MockdbTaskQueueOwnershipMockRecorder -} - -// MockdbTaskQueueOwnershipMockRecorder is the mock recorder for MockdbTaskQueueOwnership. -type MockdbTaskQueueOwnershipMockRecorder struct { - mock *MockdbTaskQueueOwnership -} - -// NewMockdbTaskQueueOwnership creates a new mock instance. -func NewMockdbTaskQueueOwnership(ctrl *gomock.Controller) *MockdbTaskQueueOwnership { - mock := &MockdbTaskQueueOwnership{ctrl: ctrl} - mock.recorder = &MockdbTaskQueueOwnershipMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockdbTaskQueueOwnership) EXPECT() *MockdbTaskQueueOwnershipMockRecorder { - return m.recorder -} - -// flushTasks mocks base method. -func (m *MockdbTaskQueueOwnership) flushTasks(ctx context.Context, taskInfos ...*persistence.TaskInfo) error { - m.ctrl.T.Helper() - varargs := []interface{}{ctx} - for _, a := range taskInfos { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "flushTasks", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// flushTasks indicates an expected call of flushTasks. -func (mr *MockdbTaskQueueOwnershipMockRecorder) flushTasks(ctx interface{}, taskInfos ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{ctx}, taskInfos...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "flushTasks", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).flushTasks), varargs...) -} - -// getAckedTaskID mocks base method. -func (m *MockdbTaskQueueOwnership) getAckedTaskID() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "getAckedTaskID") - ret0, _ := ret[0].(int64) - return ret0 -} - -// getAckedTaskID indicates an expected call of getAckedTaskID. -func (mr *MockdbTaskQueueOwnershipMockRecorder) getAckedTaskID() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getAckedTaskID", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).getAckedTaskID)) -} - -// getLastAllocatedTaskID mocks base method. -func (m *MockdbTaskQueueOwnership) getLastAllocatedTaskID() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "getLastAllocatedTaskID") - ret0, _ := ret[0].(int64) - return ret0 -} - -// getLastAllocatedTaskID indicates an expected call of getLastAllocatedTaskID. -func (mr *MockdbTaskQueueOwnershipMockRecorder) getLastAllocatedTaskID() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getLastAllocatedTaskID", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).getLastAllocatedTaskID)) -} - -// getShutdownChan mocks base method. -func (m *MockdbTaskQueueOwnership) getShutdownChan() <-chan struct{} { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "getShutdownChan") - ret0, _ := ret[0].(<-chan struct{}) - return ret0 -} - -// getShutdownChan indicates an expected call of getShutdownChan. -func (mr *MockdbTaskQueueOwnershipMockRecorder) getShutdownChan() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "getShutdownChan", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).getShutdownChan)) -} - -// persistTaskQueue mocks base method. -func (m *MockdbTaskQueueOwnership) persistTaskQueue(ctx context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "persistTaskQueue", ctx) - ret0, _ := ret[0].(error) - return ret0 -} - -// persistTaskQueue indicates an expected call of persistTaskQueue. -func (mr *MockdbTaskQueueOwnershipMockRecorder) persistTaskQueue(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persistTaskQueue", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).persistTaskQueue), ctx) -} - -// takeTaskQueueOwnership mocks base method. -func (m *MockdbTaskQueueOwnership) takeTaskQueueOwnership(ctx context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "takeTaskQueueOwnership", ctx) - ret0, _ := ret[0].(error) - return ret0 -} - -// takeTaskQueueOwnership indicates an expected call of takeTaskQueueOwnership. -func (mr *MockdbTaskQueueOwnershipMockRecorder) takeTaskQueueOwnership(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "takeTaskQueueOwnership", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).takeTaskQueueOwnership), ctx) -} - -// updateAckedTaskID mocks base method. -func (m *MockdbTaskQueueOwnership) updateAckedTaskID(taskID int64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "updateAckedTaskID", taskID) -} - -// updateAckedTaskID indicates an expected call of updateAckedTaskID. -func (mr *MockdbTaskQueueOwnershipMockRecorder) updateAckedTaskID(taskID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "updateAckedTaskID", reflect.TypeOf((*MockdbTaskQueueOwnership)(nil).updateAckedTaskID), taskID) -} diff --git a/service/matching/db_task_queue_ownership_test.go b/service/matching/db_task_queue_ownership_test.go deleted file mode 100644 index f49281e7611..00000000000 --- a/service/matching/db_task_queue_ownership_test.go +++ /dev/null @@ -1,476 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/clock" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/primitives/timestamp" -) - -type ( - dbTaskOwnershipSuite struct { - *require.Assertions - suite.Suite - - controller *gomock.Controller - taskStore *persistence.MockTaskManager - timeSource *clock.EventTimeSource - - now time.Time - taskIDRangeSize int64 - namespaceID string - taskQueueName string - taskQueueType enumspb.TaskQueueType - taskQueueKind enumspb.TaskQueueKind - - taskOwnership *dbTaskQueueOwnershipImpl - } -) - -func TestDBTaskOwnershipSuite(t *testing.T) { - s := new(dbTaskOwnershipSuite) - suite.Run(t, s) -} - -func (s *dbTaskOwnershipSuite) SetupSuite() { - rand.Seed(time.Now().UnixNano()) -} - -func (s *dbTaskOwnershipSuite) TearDownSuite() { - -} - -func (s *dbTaskOwnershipSuite) SetupTest() { - s.Assertions = require.New(s.T()) - - s.controller = gomock.NewController(s.T()) - s.taskStore = persistence.NewMockTaskManager(s.controller) - s.timeSource = clock.NewEventTimeSource() - - s.now = time.Now().UTC() - s.taskIDRangeSize = 10 - s.namespaceID = uuid.New().String() - s.taskQueueName = uuid.New().String() - s.taskQueueType = enumspb.TASK_QUEUE_TYPE_ACTIVITY - s.taskQueueKind = enumspb.TaskQueueKind( - rand.Int31n(int32(len(enumspb.TaskQueueKind_name))-1) + 1, - ) - - s.taskOwnership = newDBTaskQueueOwnership( - persistence.TaskQueueKey{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskQueueType: s.taskQueueType, - }, - s.taskQueueKind, - s.taskIDRangeSize, - s.taskStore, - log.NewTestLogger(), - ) - s.timeSource.Update(s.now) - s.taskOwnership.timeSource = s.timeSource -} - -func (s *dbTaskOwnershipSuite) TearDownTest() { - s.controller.Finish() -} - -func (s *dbTaskOwnershipSuite) TestTaskOwnership_Create_Success() { - s.taskStore.EXPECT().GetTaskQueue(gomock.Any(), &persistence.GetTaskQueueRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - }).Return(nil, serviceerror.NewNotFound("random error message")) - s.taskStore.EXPECT().CreateTaskQueue(gomock.Any(), &persistence.CreateTaskQueueRequest{ - RangeID: dbTaskInitialRangeID, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: 0, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(&persistence.CreateTaskQueueResponse{}, nil) - - minTaskID, maxTaskID := rangeIDToTaskIDRange(dbTaskInitialRangeID, s.taskIDRangeSize) - err := s.taskOwnership.takeTaskQueueOwnership(context.Background()) - s.NoError(err) - s.Equal(s.now, *s.taskOwnership.stateLastUpdateTime) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: dbTaskInitialRangeID, - ackedTaskID: 0, - lastAllocatedTaskID: 0, - minTaskIDExclusive: minTaskID, - maxTaskIDInclusive: maxTaskID, - }, *s.taskOwnership.ownershipState) - select { - case <-s.taskOwnership.getShutdownChan(): - s.Fail("task ownership lost") - default: - // noop - } -} - -func (s *dbTaskOwnershipSuite) TestTaskOwnership_Create_Failed() { - s.taskStore.EXPECT().GetTaskQueue(gomock.Any(), &persistence.GetTaskQueueRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - }).Return(nil, serviceerror.NewNotFound("random error message")) - s.taskStore.EXPECT().CreateTaskQueue(gomock.Any(), &persistence.CreateTaskQueueRequest{ - RangeID: dbTaskInitialRangeID, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: 0, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(nil, &persistence.ConditionFailedError{}) - - err := s.taskOwnership.takeTaskQueueOwnership(context.Background()) - s.Error(err) - s.Nil(s.taskOwnership.stateLastUpdateTime) - s.Nil(s.taskOwnership.ownershipState) - <-s.taskOwnership.getShutdownChan() -} - -func (s *dbTaskOwnershipSuite) TestTaskOwnership_Update_Success() { - rangeID := rand.Int63() - minTaskID, maxTaskID := rangeIDToTaskIDRange(rangeID, s.taskIDRangeSize) - ackedTaskID := minTaskID + rand.Int63n(maxTaskID-minTaskID) - taskQueueInfo := s.randomTaskQueue(ackedTaskID) - s.taskStore.EXPECT().GetTaskQueue(gomock.Any(), &persistence.GetTaskQueueRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - }).Return(&persistence.GetTaskQueueResponse{ - RangeID: rangeID, - TaskQueueInfo: taskQueueInfo, - }, nil) - s.taskStore.EXPECT().UpdateTaskQueue(gomock.Any(), &persistence.UpdateTaskQueueRequest{ - PrevRangeID: rangeID, - RangeID: rangeID + 1, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: ackedTaskID, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(&persistence.UpdateTaskQueueResponse{}, nil) - - minTaskID, maxTaskID = rangeIDToTaskIDRange(rangeID+1, s.taskIDRangeSize) - err := s.taskOwnership.takeTaskQueueOwnership(context.Background()) - s.NoError(err) - s.Equal(s.now, *s.taskOwnership.stateLastUpdateTime) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: rangeID + 1, - ackedTaskID: ackedTaskID, - lastAllocatedTaskID: minTaskID, - minTaskIDExclusive: minTaskID, - maxTaskIDInclusive: maxTaskID, - }, *s.taskOwnership.ownershipState) - select { - case <-s.taskOwnership.getShutdownChan(): - s.Fail("task ownership lost") - default: - // noop - } -} - -func (s *dbTaskOwnershipSuite) TestTaskOwnership_Update_Failed() { - rangeID := rand.Int63() - minTaskID, maxTaskID := rangeIDToTaskIDRange(rangeID, s.taskIDRangeSize) - ackedTaskID := minTaskID + rand.Int63n(maxTaskID-minTaskID) - taskQueueInfo := s.randomTaskQueue(ackedTaskID) - s.taskStore.EXPECT().GetTaskQueue(gomock.Any(), &persistence.GetTaskQueueRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - }).Return(&persistence.GetTaskQueueResponse{ - RangeID: rangeID, - TaskQueueInfo: taskQueueInfo, - }, nil) - s.taskStore.EXPECT().UpdateTaskQueue(gomock.Any(), &persistence.UpdateTaskQueueRequest{ - PrevRangeID: rangeID, - RangeID: rangeID + 1, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: ackedTaskID, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(nil, &persistence.ConditionFailedError{}) - - err := s.taskOwnership.takeTaskQueueOwnership(context.Background()) - s.Error(err) - s.Nil(s.taskOwnership.stateLastUpdateTime) - s.Nil(s.taskOwnership.ownershipState) - <-s.taskOwnership.getShutdownChan() -} - -func (s *dbTaskOwnershipSuite) TestFlushTasks_Success() { - ownershipState := s.prepareTaskQueueOwnership(rand.Int63()) - - task1 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 1, - } - task2 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 2, - } - - s.taskStore.EXPECT().CreateTasks(gomock.Any(), &persistence.CreateTasksRequest{ - TaskQueueInfo: &persistence.PersistedTaskQueueInfo{ - Data: s.taskOwnership.taskQueueInfoLocked(), - RangeID: ownershipState.rangeID, - }, - Tasks: []*persistencespb.AllocatedTaskInfo{task1, task2}, - }).Return(&persistence.CreateTasksResponse{}, nil) - - err := s.taskOwnership.flushTasks(context.Background(), task1.Data, task2.Data) - s.NoError(err) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: ownershipState.rangeID, - ackedTaskID: ownershipState.ackedTaskID, - lastAllocatedTaskID: task2.TaskId, - minTaskIDExclusive: ownershipState.minTaskIDExclusive, - maxTaskIDInclusive: ownershipState.maxTaskIDInclusive, - }, *s.taskOwnership.ownershipState) - select { - case <-s.taskOwnership.getShutdownChan(): - s.Fail("task ownership lost") - default: - // noop - } -} - -func (s *dbTaskOwnershipSuite) TestFlushTasks_Failed() { - ownershipState := s.prepareTaskQueueOwnership(rand.Int63()) - - task1 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 1, - } - task2 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 2, - } - - s.taskStore.EXPECT().CreateTasks(gomock.Any(), &persistence.CreateTasksRequest{ - TaskQueueInfo: &persistence.PersistedTaskQueueInfo{ - Data: s.taskOwnership.taskQueueInfoLocked(), - RangeID: ownershipState.rangeID, - }, - Tasks: []*persistencespb.AllocatedTaskInfo{task1, task2}, - }).Return(nil, serviceerror.NewUnavailable("random error")) - - err := s.taskOwnership.flushTasks(context.Background(), task1.Data, task2.Data) - s.Error(err) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: ownershipState.rangeID, - ackedTaskID: ownershipState.ackedTaskID, - lastAllocatedTaskID: task2.TaskId, - minTaskIDExclusive: ownershipState.minTaskIDExclusive, - maxTaskIDInclusive: ownershipState.maxTaskIDInclusive, - }, *s.taskOwnership.ownershipState) - select { - case <-s.taskOwnership.getShutdownChan(): - s.Fail("task ownership lost") - default: - // noop - } -} - -func (s *dbTaskOwnershipSuite) TestFlushTasks_OwnershipLost() { - ownershipState := s.prepareTaskQueueOwnership(rand.Int63()) - - task1 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 1, - } - task2 := &persistencespb.AllocatedTaskInfo{ - Data: s.randomTask(), - TaskId: s.taskOwnership.getLastAllocatedTaskID() + 2, - } - - s.taskStore.EXPECT().CreateTasks(gomock.Any(), &persistence.CreateTasksRequest{ - TaskQueueInfo: &persistence.PersistedTaskQueueInfo{ - Data: s.taskOwnership.taskQueueInfoLocked(), - RangeID: ownershipState.rangeID, - }, - Tasks: []*persistencespb.AllocatedTaskInfo{task1, task2}, - }).Return(nil, &persistence.ConditionFailedError{}) - - err := s.taskOwnership.flushTasks(context.Background(), task1.Data, task2.Data) - s.Error(err) - s.Nil(s.taskOwnership.ownershipState) - <-s.taskOwnership.getShutdownChan() -} - -func (s *dbTaskOwnershipSuite) TestGenerateTaskID_WithinRange() { - ownershipState := s.prepareTaskQueueOwnership(rand.Int63()) - - var expectedTaskIDs []int64 - for i := ownershipState.minTaskIDExclusive + 1; i <= ownershipState.maxTaskIDInclusive; i++ { - expectedTaskIDs = append(expectedTaskIDs, i) - } - - var actualTaskIDs []int64 - for i := 0; i < int(s.taskIDRangeSize); i++ { - taskIDs, err := s.taskOwnership.generatedTaskIDsLocked(context.Background(), 1) - s.NoError(err) - s.Equal(1, len(taskIDs)) - actualTaskIDs = append(actualTaskIDs, taskIDs[0]) - } - - s.Equal(expectedTaskIDs, actualTaskIDs) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: ownershipState.rangeID, - ackedTaskID: ownershipState.ackedTaskID, - lastAllocatedTaskID: ownershipState.maxTaskIDInclusive, - minTaskIDExclusive: ownershipState.minTaskIDExclusive, - maxTaskIDInclusive: ownershipState.maxTaskIDInclusive, - }, *s.taskOwnership.ownershipState) -} - -func (s *dbTaskOwnershipSuite) TestGenerateTaskID_OutOfRange() { - prevOwnershipState := s.prepareTaskQueueOwnership(rand.Int63()) - for i := 0; i < int(s.taskIDRangeSize)-1; i++ { - _, err := s.taskOwnership.generatedTaskIDsLocked(context.Background(), 1) - s.NoError(err) - } - - s.taskStore.EXPECT().UpdateTaskQueue(gomock.Any(), &persistence.UpdateTaskQueueRequest{ - PrevRangeID: prevOwnershipState.rangeID, - RangeID: prevOwnershipState.rangeID + 1, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: prevOwnershipState.ackedTaskID, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(&persistence.UpdateTaskQueueResponse{}, nil) - - minTaskID, maxTaskID := rangeIDToTaskIDRange(prevOwnershipState.rangeID+1, s.taskIDRangeSize) - expectedTaskIDs := []int64{minTaskID + 1, minTaskID + 2} - actualTaskIDs, err := s.taskOwnership.generatedTaskIDsLocked(context.Background(), 2) - s.NoError(err) - s.Equal(expectedTaskIDs, actualTaskIDs) - s.Equal(dbTaskQueueOwnershipState{ - rangeID: prevOwnershipState.rangeID + 1, - ackedTaskID: prevOwnershipState.ackedTaskID, - lastAllocatedTaskID: expectedTaskIDs[len(expectedTaskIDs)-1], - minTaskIDExclusive: minTaskID, - maxTaskIDInclusive: maxTaskID, - }, *s.taskOwnership.ownershipState) -} - -func (s *dbTaskOwnershipSuite) prepareTaskQueueOwnership( - targetRangeID int64, -) dbTaskQueueOwnershipState { - minTaskID, maxTaskID := rangeIDToTaskIDRange(targetRangeID-1, s.taskIDRangeSize) - ackedTaskID := minTaskID + rand.Int63n(maxTaskID-minTaskID) - taskQueueInfo := s.randomTaskQueue(ackedTaskID) - s.taskStore.EXPECT().GetTaskQueue(gomock.Any(), &persistence.GetTaskQueueRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - }).Return(&persistence.GetTaskQueueResponse{ - RangeID: targetRangeID - 1, - TaskQueueInfo: taskQueueInfo, - }, nil) - s.taskStore.EXPECT().UpdateTaskQueue(gomock.Any(), &persistence.UpdateTaskQueueRequest{ - PrevRangeID: targetRangeID - 1, - RangeID: targetRangeID, - TaskQueueInfo: &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: ackedTaskID, - ExpiryTime: s.taskOwnership.expiryTime(), - LastUpdateTime: timestamp.TimePtr(s.now), - }, - }).Return(&persistence.UpdateTaskQueueResponse{}, nil) - - err := s.taskOwnership.takeTaskQueueOwnership(context.Background()) - s.NoError(err) - return *s.taskOwnership.ownershipState -} - -func (s *dbTaskOwnershipSuite) randomTaskQueue( - ackedTaskID int64, -) *persistencespb.TaskQueueInfo { - return &persistencespb.TaskQueueInfo{ - NamespaceId: s.namespaceID, - Name: s.taskQueueName, - TaskType: s.taskQueueType, - Kind: s.taskQueueKind, - AckLevel: ackedTaskID, - ExpiryTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - LastUpdateTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - } -} - -func (s *dbTaskOwnershipSuite) randomTask() *persistencespb.TaskInfo { - return &persistencespb.TaskInfo{ - NamespaceId: s.namespaceID, - WorkflowId: uuid.New().String(), - RunId: uuid.New().String(), - ScheduledEventId: rand.Int63(), - CreateTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - ExpiryTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - } -} diff --git a/service/matching/db_task_reader.go b/service/matching/db_task_reader.go deleted file mode 100644 index 7f770041370..00000000000 --- a/service/matching/db_task_reader.go +++ /dev/null @@ -1,173 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "sync" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/collection" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/util" - "golang.org/x/exp/maps" -) - -const ( - dbTaskReaderPageSize = 100 -) - -// temporarily disable mock gen until mock gen support go generics -// //go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination db_task_reader_mock.go - -type ( - dbTaskReader interface { - taskIterator(ctx context.Context, maxTaskID int64) collection.Iterator[*persistencespb.AllocatedTaskInfo] - ackTask(taskID int64) - moveAckedTaskID() int64 - } - - dbTaskReaderImpl struct { - taskQueueKey persistence.TaskQueueKey - store persistence.TaskManager - logger log.Logger - - sync.Mutex - tasks map[int64]bool // task ID -> true: acked or false not acked - ackedTaskID int64 // acked task ID - loadedTaskID int64 // loaded into memory task ID - } -) - -func newDBTaskReader( - taskQueueKey persistence.TaskQueueKey, - store persistence.TaskManager, - ackedTaskID int64, - logger log.Logger, -) *dbTaskReaderImpl { - return &dbTaskReaderImpl{ - taskQueueKey: taskQueueKey, - store: store, - logger: logger, - - tasks: make(map[int64]bool), - ackedTaskID: ackedTaskID, - loadedTaskID: ackedTaskID, - } -} - -func (t *dbTaskReaderImpl) taskIterator( - ctx context.Context, - maxTaskID int64, -) collection.Iterator[*persistencespb.AllocatedTaskInfo] { - return collection.NewPagingIterator( - t.getPaginationFn(ctx, maxTaskID), - ) -} - -func (t *dbTaskReaderImpl) ackTask(taskID int64) { - t.Lock() - defer t.Unlock() - - _, ok := t.tasks[taskID] - if !ok { - // trying to ack a task ID which is not present in tracking map - return - } - t.tasks[taskID] = true -} - -// moveAckedTaskID tries to advance the acked task ID -// e.g. assuming task ID & whether the task is completed -// -// 10 -> true -// 12 -> true -// 15 -> false -// -// the acked task ID can be set to 12, meaning task with ID <= 12 are finished -func (t *dbTaskReaderImpl) moveAckedTaskID() int64 { - t.Lock() - defer t.Unlock() - - taskIDs := maps.Keys(t.tasks) - util.SortSlice(taskIDs) - - for _, taskID := range taskIDs { - if !t.tasks[taskID] { - break - } - t.ackedTaskID = taskID - delete(t.tasks, taskID) - } - return t.ackedTaskID -} - -func (t *dbTaskReaderImpl) getPaginationFn( - ctx context.Context, - maxTaskID int64, -) collection.PaginationFn[*persistencespb.AllocatedTaskInfo] { - t.Lock() - defer t.Unlock() - minTaskID := t.loadedTaskID - - return func(paginationToken []byte) ([]*persistencespb.AllocatedTaskInfo, []byte, error) { - response, err := t.store.GetTasks(ctx, &persistence.GetTasksRequest{ - NamespaceID: t.taskQueueKey.NamespaceID, - TaskQueue: t.taskQueueKey.TaskQueueName, - TaskType: t.taskQueueKey.TaskQueueType, - InclusiveMinTaskID: minTaskID + 1, - ExclusiveMaxTaskID: maxTaskID + 1, - PageSize: dbTaskReaderPageSize, - NextPageToken: paginationToken, - }) - if err != nil { - return nil, nil, err - } - paginateItems := response.Tasks - token := response.NextPageToken - - t.Lock() - defer t.Unlock() - for _, task := range response.Tasks { - t.loadedTaskID = task.GetTaskId() - t.tasks[task.GetTaskId()] = false - } - // special handling max task ID - // if there is a task with max task ID - // then t.tasks with maxTaskID is set - // if there is no task with max task ID - // then we simply set t.tasks[maxTaskID] = true - // indicating that maxTaskID is already finished - // this will greatly simplify the acked task ID logic - if len(token) == 0 { - t.loadedTaskID = maxTaskID - if _, ok := t.tasks[maxTaskID]; !ok { - t.tasks[maxTaskID] = true - } - } - return paginateItems, token, nil - } -} diff --git a/service/matching/db_task_reader_mock.go b/service/matching/db_task_reader_mock.go deleted file mode 100644 index 22201596913..00000000000 --- a/service/matching/db_task_reader_mock.go +++ /dev/null @@ -1,102 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: db_task_reader.go - -// Package matching is a generated GoMock package. -package matching - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - - persistencespb "go.temporal.io/server/api/persistence/v1" - collection "go.temporal.io/server/common/collection" -) - -// MockdbTaskReader is a mock of dbTaskReader interface. -type MockdbTaskReader struct { - ctrl *gomock.Controller - recorder *MockdbTaskReaderMockRecorder -} - -// MockdbTaskReaderMockRecorder is the mock recorder for MockdbTaskReader. -type MockdbTaskReaderMockRecorder struct { - mock *MockdbTaskReader -} - -// NewMockdbTaskReader creates a new mock instance. -func NewMockdbTaskReader(ctrl *gomock.Controller) *MockdbTaskReader { - mock := &MockdbTaskReader{ctrl: ctrl} - mock.recorder = &MockdbTaskReaderMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockdbTaskReader) EXPECT() *MockdbTaskReaderMockRecorder { - return m.recorder -} - -// ackTask mocks base method. -func (m *MockdbTaskReader) ackTask(taskID int64) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ackTask", taskID) -} - -// ackTask indicates an expected call of ackTask. -func (mr *MockdbTaskReaderMockRecorder) ackTask(taskID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ackTask", reflect.TypeOf((*MockdbTaskReader)(nil).ackTask), taskID) -} - -// moveAckedTaskID mocks base method. -func (m *MockdbTaskReader) moveAckedTaskID() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "moveAckedTaskID") - ret0, _ := ret[0].(int64) - return ret0 -} - -// moveAckedTaskID indicates an expected call of moveAckedTaskID. -func (mr *MockdbTaskReaderMockRecorder) moveAckedTaskID() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "moveAckedTaskID", reflect.TypeOf((*MockdbTaskReader)(nil).moveAckedTaskID)) -} - -// taskIterator mocks base method. -func (m *MockdbTaskReader) taskIterator(ctx context.Context, maxTaskID int64) collection.Iterator[*persistencespb.AllocatedTaskInfo] { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "taskIterator", ctx, maxTaskID) - ret0, _ := ret[0].(collection.Iterator[*persistencespb.AllocatedTaskInfo]) - return ret0 -} - -// taskIterator indicates an expected call of taskIterator. -func (mr *MockdbTaskReaderMockRecorder) taskIterator(ctx, maxTaskID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "taskIterator", reflect.TypeOf((*MockdbTaskReader)(nil).taskIterator), ctx, maxTaskID) -} diff --git a/service/matching/db_task_reader_test.go b/service/matching/db_task_reader_test.go deleted file mode 100644 index 629f233bfe6..00000000000 --- a/service/matching/db_task_reader_test.go +++ /dev/null @@ -1,490 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/shuffle" -) - -type ( - dbTaskReaderSuite struct { - *require.Assertions - suite.Suite - - controller *gomock.Controller - taskStore *persistence.MockTaskManager - - namespaceID string - taskQueueName string - taskQueueType enumspb.TaskQueueType - ackedTaskID int64 - maxTaskID int64 - - taskTracker *dbTaskReaderImpl - } -) - -func TestDBTaskReaderSuite(t *testing.T) { - s := new(dbTaskReaderSuite) - suite.Run(t, s) -} - -func (s *dbTaskReaderSuite) SetupSuite() { - rand.Seed(time.Now().UnixNano()) -} - -func (s *dbTaskReaderSuite) TearDownSuite() { - -} - -func (s *dbTaskReaderSuite) SetupTest() { - s.Assertions = require.New(s.T()) - - s.controller = gomock.NewController(s.T()) - s.taskStore = persistence.NewMockTaskManager(s.controller) - - s.namespaceID = uuid.New().String() - s.taskQueueName = uuid.New().String() - s.taskQueueType = enumspb.TASK_QUEUE_TYPE_ACTIVITY - s.ackedTaskID = rand.Int63() - s.maxTaskID = s.ackedTaskID + 1000 - - s.taskTracker = newDBTaskReader( - persistence.TaskQueueKey{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskQueueType: s.taskQueueType, - }, - s.taskStore, - s.ackedTaskID, - log.NewTestLogger(), - ) -} - -func (s *dbTaskReaderSuite) TearDownTest() { - s.controller.Finish() -} - -func (s *dbTaskReaderSuite) TestIteration_Error() { - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(nil, serviceerror.NewInternal("random error")) - - iter := s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - for iter.HasNext() { - _, err := iter.Next() - s.Error(err) - } - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.ackedTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{}, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestIteration_ErrorRetry() { - taskID1 := s.ackedTaskID + 1 - tasks1 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID1}, - } - token := shuffle.Bytes([]byte("random page token")) - taskID2 := s.ackedTaskID + 3 - tasks2 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID2}, - } - gomock.InOrder( - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks1, - NextPageToken: token, - }, nil), - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: token, - }).Return(nil, serviceerror.NewInternal("some random error")), - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: taskID1 + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks2, - NextPageToken: nil, - }, nil), - ) - - iter := s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - var actualTasks []*persistencespb.AllocatedTaskInfo - for iter.HasNext() { - item, err := iter.Next() - if err != nil { - break - } - actualTasks = append(actualTasks, item) - } - s.Equal(tasks1, actualTasks) - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(taskID1, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - }, s.taskTracker.tasks) - - iter = s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - actualTasks = nil - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(tasks2, actualTasks) - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.maxTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - s.maxTaskID: true, - }, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestIteration_TwoIter() { - taskID1 := s.ackedTaskID + 1 - taskID2 := s.ackedTaskID + 3 - tasks1 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID1}, - {TaskId: taskID2}, - } - taskID3 := s.ackedTaskID + 6 - taskID4 := s.ackedTaskID + 10 - tasks2 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID3}, - {TaskId: taskID4}, - } - maxTaskID1 := taskID3 - 1 - maxTaskID2 := s.maxTaskID - - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: maxTaskID1 + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks1, - NextPageToken: nil, - }, nil) - - iter := s.taskTracker.taskIterator(context.Background(), maxTaskID1) - var actualTasks []*persistencespb.AllocatedTaskInfo - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(tasks1, actualTasks) - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(maxTaskID1, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - maxTaskID1: true, - }, s.taskTracker.tasks) - - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.taskTracker.loadedTaskID + 1, - ExclusiveMaxTaskID: maxTaskID2 + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks2, - NextPageToken: nil, - }, nil) - - iter = s.taskTracker.taskIterator(context.Background(), maxTaskID2) - actualTasks = nil - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(tasks2, actualTasks) - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.maxTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - maxTaskID1: true, - taskID3: false, - taskID4: false, - maxTaskID2: true, - }, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestIteration_Pagination() { - taskID1 := s.ackedTaskID + 1 - taskID2 := s.ackedTaskID + 3 - taskID3 := s.ackedTaskID + 6 - taskID4 := s.ackedTaskID + 10 - tasks1 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID1}, - {TaskId: taskID2}, - } - token := shuffle.Bytes([]byte("random page token")) - tasks2 := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID3}, - {TaskId: taskID4}, - } - - gomock.InOrder( - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks1, - NextPageToken: token, - }, nil), - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: token, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks2, - NextPageToken: nil, - }, nil), - ) - - iter := s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - var actualTasks []*persistencespb.AllocatedTaskInfo - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(append(tasks1, tasks2...), actualTasks) - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.maxTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - taskID3: false, - taskID4: false, - s.maxTaskID: true, - }, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestIteration_MaxTaskID_Exists() { - taskID1 := s.ackedTaskID + 1 - taskID2 := s.ackedTaskID + 3 - taskID3 := s.maxTaskID - tasks := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID1}, - {TaskId: taskID2}, - {TaskId: taskID3}, - } - - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks, - NextPageToken: nil, - }, nil) - - iter := s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - var actualTasks []*persistencespb.AllocatedTaskInfo - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(tasks, actualTasks) - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.maxTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - taskID3: false, - }, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestIteration_MaxTaskID_Missing() { - taskID1 := s.ackedTaskID + 1 - taskID2 := s.ackedTaskID + 3 - tasks := []*persistencespb.AllocatedTaskInfo{ - {TaskId: taskID1}, - {TaskId: taskID2}, - } - - s.taskStore.EXPECT().GetTasks(gomock.Any(), &persistence.GetTasksRequest{ - NamespaceID: s.namespaceID, - TaskQueue: s.taskQueueName, - TaskType: s.taskQueueType, - PageSize: dbTaskReaderPageSize, - InclusiveMinTaskID: s.ackedTaskID + 1, - ExclusiveMaxTaskID: s.maxTaskID + 1, - NextPageToken: nil, - }).Return(&persistence.GetTasksResponse{ - Tasks: tasks, - NextPageToken: nil, - }, nil) - - iter := s.taskTracker.taskIterator(context.Background(), s.maxTaskID) - var actualTasks []*persistencespb.AllocatedTaskInfo - for iter.HasNext() { - item, err := iter.Next() - s.NoError(err) - actualTasks = append(actualTasks, item) - } - s.Equal(tasks, actualTasks) - - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - s.Equal(s.maxTaskID, s.taskTracker.loadedTaskID) - s.Equal(map[int64]bool{ - taskID1: false, - taskID2: false, - s.maxTaskID: true, - }, s.taskTracker.tasks) -} - -func (s *dbTaskReaderSuite) TestAck_Exist() { - taskID := s.ackedTaskID + 12 - s.taskTracker.tasks[taskID] = false - - s.taskTracker.ackTask(taskID) - s.True(s.taskTracker.tasks[taskID]) -} - -func (s *dbTaskReaderSuite) TestAck_NotExist() { - taskID := s.ackedTaskID + 14 - - s.taskTracker.ackTask(taskID) - _, ok := s.taskTracker.tasks[taskID] - s.False(ok) -} - -func (s *dbTaskReaderSuite) TestMoveAckedTaskID() { - taskID1 := s.ackedTaskID + 1 - taskID2 := s.ackedTaskID + 3 - taskID3 := s.ackedTaskID + 6 - taskID4 := s.ackedTaskID + 10 - - s.taskTracker.tasks = map[int64]bool{ - taskID1: false, - taskID2: false, - taskID3: false, - taskID4: false, - } - s.taskTracker.loadedTaskID = taskID4 - - s.Equal(s.ackedTaskID, s.taskTracker.moveAckedTaskID()) - s.Equal(s.ackedTaskID, s.taskTracker.ackedTaskID) - - s.taskTracker.ackTask(taskID1) - s.Equal(taskID1, s.taskTracker.moveAckedTaskID()) - s.Equal(taskID1, s.taskTracker.ackedTaskID) - s.Equal(map[int64]bool{ - taskID2: false, - taskID3: false, - taskID4: false, - }, s.taskTracker.tasks) - - s.taskTracker.ackTask(taskID3) - s.Equal(taskID1, s.taskTracker.moveAckedTaskID()) - s.Equal(taskID1, s.taskTracker.ackedTaskID) - s.Equal(map[int64]bool{ - taskID2: false, - taskID3: true, - taskID4: false, - }, s.taskTracker.tasks) - - s.taskTracker.ackTask(taskID2) - s.Equal(taskID3, s.taskTracker.moveAckedTaskID()) - s.Equal(taskID3, s.taskTracker.ackedTaskID) - s.Equal(map[int64]bool{ - taskID4: false, - }, s.taskTracker.tasks) - - s.taskTracker.ackTask(taskID4) - s.Equal(taskID4, s.taskTracker.moveAckedTaskID()) - s.Equal(taskID4, s.taskTracker.ackedTaskID) - s.Equal(map[int64]bool{}, s.taskTracker.tasks) -} diff --git a/service/matching/db_task_writer.go b/service/matching/db_task_writer.go deleted file mode 100644 index 55e70ff0a2c..00000000000 --- a/service/matching/db_task_writer.go +++ /dev/null @@ -1,152 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/future" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" -) - -const ( - dbTaskFlusherBatchSize = 16 - dbTaskFlusherBufferSize = dbTaskFlusherBatchSize * 4 -) - -var ( - errDBTaskWriterBufferFull = serviceerror.NewUnavailable("dbTaskWriter encountered task buffer full") -) - -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination db_task_writer_mock.go - -type ( - dbTaskWriterFuture = future.Future[struct{}] - dbTaskWriter interface { - appendTask(task *persistencespb.TaskInfo) dbTaskWriterFuture - flushTasks(ctx context.Context) - notifyFlushChan() <-chan struct{} - } - - dbTaskInfo struct { - task *persistencespb.TaskInfo - future *future.FutureImpl[struct{}] // nil, error - } - - dbTaskWriterImpl struct { - taskQueueKey persistence.TaskQueueKey - ownership dbTaskQueueOwnership - logger log.Logger - - flushSignalChan chan struct{} - taskBuffer chan dbTaskInfo - } -) - -func newDBTaskWriter( - taskQueueKey persistence.TaskQueueKey, - ownership dbTaskQueueOwnership, - logger log.Logger, -) *dbTaskWriterImpl { - return &dbTaskWriterImpl{ - taskQueueKey: taskQueueKey, - ownership: ownership, - logger: logger, - - flushSignalChan: make(chan struct{}, 1), - taskBuffer: make(chan dbTaskInfo, dbTaskFlusherBufferSize), - } -} - -func (f *dbTaskWriterImpl) appendTask( - task *persistencespb.TaskInfo, -) dbTaskWriterFuture { - if len(f.taskBuffer) >= dbTaskFlusherBatchSize { - f.notifyFlush() - } - - fut := future.NewFuture[struct{}]() - select { - case f.taskBuffer <- dbTaskInfo{ - task: task, - future: fut, - }: - // noop - default: - // busy - fut.Set(struct{}{}, errDBTaskWriterBufferFull) - } - return fut -} - -func (f *dbTaskWriterImpl) flushTasks( - ctx context.Context, -) { - for len(f.taskBuffer) > 0 { - f.flushTasksOnce(ctx) - } -} - -func (f *dbTaskWriterImpl) flushTasksOnce( - ctx context.Context, -) { - tasks := make([]*persistencespb.TaskInfo, 0, dbTaskFlusherBatchSize) - futures := make([]*future.FutureImpl[struct{}], 0, len(tasks)) - -FlushLoop: - for i := 0; i < dbTaskFlusherBatchSize; i++ { - select { - case task := <-f.taskBuffer: - tasks = append(tasks, task.task) - futures = append(futures, task.future) - default: - break FlushLoop - } - } - - if len(tasks) == 0 { - return - } - err := f.ownership.flushTasks(ctx, tasks...) - for _, fut := range futures { - fut.Set(struct{}{}, err) - } -} - -func (f *dbTaskWriterImpl) notifyFlush() { - select { - case f.flushSignalChan <- struct{}{}: - default: - // noop, already notified - } -} - -func (f *dbTaskWriterImpl) notifyFlushChan() <-chan struct{} { - return f.flushSignalChan -} diff --git a/service/matching/db_task_writer_mock.go b/service/matching/db_task_writer_mock.go deleted file mode 100644 index eb9f574e963..00000000000 --- a/service/matching/db_task_writer_mock.go +++ /dev/null @@ -1,100 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -// Code generated by MockGen. DO NOT EDIT. -// Source: db_task_writer.go - -// Package matching is a generated GoMock package. -package matching - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - persistence "go.temporal.io/server/api/persistence/v1" -) - -// MockdbTaskWriter is a mock of dbTaskWriter interface. -type MockdbTaskWriter struct { - ctrl *gomock.Controller - recorder *MockdbTaskWriterMockRecorder -} - -// MockdbTaskWriterMockRecorder is the mock recorder for MockdbTaskWriter. -type MockdbTaskWriterMockRecorder struct { - mock *MockdbTaskWriter -} - -// NewMockdbTaskWriter creates a new mock instance. -func NewMockdbTaskWriter(ctrl *gomock.Controller) *MockdbTaskWriter { - mock := &MockdbTaskWriter{ctrl: ctrl} - mock.recorder = &MockdbTaskWriterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockdbTaskWriter) EXPECT() *MockdbTaskWriterMockRecorder { - return m.recorder -} - -// appendTask mocks base method. -func (m *MockdbTaskWriter) appendTask(task *persistence.TaskInfo) dbTaskWriterFuture { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "appendTask", task) - ret0, _ := ret[0].(dbTaskWriterFuture) - return ret0 -} - -// appendTask indicates an expected call of appendTask. -func (mr *MockdbTaskWriterMockRecorder) appendTask(task interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "appendTask", reflect.TypeOf((*MockdbTaskWriter)(nil).appendTask), task) -} - -// flushTasks mocks base method. -func (m *MockdbTaskWriter) flushTasks(ctx context.Context) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "flushTasks", ctx) -} - -// flushTasks indicates an expected call of flushTasks. -func (mr *MockdbTaskWriterMockRecorder) flushTasks(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "flushTasks", reflect.TypeOf((*MockdbTaskWriter)(nil).flushTasks), ctx) -} - -// notifyFlushChan mocks base method. -func (m *MockdbTaskWriter) notifyFlushChan() <-chan struct{} { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "notifyFlushChan") - ret0, _ := ret[0].(<-chan struct{}) - return ret0 -} - -// notifyFlushChan indicates an expected call of notifyFlushChan. -func (mr *MockdbTaskWriterMockRecorder) notifyFlushChan() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "notifyFlushChan", reflect.TypeOf((*MockdbTaskWriter)(nil).notifyFlushChan)) -} diff --git a/service/matching/db_task_writer_test.go b/service/matching/db_task_writer_test.go deleted file mode 100644 index e96d050dae8..00000000000 --- a/service/matching/db_task_writer_test.go +++ /dev/null @@ -1,277 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package matching - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/google/uuid" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - - persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common/future" - "go.temporal.io/server/common/log" - "go.temporal.io/server/common/persistence" - "go.temporal.io/server/common/primitives/timestamp" -) - -type ( - dbTaskWriterSuite struct { - *require.Assertions - suite.Suite - - controller *gomock.Controller - taskOwnership *MockdbTaskQueueOwnership - - namespaceID string - taskQueueName string - taskQueueType enumspb.TaskQueueType - - taskFlusher *dbTaskWriterImpl - } -) - -func TestDBTaskWriterSuite(t *testing.T) { - s := new(dbTaskWriterSuite) - suite.Run(t, s) -} - -func (s *dbTaskWriterSuite) SetupSuite() { - rand.Seed(time.Now().UnixNano()) -} - -func (s *dbTaskWriterSuite) TearDownSuite() { - -} - -func (s *dbTaskWriterSuite) SetupTest() { - s.Assertions = require.New(s.T()) - - s.controller = gomock.NewController(s.T()) - s.taskOwnership = NewMockdbTaskQueueOwnership(s.controller) - - s.namespaceID = uuid.New().String() - s.taskQueueName = uuid.New().String() - s.taskQueueType = enumspb.TASK_QUEUE_TYPE_ACTIVITY - - s.taskFlusher = newDBTaskWriter( - persistence.TaskQueueKey{ - NamespaceID: s.namespaceID, - TaskQueueName: s.taskQueueName, - TaskQueueType: s.taskQueueType, - }, - s.taskOwnership, - log.NewTestLogger(), - ) -} - -func (s *dbTaskWriterSuite) TearDownTest() { - s.controller.Finish() -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Once_Success() { - ctx := context.Background() - task := s.randomTask() - - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), task).Return(nil) - - fut := s.taskFlusher.appendTask(task) - s.taskFlusher.flushTasks(context.Background()) - - _, err := fut.Get(ctx) - s.NoError(err) - select { - case <-s.taskFlusher.notifyFlushChan(): - s.Fail("there should be no signal") - default: - // test pass - } -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Once_Failed() { - ctx := context.Background() - task := s.randomTask() - randomErr := serviceerror.NewUnavailable("random error") - - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), task).Return(randomErr) - - fut := s.taskFlusher.appendTask(task) - s.taskFlusher.flushTasks(context.Background()) - - _, err := fut.Get(ctx) - s.Equal(randomErr, err) - select { - case <-s.taskFlusher.notifyFlushChan(): - s.Fail("there should be no signal") - default: - // test pass - } -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Multiple_OnePage_Success() { - numTasks := dbTaskFlusherBatchSize - 1 - ctx := context.Background() - - var futures []future.Future[struct{}] - var tasks []interface{} - for i := 0; i < numTasks; i++ { - task := s.randomTask() - fut := s.taskFlusher.appendTask(task) - tasks = append(tasks, task) - futures = append(futures, fut) - } - - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), tasks...).Return(nil) - - s.taskFlusher.flushTasks(context.Background()) - - for _, fut := range futures { - _, err := fut.Get(ctx) - s.NoError(err) - } - select { - case <-s.taskFlusher.notifyFlushChan(): - s.Fail("there should be no signal") - default: - // test pass - } -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Multiple_OnePage_Failed() { - numTasks := dbTaskFlusherBatchSize - 1 - ctx := context.Background() - randomErr := serviceerror.NewUnavailable("random error") - - var futures []future.Future[struct{}] - var tasks []interface{} - for i := 0; i < numTasks; i++ { - task := s.randomTask() - fut := s.taskFlusher.appendTask(task) - tasks = append(tasks, task) - futures = append(futures, fut) - } - - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), tasks...).Return(randomErr) - - s.taskFlusher.flushTasks(context.Background()) - - for _, fut := range futures { - _, err := fut.Get(ctx) - s.Equal(randomErr, err) - } - select { - case <-s.taskFlusher.notifyFlushChan(): - s.Fail("there should be no signal") - default: - // test pass - } -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Multiple_MultiPage_Success() { - numTasks := 2*dbTaskFlusherBatchSize - 2 - ctx := context.Background() - - var futures []future.Future[struct{}] - var taskBatch [][]interface{} - var tasks []interface{} - for i := 0; i < numTasks; i++ { - task := s.randomTask() - fut := s.taskFlusher.appendTask(task) - if len(tasks) >= dbTaskFlusherBatchSize { - taskBatch = append(taskBatch, tasks) - tasks = nil - } - tasks = append(tasks, task) - futures = append(futures, fut) - } - if len(tasks) > 0 { - taskBatch = append(taskBatch, tasks) - } - - for _, tasks := range taskBatch { - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), tasks...).Return(nil) - } - - s.taskFlusher.flushTasks(context.Background()) - - for _, fut := range futures { - _, err := fut.Get(ctx) - s.NoError(err) - } - <-s.taskFlusher.notifyFlushChan() -} - -func (s *dbTaskWriterSuite) TestAppendFlushTask_Multiple_MultiPage_Failed() { - numTasks := 2*dbTaskFlusherBatchSize - 2 - ctx := context.Background() - randomErr := serviceerror.NewUnavailable("random error") - - var futures []future.Future[struct{}] - var taskBatch [][]interface{} - var tasks []interface{} - for i := 0; i < numTasks; i++ { - task := s.randomTask() - fut := s.taskFlusher.appendTask(task) - if len(tasks) >= dbTaskFlusherBatchSize { - taskBatch = append(taskBatch, tasks) - tasks = nil - } - tasks = append(tasks, task) - futures = append(futures, fut) - } - if len(tasks) > 0 { - taskBatch = append(taskBatch, tasks) - } - - for _, tasks := range taskBatch { - s.taskOwnership.EXPECT().flushTasks(gomock.Any(), tasks...).Return(randomErr) - } - - s.taskFlusher.flushTasks(context.Background()) - - for _, fut := range futures { - _, err := fut.Get(ctx) - s.Equal(randomErr, err) - } - <-s.taskFlusher.notifyFlushChan() -} - -func (s *dbTaskWriterSuite) randomTask() *persistencespb.TaskInfo { - return &persistencespb.TaskInfo{ - NamespaceId: s.namespaceID, - WorkflowId: uuid.New().String(), - RunId: uuid.New().String(), - ScheduledEventId: rand.Int63(), - CreateTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - ExpiryTime: timestamp.TimePtr(time.Unix(0, rand.Int63())), - } -}