Skip to content

Commit

Permalink
Address David's review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Mar 10, 2023
1 parent f264d93 commit deaaf56
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 44 deletions.
36 changes: 13 additions & 23 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4354,64 +4354,54 @@ func (wh *WorkflowHandler) validateTaskQueue(t *taskqueuepb.TaskQueue) error {
func (wh *WorkflowHandler) validateBuildIdCompatabilityUpdate(
req *workflowservice.UpdateWorkerBuildIdCompatabilityRequest,
) error {
errstr := "request to update worker build id compatability requires:"
hadErr := false
errDeets := []string{"request to update worker build id compatability requires:"}

checkIdLen := func(id string) {
if len(id) > wh.config.WorkerBuildIdSizeLimit() {
errstr += fmt.Sprintf(" Worker build IDs to be no larger than %v characters",
wh.config.WorkerBuildIdSizeLimit())
hadErr = true
errDeets = append(errDeets, fmt.Sprintf(" Worker build IDs to be no larger than %v characters",
wh.config.WorkerBuildIdSizeLimit()))
}
}

if req.GetNamespace() == "" {
errstr += " `namespace` to be set."
hadErr = true
errDeets = append(errDeets, " `namespace` to be set.")
}
if req.GetTaskQueue() == "" {
errstr += " `task_queue` to be set."
hadErr = true
errDeets = append(errDeets, " `task_queue` to be set.")
}
if req.GetOperation() == nil {
errstr += " an operation to be specified."
hadErr = true
errDeets = append(errDeets, " an operation to be specified.")
}
if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion_); ok {
if x.AddNewCompatibleVersion.GetNewVersionId() == "" {
errstr += " `add_new_compatible_version` to be set."
hadErr = true
errDeets = append(errDeets, " `add_new_compatible_version` to be set.")
} else {
checkIdLen(x.AddNewCompatibleVersion.GetNewVersionId())
}
if x.AddNewCompatibleVersion.GetExistingCompatibleVersion() == "" {
errstr += " `existing_compatible_version` to be set."
hadErr = true
errDeets = append(errDeets, " `existing_compatible_version` to be set.")
}
} else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewVersionIdInNewDefaultSet); ok {
if x.AddNewVersionIdInNewDefaultSet == "" {
errstr += " `add_new_version_id_in_new_default_set` to be set."
hadErr = true
errDeets = append(errDeets, " `add_new_version_id_in_new_default_set` to be set.")
} else {
checkIdLen(x.AddNewVersionIdInNewDefaultSet)
}
} else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteSetByVersionId); ok {
if x.PromoteSetByVersionId == "" {
errstr += " `promote_set_by_version_id` to be set."
hadErr = true
errDeets = append(errDeets, " `promote_set_by_version_id` to be set.")
} else {
checkIdLen(x.PromoteSetByVersionId)
}
} else if x, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteVersionIdWithinSet); ok {
if x.PromoteVersionIdWithinSet == "" {
errstr += " `promote_version_id_within_set` to be set."
hadErr = true
errDeets = append(errDeets, " `promote_version_id_within_set` to be set.")
} else {
checkIdLen(x.PromoteVersionIdWithinSet)
}
}
if hadErr {
return serviceerror.NewInvalidArgument(errstr)
if len(errDeets) > 1 {
return serviceerror.NewInvalidArgument(strings.Join(errDeets, " "))
}
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"context"
"errors"
"fmt"
"go.temporal.io/api/taskqueue/v1"
"math"
"sync/atomic"
"testing"
Expand All @@ -42,6 +41,7 @@ import (
"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -513,7 +513,7 @@ func TestTaskQueueSubParitionFetchesVersioningInfoFromRootPartitionOnInit(t *tes
tqCfg.tqId = subTqId

data := &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -552,7 +552,7 @@ func TestTaskQueueSubParitionSendsCurrentHashOfVersioningDataWhenFetching(t *tes
tqCfg.tqId = subTqId

data := &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -618,7 +618,7 @@ func TestTaskQueueRootPartitionNotifiesChildrenOfInvalidation(t *testing.T) {
// Make a change, mock verifies children are invalidated
require.NoError(t, rootTq.MutateVersioningData(ctx, func(vd *persistencespb.VersioningData) error {
*vd = persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
return nil
}))
Expand All @@ -640,7 +640,7 @@ func TestTaskQueueSubPartitionPollsPeriodically(t *testing.T) {
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
VersioningData: &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
},
},
}
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestTaskQueueSubPartitionDoesNotPollIfNoDataThenPollsWhenInvalidated(t *tes
},
}
verDat := &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
hasDatResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -725,7 +725,7 @@ func TestTaskQueueManagerWaitInitFailThenPass(t *testing.T) {
}

data := &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -792,7 +792,7 @@ func TestActivityQueueGetsVersioningDataFromWorkflowQueue(t *testing.T) {
ctx := context.Background()

data := &persistencespb.VersioningData{
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down
23 changes: 12 additions & 11 deletions service/matching/version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"encoding/binary"
"fmt"
"github.com/dgryski/go-farm"
"github.com/pborman/uuid"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/util"
)

func ToBuildIdOrderingResponse(g *persistence.VersioningData, maxDepth int) *workflowservice.GetWorkerBuildIdCompatabilityResponse {
Expand All @@ -57,9 +59,7 @@ func depthLimiter(g *persistence.VersioningData, maxDepth int, mutate bool) *wor
if maxDepth <= 0 || maxDepth >= len(g.GetVersionSets()) {
return &workflowservice.GetWorkerBuildIdCompatabilityResponse{MajorVersionSets: g.GetVersionSets()}
}
sets := g.GetVersionSets()
startIndex := len(sets) - maxDepth
shortened := g.GetVersionSets()[startIndex:]
shortened := util.SliceTail(g.GetVersionSets(), maxDepth)
if mutate {
g.VersionSets = shortened
}
Expand Down Expand Up @@ -108,17 +108,18 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U
targetedVersion := extractTargetedVersion(req)
targetSetIx, versionInSetIx := findVersion(existingData, targetedVersion)

if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewVersionIdInNewDefaultSet); ok {
if req.GetAddNewVersionIdInNewDefaultSet() != "" {
// If it's not already in the sets, add it as the new default set
if targetSetIx != -1 {
return serviceerror.NewInvalidArgument(fmt.Sprintf("version %s already exists", targetedVersion))
}

existingData.VersionSets = append(existingData.GetVersionSets(), &taskqueuepb.CompatibleVersionSet{
Id: uuid.New(),
Versions: []string{targetedVersion},
})
} else if op, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion_); ok {
compatVer := op.AddNewCompatibleVersion.GetExistingCompatibleVersion()
} else if addNew := req.GetAddNewCompatibleVersion(); addNew != nil {
compatVer := addNew.GetExistingCompatibleVersion()
compatSetIx, _ := findVersion(existingData, compatVer)
if compatSetIx == -1 {
return serviceerror.NewNotFound(
Expand All @@ -133,15 +134,15 @@ func updateImpl(existingData *persistence.VersioningData, req *workflowservice.U
// If the version doesn't exist, add it to the compatible set
existingData.VersionSets[compatSetIx].Versions =
append(existingData.VersionSets[compatSetIx].Versions, targetedVersion)
if op.AddNewCompatibleVersion.GetMakeSetDefault() {
if addNew.GetMakeSetDefault() {
makeDefaultSet(existingData, compatSetIx)
}
} else if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteSetByVersionId); ok {
} else if req.GetPromoteSetByVersionId() != "" {
if targetSetIx == -1 {
return serviceerror.NewNotFound(fmt.Sprintf("targeted version %v not found", targetedVersion))
}
makeDefaultSet(existingData, targetSetIx)
} else if _, ok := req.GetOperation().(*workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteVersionIdWithinSet); ok {
} else if req.GetPromoteVersionIdWithinSet() != "" {
if targetSetIx == -1 {
return serviceerror.NewNotFound(fmt.Sprintf("targeted version %v not found", targetedVersion))
}
Expand All @@ -164,10 +165,10 @@ func extractTargetedVersion(req *workflowservice.UpdateWorkerBuildIdCompatabilit

// Finds the version in the version sets, returning (set index, index within that set)
// Returns -1, -1 if not found.
func findVersion(data *persistence.VersioningData, id string) (int, int) {
func findVersion(data *persistence.VersioningData, buildID string) (int, int) {
for setIx, set := range data.GetVersionSets() {
for versionIx, version := range set.GetVersions() {
if version == id {
if version == buildID {
return setIx, versionIx
}
}
Expand Down
4 changes: 2 additions & 2 deletions service/matching/version_sets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ package matching

import (
"fmt"
"go.temporal.io/api/serviceerror"
"testing"

"github.com/stretchr/testify/assert"

"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

persistencepb "go.temporal.io/server/api/persistence/v1"
)

Expand Down

0 comments on commit deaaf56

Please sign in to comment.