Skip to content

Commit

Permalink
Add more test cases for XDC (#4009)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 14, 2023
1 parent e23208e commit 9363bb7
Showing 1 changed file with 113 additions and 0 deletions.
113 changes: 113 additions & 0 deletions tests/xdc/integration_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,119 @@ func (s *integrationClustersTestSuite) TestUserTimerFailover() {
}
}

func (s *integrationClustersTestSuite) TestForceWorkflowTaskClose() {
namespace := "test-force-workflow-task-close-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: clusterReplicationConfig,
ActiveClusterName: clusterName[0],
WorkflowExecutionRetentionPeriod: timestamp.DurationPtr(1 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(tests.NewContext(), regReq)
s.NoError(err)
// Wait for namespace cache to pick the change
time.Sleep(cacheRefreshInterval)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
}
resp, err := client1.DescribeNamespace(tests.NewContext(), descReq)
s.NoError(err)
s.NotNil(resp)

client2 := s.cluster2.GetFrontendClient() // standby

// Start a workflow
id := "test-force-workflow-task-close-test"
wt := "test-force-workflow-task-close-test-type"
tl := "test-force-workflow-task-close-test-taskqueue"
identity := "worker1"
workflowType := &commonpb.WorkflowType{Name: wt}
taskQueue := &taskqueuepb.TaskQueue{Name: tl}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Input: nil,
WorkflowRunTimeout: timestamp.DurationPtr(300 * time.Second),
WorkflowTaskTimeout: timestamp.DurationPtr(60 * time.Second),
Identity: identity,
}
var we *workflowservice.StartWorkflowExecutionResponse
for i := 0; i < 10; i++ {
we, err = client1.StartWorkflowExecution(tests.NewContext(), startReq)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
s.NoError(err)
s.NotNil(we.GetRunId())

s.logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.GetRunId()))

workflowFinished := false
wtHandler := func(execution *commonpb.WorkflowExecution, wt *commonpb.WorkflowType,
previousStartedEventID, startedEventID int64, history *historypb.History) ([]*commandpb.Command, error) {

workflowFinished = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
}},
}}, nil
}

poller1 := &tests.TaskPoller{
Engine: client1,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

poller2 := &tests.TaskPoller{
Engine: client2,
Namespace: namespace,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Logger: s.logger,
T: s.T(),
}

// this will fail the workflow task
_, err = poller1.PollAndProcessWorkflowTask(false, true)
s.NoError(err)

s.failover(namespace, clusterName[1], int64(2), client1)

// Send a signal in cluster
signalName := "my signal"
signalInput := payloads.EncodeString("my signal input")
_, err = client2.SignalWorkflowExecution(tests.NewContext(), &workflowservice.SignalWorkflowExecutionRequest{
Namespace: namespace,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
RunId: we.GetRunId(),
},
SignalName: signalName,
Input: signalInput,
})
s.NoError(err)

_, err = poller2.PollAndProcessWorkflowTaskWithAttempt(false, false, false, false, 1)
s.NoError(err)
s.True(workflowFinished)
}

func (s *integrationClustersTestSuite) TestTransientWorkflowTaskFailover() {
namespace := "test-transient-workflow-task-workflow-failover-" + common.GenerateRandomString(5)
client1 := s.cluster1.GetFrontendClient() // active
Expand Down

0 comments on commit 9363bb7

Please sign in to comment.