Skip to content

Commit

Permalink
Change Scan API to have no Order(ES) (temporalio#4134)
Browse files Browse the repository at this point in the history
* Change Scan API to have no Order(ES)

* Refactoring based on PR comments

* PR comments

* Fix return order
  • Loading branch information
samanbarghi committed May 2, 2023
1 parent b488c47 commit f59bc18
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ var defaultSorter = []elastic.Sorter{
elastic.NewFieldSort(searchattribute.StartTime).Desc().Missing("_first"),
}

var docSorter = []elastic.Sorter{
elastic.SortByDoc{},
}

type (
visibilityStore struct {
esClient client.Client
Expand All @@ -87,6 +91,10 @@ type (
// For ES>=7.10.0 and "default" flavor.
PointInTimeID string
}

searchParameterBuilder struct {
store *visibilityStore
}
)

var _ store.VisibilityStore = (*visibilityStore)(nil)
Expand Down Expand Up @@ -439,7 +447,7 @@ func (s *visibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getListFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -478,7 +486,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithPit(
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -527,7 +535,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithScroll(ctx context.Context,
// First call doesn't have token with ScrollID.
if len(request.NextPageToken) == 0 {
// First page.
p, err := s.buildSearchParametersV2(request)
p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -663,6 +671,7 @@ func (s *visibilityStore) buildSearchParameters(

func (s *visibilityStore) buildSearchParametersV2(
request *manager.ListWorkflowExecutionsRequestV2,
getFieldSorter func([]*elastic.FieldSort) ([]elastic.Sorter, error),
) (*client.SearchParameters, error) {

boolQuery, fieldSorts, err := s.convertQuery(
Expand All @@ -683,11 +692,16 @@ func (s *visibilityStore) buildSearchParametersV2(
return nil, serviceerror.NewInvalidArgument("ORDER BY clause is not supported")
}

sorter, err := getFieldSorter(fieldSorts)
if err != nil {
return nil, err
}

params := &client.SearchParameters{
Index: s.index,
Query: boolQuery,
PageSize: request.PageSize,
Sorter: s.setDefaultFieldSort(fieldSorts),
Sorter: sorter,
}

return params, nil
Expand Down Expand Up @@ -730,11 +744,23 @@ func (s *visibilityStore) convertQuery(
return namespaceFilterQuery, fieldSorts, nil
}

func (s *visibilityStore) setDefaultFieldSort(fieldSorts []*elastic.FieldSort) []elastic.Sorter {
func (s *visibilityStore) getScanFieldSorter(fieldSorts []*elastic.FieldSort) ([]elastic.Sorter, error) {
// custom order is not supported by Scan API
if len(fieldSorts) > 0 {
return nil, serviceerror.NewInvalidArgument("ORDER BY clause is not supported")
}

return docSorter, nil
}

func (s *visibilityStore) getListFieldSorter(fieldSorts []*elastic.FieldSort) ([]elastic.Sorter, error) {
if len(fieldSorts) == 0 {
return defaultSorter
return defaultSorter, nil
}
return s.getFieldSorter(fieldSorts), nil
}

func (s *visibilityStore) getFieldSorter(fieldSorts []*elastic.FieldSort) []elastic.Sorter {
s.metricsHandler.Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Record(1)
res := make([]elastic.Sorter, len(fieldSorts)+1)
for i, fs := range fieldSorts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,53 @@ func (s *ESVisibilitySuite) TestBuildSearchParameters() {
}, p)
}

func (s *ESVisibilitySuite) TestGetListFieldSorter() {

// test defaultSorter is returned when fieldSorts is empty
fieldSorts := make([]*elastic.FieldSort, 0)
sorter, err := s.visibilityStore.getListFieldSorter(fieldSorts)
s.NoError(err)
s.Equal(defaultSorter, sorter)

// test passing non-empty fieldSorts
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
sorter, err = s.visibilityStore.getListFieldSorter(testFieldSorts[:])
expectedSorter := make([]elastic.Sorter, len(testFieldSorts)+1)
expectedSorter[0] = testFieldSorts[0]
expectedSorter[1] = testFieldSorts[1]
expectedSorter[2] = elastic.NewFieldSort(searchattribute.RunID).Desc()
s.NoError(err)
s.Equal(expectedSorter, sorter)

}

func (s *ESVisibilitySuite) TestGetScanFieldSorter() {
// test docSorter is returned when fieldSorts is empty
fieldSorts := make([]*elastic.FieldSort, 0)
sorter, err := s.visibilityStore.getScanFieldSorter(fieldSorts)
s.NoError(err)
s.Equal(docSorter, sorter)

// test error is returned if fieldSorts is not empty
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
sorter, err = s.visibilityStore.getScanFieldSorter(testFieldSorts[:])
s.Error(err)
s.Nil(sorter)
}
func (s *ESVisibilitySuite) TestGetFieldSorter() {

// test passing non-empty fieldSorts and not sortByDoc
testFieldSorts := [2]*elastic.FieldSort{elastic.NewFieldSort("_test"), elastic.NewFieldSort("_second_tes")}
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
sorter := s.visibilityStore.getFieldSorter(testFieldSorts[:])
expectedSorter := make([]elastic.Sorter, len(testFieldSorts)+1)
expectedSorter[0] = testFieldSorts[0]
expectedSorter[1] = testFieldSorts[1]
expectedSorter[2] = elastic.NewFieldSort(searchattribute.RunID).Desc()
s.Equal(expectedSorter, sorter)
}

func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request := &manager.ListWorkflowExecutionsRequestV2{
NamespaceID: testNamespaceID,
Expand All @@ -428,7 +475,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request.Query = `WorkflowId="guid-2208"`
filterQuery := elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery := elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err := s.visibilityStore.buildSearchParametersV2(request)
p, err := s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -445,7 +492,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
// note namespace division appears in the filterQuery, not the boolQuery like the negative version
filterQuery = elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"), matchNSDivision)
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery)
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -461,7 +508,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
request.Query = `Order bY WorkflowId`
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery).MustNot(namespaceDivisionExists)
s.mockMetricsHandler.EXPECT().Counter(metrics.ElasticsearchCustomOrderByClauseCount.GetMetricName()).Return(metrics.NoopCounterMetricFunc)
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -476,9 +523,32 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2() {
}, p)
request.Query = ""

// test with Scan API
request.Query = `WorkflowId="guid-2208"`
filterQuery = elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery = elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getScanFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Query: boolQuery,
SearchAfter: nil,
PointInTime: nil,
PageSize: testPageSize,
Sorter: docSorter,
}, p)
request.Query = ""

// test with Scan API with custom sort
request.Query = `Order bY WorkflowId`
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getScanFieldSorter)
s.Error(err)
s.Nil(p)
request.Query = ""

// test for wrong query
request.Query = "invalid query"
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.Nil(p)
s.Error(err)
request.Query = ""
Expand All @@ -500,7 +570,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2DisableOrderByClause() {
request.Query = `WorkflowId="guid-2208"`
filterQuery := elastic.NewBoolQuery().Filter(elastic.NewMatchQuery(searchattribute.WorkflowID, "guid-2208"))
boolQuery := elastic.NewBoolQuery().Filter(matchNamespaceQuery, filterQuery).MustNot(namespaceDivisionExists)
p, err := s.visibilityStore.buildSearchParametersV2(request)
p, err := s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.NoError(err)
s.Equal(&client.SearchParameters{
Index: testIndex,
Expand All @@ -514,7 +584,7 @@ func (s *ESVisibilitySuite) TestBuildSearchParametersV2DisableOrderByClause() {

// test invalid query with ORDER BY
request.Query = `ORDER BY WorkflowId`
p, err = s.visibilityStore.buildSearchParametersV2(request)
p, err = s.visibilityStore.buildSearchParametersV2(request, s.visibilityStore.getListFieldSorter)
s.Nil(p)
s.Error(err)
var invalidArgumentErr *serviceerror.InvalidArgument
Expand Down

0 comments on commit f59bc18

Please sign in to comment.