diff --git a/tests/xdc/integration_failover_test.go b/tests/xdc/integration_failover_test.go index bc4d6c4b3a2..c6ad48e9129 100644 --- a/tests/xdc/integration_failover_test.go +++ b/tests/xdc/integration_failover_test.go @@ -1569,6 +1569,119 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() { } } +func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() { + namespace := "test-force-workflow-task-close-" + common.GenerateRandomString(5) + client1 := s.cluster1.GetFrontendClient() // active + regReq := &workflowservice.RegisterNamespaceRequest{ + Namespace: namespace, + IsGlobalNamespace: true, + Clusters: clusterReplicationConfig, + ActiveClusterName: clusterName[0], + WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24), + } + _, err := client1.RegisterNamespace(tests.NewContext(), regReq) + s.NoError(err) + // Wait for namespace cache to pick the change + time.Sleep(cacheRefreshInterval) + + descReq := &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + } + resp, err := client1.DescribeNamespace(tests.NewContext(), descReq) + s.NoError(err) + s.NotNil(resp) + + client2 := s.cluster2.GetFrontendClient() // standby + + // Start a workflow + id := "test-force-workflow-task-close-test" + wt := "test-force-workflow-task-close-test-type" + tl := "test-force-workflow-task-close-test-taskqueue" + identity := "worker1" + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tl} + startReq := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(300 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(60 * time.Second), + Identity: identity, + } + var we *workflowservice.StartWorkflowExecutionResponse + for i := 0; i < 10; i++ { + we, err = client1.StartWorkflowExecution(tests.NewContext(), startReq) + if err == nil { + break + } + time.Sleep(1 * time.Second) + } + s.NoError(err) + s.NotNil(we.GetRunId()) + + s.logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.GetRunId())) + + workflowFinished := false + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, + previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + + workflowFinished = true + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }}, + }}, nil + } + + poller1 := &tests.TaskPoller{ + Engine: client1, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), + } + + poller2 := &tests.TaskPoller{ + Engine: client2, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), + } + + // this will fail the workflow task + _, err = poller1.PollAndProcessWorkflowTask(false, true) + s.NoError(err) + + s.failover(namespace, clusterName[1], int64(2), client1) + + // Send a signal in cluster + signalName := "my signal" + signalInput := payloads.EncodeString("my signal input") + _, err = client2.SignalWorkflowExecution(tests.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + RunId: we.GetRunId(), + }, + SignalName: signalName, + Input: signalInput, + }) + s.NoError(err) + + _, err = poller2.PollAndProcessWorkflowTaskWithAttempt(false, false, false, false, 1) + s.NoError(err) + s.True(workflowFinished) +} + func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() { namespace := "test-transient-workflow-task-workflow-failover-" + common.GenerateRandomString(5) client1 := s.cluster1.GetFrontendClient() // active