Skip to content

Commit

Permalink
Fix flaky elasticsearch tests and possible duplicate records in searc…
Browse files Browse the repository at this point in the history
…h after (#1945)
  • Loading branch information
vancexu authored and longquanzheng committed Jun 12, 2019
1 parent b3f82d5 commit eff01a2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
1 change: 1 addition & 0 deletions common/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func NewClient(config *Config) (Client, error) {
client, err := elastic.NewClient(
elastic.SetURL(config.URL.String()),
elastic.SetRetrier(elastic.NewBackoffRetrier(elastic.NewExponentialBackoff(128*time.Millisecond, 513*time.Millisecond))),
elastic.SetDecoder(&elastic.NumberDecoder{}), // critical to ensure decode of int64 won't lose precise
)
if err != nil {
return nil, err
Expand Down
21 changes: 14 additions & 7 deletions host/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ import (
)

const (
numOfRetry = 50
waitTimeInMs = 400
numOfRetry = 50
waitTimeInMs = 400
waitForESToSettle = 4 * time.Second // wait es shards for some time ensure data consistent
)

type elasticsearchIntegrationSuite struct {
Expand Down Expand Up @@ -183,7 +184,6 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAttribute() {
}

func (s *elasticsearchIntegrationSuite) TestListWorkflow_PageToken() {
s.T().Skip("fixme: flaky test")
id := "es-integration-list-workflow-token-test"
wt := "es-integration-list-workflow-token-test-type"
tl := "es-integration-list-workflow-token-test-tasklist"
Expand All @@ -196,7 +196,6 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_PageToken() {
}

func (s *elasticsearchIntegrationSuite) TestListWorkflow_SearchAfter() {
s.T().Skip("fixme: flaky test")
id := "es-integration-list-workflow-searchAfter-test"
wt := "es-integration-list-workflow-searchAfter-test-type"
tl := "es-integration-list-workflow-searchAfter-test-tasklist"
Expand Down Expand Up @@ -240,6 +239,8 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrQuery() {
we3, err := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err)

time.Sleep(waitForESToSettle)

// query 1 workflow with search attr
query1 := fmt.Sprintf(`CustomIntField = %d`, 1)
var openExecution *workflow.WorkflowExecutionInfo
Expand Down Expand Up @@ -315,7 +316,6 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrQuery() {

// To test last page search trigger max window size error
func (s *elasticsearchIntegrationSuite) TestListWorkflow_MaxWindowSize() {
s.T().Skip("fixme: flaky test")
// set es index index settings
indexName := s.testClusterConfig.ESConfig.Indices[common.VisibilityAppName]
_, err := s.esClient.IndexPutSettings(indexName).
Expand All @@ -335,6 +335,8 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_MaxWindowSize() {
s.Nil(err)
}

time.Sleep(waitForESToSettle)

var listResp *workflow.ListWorkflowExecutionsResponse
var nextPageToken []byte

Expand All @@ -354,6 +356,7 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_MaxWindowSize() {
}
time.Sleep(waitTimeInMs * time.Millisecond)
}
s.NotNil(listResp)
s.True(len(listResp.GetNextPageToken()) != 0)

// the last request
Expand Down Expand Up @@ -402,6 +405,8 @@ func (s *elasticsearchIntegrationSuite) TestListWorkflow_OrderBy() {
s.Nil(err)
}

time.Sleep(waitForESToSettle)

desc := "desc"
asc := "asc"
queryTemplate := `WorkflowType = "%s" order by %s %s`
Expand Down Expand Up @@ -512,6 +517,8 @@ func (s *elasticsearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, p
s.Nil(err)
}

time.Sleep(waitForESToSettle)

var openExecutions []*workflow.WorkflowExecutionInfo
var nextPageToken []byte

Expand All @@ -534,7 +541,7 @@ func (s *elasticsearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, p
s.Nil(err)
if len(resp.GetExecutions()) == pageSize {
openExecutions = resp.GetExecutions()
nextPageToken = resp.NextPageToken
nextPageToken = resp.GetNextPageToken()
break
}
time.Sleep(waitTimeInMs * time.Millisecond)
Expand All @@ -559,7 +566,7 @@ func (s *elasticsearchIntegrationSuite) testListWorkflowHelper(numOfWorkflows, p
if len(resp.GetExecutions()) == numOfWorkflows-pageSize {
inIf = true
openExecutions = resp.GetExecutions()
nextPageToken = resp.NextPageToken
nextPageToken = resp.GetNextPageToken()
break
}
time.Sleep(waitTimeInMs * time.Millisecond)
Expand Down

0 comments on commit eff01a2

Please sign in to comment.