Skip to content

Commit

Permalink
Refactor decision logic removing for loop (#1951)
Browse files Browse the repository at this point in the history
* Refactor decision handler logic
* Remove workflowStartEventProvider in favor of mutableState GetStartEvent function
* Refactor handleDecisionTaskStarted to use history engine updateWorkflowExecutionWithAction
* Remove getWorkflowStartedEvent
  • Loading branch information
wxing1292 authored Jun 5, 2019
1 parent 6a76852 commit a7422f6
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 195 deletions.
188 changes: 81 additions & 107 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"fmt"
"time"

"github.com/uber/cadence/common/cache"

h "github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -110,7 +109,10 @@ func (handler *decisionHandlerImpl) handleDecisionTaskScheduled(
transferTasks: []persistence.Task{&persistence.RecordWorkflowStartedTask{}},
}

startEvent, _ := msBuilder.GetStartEvent()
startEvent, found := msBuilder.GetStartEvent()
if !found {
return nil, &workflow.InternalServiceError{Message: "Failed to load start event."}
}
executionTimestamp := getWorkflowExecutionTimestamp(msBuilder, startEvent)
if req.GetIsFirstDecision() && executionTimestamp.After(time.Now()) {
postActions.timerTasks = append(postActions.timerTasks, &persistence.WorkflowBackoffTimerTask{
Expand All @@ -127,100 +129,88 @@ func (handler *decisionHandlerImpl) handleDecisionTaskScheduled(
func (handler *decisionHandlerImpl) handleDecisionTaskStarted(
ctx ctx.Context,
req *h.RecordDecisionTaskStartedRequest,
) (resp *h.RecordDecisionTaskStartedResponse, retError error) {
) (*h.RecordDecisionTaskStartedResponse, error) {

domainEntry, err := handler.historyEngine.getActiveDomainEntry(req.DomainUUID)
if err != nil {
return nil, err
}
domainID := domainEntry.GetInfo().ID

context, release, err0 := handler.historyCache.getOrCreateWorkflowExecutionWithTimeout(ctx, domainID, *req.WorkflowExecution)
if err0 != nil {
return nil, err0
execution := workflow.WorkflowExecution{
WorkflowId: req.WorkflowExecution.WorkflowId,
RunId: req.WorkflowExecution.RunId,
}
defer func() { release(retError) }()

scheduleID := req.GetScheduleId()
requestID := req.GetRequestId()

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err0 := context.loadWorkflowExecution()
if err0 != nil {
return nil, err0
}
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}

tBuilder := handler.historyEngine.getTimerBuilder(context.getExecution())

di, isRunning := msBuilder.GetPendingDecision(scheduleID)

// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduleID >= msBuilder.GetNextEventID() {
handler.metricsClient.IncCounter(metrics.HistoryRecordDecisionTaskStartedScope, metrics.StaleMutableStateCounter)
// Reload workflow execution history
context.clear()
continue Update_History_Loop
}

// Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If
// task is not outstanding than it is most probably a duplicate and complete the task.
if !isRunning {
// Looks like DecisionTask already completed as a result of another call.
// It is OK to drop the task at this point.
context.getLogger().Debug("Potentially duplicate task.", tag.TaskID(req.GetTaskId()), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TransferTaskTypeDecisionTask))
var resp *h.RecordDecisionTaskStartedResponse
err = handler.historyEngine.updateWorkflowExecutionWithAction(ctx, domainID, execution,
func(msBuilder mutableState, tBuilder *timerBuilder) (*updateWorkflowAction, error) {
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
}

return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."}
}
di, isRunning := msBuilder.GetPendingDecision(scheduleID)

if di.StartedID != common.EmptyEventID {
// If decision is started as part of the current request scope then return a positive response
if di.RequestID == requestID {
return handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, di, req.PollRequest.GetIdentity()), nil
// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduleID >= msBuilder.GetNextEventID() {
handler.metricsClient.IncCounter(metrics.HistoryRecordDecisionTaskStartedScope, metrics.StaleMutableStateCounter)
// Reload workflow execution history
// ErrStaleState will trigger updateWorkflowExecutionWithAction function to reload the mutable state
return nil, ErrStaleState
}

// Looks like DecisionTask already started as a result of another call.
// It is OK to drop the task at this point.
context.getLogger().Debug("Potentially duplicate task.", tag.TaskID(req.GetTaskId()), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TaskListTypeDecision))
return nil, &h.EventAlreadyStartedError{Message: "Decision task already started."}
}
// Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If
// task is not outstanding than it is most probably a duplicate and complete the task.
if !isRunning {
// Looks like DecisionTask already completed as a result of another call.
// It is OK to drop the task at this point.
return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."}
}

_, di = msBuilder.AddDecisionTaskStartedEvent(scheduleID, requestID, req.PollRequest)
if di == nil {
// Unable to add DecisionTaskStarted event to history
return nil, &workflow.InternalServiceError{Message: "Unable to add DecisionTaskStarted event to history."}
}
updateAction := &updateWorkflowAction{
noop: false,
deleteWorkflow: false,
createDecision: false,
timerTasks: nil,
transferTasks: nil,
}

// Start a timer for the decision task.
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(di.ScheduleID, di.Attempt, di.DecisionTimeout)
timerTasks := []persistence.Task{timeOutTask}
defer handler.timerProcessor.NotifyNewTimers(handler.currentClusterName, handler.shard.GetCurrentTime(handler.currentClusterName), timerTasks)
if di.StartedID != common.EmptyEventID {
// If decision is started as part of the current request scope then return a positive response
if di.RequestID == requestID {
resp = handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, di, req.PollRequest.GetIdentity())
updateAction.noop = true
return updateAction, nil
}

// Generate a transaction ID for appending events to history
transactionID, err2 := handler.shard.GetNextTransferTaskID()
if err2 != nil {
return nil, err2
}
// Looks like DecisionTask already started as a result of another call.
// It is OK to drop the task at this point.
return nil, &h.EventAlreadyStartedError{Message: "Decision task already started."}
}

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
if err3 := context.updateWorkflowExecution(nil, timerTasks, transactionID); err3 != nil {
if err3 == ErrConflict {
handler.metricsClient.IncCounter(metrics.HistoryRecordDecisionTaskStartedScope,
metrics.ConcurrencyUpdateFailureCounter)
continue Update_History_Loop
_, di = msBuilder.AddDecisionTaskStartedEvent(scheduleID, requestID, req.PollRequest)
if di == nil {
// Unable to add DecisionTaskStarted event to history
return nil, &workflow.InternalServiceError{Message: "Unable to add DecisionTaskStarted event to history."}
}
return nil, err3
}

return handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, di, req.PollRequest.GetIdentity()), nil
}
resp = handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, di, req.PollRequest.GetIdentity())
updateAction.timerTasks = []persistence.Task{tBuilder.AddStartToCloseDecisionTimoutTask(
di.ScheduleID,
di.Attempt,
di.DecisionTimeout,
)}
return updateAction, nil
})

