Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Scan API to have no Order(ES) #4134

Merged
merged 4 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ var defaultSorter = []elastic.Sorter{
elastic.NewFieldSort(searchattribute.StartTime).Desc().Missing("_first"),
}

var docSorter = []elastic.Sorter{
elastic.SortByDoc{},
}

type (
visibilityStore struct {
esClient client.Client
Expand All @@ -87,6 +91,10 @@ type (
// For ES>=7.10.0 and "default" flavor.
PointInTimeID string
}

searchParameterBuilder struct {
store *visibilityStore
}
Comment on lines +94 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used anywhere. Remove it.

)

var _ store.VisibilityStore = (*visibilityStore)(nil)
Expand Down Expand Up @@ -439,7 +447,7 @@ func (s *visibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getListFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -478,7 +486,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithPit(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -527,7 +535,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithScroll(ctx context.Context,
// First call doesn't have token with ScrollID.
if len(request.NextPageToken) == 0 {
// First page.
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -663,6 +671,7 @@ func (s *visibilityStore) buildSearchParameters(

func (s *visibilityStore) buildSearchParametersV2(
request *manager.ListWorkflowExecutionsRequestV2,
getFieldSorter func([]*elastic.FieldSort) ([]elastic.Sorter, error),
) (*client.SearchParameters, error) {

boolQuery, fieldSorts, err := s.convertQuery(
Expand All @@ -683,11 +692,16 @@ func (s *visibilityStore) buildSearchParametersV2(
return nil, serviceerror.NewInvalidArgument("ORDER BY clause is not supported")
}

sorter, err := getFieldSorter(fieldSorts)
if err != nil {
return nil, err
}

params := &client.SearchParameters{
Index: s.index,
Query: boolQuery,
PageSize: request.PageSize,
Sorter: s.setDefaultFieldSort(fieldSorts),
Sorter: sorter,
}

return params, nil
Expand Down Expand Up @@ -730,11 +744,23 @@ func (s *visibilityStore) convertQuery(
return namespaceFilterQuery, fieldSorts, nil
}

func (s *visibilityStore) setDefaultFieldSort(fieldSorts []*elastic.FieldSort) []elastic.Sorter {
func (s *visibilityStore) getScanFieldSorter(fieldSorts []*elastic.FieldSort) ([]elastic.Sorter, error) {
// custom order is not supported by Scan API
if len(fieldSorts) > 0 {
return nil, serviceerror.NewInvalidArgument("ORDER BY clause is not supported")
}

return docSorter, nil
}

func (s *visibilityStore) getListFieldSorter(fieldSorts []*elastic.FieldSort) ([]elastic.Sorter, error) {
if len(fieldSorts) == 0 {
return defaultSorter
return defaultSorter, nil
}
return s.getFieldSorter(fieldSorts), nil
}

func (s *visibilityStore) getFieldSorter(fieldSorts []*elastic.FieldSort) []elastic.Sorter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by getListFieldSorter, no need to be a separate function.

s.metricsHandler.Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Record(1)
res := make([]elastic.Sorter, len(fieldSorts)+1)
for i, fs := range fieldSorts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,53 @@ func (s *ESVisibilitySuite) TestBuildSearchParameters() {
}, p)
}

func (s *ESVisibilitySuite) TestGetListFieldSorter() {

// test defaultSorter is returned when fieldSorts is empty
fieldSorts := make([]*elastic.FieldSort, 0)
sorter, err := s.visibilityStore.getListFieldSorter(fieldSorts)
s.NoError(err)
s.Equal(defaultSorter, sorter)

// test passing non-empty fieldSorts
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
sorter, err = s.visibilityStore.getListFieldSorter(testFieldSorts[:])
expectedSorter := make([]elastic.Sorter, len(testFieldSorts)+1)
expectedSorter[0] = testFieldSorts[0]
expectedSorter[1] = testFieldSorts[1]
expectedSorter[2] = elastic.NewFieldSort(searchattribute.RunID).Desc()
s.NoError(err)
s.Equal(expectedSorter, sorter)

}

func (s *ESVisibilitySuite) TestGetScanFieldSorter() {
// test docSorter is returned when fieldSorts is empty
fieldSorts := make([]*elastic.FieldSort, 0)
sorter, err := s.visibilityStore.getScanFieldSorter(fieldSorts)
s.NoError(err)
s.Equal(docSorter, sorter)

// test error is returned if fieldSorts is not empty
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
sorter, err = s.visibilityStore.getScanFieldSorter(testFieldSorts[:])
s.Error(err)
s.Nil(sorter)
}
func (s *ESVisibilitySuite) TestGetFieldSorter() {

// test passing non-empty fieldSorts and not sortByDoc
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
sorter := s.visibilityStore.getFieldSorter(testFieldSorts[:])
expectedSorter := make([]elastic.Sorter, len(testFieldSorts)+1)
expectedSorter[0] = testFieldSorts[0]
expectedSorter[1] = testFieldSorts[1]
expectedSorter[2] = elastic.NewFieldSort(searchattribute.RunID).Desc()
s.Equal(expectedSorter, sorter)
}

func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: testNamespaceID,
Expand All @@ -428,7 +475,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request.Query = `WorkflowId="guid-2208"`
filterQuery := elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery := elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err := s.visibilityStore.buildSearchParametersV2(request)
p, err := s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -445,7 +492,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
// note namespace division appears in the filterQuery, not the boolQuery like the negative version
filterQuery = elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"), matchNSDivision)
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery)
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -461,7 +508,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request.Query = `Order bY WorkflowId`
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery).MustNot(namespaceDivisionExists)
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -476,9 +523,32 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
}, p)
request.Query = ""

// test with Scan API
request.Query = `WorkflowId="guid-2208"`
filterQuery = elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getScanFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Query: boolQuery,
SearchAfter: nil,
PointInTime: nil,
PageSize: testPageSize,
Sorter: docSorter,
}, p)
request.Query = ""

// test with Scan API with custom sort
request.Query = `Order bY WorkflowId`
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getScanFieldSorter)
s.Error(err)
s.Nil(p)
request.Query = ""

// test for wrong query
request.Query = "invalid query"
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.Nil(p)
s.Error(err)
request.Query = ""
Expand All @@ -500,7 +570,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2DisableOrderByClause() {
request.Query = `WorkflowId="guid-2208"`
filterQuery := elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery := elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err := s.visibilityStore.buildSearchParametersV2(request)
p, err := s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -514,7 +584,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2DisableOrderByClause() {

// test invalid query with ORDER BY
request.Query = `ORDER BY WorkflowId`
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.Nil(p)
s.Error(err)
var invalidArgumentErr *serviceerror.InvalidArgument
Expand Down