From 00872777b269332c24a47e9a144f00700bd7fab2 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 10 Mar 2023 09:56:19 -0800 Subject: [PATCH] Address David's review --- service/frontend/workflow_handler.go | 36 ++++++++--------------- service/matching/taskQueueManager_test.go | 16 +++++----- service/matching/version_sets.go | 23 ++++++++------- service/matching/version_sets_test.go | 4 +-- 4 files changed, 35 insertions(+), 44 deletions(-) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index c9c6012b69fd..6537c259a5b8 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -4354,64 +4354,54 @@ func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue) error { func (wh *WorkflowHandler) validateBuildIdCompatabilityUpdate( req *workflowservice.UpdateWorkerBuildIdCompatabilityRequest, ) error { - errstr := "request to update worker build id compatability requires:" - hadErr := false + errDeets := []string{"request to update worker build id compatability requires:"} checkIdLen := func(id string) { if len(id) > wh.config.WorkerBuildIdSizeLimit() { - errstr += fmt.Sprintf(" Worker build IDs to be no larger than %v characters", - wh.config.WorkerBuildIdSizeLimit()) - hadErr = true + errDeets = append(errDeets, fmt.Sprintf(" Worker build IDs to be no larger than %v characters", + wh.config.WorkerBuildIdSizeLimit())) } } if req.GetNamespace() == "" { - errstr += " `namespace` to be set." - hadErr = true + errDeets = append(errDeets, " `namespace` to be set.") } if req.GetTaskQueue() == "" { - errstr += " `task_queue` to be set." - hadErr = true + errDeets = append(errDeets, " `task_queue` to be set.") } if req.GetOperation() == nil { - errstr += " an operation to be specified." - hadErr = true + errDeets = append(errDeets, " an operation to be specified.") } if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion_); ok { if x.AddNewCompatibleVersion.GetNewVersionId() == "" { - errstr += " `add_new_compatible_version` to be set." - hadErr = true + errDeets = append(errDeets, " `add_new_compatible_version` to be set.") } else { checkIdLen(x.AddNewCompatibleVersion.GetNewVersionId()) } if x.AddNewCompatibleVersion.GetExistingCompatibleVersion() == "" { - errstr += " `existing_compatible_version` to be set." - hadErr = true + errDeets = append(errDeets, " `existing_compatible_version` to be set.") } } else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewVersionIdInNewDefaultSet); ok { if x.AddNewVersionIdInNewDefaultSet == "" { - errstr += " `add_new_version_id_in_new_default_set` to be set." - hadErr = true + errDeets = append(errDeets, " `add_new_version_id_in_new_default_set` to be set.") } else { checkIdLen(x.AddNewVersionIdInNewDefaultSet) } } else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteSetByVersionId); ok { if x.PromoteSetByVersionId == "" { - errstr += " `promote_set_by_version_id` to be set." - hadErr = true + errDeets = append(errDeets, " `promote_set_by_version_id` to be set.") } else { checkIdLen(x.PromoteSetByVersionId) } } else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteVersionIdWithinSet); ok { if x.PromoteVersionIdWithinSet == "" { - errstr += " `promote_version_id_within_set` to be set." - hadErr = true + errDeets = append(errDeets, " `promote_version_id_within_set` to be set.") } else { checkIdLen(x.PromoteVersionIdWithinSet) } } - if hadErr { - return serviceerror.NewInvalidArgument(errstr) + if len(errDeets) > 1 { + return serviceerror.NewInvalidArgument(strings.Join(errDeets, " ")) } return nil } diff --git a/service/matching/taskQueueManager_test.go b/service/matching/taskQueueManager_test.go index 0d8c7d1e1776..29a1bf2dfc1e 100644 --- a/service/matching/taskQueueManager_test.go +++ b/service/matching/taskQueueManager_test.go @@ -28,7 +28,6 @@ import ( "context" "errors" "fmt" - "go.temporal.io/api/taskqueue/v1" "math" "sync/atomic" "testing" @@ -42,6 +41,7 @@ import ( "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -513,7 +513,7 @@ func TestTaskQueueSubParitionFetchesVersioningInfoFromRootPartitionOnInit(t *tes tqCfg.tqId = subTqId data := &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -552,7 +552,7 @@ func TestTaskQueueSubParitionSendsCurrentHashOfVersioningDataWhenFetching(t *tes tqCfg.tqId = subTqId data := &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -618,7 +618,7 @@ func TestTaskQueueRootPartitionNotifiesChildrenOfInvalidation(t *testing.T) { // Make a change, mock verifies children are invalidated require.NoError(t, rootTq.MutateVersioningData(ctx, func(vd *persistencespb.VersioningData) error { *vd = persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } return nil })) @@ -640,7 +640,7 @@ func TestTaskQueueSubPartitionPollsPeriodically(t *testing.T) { asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ VersioningData: &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, }, }, } @@ -678,7 +678,7 @@ func TestTaskQueueSubPartitionDoesNotPollIfNoDataThenPollsWhenInvalidated(t *tes }, } verDat := &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } hasDatResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -725,7 +725,7 @@ func TestTaskQueueManagerWaitInitFailThenPass(t *testing.T) { } data := &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ @@ -792,7 +792,7 @@ func TestActivityQueueGetsVersioningDataFromWorkflowQueue(t *testing.T) { ctx := context.Background() data := &persistencespb.VersioningData{ - VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")}, + VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")}, } asResp := &matchingservice.GetTaskQueueMetadataResponse{ VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{ diff --git a/service/matching/version_sets.go b/service/matching/version_sets.go index 655423c59fec..031b0a671935 100644 --- a/service/matching/version_sets.go +++ b/service/matching/version_sets.go @@ -28,10 +28,12 @@ import ( "encoding/binary" "fmt" "github.com/dgryski/go-farm" + "github.com/pborman/uuid" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/util" ) func ToBuildIdOrderingResponse(g *persistence.VersioningData, maxDepth int) *workflowservice.GetWorkerBuildIdCompatabilityResponse { @@ -57,9 +59,7 @@ func depthLimiter(g *persistence.VersioningData, maxDepth int, mutate bool) *wor if maxDepth <= 0 || maxDepth >= len(g.GetVersionSets()) { return &workflowservice.GetWorkerBuildIdCompatabilityResponse{MajorVersionSets: g.GetVersionSets()} } - sets := g.GetVersionSets() - startIndex := len(sets) - maxDepth - shortened := g.GetVersionSets()[startIndex:] + shortened := util.SliceTail(g.GetVersionSets(), maxDepth) if mutate { g.VersionSets = shortened } @@ -108,17 +108,18 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U targetedVersion := extractTargetedVersion(req) targetSetIx, versionInSetIx := findVersion(existingData, targetedVersion) - if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewVersionIdInNewDefaultSet); ok { + if req.GetAddNewVersionIdInNewDefaultSet() != "" { // If it's not already in the sets, add it as the new default set if targetSetIx != -1 { return serviceerror.NewInvalidArgument(fmt.Sprintf("version %s already exists", targetedVersion)) } existingData.VersionSets = append(existingData.GetVersionSets(), &taskqueuepb.CompatibleVersionSet{ + Id: uuid.New(), Versions: []string{targetedVersion}, }) - } else if op, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion_); ok { - compatVer := op.AddNewCompatibleVersion.GetExistingCompatibleVersion() + } else if addNew := req.GetAddNewCompatibleVersion(); addNew != nil { + compatVer := addNew.GetExistingCompatibleVersion() compatSetIx, _ := findVersion(existingData, compatVer) if compatSetIx == -1 { return serviceerror.NewNotFound( @@ -133,15 +134,15 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U // If the version doesn't exist, add it to the compatible set existingData.VersionSets[compatSetIx].Versions = append(existingData.VersionSets[compatSetIx].Versions, targetedVersion) - if op.AddNewCompatibleVersion.GetMakeSetDefault() { + if addNew.GetMakeSetDefault() { makeDefaultSet(existingData, compatSetIx) } - } else if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteSetByVersionId); ok { + } else if req.GetPromoteSetByVersionId() != "" { if targetSetIx == -1 { return serviceerror.NewNotFound(fmt.Sprintf("targeted version %v not found", targetedVersion)) } makeDefaultSet(existingData, targetSetIx) - } else if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteVersionIdWithinSet); ok { + } else if req.GetPromoteVersionIdWithinSet() != "" { if targetSetIx == -1 { return serviceerror.NewNotFound(fmt.Sprintf("targeted version %v not found", targetedVersion)) } @@ -164,10 +165,10 @@ func extractTargetedVersion(req *workflowservice.UpdateWorkerBuildIdCompatabilit // Finds the version in the version sets, returning (set index, index within that set) // Returns -1, -1 if not found. -func findVersion(data *persistence.VersioningData, id string) (int, int) { +func findVersion(data *persistence.VersioningData, buildID string) (int, int) { for setIx, set := range data.GetVersionSets() { for versionIx, version := range set.GetVersions() { - if version == id { + if version == buildID { return setIx, versionIx } } diff --git a/service/matching/version_sets_test.go b/service/matching/version_sets_test.go index 164932fa54df..9806c4673647 100644 --- a/service/matching/version_sets_test.go +++ b/service/matching/version_sets_test.go @@ -26,13 +26,13 @@ package matching import ( "fmt" - "go.temporal.io/api/serviceerror" "testing" "github.com/stretchr/testify/assert" + + "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" - persistencepb "go.temporal.io/server/api/persistence/v1" )