Skip to content

Commit

Permalink
Fix missing decision timeout for transient decisions (#889)
Browse files Browse the repository at this point in the history
We were using incorrect scheduleID for transient decisions on decision
started event.  This happens if a new event comes in after a transient
decision is scheduled, but before it is started.  We end up a creating a
timeout task with the wrong decision schedule id causing timer
processing to skip that timeout event.
Also added an integration test for this use case.
  • Loading branch information
samarabbas authored Jun 26, 2018
1 parent ec2c6da commit da1ccd6
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
102 changes: 102 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6146,6 +6146,108 @@ func (s *integrationSuite) TestSignalWithStartWorkflow() {
s.Equal(identity, *signalEvent.WorkflowExecutionSignaledEventAttributes.Identity)
}

func (s *integrationSuite) TestTransientDecisionTimeout() {
id := "integration-transient-decision-timeout-test"
wt := "integration-transient-decision-timeout-test-type"
tl := "integration-transient-decision-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)
s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

// decider logic
workflowComplete := false
failDecision := true
signalCount := 0
//var signalEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if failDecision {
failDecision = false
return nil, nil, errors.New("Decider Panic")
}

// Count signals
for _, event := range history.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeWorkflowExecutionSignaled {
signalCount++
}
}

workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
suite: s,
}

// First decision immediately fails and schedules a transient decision
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Now send a signal when transient decision is scheduled
err = s.sendSignal(s.domainName, workflowExecution, "signalA", nil, identity)
s.Nil(err, "failed to send signal to execution")

// Drop decision task to cause a Decision Timeout
_, err = poller.pollAndProcessDecisionTask(true, true)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Print history after dropping decision
s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})

// Now process signal and complete workflow execution
_, err = poller.pollAndProcessDecisionTaskWithAttempt(true, false, false, false, int64(1))
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.Equal(1, signalCount)
s.True(workflowComplete)
}

func (s *integrationSuite) getHistory(domain string, execution *workflow.WorkflowExecution) []*workflow.HistoryEvent {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ Update_History_Loop:
}

// Start a timer for the decision task.
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(scheduleID, di.Attempt, di.DecisionTimeout)
timeOutTask := tBuilder.AddStartToCloseDecisionTimoutTask(di.ScheduleID, di.Attempt, di.DecisionTimeout)
timerTasks := []persistence.Task{timeOutTask}
defer e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)

Expand Down

0 comments on commit da1ccd6

Please sign in to comment.