From 7f064eac4c60e690b0da0e86ed878a5c8b9269dc Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 20 Jun 2023 15:31:27 -0700 Subject: [PATCH] Wrap setting futures in taskQueueManager (#4494) --- service/matching/taskQueueManager.go | 30 ++++++++++++++++------- service/matching/taskQueueManager_test.go | 8 +++--- service/matching/taskWriter.go | 8 ++---- 3 files changed, 27 insertions(+), 19 deletions(-) diff --git a/service/matching/taskQueueManager.go b/service/matching/taskQueueManager.go index 4e768c9827f..466296b1c12 100644 --- a/service/matching/taskQueueManager.go +++ b/service/matching/taskQueueManager.go @@ -357,6 +357,22 @@ func (c *taskQueueManagerImpl) managesSpecificVersionSet() bool { return c.taskQueueID.VersionSet() != "" } +func (c *taskQueueManagerImpl) SetInitializedError(err error) { + c.initializedError.Set(struct{}{}, err) + if err != nil { + // We can't recover from here without starting over, so unload the whole task queue + c.unloadFromEngine() + } +} + +func (c *taskQueueManagerImpl) SetUserDataInitialFetch(err error) { + c.userDataInitialFetch.Set(struct{}{}, err) + if err != nil { + // We can't recover from here without starting over, so unload the whole task queue + c.unloadFromEngine() + } +} + func (c *taskQueueManagerImpl) WaitUntilInitialized(ctx context.Context) error { _, err := c.initializedError.Get(ctx) if err != nil { @@ -762,22 +778,18 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error { if !c.config.LoadUserData() { // if disabled, mark ready now - c.userDataInitialFetch.Set(struct{}{}, nil) + c.SetUserDataInitialFetch(nil) return nil } if c.managesSpecificVersionSet() { // tqm for specific version set doesn't have its own user data - c.userDataInitialFetch.Set(struct{}{}, nil) + c.SetUserDataInitialFetch(nil) return nil } if c.db.DbStoresUserData() { // root workflow partition "owns" user data, read it from db err := c.db.loadUserData(ctx) - c.userDataInitialFetch.Set(struct{}{}, err) - if err != nil { - // We can't recover from here without starting over, so unload the whole task queue - c.unloadFromEngine() - } + c.SetUserDataInitialFetch(err) return err } @@ -787,7 +799,7 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error { if err != nil { if err == errMissingNormalQueueName { // pretend we have no user data - c.userDataInitialFetch.Set(struct{}{}, nil) + c.SetUserDataInitialFetch(nil) } return err } @@ -821,7 +833,7 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error { c.db.setUserDataForNonOwningPartition(res.GetUserData()) } if firstCall { - c.userDataInitialFetch.Set(struct{}{}, err) + c.SetUserDataInitialFetch(nil) firstCall = false } return nil diff --git a/service/matching/taskQueueManager_test.go b/service/matching/taskQueueManager_test.go index 37088f01f75..812a31dc65c 100644 --- a/service/matching/taskQueueManager_test.go +++ b/service/matching/taskQueueManager_test.go @@ -144,8 +144,8 @@ func TestDeliverBufferTasks_RetriesVersionedTaskWhenUserInfoDisabled(t *testing. }, } - tlm.initializedError.Set(struct{}{}, nil) - tlm.userDataInitialFetch.Set(struct{}{}, nil) + tlm.SetInitializedError(nil) + tlm.SetUserDataInitialFetch(nil) tlm.taskReader.gorogrp.Go(tlm.taskReader.dispatchBufferedTasks) time.Sleep(3 * taskReaderOfferThrottleWait) @@ -179,8 +179,8 @@ func TestDeliverBufferTasks_RetriesUseDefaultTaskWhenUserInfoDisabled(t *testing }, } - tlm.initializedError.Set(struct{}{}, nil) - tlm.userDataInitialFetch.Set(struct{}{}, nil) + tlm.SetInitializedError(nil) + tlm.SetUserDataInitialFetch(nil) tlm.taskReader.gorogrp.Go(tlm.taskReader.dispatchBufferedTasks) time.Sleep(taskReaderOfferThrottleWait) diff --git a/service/matching/taskWriter.go b/service/matching/taskWriter.go index e92b4d2e97a..3c1cc823000 100644 --- a/service/matching/taskWriter.go +++ b/service/matching/taskWriter.go @@ -218,12 +218,8 @@ func (w *taskWriter) appendTasks( func (w *taskWriter) taskWriterLoop(ctx context.Context) error { err := w.initReadWriteState(ctx) - w.tlMgr.initializedError.Set(struct{}{}, err) - if err != nil { - // We can't recover from here without starting over, so unload the whole task queue - w.tlMgr.unloadFromEngine() - return err - } + w.tlMgr.SetInitializedError(err) + writerLoop: for { select {