Skip to content

Commit

Permalink
Merge branch 'master' into standby-delay
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Jul 17, 2018
2 parents 51f6613 + c4650a1 commit 8fd74cf
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 38 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 124 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6246,6 +6246,130 @@ func (s *integrationSuite) TestTransientDecisionTimeout() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestTaskProcessingProtectionForRateLimitError() {
id := "integration-task-processing-protection-for-rate-limit-error-test"
wt := "integration-task-processing-protection-for-rate-limit-error-test-type"
tl := "integration-task-processing-protection-for-rate-limit-error-test-tasklist"
identity := "worker1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

// Start workflow execution
request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(601),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(600),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)
workflowExecution := &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
}

// decider logic
workflowComplete := false
signalCount := 0
createUserTimer := false
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, h *workflow.History) ([]byte, []*workflow.Decision, error) {

if !createUserTimer {
createUserTimer = true

return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartTimer),
StartTimerDecisionAttributes: &workflow.StartTimerDecisionAttributes{
TimerId: common.StringPtr("timer-id-1"),
StartToFireTimeoutSeconds: common.Int64Ptr(5),
},
}}, nil
}

// Count signals
for _, event := range h.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeWorkflowExecutionSignaled {
signalCount++
}
}

workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: nil,
logger: s.logger,
suite: s,
}

// Process first decision to create user timer
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Send one signal to create a new decision
for i := 0; i < 1; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Drop decision to cause all events to be buffered from now on
_, err = poller.pollAndProcessDecisionTask(false, true)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Buffered Signals
for i := 1; i < 101; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
s.Nil(s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity))
}

// Rate limitted signals
for i := 0; i < 10; i++ {
buf := new(bytes.Buffer)
binary.Write(buf, binary.LittleEndian, i)
signalErr := s.sendSignal(s.domainName, workflowExecution, "SignalName", buf.Bytes(), identity)
s.NotNil(signalErr)
s.Equal(history.ErrBufferedEventsLimitExceeded, signalErr)
}

// Process signal in decider
_, err = poller.pollAndProcessDecisionTaskWithAttempt(true, false, false, false, 1)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.printWorkflowHistory(s.domainName, workflowExecution)

s.True(workflowComplete)
s.Equal(101, signalCount)
}

