Skip to content

Commit

Permalink
matching: make taskListID and taskListName partition aware (#2176)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Jul 11, 2019
1 parent 0fc9928 commit 07397aa
Show file tree
Hide file tree
Showing 10 changed files with 456 additions and 96 deletions.
73 changes: 56 additions & 17 deletions service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"fmt"
"strings"

"github.com/pborman/uuid"
workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -59,6 +60,10 @@ type (
}
)

const (
reservedTaskListPrefix = "/__cadence_sys/"
)

func newDecisionAttrValidator(
domainCache cache.DomainCache,
config *Config,
Expand Down Expand Up @@ -191,8 +196,9 @@ func (v *decisionAttrValidator) validateActivityScheduleAttributes(
return &workflow.BadRequestError{Message: "ScheduleActivityTaskDecisionAttributes is not set on decision."}
}

if attributes.TaskList == nil || attributes.TaskList.GetName() == "" {
return &workflow.BadRequestError{Message: "TaskList is not set on decision."}
defaultTaskListName := ""
if _, err := v.validatedTaskList(attributes.TaskList, defaultTaskListName); err != nil {
return err
}

if attributes.GetActivityId() == "" {
Expand Down Expand Up @@ -453,8 +459,10 @@ func (v *decisionAttrValidator) validateSignalExternalWorkflowExecutionAttribute
return nil
}

func (v *decisionAttrValidator) validateUpsertWorkflowSearchAttributes(domainName string,
attributes *workflow.UpsertWorkflowSearchAttributesDecisionAttributes) error {
func (v *decisionAttrValidator) validateUpsertWorkflowSearchAttributes(
domainName string,
attributes *workflow.UpsertWorkflowSearchAttributesDecisionAttributes,
) error {

if attributes == nil {
return &workflow.BadRequestError{Message: "UpsertWorkflowSearchAttributesDecisionAttributes is not set on decision."}
Expand Down Expand Up @@ -485,17 +493,17 @@ func (v *decisionAttrValidator) validateContinueAsNewWorkflowExecutionAttributes
attributes.WorkflowType = &workflow.WorkflowType{Name: common.StringPtr(executionInfo.WorkflowTypeName)}
}

// Inherit Tasklist from previous execution if not provided on decision
if attributes.TaskList == nil || attributes.TaskList.GetName() == "" {
attributes.TaskList = &workflow.TaskList{Name: common.StringPtr(executionInfo.TaskList)}
}
if len(attributes.TaskList.GetName()) > v.maxIDLengthLimit {
return &workflow.BadRequestError{Message: "TaskList exceeds length limit."}
}
if len(attributes.WorkflowType.GetName()) > v.maxIDLengthLimit {
return &workflow.BadRequestError{Message: "WorkflowType exceeds length limit."}
}

// Inherit Tasklist from previous execution if not provided on decision
tl, err := v.validatedTaskList(attributes.TaskList, executionInfo.TaskList)
if err != nil {
return err
}
attributes.TaskList = tl

// Inherit workflow timeout from previous execution if not provided on decision
if attributes.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
attributes.ExecutionStartToCloseTimeoutSeconds = common.Int32Ptr(executionInfo.WorkflowTimeout)
Expand Down Expand Up @@ -560,13 +568,11 @@ func (v *decisionAttrValidator) validateStartChildExecutionAttributes(
}

// Inherit tasklist from parent workflow execution if not provided on decision
if attributes.TaskList == nil || attributes.TaskList.GetName() == "" {
attributes.TaskList = &workflow.TaskList{Name: common.StringPtr(parentInfo.TaskList)}
}

if len(attributes.TaskList.GetName()) > v.maxIDLengthLimit {
return &workflow.BadRequestError{Message: "TaskList exceeds length limit."}
tl, err := v.validatedTaskList(attributes.TaskList, parentInfo.TaskList)
if err != nil {
return err
}
attributes.TaskList = tl

// Inherit workflow timeout from parent workflow execution if not provided on decision
if attributes.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
Expand All @@ -581,6 +587,39 @@ func (v *decisionAttrValidator) validateStartChildExecutionAttributes(
return nil
}

func (v *decisionAttrValidator) validatedTaskList(
tl *workflow.TaskList,
defaultVal string,
) (*workflow.TaskList, error) {

if tl == nil {
tl = &workflow.TaskList{}
}

if tl.GetName() == "" {
if defaultVal == "" {
return tl, &workflow.BadRequestError{"missing task list name"}
}
tl.Name = &defaultVal
return tl, nil
}

name := tl.GetName()
if len(name) > v.maxIDLengthLimit {
return tl, &workflow.BadRequestError{
Message: fmt.Sprintf("task list name exceeds length limit of %v", v.maxIDLengthLimit),
}
}

if strings.HasPrefix(name, reservedTaskListPrefix) {
return tl, &workflow.BadRequestError{
Message: fmt.Sprintf("task list name cannot start with reserved prefix %v", reservedTaskListPrefix),
}
}

return tl, nil
}

func (v *decisionAttrValidator) validateCrossDomainCall(
domainID string,
targetDomainID string,
Expand Down
47 changes: 46 additions & 1 deletion service/history/decisionChecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package history

import (
"testing"

"github.com/stretchr/testify/suite"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
Expand All @@ -30,7 +32,6 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"testing"
)

type (
Expand Down Expand Up @@ -489,3 +490,47 @@ func (s *decisionAttrValidatorSuite) TestValidateCrossDomainCall_GlobalToGlobal_
err := s.validator.validateCrossDomainCall(s.testDomainID, targetDomainID)
s.Nil(err)
}

func (s *decisionAttrValidatorSuite) TestValidateTaskListName() {
taskList := func(name string) *workflow.TaskList {
kind := workflow.TaskListKindNormal
return &workflow.TaskList{Name: &name, Kind: &kind}
}

testCases := []struct {
defaultVal string
input *workflow.TaskList
output *workflow.TaskList
isOutputErr bool
}{
{"tl-1", nil, &workflow.TaskList{Name: common.StringPtr("tl-1")}, false},
{"", taskList("tl-1"), taskList("tl-1"), false},
{"tl-1", taskList("tl-1"), taskList("tl-1"), false},
{"", taskList("/tl-1"), taskList("/tl-1"), false},
{"", taskList("/__cadence_sys"), taskList("/__cadence_sys"), false},
{"", nil, &workflow.TaskList{}, true},
{"", taskList(""), taskList(""), true},
{"", taskList(reservedTaskListPrefix), taskList(reservedTaskListPrefix), true},
{"tl-1", taskList(reservedTaskListPrefix), taskList(reservedTaskListPrefix), true},
{"", taskList(reservedTaskListPrefix + "tl-1"), taskList(reservedTaskListPrefix + "tl-1"), true},
{"tl-1", taskList(reservedTaskListPrefix + "tl-1"), taskList(reservedTaskListPrefix + "tl-1"), true},
}

for _, tc := range testCases {
key := tc.defaultVal + "#"
if tc.input != nil {
key += tc.input.GetName()
} else {
key += "nil"
}
s.Run(key, func() {
output, err := s.validator.validatedTaskList(tc.input, tc.defaultVal)
if tc.isOutputErr {
s.Error(err)
} else {
s.NoError(err)
}
s.EqualValues(tc.output, output)
})
}
}
2 changes: 1 addition & 1 deletion service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainC
}

domain := domainEntry.GetInfo().Name
taskListName := id.taskListName
taskListName := id.name
taskType := id.taskType
return &taskListConfig{
RangeSize: config.RangeSize,
Expand Down
93 changes: 52 additions & 41 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ type matchingEngineImpl struct {
domainCache cache.DomainCache
}

type taskListID struct {
domainID string
taskListName string
taskType int
}

type pollerIDCtxKey string
type identityCtxKey string

Expand All @@ -91,22 +85,8 @@ var (

const (
maxQueryWaitCount = 5
maxQueryLoopCount = 5
)

func (t *taskListID) String() string {
var r string
if t.taskType == persistence.TaskListTypeActivity {
r += "activity"
} else {
r += "decision"
}
r += " task list \""
r += t.taskListName
r += "\""
return r
}

var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented

// NewEngine creates an instance of matching engine
Expand Down Expand Up @@ -184,21 +164,21 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID,
e.taskListsLock.Unlock()
return result, nil
}
e.logger.Info("", tag.LifeCycleStarting, tag.WorkflowTaskListName(taskList.taskListName), tag.WorkflowTaskListType(taskList.taskType))
e.logger.Info("", tag.LifeCycleStarting, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType))
mgr, err := newTaskListManager(e, taskList, taskListKind, e.config)
if err != nil {
e.taskListsLock.Unlock()
e.logger.Info("", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.taskListName), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
e.logger.Info("", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
return nil, err
}
e.taskLists[*taskList] = mgr
e.taskListsLock.Unlock()
err = mgr.Start()
if err != nil {
e.logger.Info("", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.taskListName), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
e.logger.Info("", tag.LifeCycleStartFailed, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.Error(err))
return nil, err
}
e.logger.Info("", tag.LifeCycleStarted, tag.WorkflowTaskListName(taskList.taskListName), tag.WorkflowTaskListType(taskList.taskType))
e.logger.Info("", tag.LifeCycleStarted, tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType))
return mgr, nil
}

Expand All @@ -220,14 +200,24 @@ func (e *matchingEngineImpl) AddDecisionTask(ctx context.Context, addRequest *m.
domainID := addRequest.GetDomainUUID()
taskListName := addRequest.TaskList.GetName()
taskListKind := common.TaskListKindPtr(addRequest.TaskList.GetKind())
e.logger.Debug(fmt.Sprintf("Received AddDecisionTask for taskList=%v, WorkflowID=%v, RunID=%v, ScheduleToStartTimeout=%v",
addRequest.TaskList.GetName(), addRequest.Execution.GetWorkflowId(), addRequest.Execution.GetRunId(),
addRequest.GetScheduleToStartTimeoutSeconds()))
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)

e.logger.Debug(
fmt.Sprintf("Received AddDecisionTask for taskList=%v, WorkflowID=%v, RunID=%v, ScheduleToStartTimeout=%v",
addRequest.TaskList.GetName(),
addRequest.Execution.GetWorkflowId(),
addRequest.Execution.GetRunId(),
addRequest.GetScheduleToStartTimeoutSeconds()))

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return false, err
}

tlMgr, err := e.getTaskListManager(taskList, taskListKind)
if err != nil {
return false, err
}

taskInfo := &persistence.TaskInfo{
DomainID: domainID,
RunID: addRequest.Execution.GetRunId(),
Expand All @@ -248,13 +238,23 @@ func (e *matchingEngineImpl) AddActivityTask(ctx context.Context, addRequest *m.
sourceDomainID := addRequest.GetSourceDomainUUID()
taskListName := addRequest.TaskList.GetName()
taskListKind := common.TaskListKindPtr(addRequest.TaskList.GetKind())
e.logger.Debug(fmt.Sprintf("Received AddActivityTask for taskList=%v WorkflowID=%v, RunID=%v",
taskListName, addRequest.Execution.WorkflowId, addRequest.Execution.RunId))
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)

e.logger.Debug(
fmt.Sprintf("Received AddActivityTask for taskList=%v WorkflowID=%v, RunID=%v",
taskListName,
addRequest.Execution.WorkflowId,
addRequest.Execution.RunId))

taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
if err != nil {
return false, err
}

tlMgr, err := e.getTaskListManager(taskList, taskListKind)
if err != nil {
return false, err
}

taskInfo := &persistence.TaskInfo{
DomainID: sourceDomainID,
RunID: addRequest.Execution.GetRunId(),
Expand Down Expand Up @@ -289,7 +289,10 @@ pollLoop:
// long-poll when frontend calls CancelOutstandingPoll API
pollerCtx := context.WithValue(ctx, pollerIDKey, pollerID)
pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity())
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}
taskListKind := common.TaskListKindPtr(request.TaskList.GetKind())
task, err := e.getTask(pollerCtx, taskList, nil, taskListKind)
if err != nil {
Expand Down Expand Up @@ -379,7 +382,11 @@ pollLoop:
return nil, err
}

taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity)
if err != nil {
return nil, err
}

var maxDispatch *float64
if request.TaskListMetadata != nil {
maxDispatch = request.TaskListMetadata.MaxTasksPerSecond
Expand Down Expand Up @@ -420,8 +427,11 @@ pollLoop:
func (e *matchingEngineImpl) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest) (*workflow.QueryWorkflowResponse, error) {
domainID := queryRequest.GetDomainUUID()
taskListName := queryRequest.TaskList.GetName()
taskList := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
taskListKind := common.TaskListKindPtr(queryRequest.TaskList.GetKind())
taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision)
if err != nil {
return nil, err
}

var lastErr error
query_loop:
Expand Down Expand Up @@ -534,7 +544,10 @@ func (e *matchingEngineImpl) CancelOutstandingPoll(ctx context.Context, request
taskListName := request.TaskList.GetName()
pollerID := request.GetPollerID()

taskList := newTaskListID(domainID, taskListName, taskListType)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return err
}
taskListKind := common.TaskListKindPtr(request.TaskList.GetKind())
tlMgr, err := e.getTaskListManager(taskList, taskListKind)
if err != nil {
Expand All @@ -552,8 +565,10 @@ func (e *matchingEngineImpl) DescribeTaskList(ctx context.Context, request *m.De
taskListType = persistence.TaskListTypeActivity
}
taskListName := request.DescRequest.TaskList.GetName()

taskList := newTaskListID(domainID, taskListName, taskListType)
taskList, err := newTaskListID(domainID, taskListName, taskListType)
if err != nil {
return nil, err
}
taskListKind := common.TaskListKindPtr(request.DescRequest.TaskList.GetKind())
tlMgr, err := e.getTaskListManager(taskList, taskListKind)
if err != nil {
Expand Down Expand Up @@ -731,10 +746,6 @@ func (e *matchingEngineImpl) recordActivityTaskStarted(
return resp, err
}

func newTaskListID(domainID, taskListName string, taskType int) *taskListID {
return &taskListID{domainID: domainID, taskListName: taskListName, taskType: taskType}
}

func workflowExecutionPtr(execution workflow.WorkflowExecution) *workflow.WorkflowExecution {
return &execution
}
Loading

0 comments on commit 07397aa

Please sign in to comment.