return nil, ErrMaxAttemptsExceeded
if err != nil {
return nil, err
}
return resp, nil
}

func (handler *decisionHandlerImpl) handleDecisionTaskFailed(
Expand All @@ -235,8 +225,8 @@ func (handler *decisionHandlerImpl) handleDecisionTaskFailed(
domainID := domainEntry.GetInfo().ID

request := req.FailedRequest
token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return ErrDeserializingToken
}

Expand Down Expand Up @@ -295,17 +285,17 @@ func (handler *decisionHandlerImpl) handleDecisionTaskCompleted(
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)

context, release, err0 := handler.historyCache.getOrCreateWorkflowExecutionWithTimeout(ctx, domainID, workflowExecution)
if err0 != nil {
return nil, err0
context, release, err := handler.historyCache.getOrCreateWorkflowExecutionWithTimeout(ctx, domainID, workflowExecution)
if err != nil {
return nil, err
}
defer func() { release(retError) }()

Update_History_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
msBuilder, err1 := context.loadWorkflowExecution()
if err1 != nil {
return nil, err1
msBuilder, err := context.loadWorkflowExecution()
if err != nil {
return nil, err
}
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, ErrWorkflowCompleted
Expand All @@ -315,20 +305,6 @@ Update_History_Loop:
timerBuilderProvider := func() *timerBuilder {
return handler.historyEngine.getTimerBuilder(context.getExecution())
}
workflowStartEventProvider := func() (*workflow.HistoryEvent, error) {
return getWorkflowStartedEvent(
handler.historyEngine.historyMgr,
handler.historyEngine.historyV2Mgr,
msBuilder.GetEventStoreVersion(),
msBuilder.GetCurrentBranch(),
handler.logger,
executionInfo.DomainID,
executionInfo.WorkflowID,
executionInfo.RunID,
common.IntPtr(handler.shard.GetShardID()),
)

}

scheduleID := token.ScheduleID
di, isRunning := msBuilder.GetPendingDecision(scheduleID)
Expand Down Expand Up @@ -421,7 +397,6 @@ Update_History_Loop:
decisionBlobSizeChecker,
handler.logger,
timerBuilderProvider,
workflowStartEventProvider,
handler.domainCache,
handler.metricsClient,
)
Expand Down Expand Up @@ -459,10 +434,9 @@ Update_History_Loop:
tag.WorkflowID(token.WorkflowID),
tag.WorkflowRunID(token.RunID),
tag.WorkflowDomainID(domainID))
var err1 error
msBuilder, err1 = handler.historyEngine.failDecision(context, scheduleID, startedID, failCause, []byte(failMessage), request)
if err1 != nil {
return nil, err1
msBuilder, err = handler.historyEngine.failDecision(context, scheduleID, startedID, failCause, []byte(failMessage), request)
if err != nil {
return nil, err
}
tBuilder = handler.historyEngine.getTimerBuilder(context.getExecution())
isComplete = false
Expand Down Expand Up @@ -527,9 +501,9 @@ Update_History_Loop:
}

// Generate a transaction ID for appending events to history
transactionID, err3 := handler.shard.GetNextTransferTaskID()
if err3 != nil {
return nil, err3
transactionID, err := handler.shard.GetNextTransferTaskID()
if err != nil {
return nil, err
}

// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
Expand Down Expand Up @@ -571,9 +545,9 @@ Update_History_Loop:
}
transferTasks = []persistence.Task{tranT}
timerTasks = []persistence.Task{timerT}
transactionID, err3 = handler.shard.GetNextTransferTaskID()
if err3 != nil {
return nil, err3
transactionID, err = handler.shard.GetNextTransferTaskID()
if err != nil {
return nil, err
}
if err := context.updateWorkflowExecutionWithContext(request.ExecutionContext, transferTasks, timerTasks, transactionID); err != nil {
return nil, err
Expand Down
35 changes: 15 additions & 20 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
)

type (
workflowStartEventProvider func() (*workflow.HistoryEvent, error)

timerBuilderProvider func() *timerBuilder

decisionTaskHandlerImpl struct {
Expand Down Expand Up @@ -64,11 +62,10 @@ type (
attrValidator *decisionAttrValidator
sizeLimitChecker *decisionBlobSizeChecker

logger log.Logger
timerBuilderProvider timerBuilderProvider
workflowStartEventProvider workflowStartEventProvider
domainCache cache.DomainCache
metricsClient metrics.Client
logger log.Logger
timerBuilderProvider timerBuilderProvider
domainCache cache.DomainCache
metricsClient metrics.Client
}
)

Expand All @@ -82,7 +79,6 @@ func newDecisionTaskHandler(
sizeLimitChecker *decisionBlobSizeChecker,
logger log.Logger,
timerBuilderProvider timerBuilderProvider,
workflowStartEventProvider workflowStartEventProvider,
domainCache cache.DomainCache,
metricsClient metrics.Client,
) *decisionTaskHandlerImpl {
Expand All @@ -109,12 +105,11 @@ func newDecisionTaskHandler(
attrValidator: attrValidator,
sizeLimitChecker: sizeLimitChecker,

logger: logger,
timerBuilder: timerBuilderProvider(),
timerBuilderProvider: timerBuilderProvider,
workflowStartEventProvider: workflowStartEventProvider,
domainCache: domainCache,
metricsClient: metricsClient,
logger: logger,
timerBuilder: timerBuilderProvider(),
timerBuilderProvider: timerBuilderProvider,
domainCache: domainCache,
metricsClient: metricsClient,
}
}

Expand Down Expand Up @@ -343,9 +338,9 @@ func (handler *decisionTaskHandlerImpl) handleDecisionCompleteWorkflow(
} else {
// this is a cron workflow
executionInfo := handler.mutableState.GetExecutionInfo()
startEvent, err := handler.workflowStartEventProvider()
if err != nil {
return err
startEvent, found := handler.mutableState.GetStartEvent()
if !found {
return &workflow.InternalServiceError{Message: "Failed to load start event."}
}

startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
Expand Down Expand Up @@ -439,9 +434,9 @@ func (handler *decisionTaskHandlerImpl) handleDecisionFailWorkflow(
return &workflow.InternalServiceError{Message: "Unable to add fail workflow event."}
}
} else {
startEvent, err := handler.workflowStartEventProvider()
if err != nil {
return err
startEvent, found := handler.mutableState.GetStartEvent()
if !found {
return &workflow.InternalServiceError{Message: "Failed to load start event."}
}

startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
Expand Down
Loading

0 comments on commit a7422f6

Please sign in to comment.