Skip to content

Commit

Permalink
Fix workflow timeout not created for new execution when ContinuedAsNew (
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Jan 3, 2018
1 parent 5e9c49d commit aa29ebb
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 4 deletions.
6 changes: 4 additions & 2 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,10 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
if request.ContinueAsNew != nil {
startReq := request.ContinueAsNew
d.CreateWorkflowExecutionWithinBatch(startReq, batch, cqlNowTimestamp)
d.createTransferTasks(batch, startReq.TransferTasks, startReq.DomainID, *startReq.Execution.WorkflowId,
*startReq.Execution.RunId, cqlNowTimestamp)
d.createTransferTasks(batch, startReq.TransferTasks, startReq.DomainID, startReq.Execution.GetWorkflowId(),
startReq.Execution.GetRunId(), cqlNowTimestamp)
d.createTimerTasks(batch, startReq.TimerTasks, nil, startReq.DomainID, startReq.Execution.GetWorkflowId(),
startReq.Execution.GetRunId(), cqlNowTimestamp)
} else if request.FinishExecution {
retentionInSeconds := request.FinishedExecutionTTL
if retentionInSeconds <= 0 {
Expand Down
108 changes: 107 additions & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,112 @@ func (s *integrationSuite) TestContinueAsNewWorkflow() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestContinueAsNewWorkflow_Timeout() {
id := "interation-continue-as-new-workflow-timeout-test"
wt := "interation-continue-as-new-workflow-timeout-test-type"
tl := "interation-continue-as-new-workflow-timeout-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowComplete := false
continueAsNewCount := int32(1)
continueAsNewCounter := int32(0)
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if continueAsNewCounter < continueAsNewCount {
continueAsNewCounter++
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, continueAsNewCounter))

return []byte(strconv.Itoa(int(continueAsNewCounter))), []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeContinueAsNewWorkflowExecution),
ContinueAsNewWorkflowExecutionDecisionAttributes: &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: workflowType,
TaskList: &workflow.TaskList{Name: &tl},
Input: buf.Bytes(),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), // set timeout to 1
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
},
}}, nil
}

workflowComplete = true
return []byte(strconv.Itoa(int(continueAsNewCounter))), []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
logger: s.logger,
suite: s,
}

// process the decision and continue as new
_, err := poller.pollAndProcessDecisionTask(true, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.False(workflowComplete)

time.Sleep(1 * time.Second) // wait 1 second for timeout

GetHistoryLoop:
for i := 0; i < 20; i++ {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
},
})
s.Nil(err)
history := historyResponse.History
common.PrettyPrintHistory(history, s.logger)

lastEvent := history.Events[len(history.Events)-1]
if *lastEvent.EventType != workflow.EventTypeWorkflowExecutionTimedOut {
s.logger.Warnf("Execution not timedout yet.")
time.Sleep(200 * time.Millisecond)
continue GetHistoryLoop
}

timeoutEventAttributes := lastEvent.WorkflowExecutionTimedOutEventAttributes
s.Equal(workflow.TimeoutTypeStartToClose, *timeoutEventAttributes.TimeoutType)
workflowComplete = true
break GetHistoryLoop
}
s.True(workflowComplete)
}

func (s *integrationSuite) TestVisibility() {
startTime := time.Now().UnixNano()

Expand Down Expand Up @@ -3833,6 +3939,6 @@ func (s *integrationSuite) printWorkflowHistory(domain string, execution *workfl
}

func createContext() context.Context {
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
ctx, _ := context.WithTimeout(context.Background(), 90*time.Second)
return ctx
}
3 changes: 2 additions & 1 deletion service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,11 +919,12 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
return nil, wh.error(err, scope)
}

// this function return the following 3 things,
// this function return the following 5 things,
// 1. the workflow run ID
// 2. the last first event ID (the event ID of the last batch of events in the history)
// 3. the next event ID
// 4. whether the workflow is closed
// 5. error if any
queryHistory := func(domainUUID string, execution *gen.WorkflowExecution, expectedNextEventID int64) (string, int64, int64, bool, error) {
response, err := wh.history.GetMutableState(ctx, &h.GetMutableStateRequest{
DomainUUID: common.StringPtr(domainUUID),
Expand Down
12 changes: 12 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ Update_History_Loop:
transferTasks := []persistence.Task{}
timerTasks := []persistence.Task{}
var continueAsNewBuilder *mutableStateBuilder
var continueAsNewTimerTasks []persistence.Task
hasDecisionScheduleActivityTask := false

if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskList == nil {
Expand Down Expand Up @@ -1038,6 +1039,14 @@ Update_History_Loop:
if err != nil {
return nil
}

// add timer task to new workflow
duration := time.Duration(*attributes.ExecutionStartToCloseTimeoutSeconds) * time.Second
continueAsNewTimerTasks = []persistence.Task{&persistence.WorkflowTimeoutTask{
VisibilityTimestamp: e.shard.GetTimeSource().Now().Add(duration),
}}
msBuilder.continueAsNew.TimerTasks = continueAsNewTimerTasks

isComplete = true
continueAsNewBuilder = newStateBuilder

Expand Down Expand Up @@ -1143,8 +1152,11 @@ Update_History_Loop:
return updateErr
}

// add continueAsNewTimerTask
timerTasks = append(timerTasks, continueAsNewTimerTasks...)
// Inform timer about the new ones.
e.timerProcessor.NotifyNewTimer(timerTasks)

return err
}

Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (t *timerQueueProcessorImpl) Stop() {
}

// NotifyNewTimer - Notify the processor about the new timer arrival.
// This should be called each time new timer created, otherwise timer maybe fired unexpected.
func (t *timerQueueProcessorImpl) NotifyNewTimer(timerTasks []persistence.Task) {
if len(timerTasks) == 0 {
return
Expand Down

0 comments on commit aa29ebb

Please sign in to comment.