Skip to content

Commit

Permalink
Merge branch 'master' into util_last
Browse files Browse the repository at this point in the history
  • Loading branch information
agautam478 authored Apr 4, 2024
2 parents 0e59c94 + 4d34cfb commit 9460269
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 32 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ require (
github.com/xdg/stringprep v1.0.0 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/dig v1.10.0 // indirect
go.uber.org/goleak v1.0.0
go.uber.org/net/metrics v1.3.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20220218215828-6cf2b201936e // indirect
Expand Down
3 changes: 3 additions & 0 deletions service/history/queue/timer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (t *timerQueueProcessorBase) Start() {
go t.processorPump()
}

// Edge Case: Stop doesn't stop TimerGate if timerQueueProcessorBase is only initiliazed without starting
// As a result, TimerGate needs to be stopped separately
// One way to fix this is to make sure TimerGate doesn't start daemon loop on initilization and requires explicit Start
func (t *timerQueueProcessorBase) Stop() {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
Expand Down
85 changes: 53 additions & 32 deletions service/history/queue/timer_queue_processor_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/goleak"

"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
Expand Down Expand Up @@ -93,10 +94,12 @@ func (s *timerQueueProcessorBaseSuite) SetupTest() {
func (s *timerQueueProcessorBaseSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
goleak.VerifyNone(s.T())
}

func (s *timerQueueProcessorBaseSuite) TestIsProcessNow() {
timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
s.True(timerQueueProcessBase.isProcessNow(time.Time{}))

now := s.mockShard.GetCurrentTime(s.clusterName)
Expand All @@ -107,6 +110,7 @@ func (s *timerQueueProcessorBaseSuite) TestIsProcessNow() {

timeAfter := now.Add(10 * time.Second)
s.False(timerQueueProcessBase.isProcessNow(timeAfter))

}

func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_More() {
Expand Down Expand Up @@ -141,7 +145,8 @@ func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_More() {
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.getTimerTasks(readLevel, maxReadLevel, request.NextPageToken, batchSize)
s.Nil(err)
s.Equal(response.Timers, got.Timers)
Expand Down Expand Up @@ -180,7 +185,8 @@ func (s *timerQueueProcessorBaseSuite) TestGetTimerTasks_NoMore() {
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.getTimerTasks(readLevel, maxReadLevel, request.NextPageToken, batchSize)
s.Nil(err)
s.Equal(response.Timers, got.Timers)
Expand Down Expand Up @@ -220,7 +226,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadLookAheadTask() {
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
lookAheadTask, err := timerQueueProcessBase.readLookAheadTask(readLevel, maxReadLevel)
s.Nil(err)
s.Equal(response.Timers[0], lookAheadTask)
Expand Down Expand Up @@ -265,7 +272,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_NoLookAhead_NoNext
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, lookAheadRequest).Return(&persistence.GetTimerIndexTasksResponse{}, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken)
s.Nil(err)
s.Equal(response.Timers, got.timerTasks)
Expand Down Expand Up @@ -304,7 +312,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_NoLookAhead_HasNex
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken)
s.Nil(err)
s.Equal(response.Timers, got.timerTasks)
Expand Down Expand Up @@ -354,7 +363,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_HasLookAhead_NoNex
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken)
s.Nil(err)
s.Equal([]*persistence.TimerTaskInfo{response.Timers[0]}, got.timerTasks)
Expand Down Expand Up @@ -404,7 +414,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_HasLookAhead_HasNe
mockExecutionMgr := s.mockShard.Resource.ExecutionMgr
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken)
s.Nil(err)
s.Equal([]*persistence.TimerTaskInfo{response.Timers[0]}, got.timerTasks)
Expand Down Expand Up @@ -462,7 +473,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_LookAheadFailed_No
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, request).Return(response, nil).Once()
mockExecutionMgr.On("GetTimerIndexTasks", mock.Anything, lookAheadRequest).Return(nil, errors.New("some random error")).Times(s.mockShard.GetConfig().TimerProcessorGetFailureRetryCount())

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()
got, err := timerQueueProcessBase.readAndFilterTasks(readLevel, maxReadLevel, request.NextPageToken)
s.Nil(err)
s.Equal(response.Timers, got.timerTasks)
Expand All @@ -471,7 +483,8 @@ func (s *timerQueueProcessorBaseSuite) TestReadAndFilterTasks_LookAheadFailed_No
}

func (s *timerQueueProcessorBaseSuite) TestNotifyNewTimes() {
timerQueueProcessBase := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(nil, nil, nil, nil, nil)
defer done()

// assert the initial state
s.True(timerQueueProcessBase.newTime.IsZero())
Expand Down Expand Up @@ -539,7 +552,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessQueueCollections_SkipRead() {
return shardMaxReadLevel
}

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
defer done()
timerQueueProcessBase.processQueueCollections(map[int]struct{}{queueLevel: {}})

s.Len(timerQueueProcessBase.processingQueueCollections, 1)
Expand Down Expand Up @@ -620,7 +634,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_HasNextPage() {

s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
defer done()
timerQueueProcessBase.processQueueCollections(map[int]struct{}{queueLevel: {}})

s.Len(timerQueueProcessBase.processingQueueCollections, 1)
Expand Down Expand Up @@ -710,7 +725,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_NoNextPage_HasLookAhead(

s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
defer done()
timerQueueProcessBase.processingQueueReadProgress[0] = timeTaskReadProgress{
currentQueue: timerQueueProcessBase.processingQueueCollections[0].ActiveQueue(),
readLevel: ackLevel,
Expand Down Expand Up @@ -807,7 +823,8 @@ func (s *timerQueueProcessorBaseSuite) TestProcessBatch_NoNextPage_NoLookAhead()

s.mockTaskProcessor.EXPECT().TrySubmit(gomock.Any()).Return(true, nil).AnyTimes()

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, done := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
defer done()
timerQueueProcessBase.processingQueueReadProgress[0] = timeTaskReadProgress{
currentQueue: timerQueueProcessBase.processingQueueCollections[0].ActiveQueue(),
readLevel: ackLevel,
Expand Down Expand Up @@ -854,7 +871,7 @@ func (s *timerQueueProcessorBaseSuite) TestTimerProcessorPump_HandleAckLevelUpda
return newTimerTaskKey(now, 0)
}

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, _ := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase.options.UpdateAckInterval = dynamicconfig.GetDurationPropertyFn(1 * time.Millisecond)
updatedCh := make(chan struct{}, 1)
timerQueueProcessBase.updateAckLevelFn = func() (bool, task.Key, error) {
Expand Down Expand Up @@ -889,7 +906,7 @@ func (s *timerQueueProcessorBaseSuite) TestTimerProcessorPump_SplitQueue() {
return newTimerTaskKey(now, 0)
}

timerQueueProcessBase := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase, _ := s.newTestTimerQueueProcessorBase(processingQueueStates, updateMaxReadLevel, nil, nil, nil)
timerQueueProcessBase.options.SplitQueueInterval = dynamicconfig.GetDurationPropertyFn(1 * time.Millisecond)
splittedCh := make(chan struct{}, 1)
timerQueueProcessBase.splitProcessingQueueCollectionFn = func(splitPolicy ProcessingQueueSplitPolicy, upsertPollTimeFn func(int, time.Time)) {
Expand All @@ -912,21 +929,25 @@ func (s *timerQueueProcessorBaseSuite) newTestTimerQueueProcessorBase(
updateClusterAckLevel updateClusterAckLevelFn,
updateProcessingQueueStates updateProcessingQueueStatesFn,
queueShutdown queueShutdownFn,
) *timerQueueProcessorBase {
) (*timerQueueProcessorBase, func()) {
timerGate := NewLocalTimerGate(s.mockShard.GetTimeSource())

return newTimerQueueProcessorBase(
s.clusterName,
s.mockShard,
processingQueueStates,
s.mockTaskProcessor,
NewLocalTimerGate(s.mockShard.GetTimeSource()),
newTimerQueueProcessorOptions(s.mockShard.GetConfig(), true, false),
updateMaxReadLevel,
updateClusterAckLevel,
updateProcessingQueueStates,
queueShutdown,
nil,
nil,
s.logger,
s.metricsClient,
)
s.clusterName,
s.mockShard,
processingQueueStates,
s.mockTaskProcessor,
timerGate,
newTimerQueueProcessorOptions(s.mockShard.GetConfig(), true, false),
updateMaxReadLevel,
updateClusterAckLevel,
updateProcessingQueueStates,
queueShutdown,
nil,
nil,
s.logger,
s.metricsClient,
), func() {
timerGate.Close()
}
}
75 changes: 75 additions & 0 deletions service/history/queue/transfer_queue_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package queue

import (
"testing"

"github.com/golang/mock/gomock"
"go.uber.org/goleak"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/reset"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
"github.com/uber/cadence/service/history/workflowcache"
"github.com/uber/cadence/service/worker/archiver"
)

func TestTransferQueueProcessor_RequireStartStop(t *testing.T) {
// some goroutine leak not from this test
defer goleak.VerifyNone(t)
ctrl := gomock.NewController(t)
mockShard := shard.NewTestContext(
t, ctrl, &persistence.ShardInfo{
ShardID: 10,
RangeID: 1,
TransferAckLevel: 0,
},
config.NewForTest())
defer mockShard.Finish(t)

mockProcessor := task.NewMockProcessor(ctrl)
mockResetter := reset.NewMockWorkflowResetter(ctrl)
mockArchiver := &archiver.ClientMock{}
mockInvariant := invariant.NewMockInvariant(ctrl)
mockWorkflowCache := workflowcache.NewMockWFCache(ctrl)
ratelimit := func(domain string) bool { return false }

// Create a new transferQueueProcessor
processor := NewTransferQueueProcessor(
mockShard,
mockShard.GetEngine(),
mockProcessor,
execution.NewCache(mockShard),
mockResetter,
mockArchiver,
mockInvariant,
mockWorkflowCache,
ratelimit)
processor.Start()
processor.Stop()
}

0 comments on commit 9460269

Please sign in to comment.