diff --git a/tests/xdc/integration_failover_test.go b/tests/xdc/integration_failover_test.go index 0bfe2ea5e21..df19a1aae47 100644 --- a/tests/xdc/integration_failover_test.go +++ b/tests/xdc/integration_failover_test.go @@ -663,7 +663,7 @@ func (s *integrationClustersTestSuite) TestStickyWorkflowTaskFailover() { s.failover(namespace, clusterName[0], int64(11), client2) - _, err = poller1.PollAndProcessWorkflowTask(true, false) + _, err = poller1.PollAndProcessWorkflowTask(false, false) s.logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) s.NoError(err) s.True(workflowCompleted) @@ -1562,7 +1562,7 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() { for i := 1; i < 20; i++ { if !workflowCompleted { - _, err = poller2.PollAndProcessWorkflowTask(true, false) + _, err = poller2.PollAndProcessWorkflowTask(false, false) s.NoError(err) time.Sleep(time.Second) } @@ -1828,8 +1828,8 @@ func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() { s.True(workflowFinished) } -func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { - namespace := "test-cron-workflow-failover-" + common.GenerateRandomString(5) +func (s *integrationClustersTestSuite) TestCronWorkflowStartAndFailover() { + namespace := "test-cron-workflow-start-and-failover-" + common.GenerateRandomString(5) client1 := s.cluster1.GetFrontendClient() // active regReq := &workflowservice.RegisterNamespaceRequest{ Namespace: namespace, @@ -1853,9 +1853,9 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { client2 := s.cluster2.GetFrontendClient() // standby // start a workflow - id := "integration-cron-workflow-failover-test" - wt := "integration-cron-workflow-failover-test-type" - tl := "integration-cron-workflow-failover-test-taskqueue" + id := "integration-cron-workflow-start-and-failover-test" + wt := "integration-cron-workflow-start-and-failover-test-type" + tl := "integration-cron-workflow-start-and-failover-test-taskqueue" identity := "worker1" workflowType := &commonpb.WorkflowType{Name: wt} taskQueue := &taskqueuepb.TaskQueue{Name: tl} @@ -1875,8 +1875,12 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { s.NoError(err) s.NotNil(we.GetRunId()) + wfCompleted := false + var executions []*commonpb.WorkflowExecution wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + executions = append(executions, execution) + wfCompleted = true return []*commandpb.Command{ { CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, @@ -1898,12 +1902,120 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { s.failover(namespace, clusterName[1], int64(2), client1) - // Run twice to make sure cron schedule is passed to standby. - for i := 0; i < 2; i++ { - _, err = poller2.PollAndProcessWorkflowTask(false, false) - s.NoError(err) + _, err = poller2.PollAndProcessWorkflowTask(false, false) + s.NoError(err) + s.True(wfCompleted) + events := s.getHistory(client2, namespace, executions[0]) + s.Equal(int64(2), events[len(events)-1].GetVersion()) + + // terminate the remaining cron + _, err = client2.TerminateWorkflowExecution(tests.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{ + Namespace: namespace, + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: id, + }, + }) + s.NoError(err) +} + +func (s *integrationClustersTestSuite) TestCronWorkflowCompleteAndFailover() { + namespace := "test-cron-workflow-complete-and-failover-" + common.GenerateRandomString(5) + client1 := s.cluster1.GetFrontendClient() // active + regReq := &workflowservice.RegisterNamespaceRequest{ + Namespace: namespace, + IsGlobalNamespace: true, + Clusters: clusterReplicationConfig, + ActiveClusterName: clusterName[0], + WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24), + } + _, err := client1.RegisterNamespace(tests.NewContext(), regReq) + s.NoError(err) + // Wait for namespace cache to pick the change + time.Sleep(cacheRefreshInterval) + + descReq := &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + } + resp, err := client1.DescribeNamespace(tests.NewContext(), descReq) + s.NoError(err) + s.NotNil(resp) + + client2 := s.cluster2.GetFrontendClient() // standby + + // start a workflow + id := "integration-cron-workflow-complete-andfailover-test" + wt := "integration-cron-workflow-complete-andfailover-test-type" + tl := "integration-cron-workflow-complete-andfailover-test-taskqueue" + identity := "worker1" + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tl} + startReq := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + Identity: identity, + CronSchedule: "@every 5s", + } + we, err := client1.StartWorkflowExecution(tests.NewContext(), startReq) + s.NoError(err) + s.NotNil(we.GetRunId()) + + wfCompletionCount := 0 + var executions []*commonpb.WorkflowExecution + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, + previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + wfCompletionCount += 1 + executions = append(executions, execution) + return []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("cron-test-result"), + }}, + }}, nil + } + + poller1 := tests.TaskPoller{ + Engine: client1, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), + } + + poller2 := tests.TaskPoller{ + Engine: client2, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), } + _, err = poller1.PollAndProcessWorkflowTask(false, false) + s.NoError(err) + s.Equal(1, wfCompletionCount) + events := s.getHistory(client1, namespace, executions[0]) + s.Equal(int64(1), events[0].GetVersion()) + s.Equal(int64(1), events[len(events)-1].GetVersion()) + + s.failover(namespace, clusterName[1], int64(2), client1) + + _, err = poller2.PollAndProcessWorkflowTask(false, false) + s.NoError(err) + s.Equal(2, wfCompletionCount) + events = s.getHistory(client2, namespace, executions[1]) + s.Equal(int64(1), events[0].GetVersion()) + s.Equal(int64(2), events[len(events)-1].GetVersion()) + _, err = client2.TerminateWorkflowExecution(tests.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{ Namespace: namespace, WorkflowExecution: &commonpb.WorkflowExecution{ @@ -1913,8 +2025,8 @@ func (s *integrationClustersTestSuite) TestCronWorkflowFailover() { s.NoError(err) } -func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() { - namespace := "test-workflow-retry-failover-" + common.GenerateRandomString(5) +func (s *integrationClustersTestSuite) TestWorkflowRetryStartAndFailover() { + namespace := "test-workflow-retry-start-and-failover-" + common.GenerateRandomString(5) client1 := s.cluster1.GetFrontendClient() // active regReq := &workflowservice.RegisterNamespaceRequest{ Namespace: namespace, @@ -1938,9 +2050,9 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() { client2 := s.cluster2.GetFrontendClient() // standby // start a workflow - id := "integration-workflow-retry-failover-test" - wt := "integration-workflow-retry-failover-test-type" - tl := "integration-workflow-retry-failover-test-taskqueue" + id := "integration-workflow-retry-start-and-failover-test" + wt := "integration-workflow-retry-start-and-failover-test-type" + tl := "integration-workflow-retry-start-and-failover-test-taskqueue" identity := "worker1" workflowType := &commonpb.WorkflowType{Name: wt} taskQueue := &taskqueuepb.TaskQueue{Name: tl} @@ -1995,22 +2107,124 @@ func (s *integrationClustersTestSuite) TestWorkflowRetryFailover() { _, err = poller2.PollAndProcessWorkflowTask(false, false) s.NoError(err) events := s.getHistory(client2, namespace, executions[0]) - s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType()) + s.Equal(int64(1), events[0].GetVersion()) + s.Equal(int64(2), events[len(events)-1].GetVersion()) s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType()) // second attempt _, err = poller2.PollAndProcessWorkflowTask(false, false) s.NoError(err) events = s.getHistory(client2, namespace, executions[1]) + s.Equal(int64(2), events[0].GetVersion()) + s.Equal(int64(2), events[len(events)-1].GetVersion()) s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType()) s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt()) +} + +func (s *integrationClustersTestSuite) TestWorkflowRetryFailAndFailover() { + namespace := "test-workflow-retry-fail-and-failover-" + common.GenerateRandomString(5) + client1 := s.cluster1.GetFrontendClient() // active + regReq := &workflowservice.RegisterNamespaceRequest{ + Namespace: namespace, + IsGlobalNamespace: true, + Clusters: clusterReplicationConfig, + ActiveClusterName: clusterName[0], + WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24), + } + _, err := client1.RegisterNamespace(tests.NewContext(), regReq) + s.NoError(err) + // Wait for namespace cache to pick the change + time.Sleep(cacheRefreshInterval) + + descReq := &workflowservice.DescribeNamespaceRequest{ + Namespace: namespace, + } + resp, err := client1.DescribeNamespace(tests.NewContext(), descReq) + s.NoError(err) + s.NotNil(resp) + + client2 := s.cluster2.GetFrontendClient() // standby + + // start a workflow + id := "integration-workflow-retry-fail-and-failover-test" + wt := "integration-workflow-retry-fail-and-failover-test-type" + tl := "integration-workflow-retry-fail-and-failover-test-taskqueue" + identity := "worker1" + workflowType := &commonpb.WorkflowType{Name: wt} + taskQueue := &taskqueuepb.TaskQueue{Name: tl} + startReq := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: namespace, + WorkflowId: id, + WorkflowType: workflowType, + TaskQueue: taskQueue, + Input: nil, + WorkflowRunTimeout: timestamp.DurationPtr(100 * time.Second), + WorkflowTaskTimeout: timestamp.DurationPtr(1 * time.Second), + Identity: identity, + RetryPolicy: &commonpb.RetryPolicy{ + InitialInterval: timestamp.DurationPtr(1 * time.Second), + MaximumAttempts: 3, + MaximumInterval: timestamp.DurationPtr(1 * time.Second), + NonRetryableErrorTypes: []string{"bad-bug"}, + BackoffCoefficient: 1, + }, + } + we, err := client1.StartWorkflowExecution(tests.NewContext(), startReq) + s.NoError(err) + s.NotNil(we.GetRunId()) + + var executions []*commonpb.WorkflowExecution + wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType, + previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) { + executions = append(executions, execution) + return []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_FailWorkflowExecutionCommandAttributes{FailWorkflowExecutionCommandAttributes: &commandpb.FailWorkflowExecutionCommandAttributes{ + Failure: failure.NewServerFailure("retryable-error", false), + }}, + }}, nil + } + + poller1 := tests.TaskPoller{ + Engine: client1, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), + } + + poller2 := tests.TaskPoller{ + Engine: client2, + Namespace: namespace, + TaskQueue: taskQueue, + Identity: identity, + WorkflowTaskHandler: wtHandler, + Logger: s.logger, + T: s.T(), + } + + _, err = poller1.PollAndProcessWorkflowTask(false, false) + s.NoError(err) + events := s.getHistory(client1, namespace, executions[0]) + s.Equal(int64(1), events[0].GetVersion()) + s.Equal(int64(1), events[len(events)-1].GetVersion()) + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType()) + s.Equal(int32(1), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt()) + + s.failover(namespace, clusterName[1], int64(2), client1) - // third attempt. Still failing, should stop retry. _, err = poller2.PollAndProcessWorkflowTask(false, false) s.NoError(err) - events = s.getHistory(client2, namespace, executions[2]) + events = s.getHistory(client2, namespace, executions[1]) + s.Equal(int64(1), events[0].GetVersion()) + s.Equal(int64(2), events[len(events)-1].GetVersion()) s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, events[len(events)-1].GetEventType()) - s.Equal(int32(3), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt()) + s.Equal(int32(2), events[0].GetWorkflowExecutionStartedEventAttributes().GetAttempt()) } func (s *integrationClustersTestSuite) TestActivityHeartbeatFailover() {