func (s *integrationSuite) getHistory(domain string, execution *workflow.WorkflowExecution) []*workflow.HistoryEvent {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(domain),
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ enum DecisionTaskFailedCause {
WORKFLOW_WORKER_UNHANDLED_FAILURE,
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_START_CHILD_EXECUTION_ATTRIBUTES,
FORCE_CLOSE_DECISION,
}

enum CancelExternalWorkflowExecutionFailedCause {
Expand Down
2 changes: 1 addition & 1 deletion service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ TaskFilterLoop:
for _, task := range tasks {
_, isLoaded := a.outstandingTasks[task.GetTaskID()]
if isLoaded {
// timer already loaded
// task already loaded
a.logger.Debugf("Skipping transfer task: %v.", task)
continue TaskFilterLoop
}
Expand Down
55 changes: 43 additions & 12 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,19 +659,12 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
) error {
executionInfo := msBuilder.GetExecutionInfo()
var transferTasks []persistence.Task
var err error
if scheduleNewDecision {
// Schedule a new decision.
di := msBuilder.AddDecisionTaskScheduledEvent()
transferTasks = []persistence.Task{&persistence.DecisionTask{
DomainID: executionInfo.DomainID,
TaskList: di.TaskList,
ScheduleID: di.ScheduleID,
}}
if msBuilder.IsStickyTaskListEnabled() {
tBuilder := t.historyService.getTimerBuilder(&context.workflowExecution)
stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt,
executionInfo.StickyScheduleToStartTimeout)
timerTasks = append(timerTasks, stickyTaskTimeoutTimer)
transferTasks, timerTasks, err = context.scheduleNewDecision(transferTasks, timerTasks)
if err != nil {
return err
}
}

Expand All @@ -691,13 +684,51 @@ func (t *timerQueueActiveProcessorImpl) updateWorkflowExecution(
return err1
}

err := context.updateWorkflowExecutionWithDeleteTask(transferTasks, timerTasks, clearTimerTask, transactionID)
err = context.updateWorkflowExecutionWithDeleteTask(transferTasks, timerTasks, clearTimerTask, transactionID)
if err != nil {
if isShardOwnershiptLostError(err) {
// Shard is stolen. Stop timer processing to reduce duplicates
t.timerQueueProcessorBase.Stop()
return err
}

// Check if the processing is blocked due to limit exceeded error and fail any outstanding decision to
// unblock processing
if err == ErrBufferedEventsLimitExceeded {
context.clear()

var err1 error
// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 = context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = context.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = context.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}

return err
}
}

t.notifyNewTimers(timerTasks)
return err
}
59 changes: 40 additions & 19 deletions service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (t *transferQueueActiveProcessorImpl) processDecisionTask(task *persistence
return err
}

if task.ScheduleID == common.FirstEventID+1 {
if task.ScheduleID <= common.FirstEventID+2 {
err = t.recordWorkflowExecutionStarted(execution, task, wfTypeName, startTimestamp, workflowTimeout)
}

Expand Down Expand Up @@ -1079,27 +1079,13 @@ Update_History_Loop:
if err := action(msBuilder); err != nil {
return err
}
executionInfo := msBuilder.GetExecutionInfo()

if createDecisionTask {
// Create a transfer task to schedule a decision task
if !msBuilder.HasPendingDecisionTask() {
di := msBuilder.AddDecisionTaskScheduledEvent()
transferTasks = append(transferTasks, &persistence.DecisionTask{
DomainID: domainID,
TaskList: di.TaskList,
ScheduleID: di.ScheduleID,
})
if msBuilder.IsStickyTaskListEnabled() {
lg := t.logger.WithFields(bark.Fields{
logging.TagWorkflowExecutionID: context.workflowExecution.WorkflowId,
logging.TagWorkflowRunID: context.workflowExecution.RunId,
})
tBuilder := newTimerBuilder(t.shard.GetConfig(), lg, common.NewRealTimeSource())
stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt,
executionInfo.StickyScheduleToStartTimeout)
timerTasks = []persistence.Task{stickyTaskTimeoutTimer}
}
var err error
transferTasks, timerTasks, err = context.scheduleNewDecision(transferTasks, timerTasks)
if err != nil {
return err
}
}

Expand All @@ -1115,6 +1101,41 @@ Update_History_Loop:
if err == ErrConflict {
continue Update_History_Loop
}

// Check if the processing is blocked due to limit exceeded error and fail any outstanding decision to
// unblock processing
if err == ErrBufferedEventsLimitExceeded {
context.clear()

var err1 error
// Reload workflow execution so we can apply the decision task failure event
msBuilder, err1 = context.loadWorkflowExecution()
if err1 != nil {
return err1
}

if di, ok := msBuilder.GetInFlightDecisionTask(); ok {
msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID,
workflow.DecisionTaskFailedCauseForceCloseDecision, nil, identityHistoryService)

var transT, timerT []persistence.Task
transT, timerT, err1 = context.scheduleNewDecision(transT, timerT)
if err1 != nil {
return err1
}

// Generate a transaction ID for appending events to history
transactionID, err1 := t.historyService.shard.GetNextTransferTaskID()
if err1 != nil {
return err1
}
err1 = context.updateWorkflowExecution(transT, timerT, transactionID)
if err1 != nil {
return err1
}
}
}

return err
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *transferQueueStandbyProcessorImpl) processDecisionTask(transferTask *pe
decisionInfo, isPending := msBuilder.GetPendingDecision(transferTask.ScheduleID)

if !isPending {
if transferTask.ScheduleID == common.FirstEventID+1 {
if transferTask.ScheduleID <= common.FirstEventID+2 {
return t.recordWorkflowStarted(msBuilder)
}
return nil
Expand All @@ -206,7 +206,7 @@ func (t *transferQueueStandbyProcessorImpl) processDecisionTask(transferTask *pe
return ErrTaskRetry
}

if transferTask.ScheduleID == common.FirstEventID+1 {
if transferTask.ScheduleID <= common.FirstEventID+2 {
return t.recordWorkflowStarted(msBuilder)
}

Expand Down
Loading

0 comments on commit 8fd74cf

Please sign in to comment.