Skip to content

Commit

Permalink
Redirect to versioned queues in matching (#4241)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr authored May 4, 2023
1 parent ca8761d commit b25f52d
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230503164115-e6a655167ace
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1116,8 +1116,9 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.19.1-0.20230503164115-e6a655167ace h1:ZOt7H4ITTprMHT7J8YvTezFkIbuvlaAoSRapaV7VzBI=
go.temporal.io/api v1.19.1-0.20230503164115-e6a655167ace/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8=
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0 h1:C2zhJnL7FvHT7FrTCfzk7B+Ra67ZYvmLp1YISHCsLos=
go.temporal.io/api v1.19.1-0.20230504042653-5484c8a340a0/go.mod h1:OiRzMU/dM++aM7IexDcAk0yLc1Pktdr/MUMD0gdpXy8=
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc h1:BABNHYopDR0C/9SF/sLI/Ktbt8JBZ2uqOtpRlAAcFZY=
go.temporal.io/sdk v1.22.2-0.20230503164257-9f11e8c73dbc/go.mod h1:mOHv2LGMQ3NHeCixq42+7041i7hayymv/Q9C1BobtRE=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion proto/api
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE update_info_maps (
namespace_id BINARY(16) NOT NULL,
workflow_id VARCHAR(255) NOT NULL,
run_id BINARY(16) NOT NULL,
updat_id VARCHAR(255) NOT NULL,
update_id VARCHAR(255) NOT NULL,
--
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(16),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CREATE TABLE update_info_maps (
namespace_id BINARY(16) NOT NULL,
workflow_id VARCHAR(255) NOT NULL,
run_id BINARY(16) NOT NULL,
updat_id VARCHAR(255) NOT NULL,
update_id VARCHAR(255) NOT NULL,
--
data MEDIUMBLOB NOT NULL,
data_encoding VARCHAR(16),
Expand Down
110 changes: 106 additions & 4 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,12 @@ func (e *matchingEngineImpl) AddWorkflowTask(
taskQueueName := addRequest.TaskQueue.GetName()
taskQueueKind := addRequest.TaskQueue.GetKind()

taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
if err != nil {
return false, err
}

taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, addRequest.WorkerVersionStamp, taskQueueKind)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -321,7 +326,12 @@ func (e *matchingEngineImpl) AddActivityTask(
taskQueueName := addRequest.TaskQueue.GetName()
taskQueueKind := addRequest.TaskQueue.GetKind()

taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
if err != nil {
return false, err
}

taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, addRequest.WorkerVersionStamp, taskQueueKind)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -567,7 +577,13 @@ func (e *matchingEngineImpl) QueryWorkflow(
namespaceID := namespace.ID(queryRequest.GetNamespaceId())
taskQueueName := queryRequest.TaskQueue.GetName()
taskQueueKind := queryRequest.TaskQueue.GetKind()
taskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)

origTaskQueue, err := newTaskQueueID(namespaceID, taskQueueName, enumspb.TASK_QUEUE_TYPE_WORKFLOW)
if err != nil {
return nil, err
}

taskQueue, err := e.redirectToVersionedQueueForAdd(hCtx, origTaskQueue, queryRequest.WorkerVersionStamp, taskQueueKind)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -912,10 +928,19 @@ func (e *matchingEngineImpl) getAllPartitions(

func (e *matchingEngineImpl) getTask(
ctx context.Context,
taskQueue *taskQueueID,
origTaskQueue *taskQueueID,
taskQueueKind enumspb.TaskQueueKind,
pollMetadata *pollMetadata,
) (*internalTask, error) {
taskQueue, err := e.redirectToVersionedQueueForPoll(
ctx,
origTaskQueue,
pollMetadata.workerVersionCapabilities,
taskQueueKind,
)
if err != nil {
return nil, err
}
tlMgr, err := e.getTaskQueueManager(ctx, taskQueue, taskQueueKind, true)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1110,6 +1135,83 @@ func (e *matchingEngineImpl) emitForwardedSourceStats(
}
}

func (e *matchingEngineImpl) redirectToVersionedQueueForPoll(
ctx context.Context,
taskQueue *taskQueueID,
workerVersionCapabilities *commonpb.WorkerVersionCapabilities,
kind enumspb.TaskQueueKind,
) (*taskQueueID, error) {
// Since sticky queues are pinned to a particular worker, we don't need to redirect
if kind == enumspb.TASK_QUEUE_KIND_STICKY {
// TODO: we may need to kick the workflow off of the sticky queue here
// (e.g. serviceerrors.StickyWorkerUnavailable) if there's a newer build id
return taskQueue, nil
}
unversionedTQM, err := e.getTaskQueueManager(ctx, taskQueue, kind, true)
if err != nil {
return nil, err
}
userData, err := unversionedTQM.GetUserData(ctx)
if err != nil {
return nil, err
}
if userData.GetData().GetVersioningData() == nil {
if workerVersionCapabilities == nil {
// queue is not versioned and neither is worker, all good
return taskQueue, nil
}
// TODO: consider making an ephemeral set so we can match even if replication fails
return nil, errPollWithVersionOnUnversionedQueue
}
if workerVersionCapabilities == nil {
// TODO: We should leave this one on the unversioned queue for unversioned workflows to
// continue running. Do that in the same PR as the first-wft switch below.
return nil, errPollOnVersionedQueueWithNoVersion
}
versionSet, err := lookupVersionSetForPoll(userData.Data.VersioningData, workerVersionCapabilities)
if err != nil {
return nil, err
}
return newTaskQueueIDWithVersionSet(taskQueue, versionSet), nil
}

func (e *matchingEngineImpl) redirectToVersionedQueueForAdd(
ctx context.Context,
taskQueue *taskQueueID,
stamp *commonpb.WorkerVersionStamp,
kind enumspb.TaskQueueKind,
) (*taskQueueID, error) {
// sticky queues are unversioned
if kind == enumspb.TASK_QUEUE_KIND_STICKY {
return taskQueue, nil
}
unversionedTQM, err := e.getTaskQueueManager(ctx, taskQueue, kind, true)
if err != nil {
return nil, err
}
userData, err := unversionedTQM.GetUserData(ctx)
if err != nil {
return nil, err
}
if userData.GetData().GetVersioningData() == nil {
if stamp == nil {
// queue is not versioned and neither is workflow, all good
return taskQueue, nil
}
return nil, errVersionedTaskForUnversionedQueue
}

// TODO: Here we have to distinguish between a new workflow (first wft), which we should
// assign to the default version), and a later wft, which we should leave on the
// unversioned queue. Do that in a follow-up PR.

versionSet, err := lookupVersionSetForAdd(userData.Data.VersioningData, stamp)
if err != nil {
return nil, err
}
return newTaskQueueIDWithVersionSet(taskQueue, versionSet), nil
}

func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
m.Lock()
defer m.Unlock()
Expand Down
91 changes: 71 additions & 20 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,23 @@ import (
"crypto/sha256"
"encoding/base64"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
hlc "go.temporal.io/server/common/clock/hybrid_logical_clock"
)

var (
// TODO: all of these errors are temporary, we'll handle all these cases in future PRs
errBuildIDNotFound = serviceerror.NewInvalidArgument("build ID not found")
errEmptyVersioningData = serviceerror.NewInvalidArgument("versioning data is empty")
errPollWithVersionOnUnversionedQueue = serviceerror.NewInvalidArgument("poll with version capabilities on unversioned queue")
errPollOnVersionedQueueWithNoVersion = serviceerror.NewInvalidArgument("poll on versioned queue with no version capabilities")
errVersionedTaskForUnversionedQueue = serviceerror.NewInvalidArgument("got task with version stamp for unversioned queue")
)

// ToBuildIdOrderingResponse transforms the internal VersioningData representation to public representation.
// If maxSets is given, the last sets up to maxSets will be returned.
func ToBuildIdOrderingResponse(data *persistencespb.VersioningData, maxSets int) *workflowservice.GetWorkerBuildIdCompatibilityResponse {
Expand Down Expand Up @@ -124,11 +134,10 @@ func hashBuildID(buildID string) string {
func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData, req *workflowservice.UpdateWorkerBuildIdCompatibilityRequest) (*persistencespb.VersioningData, error) {
// First find if the targeted version is already in the sets
targetedVersion := extractTargetedVersion(req)
findRes := findVersion(existingData, targetedVersion)
targetSetIdx, versionInSetIdx := findRes.setIdx, findRes.indexInSet
targetSetIdx, versionInSetIdx := findVersion(existingData, targetedVersion)
numExistingSets := len(existingData.GetVersionSets())
modifiedData := persistencespb.VersioningData{
VersionSets: make([]*persistencespb.CompatibleVersionSet, numExistingSets),
VersionSets: make([]*persistencespb.CompatibleVersionSet, len(existingData.GetVersionSets())),
DefaultUpdateTimestamp: existingData.GetDefaultUpdateTimestamp(),
}
copy(modifiedData.VersionSets, existingData.GetVersionSets())
Expand All @@ -153,7 +162,7 @@ func updateImpl(timestamp hlc.Clock, existingData *persistencespb.VersioningData
makeDefaultSet(&modifiedData, len(modifiedData.VersionSets)-1, &timestamp)
} else if addNew := req.GetAddNewCompatibleBuildId(); addNew != nil {
compatVer := addNew.GetExistingCompatibleBuildId()
compatSetIdx := findVersion(&modifiedData, compatVer).setIdx
compatSetIdx, _ := findVersion(&modifiedData, compatVer)
if compatSetIdx == -1 {
return nil, serviceerror.NewNotFound(
fmt.Sprintf("targeted compatible_version %v not found", compatVer))
Expand Down Expand Up @@ -231,28 +240,17 @@ func extractTargetedVersion(req *workflowservice.UpdateWorkerBuildIdCompatibilit
return req.GetAddNewBuildIdInNewDefaultSet()
}

type findVersionRes struct {
setIdx int
indexInSet int
}

// Finds the version in the version sets, returning (set index, index within that set)
// Returns -1, -1 if not found.
func findVersion(data *persistencespb.VersioningData, buildID string) findVersionRes {
for setIx, set := range data.GetVersionSets() {
for versionIx, version := range set.GetBuildIds() {
func findVersion(data *persistencespb.VersioningData, buildID string) (setIndex, indexInSet int) {
for setIndex, set := range data.GetVersionSets() {
for indexInSet, version := range set.GetBuildIds() {
if version.Id == buildID {
return findVersionRes{
setIdx: setIx,
indexInSet: versionIx,
}
return setIndex, indexInSet
}
}
}
return findVersionRes{
setIdx: -1,
indexInSet: -1,
}
return -1, -1
}

func makeDefaultSet(data *persistencespb.VersioningData, setIx int, timestamp *hlc.Clock) {
Expand Down Expand Up @@ -281,3 +279,56 @@ func makeVersionInSetDefault(data *persistencespb.VersioningData, setIx, version
setVersions[len(setVersions)-1] = moveMe
}
}

// Requires: data is not nil, caps is not nil
func lookupVersionSetForPoll(data *persistencespb.VersioningData, caps *commonpb.WorkerVersionCapabilities) (string, error) {
// for poll, only the latest version in the compatible set can get tasks
// find the version set that this worker is in
setIdx, _ := findVersion(data, caps.BuildId)
if setIdx < 0 {
// TODO: consider making an ephemeral set so we can match even if replication fails
return "", errBuildIDNotFound
}
set := data.VersionSets[setIdx]
latestInSet := set.BuildIds[len(set.BuildIds)-1].Id
if caps.BuildId != latestInSet {
return "", serviceerror.NewNewerBuildExists(latestInSet)
}
return getSetID(set), nil
}

// Requires: data is not nil
func lookupVersionSetForAdd(data *persistencespb.VersioningData, stamp *commonpb.WorkerVersionStamp) (string, error) {
var set *persistencespb.CompatibleVersionSet
if stamp == nil {
// if this is a new workflow, assign it to the latest version.
// (if it's an unversioned workflow that has already completed one or more tasks, then
// leave it on the unversioned one. that case is handled already before we get here.)
setLen := len(data.VersionSets)
if setLen == 0 || data.VersionSets[setLen-1] == nil {
return "", errEmptyVersioningData
}
set = data.VersionSets[setLen-1]
} else {
// for add, any version in the compatible set maps to the set
setIdx, _ := findVersion(data, stamp.BuildId)
if setIdx < 0 {
// TODO: consider making an ephemeral set so we can match even if replication fails
return "", errBuildIDNotFound
}
set = data.VersionSets[setIdx]
}
return getSetID(set), nil
}

// getSetID returns an arbitrary but consistent member of the set.
// We want Add and Poll requests for the same set to converge on a single id so we can match
// them, but we don't have a single id for a set in the general case: in rare cases we may have
// multiple ids (due to failovers). We can do this by picking an arbitrary id in the set, e.g.
// the first. If the versioning data changes in any way, we'll re-resolve the set id, so this
// choice only has to be consistent within one version of the versioning data. (For correct
// handling of spooled tasks in Add, this does need to be an actual set id, not an arbitrary
// string.)
func getSetID(set *persistencespb.CompatibleVersionSet) string {
return set.SetIds[0]
}
32 changes: 5 additions & 27 deletions tests/versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package tests

import (
"context"
"flag"
"fmt"
"testing"
Expand All @@ -37,8 +36,6 @@ import (
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
Expand All @@ -55,7 +52,7 @@ type versioningIntegSuite struct {

func (s *versioningIntegSuite) SetupSuite() {
s.dynamicConfigOverrides = make(map[dynamicconfig.Key]interface{})
s.dynamicConfigOverrides[dynamicconfig.MatchingUpdateAckInterval] = 1 * time.Second
s.dynamicConfigOverrides[dynamicconfig.MatchingIdleTaskqueueCheckInterval] = 5 * time.Second
s.dynamicConfigOverrides[dynamicconfig.FrontendEnableWorkerVersioningDataAPIs] = true
s.setupSuite("testdata/integration_test_cluster.yaml")
}
Expand Down Expand Up @@ -173,8 +170,7 @@ func (s *versioningIntegSuite) TestLinkToNonexistentCompatibleVersionReturnsNotF
s.IsType(&serviceerror.NotFound{}, err)
}

// This test verifies that the lease renewal of a task queue does not destroy the versioning data - as it updates the
// task queue info. We need to make sure that update preserves the versioning data.
// This test verifies that user data persists across unload/reload.
func (s *versioningIntegSuite) TestVersioningStateNotDestroyedByOtherUpdates() {
ctx := NewContext()
tq := "integration-versioning-not-destroyed"
Expand All @@ -189,27 +185,9 @@ func (s *versioningIntegSuite) TestVersioningStateNotDestroyedByOtherUpdates() {
s.NoError(err)
s.NotNil(res)

sdkWorker := worker.New(s.sdkClient, tq, worker.Options{})
if err := sdkWorker.Start(); err != nil {
s.Logger.Fatal("Error starting worker", tag.Error(err))
}

wfFunc := func(ctx workflow.Context) error {
// This timer exists to ensure the lease-renewal on the task queue happens, to verify that doesn't blow up data.
// The renewal interval has been lowered in this suite.
_ = workflow.Sleep(ctx, 3*time.Second)
return nil
}
sdkWorker.RegisterWorkflow(wfFunc)
id := "integration-test-unhandled-command-new-task"
workflowOptions := sdkclient.StartWorkflowOptions{ID: id, TaskQueue: tq}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, wfFunc)
s.NoError(err)
err = workflowRun.Get(ctx, nil)
s.NoError(err)
sdkWorker.Stop()
// The idle interval has been lowered to 5s in this suite, so we can sleep > 10s to ensure
// that the task queue is unloaded.
time.Sleep(11 * time.Second)

res2, err := s.engine.GetWorkerBuildIdCompatibility(ctx, &workflowservice.GetWorkerBuildIdCompatibilityRequest{
Namespace: s.namespace,
Expand Down
Loading

0 comments on commit b25f52d

Please sign in to comment.