From 8cc931913e28348db0a79e89bb31b4a9e40ad4a8 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 30 Jan 2018 15:24:24 -0800 Subject: [PATCH] Handle workflow signal itself (#539) * Handle workflow signal itself * Address comments, add isWorkflowRunning check --- host/integration_test.go | 129 ++++++++++++++++++++++ service/history/transferQueueProcessor.go | 26 ++++- 2 files changed, 153 insertions(+), 2 deletions(-) diff --git a/host/integration_test.go b/host/integration_test.go index abcfeb2ef44..8855cd8196d 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -5044,6 +5044,135 @@ CheckHistoryLoopForCancelSent: s.True(signalSentFailed) } +func (s *integrationSuite) TestSignalExternalWorkflowDecision_SignalSelf() { + id := "integration-signal-self-workflow-decision-test" + wt := "integration-signal-self-workflow-decision-test-type" + tl := "integration-signal-self-workflow-decision-test-tasklist" + identity := "worker1" + activityName := "activity_type1" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Identity: common.StringPtr(identity), + } + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + activityCount := int32(1) + activityCounter := int32(0) + signalName := "my signal" + signalInput := []byte("my signal input.") + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + if activityCounter < activityCount { + activityCounter++ + buf := new(bytes.Buffer) + s.Nil(binary.Write(buf, binary.LittleEndian, activityCounter)) + + return []byte(strconv.Itoa(int(activityCounter))), []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr(strconv.Itoa(int(activityCounter))), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: buf.Bytes(), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(100), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(10), + StartToCloseTimeoutSeconds: common.Int32Ptr(50), + HeartbeatTimeoutSeconds: common.Int32Ptr(5), + }, + }}, nil + } + + return []byte(strconv.Itoa(int(activityCounter))), []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeSignalExternalWorkflowExecution), + SignalExternalWorkflowExecutionDecisionAttributes: &workflow.SignalExternalWorkflowExecutionDecisionAttributes{ + Domain: common.StringPtr(s.domainName), + Execution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(we.GetRunId()), + }, + SignalName: common.StringPtr(signalName), + Input: signalInput, + }, + }}, nil + } + + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + return []byte("Activity Result."), false, nil + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: atHandler, + logger: s.logger, + suite: s, + } + + // Start workflows to make some progress. + _, err := poller.pollAndProcessDecisionTask(false, false) + s.logger.Infof("pollAndProcessDecisionTask: %v", err) + s.Nil(err) + + // Signal the foreign workflow with this decision request. + _, err = poller.pollAndProcessDecisionTask(true, false) + s.logger.Infof("pollAndProcessDecisionTask: %v", err) + s.Nil(err) + + signalSentFailed := false + intiatedEventID := 10 +CheckHistoryLoopForCancelSent: + for i := 1; i < 10; i++ { + historyResponse, err := s.engine.GetWorkflowExecutionHistory(createContext(), &workflow.GetWorkflowExecutionHistoryRequest{ + Domain: common.StringPtr(s.domainName), + Execution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(*we.RunId), + }, + }) + s.Nil(err) + history := historyResponse.History + common.PrettyPrintHistory(history, s.logger) + + signalFailedEvent := history.Events[len(history.Events)-2] + if *signalFailedEvent.EventType != workflow.EventTypeSignalExternalWorkflowExecutionFailed { + s.logger.Info("Cancellaton not cancelled yet.") + time.Sleep(100 * time.Millisecond) + continue CheckHistoryLoopForCancelSent + } + + signalExternalWorkflowExecutionFailedEventAttributes := signalFailedEvent.SignalExternalWorkflowExecutionFailedEventAttributes + s.Equal(int64(intiatedEventID), *signalExternalWorkflowExecutionFailedEventAttributes.InitiatedEventId) + s.Equal(id, *signalExternalWorkflowExecutionFailedEventAttributes.WorkflowExecution.WorkflowId) + s.Equal(we.RunId, signalExternalWorkflowExecutionFailedEventAttributes.WorkflowExecution.RunId) + + signalSentFailed = true + break + } + + s.True(signalSentFailed) + +} + func (s *integrationSuite) setupShards() { // shard 0 is always created, we create additional shards if needed for shardID := 1; shardID < testNumberOfHistoryShards; shardID++ { diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 93e60cca709..7cade76da56 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -613,7 +613,7 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr var msBuilder *mutableStateBuilder msBuilder, err = context.loadWorkflowExecution() - if err != nil { + if err != nil || !msBuilder.isWorkflowExecutionRunning() { if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil @@ -630,6 +630,28 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr return nil } + // handle workflow signal itself + if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID { + signalRequest := &history.SignalWorkflowExecutionRequest{ + DomainUUID: common.StringPtr(domainID), + SignalRequest: &workflow.SignalWorkflowExecutionRequest{ + Domain: common.StringPtr(domainID), + WorkflowExecution: &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(task.WorkflowID), + RunId: common.StringPtr(task.RunID), + }, + Identity: common.StringPtr(identityHistoryService), + Control: ri.Control, + }, + } + err = t.requestSignalFailed(task, context, signalRequest) + if _, ok := err.(*workflow.EntityNotExistsError); ok { + // this could happen if this is a duplicate processing of the task, and the execution has already completed. + return nil + } + return err + } + targetRunID := task.TargetRunID if targetRunID == persistence.GetTransferTaskTypeTransferTargetRunID() { // when signal decision has empty runID, db will save default runID to transfer task. @@ -717,7 +739,7 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc // First step is to load workflow execution so we can retrieve the initiated event var msBuilder *mutableStateBuilder msBuilder, err = context.loadWorkflowExecution() - if err != nil { + if err != nil || !msBuilder.isWorkflowExecutionRunning() { if _, ok := err.(*workflow.EntityNotExistsError); ok { // this could happen if this is a duplicate processing of the task, and the execution has already completed. return nil