Skip to content

Commit

Permalink
enable state-based replication for some xdc tests (temporalio#6958)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
enable state-based replication for some xdc tests

## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
hai719 authored Dec 11, 2024
1 parent 154b40d commit 09a61bf
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 55 deletions.
67 changes: 54 additions & 13 deletions tests/xdc/advanced_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commandpb "go.temporal.io/api/command/v1"
Expand All @@ -46,8 +47,10 @@ import (
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/searchattribute"
Expand Down Expand Up @@ -77,6 +80,9 @@ type AdvVisCrossDCTestSuite struct {
clusterConfigs []*testcore.TestClusterConfig
isElasticsearchEnabled bool

dynamicConfigOverrides map[dynamicconfig.Key]interface{}
enableTransitionHistory bool

testSearchAttributeKey string
testSearchAttributeVal string

Expand All @@ -86,7 +92,26 @@ type AdvVisCrossDCTestSuite struct {

func TestAdvVisCrossDCTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(AdvVisCrossDCTestSuite))
for _, tc := range []struct {
name string
enableTransitionHistory bool
}{
{
name: "EnableTransitionHistory",
enableTransitionHistory: true,
},
{
name: "DisableTransitionHistory",
enableTransitionHistory: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
s := &AdvVisCrossDCTestSuite{
enableTransitionHistory: tc.enableTransitionHistory,
}
suite.Run(t, s)
})
}
}

var (
Expand All @@ -105,6 +130,10 @@ func (s *AdvVisCrossDCTestSuite) SetupSuite() {
s.logger = log.NewTestLogger()
s.testClusterFactory = testcore.NewTestClusterFactory()

s.dynamicConfigOverrides = map[dynamicconfig.Key]any{
dynamicconfig.EnableTransitionHistory.Key(): s.enableTransitionHistory,
}

var fileName string
if testcore.UsingSQLAdvancedVisibility() {
// NOTE: can't use xdc_clusters.yaml here because it somehow interferes with the other xDC tests.
Expand All @@ -130,6 +159,10 @@ func (s *AdvVisCrossDCTestSuite) SetupSuite() {
s.Require().NoError(yaml.Unmarshal(confContent, &clusterConfigs))
s.clusterConfigs = clusterConfigs

for _, config := range clusterConfigs {
config.DynamicConfigOverrides = s.dynamicConfigOverrides
}

c, err := s.testClusterFactory.NewCluster(s.T(), clusterConfigs[0], log.With(s.logger, tag.ClusterName(clusterNameAdvVis[0])))
s.Require().NoError(err)
s.cluster1 = c
Expand Down Expand Up @@ -178,10 +211,10 @@ func (s *AdvVisCrossDCTestSuite) TearDownSuite() {
}

func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
namespace := "test-xdc-search-attr-" + common.GenerateRandomString(5)
ns := "test-xdc-search-attr-" + common.GenerateRandomString(5)
client1 := s.cluster1.FrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
Namespace: ns,
Clusters: clusterReplicationConfigAdvVis,
ActiveClusterName: clusterNameAdvVis[0],
IsGlobalNamespace: true,
Expand All @@ -195,7 +228,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
if !s.isElasticsearchEnabled {
// When Elasticsearch is enabled, the search attribute aliases are not used.
_, err = client1.UpdateNamespace(testcore.NewContext(), &workflowservice.UpdateNamespaceRequest{
Namespace: namespace,
Namespace: ns,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: map[string]string{
"Bool01": "CustomBoolField",
Expand All @@ -212,8 +245,16 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
time.Sleep(cacheRefreshInterval) // nolint:forbidigo
}

s.EventuallyWithT(func(t *assert.CollectT) {
// Wait for namespace record to be replicated and loaded into memory.
for _, r := range s.cluster2.Host().FrontendNamespaceRegistries() {
_, err := r.GetNamespace(namespace.Name(ns))
assert.NoError(t, err)
}
}, 15*time.Second, 500*time.Millisecond)

descReq := &workflowservice.DescribeNamespaceRequest{
Namespace: namespace,
Namespace: ns,
}
resp, err := client1.DescribeNamespace(testcore.NewContext(), descReq)
s.NoError(err)
Expand All @@ -239,7 +280,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
}
startReq := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: namespace,
Namespace: ns,
WorkflowId: id,
WorkflowType: workflowType,
TaskQueue: taskQueue,
Expand All @@ -259,7 +300,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
startFilter := &filterpb.StartTimeFilter{}
startFilter.EarliestTime = timestamppb.New(startTime)
saListRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Namespace: ns,
PageSize: 5,
Query: fmt.Sprintf(`WorkflowId = "%s" and %s = "%s"`, id, s.testSearchAttributeKey, s.testSearchAttributeVal),
}
Expand Down Expand Up @@ -307,7 +348,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {

poller := testcore.TaskPoller{
Client: client1,
Namespace: namespace,
Namespace: ns,
TaskQueue: taskQueue,
Identity: identity,
WorkflowTaskHandler: wtHandler,
Expand Down Expand Up @@ -354,7 +395,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
}

saListRequest = &workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Namespace: ns,
PageSize: int32(2),
Query: fmt.Sprintf(`WorkflowId = "%s" and %s = "another string"`, id, s.testSearchAttributeKey),
}
Expand All @@ -365,7 +406,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
testListResult(engine2, saListRequest)

runningListRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Namespace: ns,
PageSize: int32(2),
Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Running'`, wt),
}
Expand All @@ -378,7 +419,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
terminateReason := "force terminate to make sure standby process tasks"
terminateDetails := payloads.EncodeString("terminate details")
_, err = client1.TerminateWorkflowExecution(testcore.NewContext(), &workflowservice.TerminateWorkflowExecutionRequest{
Namespace: namespace,
Namespace: ns,
WorkflowExecution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
Expand All @@ -390,7 +431,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {

// check terminate done
getHistoryReq := &workflowservice.GetWorkflowExecutionHistoryRequest{
Namespace: namespace,
Namespace: ns,
Execution: &commonpb.WorkflowExecution{
WorkflowId: id,
},
Expand Down Expand Up @@ -426,7 +467,7 @@ func (s *AdvVisCrossDCTestSuite) TestSearchAttributes() {
}, waitTimeInMs*numOfRetry*time.Millisecond, waitTimeInMs*time.Millisecond)

terminatedListRequest := &workflowservice.ListWorkflowExecutionsRequest{
Namespace: namespace,
Namespace: ns,
PageSize: int32(2),
Query: fmt.Sprintf(`WorkflowType = '%s' and ExecutionStatus = 'Terminated'`, wt),
}
Expand Down
5 changes: 5 additions & 0 deletions tests/xdc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/sdk/converter"
"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -93,6 +94,10 @@ func (s *xdcBaseSuite) setupSuite(clusterNames []string, opts ...testcore.Option
params := testcore.ApplyTestClusterParams(opts)

s.clusterNames = clusterNames
for idx, clusterName := range s.clusterNames {
s.clusterNames[idx] = clusterName + "_" + common.GenerateRandomString(5)
}

if s.logger == nil {
s.logger = log.NewTestLogger()
}
Expand Down
30 changes: 24 additions & 6 deletions tests/xdc/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,25 @@ type (

func TestFuncClustersTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(FunctionalClustersTestSuite))
for _, tc := range []struct {
name string
enableTransitionHistory bool
}{
{
name: "EnableTransitionHistory",
enableTransitionHistory: true,
},
{
name: "DisableTransitionHistory",
enableTransitionHistory: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
s := &FunctionalClustersTestSuite{}
s.enableTransitionHistory = tc.enableTransitionHistory
suite.Run(t, s)
})
}
}

func (s *FunctionalClustersTestSuite) SetupSuite() {
Expand Down Expand Up @@ -480,11 +498,11 @@ func (s *FunctionalClustersTestSuite) TestStickyWorkflowTaskFailover() {
client2 := s.cluster2.FrontendClient() // standby

// Start a workflow
id := "functional-sticky-workflow-task-workflow-failover-test"
wt := "functional-sticky-workflow-task-workflow-failover-test-type"
tq := "functional-sticky-workflow-task-workflow-failover-test-taskqueue"
stq1 := "functional-sticky-workflow-task-workflow-failover-test-taskqueue-sticky1"
stq2 := "functional-sticky-workflow-task-workflow-failover-test-taskqueue-sticky2"
id := "functional-sticky-workflow-task-workflow-failover-test-" + "TransitionHistory" + strconv.FormatBool(s.enableTransitionHistory)
wt := id + "-type"
tq := id + "-taskqueue"
stq1 := id + "-taskqueue-sticky1"
stq2 := id + "-taskqueue-sticky2"
identity1 := "worker1"
identity2 := "worker2"

Expand Down
Loading

0 comments on commit 09a61bf

Please sign in to comment.