Skip to content

Commit

Permalink
Wrap setting futures in taskQueueManager (#4494)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored Jun 20, 2023
1 parent 35ad0a5 commit 7f064ea
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
30 changes: 21 additions & 9 deletions service/matching/taskQueueManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,22 @@ func (c *taskQueueManagerImpl) managesSpecificVersionSet() bool {
return c.taskQueueID.VersionSet() != ""
}

func (c *taskQueueManagerImpl) SetInitializedError(err error) {
c.initializedError.Set(struct{}{}, err)
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
c.unloadFromEngine()
}
}

func (c *taskQueueManagerImpl) SetUserDataInitialFetch(err error) {
c.userDataInitialFetch.Set(struct{}{}, err)
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
c.unloadFromEngine()
}
}

func (c *taskQueueManagerImpl) WaitUntilInitialized(ctx context.Context) error {
_, err := c.initializedError.Get(ctx)
if err != nil {
Expand Down Expand Up @@ -762,22 +778,18 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error {

if !c.config.LoadUserData() {
// if disabled, mark ready now
c.userDataInitialFetch.Set(struct{}{}, nil)
c.SetUserDataInitialFetch(nil)
return nil
}
if c.managesSpecificVersionSet() {
// tqm for specific version set doesn't have its own user data
c.userDataInitialFetch.Set(struct{}{}, nil)
c.SetUserDataInitialFetch(nil)
return nil
}
if c.db.DbStoresUserData() {
// root workflow partition "owns" user data, read it from db
err := c.db.loadUserData(ctx)
c.userDataInitialFetch.Set(struct{}{}, err)
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
c.unloadFromEngine()
}
c.SetUserDataInitialFetch(err)
return err
}

Expand All @@ -787,7 +799,7 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error {
if err != nil {
if err == errMissingNormalQueueName {
// pretend we have no user data
c.userDataInitialFetch.Set(struct{}{}, nil)
c.SetUserDataInitialFetch(nil)
}
return err
}
Expand Down Expand Up @@ -821,7 +833,7 @@ func (c *taskQueueManagerImpl) fetchUserData(ctx context.Context) error {
c.db.setUserDataForNonOwningPartition(res.GetUserData())
}
if firstCall {
c.userDataInitialFetch.Set(struct{}{}, err)
c.SetUserDataInitialFetch(nil)
firstCall = false
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func TestDeliverBufferTasks_RetriesVersionedTaskWhenUserInfoDisabled(t *testing.
},
}

tlm.initializedError.Set(struct{}{}, nil)
tlm.userDataInitialFetch.Set(struct{}{}, nil)
tlm.SetInitializedError(nil)
tlm.SetUserDataInitialFetch(nil)
tlm.taskReader.gorogrp.Go(tlm.taskReader.dispatchBufferedTasks)

time.Sleep(3 * taskReaderOfferThrottleWait)
Expand Down Expand Up @@ -179,8 +179,8 @@ func TestDeliverBufferTasks_RetriesUseDefaultTaskWhenUserInfoDisabled(t *testing
},
}

tlm.initializedError.Set(struct{}{}, nil)
tlm.userDataInitialFetch.Set(struct{}{}, nil)
tlm.SetInitializedError(nil)
tlm.SetUserDataInitialFetch(nil)
tlm.taskReader.gorogrp.Go(tlm.taskReader.dispatchBufferedTasks)

time.Sleep(taskReaderOfferThrottleWait)
Expand Down
8 changes: 2 additions & 6 deletions service/matching/taskWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,8 @@ func (w *taskWriter) appendTasks(

func (w *taskWriter) taskWriterLoop(ctx context.Context) error {
err := w.initReadWriteState(ctx)
w.tlMgr.initializedError.Set(struct{}{}, err)
if err != nil {
// We can't recover from here without starting over, so unload the whole task queue
w.tlMgr.unloadFromEngine()
return err
}
w.tlMgr.SetInitializedError(err)

writerLoop:
for {
select {
Expand Down

0 comments on commit 7f064ea

Please sign in to comment.