Skip to content

Commit

Permalink
Handle workflow signal itself (#539)
Browse files Browse the repository at this point in the history
* Handle workflow signal itself

* Address comments, add isWorkflowRunning check
  • Loading branch information
vancexu authored Jan 30, 2018
1 parent 4d9fcf7 commit 8cc9319
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 2 deletions.
129 changes: 129 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5044,6 +5044,135 @@ CheckHistoryLoopForCancelSent:
s.True(signalSentFailed)
}

func (s *integrationSuite) TestSignalExternalWorkflowDecision_SignalSelf() {
id := "integration-signal-self-workflow-decision-test"
wt := "integration-signal-self-workflow-decision-test-type"
tl := "integration-signal-self-workflow-decision-test-tasklist"
identity := "worker1"
activityName := "activity_type1"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Identity: common.StringPtr(identity),
}
we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)
s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

activityCount := int32(1)
activityCounter := int32(0)
signalName := "my signal"
signalInput := []byte("my signal input.")
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if activityCounter < activityCount {
activityCounter++
buf := new(bytes.Buffer)
s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter))

return []byte(strconv.Itoa(int(activityCounter))), []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr(strconv.Itoa(int(activityCounter))),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: buf.Bytes(),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(10),
StartToCloseTimeoutSeconds: common.Int32Ptr(50),
HeartbeatTimeoutSeconds: common.Int32Ptr(5),
},
}}, nil
}

return []byte(strconv.Itoa(int(activityCounter))), []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeSignalExternalWorkflowExecution),
SignalExternalWorkflowExecutionDecisionAttributes: &workflow.SignalExternalWorkflowExecutionDecisionAttributes{
Domain: common.StringPtr(s.domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
},
SignalName: common.StringPtr(signalName),
Input: signalInput,
},
}}, nil
}

atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {
return []byte("Activity Result."), false, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: atHandler,
logger: s.logger,
suite: s,
}

// Start workflows to make some progress.
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

// Signal the foreign workflow with this decision request.
_, err = poller.pollAndProcessDecisionTask(true, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

signalSentFailed := false
intiatedEventID := 10
CheckHistoryLoopForCancelSent:
for i := 1; i < 10; i++ {
historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(*we.RunId),
},
})
s.Nil(err)
history := historyResponse.History
common.PrettyPrintHistory(history, s.logger)

signalFailedEvent := history.Events[len(history.Events)-2]
if *signalFailedEvent.EventType != workflow.EventTypeSignalExternalWorkflowExecutionFailed {
s.logger.Info("Cancellaton not cancelled yet.")
time.Sleep(100 * time.Millisecond)
continue CheckHistoryLoopForCancelSent
}

signalExternalWorkflowExecutionFailedEventAttributes := signalFailedEvent.SignalExternalWorkflowExecutionFailedEventAttributes
s.Equal(int64(intiatedEventID), *signalExternalWorkflowExecutionFailedEventAttributes.InitiatedEventId)
s.Equal(id, *signalExternalWorkflowExecutionFailedEventAttributes.WorkflowExecution.WorkflowId)
s.Equal(we.RunId, signalExternalWorkflowExecutionFailedEventAttributes.WorkflowExecution.RunId)

signalSentFailed = true
break
}

s.True(signalSentFailed)

}

func (s *integrationSuite) setupShards() {
// shard 0 is always created, we create additional shards if needed
for shardID := 1; shardID < testNumberOfHistoryShards; shardID++ {
Expand Down
26 changes: 24 additions & 2 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr

var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
if err != nil || !msBuilder.isWorkflowExecutionRunning() {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
Expand All @@ -630,6 +630,28 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr
return nil
}

// handle workflow signal itself
if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID {
signalRequest := &history.SignalWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
SignalRequest: &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
},
Identity: common.StringPtr(identityHistoryService),
Control: ri.Control,
},
}
err = t.requestSignalFailed(task, context, signalRequest)
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}

targetRunID := task.TargetRunID
if targetRunID == persistence.GetTransferTaskTypeTransferTargetRunID() {
// when signal decision has empty runID, db will save default runID to transfer task.
Expand Down Expand Up @@ -717,7 +739,7 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc
// First step is to load workflow execution so we can retrieve the initiated event
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
if err != nil || !msBuilder.isWorkflowExecutionRunning() {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
Expand Down

0 comments on commit 8cc9319

Please sign in to comment.