diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index 4f90ce4afafb..e4468c31db45 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -878,7 +878,7 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w return &workflowservice.PollWorkflowTaskQueueResponse{}, nil } - // For newer build error, return silently. + // These errors are expected based on certain client behavior. We should not log them, it'd be too noisy. var newerBuild *serviceerror.NewerBuildExists if errors.As(err, &newerBuild) { return nil, err @@ -1114,7 +1114,7 @@ func (wh *WorkflowHandler) PollActivityTaskQueue(ctx context.Context, request *w return &workflowservice.PollActivityTaskQueueResponse{}, nil } - // For newer build error, return silently. + // These errors are expected based on certain client behavior. We should not log them, it'd be too noisy. var newerBuild *serviceerror.NewerBuildExists if errors.As(err, &newerBuild) { return nil, err diff --git a/service/matching/taskQueueManager.go b/service/matching/taskQueueManager.go index f548e4d8c382..171fa2bb70c8 100644 --- a/service/matching/taskQueueManager.go +++ b/service/matching/taskQueueManager.go @@ -346,13 +346,12 @@ func (c *taskQueueManagerImpl) Stop() { c.unloadFromEngine() } -// isVersioned returns true if this is a tqm for a "versioned [low-level] task queue". Note -// that this is a different concept from the overall [high-level] task queue having versioning -// data associated with it, which is the usual meaning of "versioned task queue". In this case, -// it means whether this is a tqm processing a specific version set id. Unlike non-root -// partitions which are known (at some level) by other services, [low-level] task queues with a -// version set should not be interacted with outside of the matching service. -func (c *taskQueueManagerImpl) isVersioned() bool { +// managesSpecificVersionSet returns true if this is a tqm for a specific version set in the +// build-id-based versioning feature. Note that this is a different concept from the overall +// task queue having versioning data associated with it, which is the usual meaning of +// "versioned task queue". These task queues are not interacted with directly outside outside +// of a single matching node. +func (c *taskQueueManagerImpl) managesSpecificVersionSet() bool { return c.taskQueueID.VersionSet() != "" } @@ -362,7 +361,7 @@ func (c *taskQueueManagerImpl) isVersioned() bool { func (c *taskQueueManagerImpl) shouldFetchUserData() bool { // 1. If the db stores it, then we definitely should not be fetching. // 2. Additionally, we should not fetch for "versioned" tqms. - return !c.db.DbStoresUserData() && !c.isVersioned() + return !c.db.DbStoresUserData() && !c.managesSpecificVersionSet() } func (c *taskQueueManagerImpl) WaitUntilInitialized(ctx context.Context) error { @@ -510,7 +509,7 @@ func (c *taskQueueManagerImpl) DispatchQueryTask( // GetUserData returns the user data for the task queue if any. // Note: can return nil value with no error. func (c *taskQueueManagerImpl) GetUserData(ctx context.Context) (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) { - if c.isVersioned() { + if c.managesSpecificVersionSet() { return nil, nil, errNoUserDataOnVersionedTQM } return c.db.GetUserData(ctx) diff --git a/tests/versioning_test.go b/tests/versioning_test.go index eadb6bd4188e..72db5a5757c4 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -171,7 +171,7 @@ func (s *versioningIntegSuite) TestLinkToNonexistentCompatibleVersionReturnsNotF func (s *versioningIntegSuite) TestVersioningStatePersistsAcrossUnload() { ctx := NewContext() - tq := "integration-versioning-not-destroyed" + tq := "integration-versioning-persists" s.addNewDefaultBuildId(ctx, tq, "foo") @@ -311,6 +311,50 @@ func (s *versioningIntegSuite) dispatchNewWorkflow() { s.Equal("done!", out) } +func (s *versioningIntegSuite) TestDispatchNotUsingVersioning() { + s.testWithMatchingBehavior(s.dispatchNotUsingVersioning) +} + +func (s *versioningIntegSuite) dispatchNotUsingVersioning() { + tq := s.randomizeStr(s.T().Name()) + + wf1nover := func(ctx workflow.Context) (string, error) { + return "done without versioning!", nil + } + wf1 := func(ctx workflow.Context) (string, error) { + return "done with versioning!", nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.addNewDefaultBuildId(ctx, tq, "v1") + s.waitForPropagation(ctx, tq, "v1") + + w1nover := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1"), + UseBuildIDForVersioning: false, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1nover.RegisterWorkflowWithOptions(wf1nover, workflow.RegisterOptions{Name: "wf"}) + w1.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + s.NoError(w1nover.Start()) + defer w1nover.Stop() + s.NoError(w1.Start()) + defer w1.Stop() + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.NoError(err) + var out string + s.NoError(run.Get(ctx, &out)) + s.Equal("done with versioning!", out) +} + func (s *versioningIntegSuite) TestDispatchNewWorkflowStartWorkerFirst() { s.testWithMatchingBehavior(s.dispatchNewWorkflowStartWorkerFirst) } @@ -468,11 +512,27 @@ func (s *versioningIntegSuite) dispatchUpgrade(stopOld bool) { s.Equal("done from 1.1!", out) } +type activityFailMode int + +const ( + dontFailActivity = iota + failActivity + timeoutActivity +) + func (s *versioningIntegSuite) TestDispatchActivity() { - s.testWithMatchingBehavior(s.dispatchActivity) + s.testWithMatchingBehavior(func() { s.dispatchActivity(dontFailActivity) }) +} + +func (s *versioningIntegSuite) TestDispatchActivityFail() { + s.testWithMatchingBehavior(func() { s.dispatchActivity(failActivity) }) } -func (s *versioningIntegSuite) dispatchActivity() { +func (s *versioningIntegSuite) TestDispatchActivityTimeout() { + s.testWithMatchingBehavior(func() { s.dispatchActivity(timeoutActivity) }) +} + +func (s *versioningIntegSuite) dispatchActivity(failMode activityFailMode) { // This also implicitly tests that a workflow stays on a compatible version set if a new // incompatible set is registered, because wf2 just panics. It further tests that // stickiness on v1 is not broken by registering v2, because the channel send will panic on @@ -482,8 +542,32 @@ func (s *versioningIntegSuite) dispatchActivity() { started := make(chan struct{}, 1) - act1 := func() (string, error) { return "v1", nil } - act2 := func() (string, error) { return "v2", nil } + var act1state, act2state atomic.Int32 + + act1 := func() (string, error) { + if act1state.Add(1) == 1 { + switch failMode { + case failActivity: + return "", errors.New("try again") + case timeoutActivity: + time.Sleep(5 * time.Second) + return "ignored", nil + } + } + return "v1", nil + } + act2 := func() (string, error) { + if act2state.Add(1) == 1 { + switch failMode { + case failActivity: + return "", errors.New("try again") + case timeoutActivity: + time.Sleep(5 * time.Second) + return "ignored", nil + } + } + return "v2", nil + } wf1 := func(ctx workflow.Context) (string, error) { started <- struct{}{} // wait for signal @@ -493,11 +577,13 @@ func (s *versioningIntegSuite) dispatchActivity() { ScheduleToCloseTimeout: time.Minute, DisableEagerExecution: true, VersioningIntent: temporal.VersioningIntentCompatible, + StartToCloseTimeout: 1 * time.Second, }), "act") fut2 := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToCloseTimeout: time.Minute, DisableEagerExecution: true, VersioningIntent: temporal.VersioningIntentDefault, // this one should go to default + StartToCloseTimeout: 1 * time.Second, }), "act") var val1, val2 string s.NoError(fut1.Get(ctx, &val1)) @@ -528,7 +614,7 @@ func (s *versioningIntegSuite) dispatchActivity() { s.NoError(err) // wait for it to start on v1 s.waitForChan(ctx, started) - close(started) //force panic if replayed + close(started) // force panic if replayed // now register v2 as default s.addNewDefaultBuildId(ctx, tq, "v2") @@ -552,6 +638,78 @@ func (s *versioningIntegSuite) dispatchActivity() { s.Equal("v1v2", out) } +func (s *versioningIntegSuite) TestDispatchActivityCompatible() { + s.testWithMatchingBehavior(s.dispatchActivityCompatible) +} + +func (s *versioningIntegSuite) dispatchActivityCompatible() { + tq := s.randomizeStr(s.T().Name()) + + started := make(chan struct{}, 2) + + act1 := func() (string, error) { return "v1", nil } + act11 := func() (string, error) { return "v1.1", nil } + wf1 := func(ctx workflow.Context) (string, error) { + started <- struct{}{} + // wait for signal + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + // run activity + fut11 := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToCloseTimeout: time.Minute, + DisableEagerExecution: true, + VersioningIntent: temporal.VersioningIntentCompatible, + }), "act") + var val11 string + s.NoError(fut11.Get(ctx, &val11)) + return val11, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.addNewDefaultBuildId(ctx, tq, "v1") + s.waitForPropagation(ctx, tq, "v1") + + w1 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + w1.RegisterActivityWithOptions(act1, activity.RegisterOptions{Name: "act"}) + s.NoError(w1.Start()) + defer w1.Stop() + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.NoError(err) + // wait for it to start on v1 + s.waitForChan(ctx, started) + + // now register v1.1 as compatible + s.addCompatibleBuildId(ctx, tq, "v1.1", "v1", false) + s.waitForPropagation(ctx, tq, "v1.1") + // start worker for v1.1 + w11 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1.1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w11.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + w11.RegisterActivityWithOptions(act11, activity.RegisterOptions{Name: "act"}) + s.NoError(w11.Start()) + defer w11.Stop() + + // wait for w1 long polls to all time out + time.Sleep(longPollTime) + + // unblock the workflow + s.NoError(s.sdkClient.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "wait", nil)) + + var out string + s.NoError(run.Get(ctx, &out)) + s.Equal("v1.1", out) +} + func (s *versioningIntegSuite) TestDispatchChildWorkflow() { s.testWithMatchingBehavior(s.dispatchChildWorkflow) } @@ -630,6 +788,161 @@ func (s *versioningIntegSuite) dispatchChildWorkflow() { s.Equal("v1v2", out) } +func (s *versioningIntegSuite) TestDispatchChildWorkflowUpgrade() { + s.testWithMatchingBehavior(s.dispatchChildWorkflowUpgrade) +} + +func (s *versioningIntegSuite) dispatchChildWorkflowUpgrade() { + tq := s.randomizeStr(s.T().Name()) + + started := make(chan struct{}, 2) + + child1 := func(workflow.Context) (string, error) { return "v1", nil } + child11 := func(workflow.Context) (string, error) { return "v1.1", nil } + wf1 := func(ctx workflow.Context) (string, error) { + started <- struct{}{} + // wait for signal + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + // run child + fut11 := workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{}), "child") + var val11 string + s.NoError(fut11.Get(ctx, &val11)) + return val11, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.addNewDefaultBuildId(ctx, tq, "v1") + s.waitForPropagation(ctx, tq, "v1") + + w1 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + w1.RegisterWorkflowWithOptions(child1, workflow.RegisterOptions{Name: "child"}) + s.NoError(w1.Start()) + defer w1.Stop() + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.NoError(err) + // wait for it to start on v1 + s.waitForChan(ctx, started) + + // now register v1.1 as compatible + s.addCompatibleBuildId(ctx, tq, "v1.1", "v1", false) + s.waitForPropagation(ctx, tq, "v1.1") + // start worker for v1.1 + w11 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1.1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w11.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + w11.RegisterWorkflowWithOptions(child11, workflow.RegisterOptions{Name: "child"}) + s.NoError(w11.Start()) + defer w11.Stop() + + // wait for w1 long polls to all time out + time.Sleep(longPollTime) + + // unblock the workflow + s.NoError(s.sdkClient.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "wait", nil)) + + var out string + s.NoError(run.Get(ctx, &out)) + s.Equal("v1.1", out) +} + +func (s *versioningIntegSuite) TestDispatchQuery() { + s.testWithMatchingBehavior(s.dispatchQuery) +} + +func (s *versioningIntegSuite) dispatchQuery() { + tq := s.randomizeStr(s.T().Name()) + + started := make(chan struct{}, 2) + + wf1 := func(ctx workflow.Context) error { + workflow.SetQueryHandler(ctx, "query", func() (string, error) { return "v1", nil }) + started <- struct{}{} + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + return nil + } + wf11 := func(ctx workflow.Context) error { + workflow.SetQueryHandler(ctx, "query", func() (string, error) { return "v1.1", nil }) + started <- struct{}{} + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + return nil + } + wf2 := func(ctx workflow.Context) error { + workflow.SetQueryHandler(ctx, "query", func() (string, error) { return "v2", nil }) + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.addNewDefaultBuildId(ctx, tq, "v1") + s.waitForPropagation(ctx, tq, "v1") + + w1 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v1"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1.RegisterWorkflowWithOptions(wf1, workflow.RegisterOptions{Name: "wf"}) + s.NoError(w1.Start()) + defer w1.Stop() + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tq}, "wf") + s.NoError(err) + // wait for it to start on v1 + s.waitForChan(ctx, started) + + // now register v1.1 as compatible + // now register v11 as newer compatible with v1 AND v2 as a new default + s.addCompatibleBuildId(ctx, tq, "v11", "v1", false) + s.addNewDefaultBuildId(ctx, tq, "v2") + s.waitForPropagation(ctx, tq, "v2") + // add another 100ms to make sure it got to sticky queues also + time.Sleep(100 * time.Millisecond) + + // start worker for v1.1 and v2 + w11 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v11"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w11.RegisterWorkflowWithOptions(wf11, workflow.RegisterOptions{Name: "wf"}) + s.NoError(w11.Start()) + defer w11.Stop() + w2 := worker.New(s.sdkClient, tq, worker.Options{ + BuildID: s.prefixed("v2"), + UseBuildIDForVersioning: true, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w2.RegisterWorkflowWithOptions(wf2, workflow.RegisterOptions{Name: "wf"}) + s.NoError(w2.Start()) + defer w2.Stop() + + // wait for w1 long polls to all time out + time.Sleep(longPollTime) + + // query + val, err := s.sdkClient.QueryWorkflow(ctx, run.GetID(), run.GetRunID(), "query") + s.NoError(err) + var out string + s.NoError(val.Get(&out)) + s.Equal("v1.1", out) + + // let the workflow exit + s.NoError(s.sdkClient.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "wait", nil)) +} + func (s *versioningIntegSuite) TestDispatchContinueAsNew() { s.testWithMatchingBehavior(s.dispatchContinueAsNew) }