diff --git a/host/client_integration_test.go b/host/client_integration_test.go index c8637e0a757..f4d4aa3c268 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -283,7 +283,8 @@ func (s *clientIntegrationSuite) TestClientDataConverter() { TaskList: s.taskList, ExecutionStartToCloseTimeout: 60 * time.Second, } - ctx, _ := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() we, err := s.wfClient.ExecuteWorkflow(ctx, workflowOptions, testDataConverterWorkflow, tl) if err != nil { s.logger.Fatalf("Start workflow with err: %v", err) @@ -313,7 +314,8 @@ func (s *clientIntegrationSuite) TestClientDataConverter_Failed() { TaskList: s.taskList, ExecutionStartToCloseTimeout: 60 * time.Second, } - ctx, _ := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() we, err := s.wfClient.ExecuteWorkflow(ctx, workflowOptions, testDataConverterWorkflow, tl) if err != nil { s.logger.Fatalf("Start workflow with err: %v", err) @@ -414,7 +416,8 @@ func (s *clientIntegrationSuite) TestClientDataConverter_WithChild() { TaskList: s.taskList, ExecutionStartToCloseTimeout: 60 * time.Second, } - ctx, _ := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() we, err := s.wfClient.ExecuteWorkflow(ctx, workflowOptions, testParentWorkflow) if err != nil { s.logger.Fatalf("Start workflow with err: %v", err) diff --git a/host/integration_test.go b/host/integration_test.go index 27d45ae767b..472dfc20046 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -197,7 +197,7 @@ func (s *integrationSuite) setupSuite(enableGlobalDomain bool, isMasterCluster b }) } -func (s *integrationSuite) TestIntegrationStartWorkflowExecution() { +func (s *integrationSuite) TestStartWorkflowExecution() { id := "integration-start-workflow-test" wt := "integration-start-workflow-test-type" tl := "integration-start-workflow-test-tasklist" @@ -544,7 +544,6 @@ func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() { s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) - workflowComplete := false decisionCount := 0 dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { @@ -559,7 +558,6 @@ func (s *integrationSuite) TestCompleteDecisionTaskAndCreateNewOne() { }}, nil } - workflowComplete = true return nil, []*workflow.Decision{{ DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ diff --git a/service/history/handler.go b/service/history/handler.go index 67b1574cdb5..4e6488a970e 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -936,6 +936,13 @@ func (h *Handler) convertError(err error) error { return createShardOwnershipLostError(h.GetHostInfo().GetAddress(), info.GetAddress()) } return createShardOwnershipLostError(h.GetHostInfo().GetAddress(), "") + case *persistence.WorkflowExecutionAlreadyStartedError: + err := err.(*persistence.WorkflowExecutionAlreadyStartedError) + return &gen.WorkflowExecutionAlreadyStartedError{ + Message: common.StringPtr("Workflow is already running"), + StartRequestId: common.StringPtr(err.StartRequestID), + RunId: common.StringPtr(err.RunID), + } } return err diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index edde385d9bb..560730e95e0 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2048,6 +2048,12 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(ctx context.Context return &workflow.StartWorkflowExecutionResponse{ RunId: common.StringPtr(resultRunID), }, nil + } else if alreadyStartedErr, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { + return nil, &workflow.WorkflowExecutionAlreadyStartedError{ + Message: common.StringPtr("Workflow is already running"), + StartRequestId: common.StringPtr(alreadyStartedErr.StartRequestID), + RunId: common.StringPtr(alreadyStartedErr.RunID), + } } return nil, err } diff --git a/service/history/historyReplicator.go b/service/history/historyReplicator.go index e392312a925..894a48a175b 100644 --- a/service/history/historyReplicator.go +++ b/service/history/historyReplicator.go @@ -125,6 +125,9 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE case *shared.WorkflowExecutionAlreadyStartedError: logger.Debugf("Encounter WorkflowExecutionAlreadyStartedError: %v", retError) retError = ErrRetryExecutionAlreadyStarted + case *persistence.WorkflowExecutionAlreadyStartedError: + logger.Debugf("Encounter WorkflowExecutionAlreadyStartedError: %v", retError) + retError = ErrRetryExecutionAlreadyStarted } } }()