Skip to content

Commit

Permalink
Skip final task queue update if lost ownership (#4554)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jul 21, 2023
1 parent 9219be2 commit 674657b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 7 deletions.
10 changes: 10 additions & 0 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,7 @@ type testTaskQueueManager struct {
createTaskCount int
getTasksCount int
getUserDataCount int
updateCount int
tasks *treemap.Map
userData *persistencespb.VersionedTaskQueueUserData
}
Expand Down Expand Up @@ -2495,6 +2496,7 @@ func (m *testTaskManager) UpdateTaskQueue(
tlm := m.getTaskQueueManager(newTestTaskQueueID(namespace.ID(tli.GetNamespaceId()), tli.Name, tli.TaskType))
tlm.Lock()
defer tlm.Unlock()
tlm.updateCount++

if tlm.rangeID != request.PrevRangeID {
return nil, &persistence.ConditionFailedError{
Expand Down Expand Up @@ -2695,6 +2697,14 @@ func (m *testTaskManager) getGetUserDataCount(taskQueue *taskQueueID) int {
return tlm.getUserDataCount
}

// getUpdateCount returns how many times UpdateTaskQueue was called
func (m *testTaskManager) getUpdateCount(taskQueue *taskQueueID) int {
tlm := m.getTaskQueueManager(taskQueue)
tlm.Lock()
defer tlm.Unlock()
return tlm.updateCount
}

func (m *testTaskManager) String() string {
m.Lock()
defer m.Unlock()
Expand Down
22 changes: 15 additions & 7 deletions service/matching/taskQueueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ type (
// userDataReady is fulfilled once versioning data is fetched from the root partition. If this TQ is
// the root partition, it is fulfilled as soon as it is fetched from db.
userDataReady *future.FutureImpl[struct{}]
// lostOwnership controls behavior on Stop: if it's false, we try to write one final
// update before unloading
lostOwnership atomic.Bool
}
)

Expand Down Expand Up @@ -291,6 +294,7 @@ func (c *taskQueueManagerImpl) signalIfFatal(err error) bool {
var condfail *persistence.ConditionFailedError
if errors.As(err, &condfail) {
c.taggedMetricsHandler.Counter(metrics.ConditionFailedErrorPerTaskQueueCounter.GetMetricName()).Record(1)
c.lostOwnership.Store(true)
c.unloadFromEngine()
return true
}
Expand Down Expand Up @@ -325,17 +329,18 @@ func (c *taskQueueManagerImpl) Stop() {
) {
return
}
// ackLevel in taskAckManager is initialized to -1 and then set to a real value (>= 0) once
// we've successfully acquired a lease. If it's still -1, then we don't have current
// metadata. UpdateState would fail on the lease check, but don't even bother calling it.
// Maybe try to write one final update of ack level and GC some tasks.
// Skip the update if we never initialized (ackLevel will be -1 in that case).
// Also skip if we're stopping due to lost ownership (the update will fail in that case).
// Ignore any errors.
// Note that it's fine to GC even if the update ack level fails because we did match the
// tasks, the next owner will just read over an empty range.
ackLevel := c.taskAckManager.getAckLevel()
if ackLevel >= 0 {
if ackLevel >= 0 && !c.lostOwnership.Load() {
ctx, cancel := c.newIOContext()
defer cancel()

if err := c.db.UpdateState(ctx, ackLevel); err != nil {
c.logger.Error("Failed to update task queue state", tag.Error(err))
}
_ = c.db.UpdateState(ctx, ackLevel)
c.taskGC.RunNow(ctx, ackLevel)
}
c.liveness.Stop()
Expand All @@ -361,6 +366,7 @@ func (c *taskQueueManagerImpl) SetInitializedError(err error) {
c.initializedError.Set(struct{}{}, err)
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
c.lostOwnership.Store(true) // not really lost ownership but we want to skip the last write
c.unloadFromEngine()
}
}
Expand All @@ -379,6 +385,7 @@ func (c *taskQueueManagerImpl) SetUserDataState(userDataState userDataState, fut
if !c.userDataReady.Ready() {
c.userDataReady.Set(struct{}{}, futureError)
if futureError != nil {
c.lostOwnership.Store(true) // not really lost ownership but we want to skip the last write
c.unloadFromEngine()
}
}
Expand Down Expand Up @@ -648,6 +655,7 @@ func (c *taskQueueManagerImpl) completeTask(task *persistencespb.AllocatedTaskIn
tag.Error(err),
tag.WorkflowTaskQueueName(c.taskQueueID.FullName()),
tag.WorkflowTaskQueueType(c.taskQueueID.taskType))
c.lostOwnership.Store(true) // not really lost ownership but we want to skip the last write
c.unloadFromEngine()
return
}
Expand Down
46 changes: 46 additions & 0 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,52 @@ func TestAddTaskStandby(t *testing.T) {
require.False(t, syncMatch)
}

func TestTQMDoesFinalUpdateOnIdleUnload(t *testing.T) {
t.Parallel()

controller := gomock.NewController(t)

cfg := NewConfig(dynamicconfig.NewNoopCollection(), false, false)
cfg.MaxTaskQueueIdleTime = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueueInfo(1 * time.Second)
tqCfg := defaultTqmTestOpts(controller)
tqCfg.config = cfg

tqm := mustCreateTestTaskQueueManagerWithConfig(t, controller, tqCfg)
tm := tqm.engine.taskManager.(*testTaskManager)

tqm.Start()
time.Sleep(2 * time.Second) // will unload due to idleness
require.Equal(t, 1, tm.getUpdateCount(tqCfg.tqId))
}

func TestTQMDoesNotDoFinalUpdateOnOwnershipLost(t *testing.T) {
// TODO: use mocks instead of testTaskManager so we can do synchronization better instead of sleeps
t.Parallel()

controller := gomock.NewController(t)

cfg := NewConfig(dynamicconfig.NewNoopCollection(), false, false)
cfg.UpdateAckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskQueueInfo(2 * time.Second)
tqCfg := defaultTqmTestOpts(controller)
tqCfg.config = cfg

tqm := mustCreateTestTaskQueueManagerWithConfig(t, controller, tqCfg)
tm := tqm.engine.taskManager.(*testTaskManager)

tqm.Start()
time.Sleep(1 * time.Second)

// simulate ownership lost
ttm := tm.getTaskQueueManager(tqCfg.tqId)
ttm.Lock()
ttm.rangeID++
ttm.Unlock()

time.Sleep(2 * time.Second) // will attempt to update and fail and not try again

require.Equal(t, 1, tm.getUpdateCount(tqCfg.tqId))
}

func TestUserData_LoadOnInit(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 674657b

Please sign in to comment.