Skip to content

Commit

Permalink
Updates to account for API refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Sep 27, 2022
1 parent 588d695 commit 2d201a1
Show file tree
Hide file tree
Showing 13 changed files with 592 additions and 898 deletions.
6 changes: 3 additions & 3 deletions api/adminservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions api/historyservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions api/matchingservice/v1/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

220 changes: 74 additions & 146 deletions api/persistence/v1/tasks.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ message TaskQueueInfo {
// Holds all the data related to worker versioning for a task queue.
// Backwards-incompatible changes cannot be made, as this would make existing stored data unreadable
message VersioningData {
// The currently established default worker build id version.
temporal.api.taskqueue.v1.VersionIdNode current_default = 1;
// Other current latest-compatible versions who are not the overall default. These are the
// versions that will be used when generating new tasks by following the graph from the
// version of the last task out to a leaf.
repeated temporal.api.taskqueue.v1.VersionIdNode compatible_leaves = 2;
repeated temporal.api.taskqueue.v1.CompatibleVersionSet version_sets = 1;
}

message TaskKey {
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4274,11 +4274,11 @@ func (wh *WorkflowHandler) validateBuildIdOrderingUpdate(
errstr += " `task_queue` to be set"
hadErr = true
}
if req.GetVersionId().GetWorkerBuildId() == "" {
if req.GetVersionId() == "" {
errstr += " targeting a valid version identifier"
hadErr = true
}
if len(req.GetVersionId().GetWorkerBuildId()) > wh.config.WorkerBuildIdSizeLimit() {
if len(req.GetVersionId()) > wh.config.WorkerBuildIdSizeLimit() {
errstr += fmt.Sprintf(" Worker build IDs to be no larger than %v characters", wh.config.WorkerBuildIdSizeLimit())
hadErr = true
}
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (e *matchingEngineImpl) GetWorkerBuildIdOrdering(
return nil, err
}
return &matchingservice.GetWorkerBuildIdOrderingResponse{
Response: ToBuildIdOrderingResponse(verDat, int(req.GetRequest().GetMaxDepth())),
Response: ToBuildIdOrderingResponse(verDat, int(req.GetRequest().GetMaxSets())),
}, nil
}

Expand Down
91 changes: 46 additions & 45 deletions service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,15 +1856,15 @@ func (s *matchingEngineSuite) TestGetVersioningData() {
Request: &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
MaxDepth: 0,
MaxSets: 0,
},
})
s.NoError(err)
s.NotNil(res)

// Set a long list of versions
for i := 0; i < 100; i++ {
id := mkVerId(fmt.Sprintf("%d", i))
id := fmt.Sprintf("%d", i)
res, err := s.matchingEngine.UpdateWorkerBuildIdOrdering(s.handlerContext, &matchingservice.UpdateWorkerBuildIdOrderingRequest{
NamespaceId: namespaceID.String(),
Request: &workflowservice.UpdateWorkerBuildIdOrderingRequest{
Expand All @@ -1879,18 +1879,18 @@ func (s *matchingEngineSuite) TestGetVersioningData() {
}
// Make a long compat-versions chain
for i := 0; i < 10; i++ {
id := mkVerId(fmt.Sprintf("99.%d", i))
prevCompat := mkVerId(fmt.Sprintf("99.%d", i-1))
id := fmt.Sprintf("99.%d", i)
prevCompat := fmt.Sprintf("99.%d", i-1)
if i == 0 {
prevCompat = mkVerId("99")
prevCompat = "99"
}
res, err := s.matchingEngine.UpdateWorkerBuildIdOrdering(s.handlerContext, &matchingservice.UpdateWorkerBuildIdOrderingRequest{
NamespaceId: namespaceID.String(),
Request: &workflowservice.UpdateWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
VersionId: id,
PreviousCompatible: prevCompat,
Namespace: namespaceID.String(),
TaskQueue: tq,
VersionId: id,
CompatibleVersion: prevCompat,
},
})
s.NoError(err)
Expand All @@ -1903,59 +1903,59 @@ func (s *matchingEngineSuite) TestGetVersioningData() {
Request: &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
MaxDepth: 0,
MaxSets: 0,
},
})
s.NoError(err)
s.NotNil(res.GetResponse().GetCurrentDefault())
lastNode := res.GetResponse().GetCurrentDefault()
s.Equal(mkVerId("99"), lastNode.GetVersion())
for lastNode.GetPreviousIncompatible() != nil {
lastNode = lastNode.GetPreviousIncompatible()
}
s.Equal(mkVerId("0"), lastNode.GetVersion())
s.Equal(mkVerId("99.9"), res.GetResponse().GetCompatibleLeaves()[0].GetVersion())
//s.NotNil(res.GetResponse().GetCurrentDefault())
//lastNode := res.GetResponse().GetCurrentDefault()
//s.Equal(mkVerId("99"), lastNode.GetVersion())
//for lastNode.GetPreviousIncompatible() != nil {
// lastNode = lastNode.GetPreviousIncompatible()
//}
//s.Equal(mkVerId("0"), lastNode.GetVersion())
//s.Equal(mkVerId("99.9"), res.GetResponse().GetCompatibleLeaves()[0].GetVersion())

// Ensure depth limiting works
res, err = s.matchingEngine.GetWorkerBuildIdOrdering(s.handlerContext, &matchingservice.GetWorkerBuildIdOrderingRequest{
NamespaceId: namespaceID.String(),
Request: &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
MaxDepth: 1,
MaxSets: 1,
},
})
s.NoError(err)
s.NotNil(res.GetResponse().GetCurrentDefault())
s.Nil(res.GetResponse().GetCurrentDefault().GetPreviousIncompatible())
s.Nil(res.GetResponse().GetCompatibleLeaves()[0].GetPreviousCompatible())
//s.NotNil(res.GetResponse().GetCurrentDefault())
//s.Nil(res.GetResponse().GetCurrentDefault().GetPreviousIncompatible())
//s.Nil(res.GetResponse().GetCompatibleLeaves()[0].GetPreviousCompatible())

res, err = s.matchingEngine.GetWorkerBuildIdOrdering(s.handlerContext, &matchingservice.GetWorkerBuildIdOrderingRequest{
NamespaceId: namespaceID.String(),
Request: &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
MaxDepth: 5,
MaxSets: 5,
},
})
s.NoError(err)
s.NotNil(res.GetResponse().GetCurrentDefault())
lastNode = res.GetResponse().GetCurrentDefault()
for {
if lastNode.GetPreviousIncompatible() == nil {
break
}
lastNode = lastNode.GetPreviousIncompatible()
}
s.Equal(mkVerId("95"), lastNode.GetVersion())
lastNode = res.GetResponse().GetCompatibleLeaves()[0]
for {
if lastNode.GetPreviousCompatible() == nil {
break
}
lastNode = lastNode.GetPreviousCompatible()
}
s.Equal(mkVerId("99.5"), lastNode.GetVersion())
//s.NotNil(res.GetResponse().GetCurrentDefault())
//lastNode = res.GetResponse().GetCurrentDefault()
//for {
// if lastNode.GetPreviousIncompatible() == nil {
// break
// }
// lastNode = lastNode.GetPreviousIncompatible()
//}
//s.Equal(mkVerId("95"), lastNode.GetVersion())
//lastNode = res.GetResponse().GetCompatibleLeaves()[0]
//for {
// if lastNode.GetPreviousCompatible() == nil {
// break
// }
// lastNode = lastNode.GetPreviousCompatible()
//}
//s.Equal(mkVerId("99.5"), lastNode.GetVersion())
}

