diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 2544baa26103..9418de8ddda2 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -270,7 +270,12 @@ func (e *matchingEngineImpl) AddWorkflowTask( taskQueueName := addRequest.TaskQueue.GetName() taskQueueKind := addRequest.TaskQueue.GetKind() - taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW) + origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW) + if err != nil { + return false, err + } + + taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, addRequest.WorkerVersionStamp, taskQueueKind) if err != nil { return false, err } @@ -321,7 +326,12 @@ func (e *matchingEngineImpl) AddActivityTask( taskQueueName := addRequest.TaskQueue.GetName() taskQueueKind := addRequest.TaskQueue.GetKind() - taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY) + origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY) + if err != nil { + return false, err + } + + taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, addRequest.WorkerVersionStamp, taskQueueKind) if err != nil { return false, err } @@ -569,7 +579,13 @@ func (e *matchingEngineImpl) QueryWorkflow( namespaceID := namespace.ID(queryRequest.GetNamespaceId()) taskQueueName := queryRequest.TaskQueue.GetName() taskQueueKind := queryRequest.TaskQueue.GetKind() - taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW) + + origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW) + if err != nil { + return nil, err + } + + taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, queryRequest.WorkerVersionStamp, taskQueueKind) if err != nil { return nil, err } @@ -915,10 +931,19 @@ func (e *matchingEngineImpl) getAllPartitions( func (e *matchingEngineImpl) getTask( ctx context.Context, - taskQueue *taskQueueID, + origTaskQueue *taskQueueID, taskQueueKind enumspb.TaskQueueKind, pollMetadata *pollMetadata, ) (*internalTask, error) { + taskQueue, err := e.redirectToVersionedQueueForPoll( + ctx, + origTaskQueue, + pollMetadata.workerVersionCapabilities, + taskQueueKind, + ) + if err != nil { + return nil, err + } tlMgr, err := e.getTaskQueueManager(ctx, taskQueue, taskQueueKind, true) if err != nil { return nil, err @@ -1113,6 +1138,83 @@ func (e *matchingEngineImpl) emitForwardedSourceStats( } } +func (e *matchingEngineImpl) redirectToVersionedQueueForPoll( + ctx context.Context, + taskQueue *taskQueueID, + workerVersionCapabilities *commonpb.WorkerVersionCapabilities, + kind enumspb.TaskQueueKind, +) (*taskQueueID, error) { + // Since sticky queues are pinned to a particular worker, we don't need to redirect + if kind == enumspb.TASK_QUEUE_KIND_STICKY { + // TODO: we may need to kick the workflow off of the sticky queue here + // (e.g. serviceerrors.StickyWorkerUnavailable) if there's a newer build id + return taskQueue, nil + } + unversionedTQM, err := e.getTaskQueueManager(ctx, taskQueue, kind, true) + if err != nil { + return nil, err + } + userData, err := unversionedTQM.GetUserData(ctx) + if err != nil { + return nil, err + } + if userData.GetData().GetVersioningData() == nil { + if workerVersionCapabilities == nil { + // queue is not versioned and neither is worker, all good + return taskQueue, nil + } + // TODO: consider making an ephemeral set so we can match even if replication fails + return nil, errPollWithVersionOnUnversionedQueue + } + if workerVersionCapabilities == nil { + // TODO: We should leave this one on the unversioned queue for unversioned workflows to + // continue running. Do that in the same PR as the first-wft switch below. + return nil, errPollOnVersionedQueueWithNoVersion + } + versionSet, err := lookupVersionSetForPoll(userData.Data.VersioningData, workerVersionCapabilities) + if err != nil { + return nil, err + } + return newTaskQueueIDWithVersionSet(taskQueue, versionSet), nil +} + +func (e *matchingEngineImpl) redirectToVersionedQueueForAdd( + ctx context.Context, + taskQueue *taskQueueID, + stamp *commonpb.WorkerVersionStamp, + kind enumspb.TaskQueueKind, +) (*taskQueueID, error) { + // sticky queues are unversioned + if kind == enumspb.TASK_QUEUE_KIND_STICKY { + return taskQueue, nil + } + unversionedTQM, err := e.getTaskQueueManager(ctx, taskQueue, kind, true) + if err != nil { + return nil, err + } + userData, err := unversionedTQM.GetUserData(ctx) + if err != nil { + return nil, err + } + if userData.GetData().GetVersioningData() == nil { + if stamp == nil { + // queue is not versioned and neither is workflow, all good + return taskQueue, nil + } + return nil, errVersionedTaskForUnversionedQueue + } + + // TODO: Here we have to distinguish between a new workflow (first wft), which we should + // assign to the default version), and a later wft, which we should leave on the + // unversioned queue. Do that in a follow-up PR. + + versionSet, err := lookupVersionSetForAdd(userData.Data.VersioningData, stamp) + if err != nil { + return nil, err + } + return newTaskQueueIDWithVersionSet(taskQueue, versionSet), nil +} + func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) { m.Lock() defer m.Unlock() diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index dec04a9c75d2..8f558d4f90d7 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -30,6 +30,7 @@ import ( "crypto/sha256" "encoding/base64" + commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" @@ -37,6 +38,15 @@ import ( hlc "go.temporal.io/server/common/clock/hybrid_logical_clock" ) +var ( + // TODO: all of these errors are temporary, we'll handle all these cases in future PRs + errBuildIDNotFound = serviceerror.NewInvalidArgument("build ID not found") + errEmptyVersioningData = serviceerror.NewInvalidArgument("versioning data is empty") + errPollWithVersionOnUnversionedQueue = serviceerror.NewInvalidArgument("poll with version capabilities on unversioned queue") + errPollOnVersionedQueueWithNoVersion = serviceerror.NewInvalidArgument("poll on versioned queue with no version capabilities") + errVersionedTaskForUnversionedQueue = serviceerror.NewInvalidArgument("got task with version stamp for unversioned queue") +) + // ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation. // If maxSets is given, the last sets up to maxSets will be returned. func ToBuildIdOrderingResponse(data *persistencespb.VersioningData, maxSets int) *workflowservice.GetWorkerBuildIdCompatibilityResponse { @@ -124,11 +134,10 @@ func hashBuildID(buildID string) string { func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistencespb.VersioningData, error) { // First find if the targeted version is already in the sets targetedVersion := extractTargetedVersion(req) - findRes := findVersion(existingData, targetedVersion) - targetSetIdx, versionInSetIdx := findRes.setIdx, findRes.indexInSet + targetSetIdx, versionInSetIdx := findVersion(existingData, targetedVersion) numExistingSets := len(existingData.GetVersionSets()) modifiedData := persistencespb.VersioningData{ - VersionSets: make([]*persistencespb.CompatibleVersionSet, numExistingSets), + VersionSets: make([]*persistencespb.CompatibleVersionSet, len(existingData.GetVersionSets())), DefaultUpdateTimestamp: existingData.GetDefaultUpdateTimestamp(), } copy(modifiedData.VersionSets, existingData.GetVersionSets()) @@ -153,7 +162,7 @@ func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData makeDefaultSet(&modifiedData, len(modifiedData.VersionSets)-1, ×tamp) } else if addNew := req.GetAddNewCompatibleBuildId(); addNew != nil { compatVer := addNew.GetExistingCompatibleBuildId() - compatSetIdx := findVersion(&modifiedData, compatVer).setIdx + compatSetIdx, _ := findVersion(&modifiedData, compatVer) if compatSetIdx == -1 { return nil, serviceerror.NewNotFound( fmt.Sprintf("targeted compatible_version %v not found", compatVer)) @@ -231,28 +240,17 @@ func extractTargetedVersion(req *workflowservice.UpdateWorkerBuildIdCompatibilit return req.GetAddNewBuildIdInNewDefaultSet() } -type findVersionRes struct { - setIdx int - indexInSet int -} - // Finds the version in the version sets, returning (set index, index within that set) // Returns -1, -1 if not found. -func findVersion(data *persistencespb.VersioningData, buildID string) findVersionRes { - for setIx, set := range data.GetVersionSets() { - for versionIx, version := range set.GetBuildIds() { +func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex, indexInSet int) { + for setIndex, set := range data.GetVersionSets() { + for indexInSet, version := range set.GetBuildIds() { if version.Id == buildID { - return findVersionRes{ - setIdx: setIx, - indexInSet: versionIx, - } + return setIndex, indexInSet } } } - return findVersionRes{ - setIdx: -1, - indexInSet: -1, - } + return -1, -1 } func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) { @@ -281,3 +279,56 @@ func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, version setVersions[len(setVersions)-1] = moveMe } } + +// Requires: data is not nil, caps is not nil +func lookupVersionSetForPoll(data *persistencespb.VersioningData, caps *commonpb.WorkerVersionCapabilities) (string, error) { + // for poll, only the latest version in the compatible set can get tasks + // find the version set that this worker is in + setIdx, _ := findVersion(data, caps.BuildId) + if setIdx < 0 { + // TODO: consider making an ephemeral set so we can match even if replication fails + return "", errBuildIDNotFound + } + set := data.VersionSets[setIdx] + latestInSet := set.BuildIds[len(set.BuildIds)-1].Id + if caps.BuildId != latestInSet { + return "", serviceerror.NewNewerBuildExists(latestInSet) + } + return getSetID(set), nil +} + +// Requires: data is not nil +func lookupVersionSetForAdd(data *persistencespb.VersioningData, stamp *commonpb.WorkerVersionStamp) (string, error) { + var set *persistencespb.CompatibleVersionSet + if stamp == nil { + // if this is a new workflow, assign it to the latest version. + // (if it's an unversioned workflow that has already completed one or more tasks, then + // leave it on the unversioned one. that case is handled already before we get here.) + setLen := len(data.VersionSets) + if setLen == 0 || data.VersionSets[setLen-1] == nil { + return "", errEmptyVersioningData + } + set = data.VersionSets[setLen-1] + } else { + // for add, any version in the compatible set maps to the set + setIdx, _ := findVersion(data, stamp.BuildId) + if setIdx < 0 { + // TODO: consider making an ephemeral set so we can match even if replication fails + return "", errBuildIDNotFound + } + set = data.VersionSets[setIdx] + } + return getSetID(set), nil +} + +// getSetID returns an arbitrary but consistent member of the set. +// We want Add and Poll requests for the same set to converge on a single id so we can match +// them, but we don't have a single id for a set in the general case: in rare cases we may have +// multiple ids (due to failovers). We can do this by picking an arbitrary id in the set, e.g. +// the first. If the versioning data changes in any way, we'll re-resolve the set id, so this +// choice only has to be consistent within one version of the versioning data. (For correct +// handling of spooled tasks in Add, this does need to be an actual set id, not an arbitrary +// string.) +func getSetID(set *persistencespb.CompatibleVersionSet) string { + return set.SetIds[0] +} diff --git a/tests/versioning_test.go b/tests/versioning_test.go index 45836b02dd27..5cf812298039 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -25,7 +25,6 @@ package tests import ( - "context" "flag" "fmt" "testing" @@ -37,8 +36,6 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" sdkclient "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" - "go.temporal.io/sdk/workflow" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" @@ -55,7 +52,7 @@ type versioningIntegSuite struct { func (s *versioningIntegSuite) SetupSuite() { s.dynamicConfigOverrides = make(map[dynamicconfig.Key]interface{}) - s.dynamicConfigOverrides[dynamicconfig.MatchingUpdateAckInterval] = 1 * time.Second + s.dynamicConfigOverrides[dynamicconfig.MatchingMaxTaskQueueIdleTime] = 5 * time.Second s.dynamicConfigOverrides[dynamicconfig.FrontendEnableWorkerVersioningDataAPIs] = true s.setupSuite("testdata/integration_test_cluster.yaml") } @@ -173,8 +170,7 @@ func (s *versioningIntegSuite) TestLinkToNonexistentCompatibleVersionReturnsNotF s.IsType(&serviceerror.NotFound{}, err) } -// This test verifies that the lease renewal of a task queue does not destroy the versioning data - as it updates the -// task queue info. We need to make sure that update preserves the versioning data. +// This test verifies that user data persists across unload/reload. func (s *versioningIntegSuite) TestVersioningStateNotDestroyedByOtherUpdates() { ctx := NewContext() tq := "integration-versioning-not-destroyed" @@ -189,27 +185,9 @@ func (s *versioningIntegSuite) TestVersioningStateNotDestroyedByOtherUpdates() { s.NoError(err) s.NotNil(res) - sdkWorker := worker.New(s.sdkClient, tq, worker.Options{}) - if err := sdkWorker.Start(); err != nil { - s.Logger.Fatal("Error starting worker", tag.Error(err)) - } - - wfFunc := func(ctx workflow.Context) error { - // This timer exists to ensure the lease-renewal on the task queue happens, to verify that doesn't blow up data. - // The renewal interval has been lowered in this suite. - _ = workflow.Sleep(ctx, 3*time.Second) - return nil - } - sdkWorker.RegisterWorkflow(wfFunc) - id := "integration-test-unhandled-command-new-task" - workflowOptions := sdkclient.StartWorkflowOptions{ID: id, TaskQueue: tq} - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, wfFunc) - s.NoError(err) - err = workflowRun.Get(ctx, nil) - s.NoError(err) - sdkWorker.Stop() + // The idle interval has been lowered to 5s in this suite, so we can sleep > 10s to ensure + // that the task queue is unloaded. + time.Sleep(11 * time.Second) res2, err := s.engine.GetWorkerBuildIdCompatibility(ctx, &workflowservice.GetWorkerBuildIdCompatibilityRequest{ Namespace: s.namespace, diff --git a/tests/xdc/user_data_replication_test.go b/tests/xdc/user_data_replication_test.go index 694a128794dd..2c5d903a3811 100644 --- a/tests/xdc/user_data_replication_test.go +++ b/tests/xdc/user_data_replication_test.go @@ -63,7 +63,7 @@ func TestUserDataReplicationTestSuite(t *testing.T) { func (s *userDataReplicationTestSuite) SetupSuite() { s.dynamicConfigOverrides = make(map[dynamicconfig.Key]interface{}) - s.dynamicConfigOverrides[dynamicconfig.EnableWorkerVersioning] = true + s.dynamicConfigOverrides[dynamicconfig.FrontendEnableWorkerVersioningDataAPIs] = true s.setupSuite([]string{"task_queue_repl_active", "task_queue_repl_standby"}) }