func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() {
Expand All @@ -1974,7 +1974,6 @@ func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() {
Request: &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: namespaceID.String(),
TaskQueue: tq,
MaxDepth: 0,
},
})
s.NoError(err)
Expand All @@ -1988,10 +1987,12 @@ func (s *matchingEngineSuite) TestActivityQueueMetadataInvalidate() {
s.NotNil(ttqm)

_, err = s.matchingEngine.InvalidateTaskQueueMetadata(s.handlerContext, &matchingservice.InvalidateTaskQueueMetadataRequest{
NamespaceId: namespaceID.String(),
TaskQueue: tq,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY,
VersioningData: &persistencespb.VersioningData{CurrentDefault: mkVerIdNode("hi")},
NamespaceId: namespaceID.String(),
TaskQueue: tq,
TaskQueueType: enumspb.TASK_QUEUE_TYPE_ACTIVITY,
VersioningData: &persistencespb.VersioningData{
VersionSets: []*taskqueuepb.CompatibleVersionSet{mkNewSet("hi")},
},
})
s.NoError(err)
}
Expand Down
15 changes: 8 additions & 7 deletions service/matching/taskQueueManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"go.temporal.io/api/taskqueue/v1"
"math"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -512,7 +513,7 @@ func TestTaskQueueSubParitionFetchesVersioningInfoFromRootPartitionOnInit(t *tes
tqCfg.tqId = subTqId

data := &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -551,7 +552,7 @@ func TestTaskQueueSubParitionSendsCurrentHashOfVersioningDataWhenFetching(t *tes
tqCfg.tqId = subTqId

data := &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -617,7 +618,7 @@ func TestTaskQueueRootPartitionNotifiesChildrenOfInvalidation(t *testing.T) {
// Make a change, mock verifies children are invalidated
require.NoError(t, rootTq.MutateVersioningData(ctx, func(vd *persistencespb.VersioningData) error {
*vd = persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
return nil
}))
Expand All @@ -639,7 +640,7 @@ func TestTaskQueueSubPartitionPollsPeriodically(t *testing.T) {
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
VersioningData: &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
},
},
}
Expand Down Expand Up @@ -677,7 +678,7 @@ func TestTaskQueueSubPartitionDoesNotPollIfNoDataThenPollsWhenInvalidated(t *tes
},
}
verDat := &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
hasDatResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -724,7 +725,7 @@ func TestTaskQueueManagerWaitInitFailThenPass(t *testing.T) {
}

data := &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down Expand Up @@ -791,7 +792,7 @@ func TestActivityQueueGetsVersioningDataFromWorkflowQueue(t *testing.T) {
ctx := context.Background()

data := &persistencespb.VersioningData{
CurrentDefault: mkVerIdNode("0"),
VersionSets: []*taskqueue.CompatibleVersionSet{mkNewSet("0")},
}
asResp := &matchingservice.GetTaskQueueMetadataResponse{
VersioningDataResp: &matchingservice.GetTaskQueueMetadataResponse_VersioningData{
Expand Down
Loading

0 comments on commit 2d201a1

Please sign in to comment.