From bb5094714ff1ef329beca757545fd9c9be279685 Mon Sep 17 00:00:00 2001 From: Rodrigo Zhou Date: Wed, 19 Jul 2023 16:31:52 -0500 Subject: [PATCH] Add support for ES Scroll for Scan API (#4614) **What changed?** Add support for ES Scroll for Scan API. This is a revert of https://github.com/temporalio/temporal/pull/4223 and https://github.com/temporalio/temporal/pull/4249. **Why?** ES 7.10 hosted in AWS is the OSS flavor, not the default one which includes support for PIT. **How did you test it?** Add unit tests. **Potential risks** No. **Is hotfix candidate?** --- .../store/elasticsearch/client/client.go | 6 + .../store/elasticsearch/client/client_mock.go | 116 ++++++++++++ .../store/elasticsearch/client/client_v7.go | 63 +++++++ .../store/elasticsearch/visibility_store.go | 72 +++++++- .../visibility_store_read_test.go | 166 +++++++++++++----- 5 files changed, 374 insertions(+), 49 deletions(-) diff --git a/common/persistence/visibility/store/elasticsearch/client/client.go b/common/persistence/visibility/store/elasticsearch/client/client.go index 048eb64bf5bd..504d7ead8e29 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client.go +++ b/common/persistence/visibility/store/elasticsearch/client/client.go @@ -52,6 +52,11 @@ type ( WaitForYellowStatus(ctx context.Context, index string) (string, error) GetMapping(ctx context.Context, index string) (map[string]string, error) + OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*elastic.SearchResult, error) + Scroll(ctx context.Context, id string, keepAliveInterval string) (*elastic.SearchResult, error) + CloseScroll(ctx context.Context, id string) error + + IsPointInTimeSupported(ctx context.Context) bool OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error) ClosePointInTime(ctx context.Context, id string) (bool, error) } @@ -79,6 +84,7 @@ type ( Sorter []elastic.Sorter SearchAfter []interface{} + ScrollID string PointInTime *elastic.PointInTime } ) diff --git a/common/persistence/visibility/store/elasticsearch/client/client_mock.go b/common/persistence/visibility/store/elasticsearch/client/client_mock.go index 8312bfa63957..243576d5f6be 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_mock.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_mock.go @@ -75,6 +75,20 @@ func (mr *MockClientMockRecorder) ClosePointInTime(ctx, id interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClosePointInTime", reflect.TypeOf((*MockClient)(nil).ClosePointInTime), ctx, id) } +// CloseScroll mocks base method. +func (m *MockClient) CloseScroll(ctx context.Context, id string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseScroll", ctx, id) + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseScroll indicates an expected call of CloseScroll. +func (mr *MockClientMockRecorder) CloseScroll(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseScroll", reflect.TypeOf((*MockClient)(nil).CloseScroll), ctx, id) +} + // Count mocks base method. func (m *MockClient) Count(ctx context.Context, index string, query v7.Query) (int64, error) { m.ctrl.T.Helper() @@ -120,6 +134,20 @@ func (mr *MockClientMockRecorder) GetMapping(ctx, index interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapping", reflect.TypeOf((*MockClient)(nil).GetMapping), ctx, index) } +// IsPointInTimeSupported mocks base method. +func (m *MockClient) IsPointInTimeSupported(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsPointInTimeSupported", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsPointInTimeSupported indicates an expected call of IsPointInTimeSupported. +func (mr *MockClientMockRecorder) IsPointInTimeSupported(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPointInTimeSupported", reflect.TypeOf((*MockClient)(nil).IsPointInTimeSupported), ctx) +} + // OpenPointInTime mocks base method. func (m *MockClient) OpenPointInTime(ctx context.Context, index, keepAliveInterval string) (string, error) { m.ctrl.T.Helper() @@ -135,6 +163,21 @@ func (mr *MockClientMockRecorder) OpenPointInTime(ctx, index, keepAliveInterval return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenPointInTime", reflect.TypeOf((*MockClient)(nil).OpenPointInTime), ctx, index, keepAliveInterval) } +// OpenScroll mocks base method. +func (m *MockClient) OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*v7.SearchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OpenScroll", ctx, p, keepAliveInterval) + ret0, _ := ret[0].(*v7.SearchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OpenScroll indicates an expected call of OpenScroll. +func (mr *MockClientMockRecorder) OpenScroll(ctx, p, keepAliveInterval interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenScroll", reflect.TypeOf((*MockClient)(nil).OpenScroll), ctx, p, keepAliveInterval) +} + // PutMapping mocks base method. func (m *MockClient) PutMapping(ctx context.Context, index string, mapping map[string]v1.IndexedValueType) (bool, error) { m.ctrl.T.Helper() @@ -165,6 +208,21 @@ func (mr *MockClientMockRecorder) RunBulkProcessor(ctx, p interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunBulkProcessor", reflect.TypeOf((*MockClient)(nil).RunBulkProcessor), ctx, p) } +// Scroll mocks base method. +func (m *MockClient) Scroll(ctx context.Context, id, keepAliveInterval string) (*v7.SearchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scroll", ctx, id, keepAliveInterval) + ret0, _ := ret[0].(*v7.SearchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Scroll indicates an expected call of Scroll. +func (mr *MockClientMockRecorder) Scroll(ctx, id, keepAliveInterval interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scroll", reflect.TypeOf((*MockClient)(nil).Scroll), ctx, id, keepAliveInterval) +} + // Search mocks base method. func (m *MockClient) Search(ctx context.Context, p *SearchParameters) (*v7.SearchResult, error) { m.ctrl.T.Helper() @@ -233,6 +291,20 @@ func (mr *MockCLIClientMockRecorder) ClosePointInTime(ctx, id interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClosePointInTime", reflect.TypeOf((*MockCLIClient)(nil).ClosePointInTime), ctx, id) } +// CloseScroll mocks base method. +func (m *MockCLIClient) CloseScroll(ctx context.Context, id string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseScroll", ctx, id) + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseScroll indicates an expected call of CloseScroll. +func (mr *MockCLIClientMockRecorder) CloseScroll(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseScroll", reflect.TypeOf((*MockCLIClient)(nil).CloseScroll), ctx, id) +} + // Count mocks base method. func (m *MockCLIClient) Count(ctx context.Context, index string, query v7.Query) (int64, error) { m.ctrl.T.Helper() @@ -292,6 +364,20 @@ func (mr *MockCLIClientMockRecorder) GetMapping(ctx, index interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMapping", reflect.TypeOf((*MockCLIClient)(nil).GetMapping), ctx, index) } +// IsPointInTimeSupported mocks base method. +func (m *MockCLIClient) IsPointInTimeSupported(ctx context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsPointInTimeSupported", ctx) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsPointInTimeSupported indicates an expected call of IsPointInTimeSupported. +func (mr *MockCLIClientMockRecorder) IsPointInTimeSupported(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPointInTimeSupported", reflect.TypeOf((*MockCLIClient)(nil).IsPointInTimeSupported), ctx) +} + // OpenPointInTime mocks base method. func (m *MockCLIClient) OpenPointInTime(ctx context.Context, index, keepAliveInterval string) (string, error) { m.ctrl.T.Helper() @@ -307,6 +393,21 @@ func (mr *MockCLIClientMockRecorder) OpenPointInTime(ctx, index, keepAliveInterv return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenPointInTime", reflect.TypeOf((*MockCLIClient)(nil).OpenPointInTime), ctx, index, keepAliveInterval) } +// OpenScroll mocks base method. +func (m *MockCLIClient) OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*v7.SearchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OpenScroll", ctx, p, keepAliveInterval) + ret0, _ := ret[0].(*v7.SearchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OpenScroll indicates an expected call of OpenScroll. +func (mr *MockCLIClientMockRecorder) OpenScroll(ctx, p, keepAliveInterval interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenScroll", reflect.TypeOf((*MockCLIClient)(nil).OpenScroll), ctx, p, keepAliveInterval) +} + // PutMapping mocks base method. func (m *MockCLIClient) PutMapping(ctx context.Context, index string, mapping map[string]v1.IndexedValueType) (bool, error) { m.ctrl.T.Helper() @@ -337,6 +438,21 @@ func (mr *MockCLIClientMockRecorder) RunBulkProcessor(ctx, p interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunBulkProcessor", reflect.TypeOf((*MockCLIClient)(nil).RunBulkProcessor), ctx, p) } +// Scroll mocks base method. +func (m *MockCLIClient) Scroll(ctx context.Context, id, keepAliveInterval string) (*v7.SearchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scroll", ctx, id, keepAliveInterval) + ret0, _ := ret[0].(*v7.SearchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Scroll indicates an expected call of Scroll. +func (mr *MockCLIClientMockRecorder) Scroll(ctx, id, keepAliveInterval interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scroll", reflect.TypeOf((*MockCLIClient)(nil).Scroll), ctx, id, keepAliveInterval) +} + // Search mocks base method. func (m *MockCLIClient) Search(ctx context.Context, p *SearchParameters) (*v7.SearchResult, error) { m.ctrl.T.Helper() diff --git a/common/persistence/visibility/store/elasticsearch/client/client_v7.go b/common/persistence/visibility/store/elasticsearch/client/client_v7.go index 0eae3d6e594e..24a8f7f6db94 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_v7.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_v7.go @@ -30,8 +30,10 @@ import ( "net/http" "net/url" "strings" + "sync" "time" + "github.com/blang/semver/v4" "github.com/olivere/elastic/v7" "github.com/olivere/elastic/v7/uritemplates" enumspb "go.temporal.io/api/enums/v1" @@ -44,9 +46,20 @@ type ( clientImpl struct { esClient *elastic.Client url url.URL + + initIsPointInTimeSupported sync.Once + isPointInTimeSupported bool } ) +const ( + pointInTimeSupportedFlavor = "default" // the other flavor is "oss" +) + +var ( + pointInTimeSupportedIn = semver.MustParseRange(">=7.10.0") +) + var _ Client = (*clientImpl)(nil) // newClient create a ES client @@ -138,6 +151,56 @@ func (c *clientImpl) Search(ctx context.Context, p *SearchParameters) (*elastic. return searchService.Do(ctx) } +func (c *clientImpl) OpenScroll( + ctx context.Context, + p *SearchParameters, + keepAliveInterval string, +) (*elastic.SearchResult, error) { + scrollService := elastic.NewScrollService(c.esClient). + Index(p.Index). + Query(p.Query). + SortBy(p.Sorter...). + KeepAlive(keepAliveInterval) + if p.PageSize != 0 { + scrollService.Size(p.PageSize) + } + return scrollService.Do(ctx) +} + +func (c *clientImpl) Scroll( + ctx context.Context, + id string, + keepAliveInterval string, +) (*elastic.SearchResult, error) { + return elastic.NewScrollService(c.esClient).ScrollId(id).KeepAlive(keepAliveInterval).Do(ctx) +} + +func (c *clientImpl) CloseScroll(ctx context.Context, id string) error { + return elastic.NewScrollService(c.esClient).ScrollId(id).Clear(ctx) +} + +func (c *clientImpl) IsPointInTimeSupported(ctx context.Context) bool { + c.initIsPointInTimeSupported.Do(func() { + c.isPointInTimeSupported = c.queryPointInTimeSupported(ctx) + }) + return c.isPointInTimeSupported +} + +func (c *clientImpl) queryPointInTimeSupported(ctx context.Context) bool { + result, _, err := c.esClient.Ping(c.url.String()).Do(ctx) + if err != nil { + return false + } + if result == nil || result.Version.BuildFlavor != pointInTimeSupportedFlavor { + return false + } + esVersion, err := semver.ParseTolerant(result.Version.Number) + if err != nil { + return false + } + return pointInTimeSupportedIn(esVersion) +} + func (c *clientImpl) OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error) { resp, err := c.esClient.OpenPointInTime(index).KeepAlive(keepAliveInterval).Do(ctx) if err != nil { diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 76993a8769f1..412d9a64fcbb 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -31,6 +31,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math" "strconv" "strings" @@ -55,6 +56,7 @@ const ( PersistenceName = "elasticsearch" delimiter = "~" + scrollKeepAliveInterval = "1m" pointInTimeKeepAliveInterval = "1m" ) @@ -72,7 +74,12 @@ type ( } visibilityPageToken struct { - SearchAfter []interface{} + SearchAfter []interface{} + + // For ScanWorkflowExecutions API. + // For ES<7.10.0 and "oss" flavor. + ScrollID string + // For ES>=7.10.0 and "default" flavor. PointInTimeID string } @@ -478,6 +485,55 @@ func (s *visibilityStore) ListWorkflowExecutions( func (s *visibilityStore) ScanWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, +) (*store.InternalListWorkflowExecutionsResponse, error) { + // Point in time is only supported in Elasticsearch 7.10+ in default flavor. + if s.esClient.IsPointInTimeSupported(ctx) { + return s.scanWorkflowExecutionsWithPit(ctx, request) + } + return s.scanWorkflowExecutionsWithScroll(ctx, request) +} + +func (s *visibilityStore) scanWorkflowExecutionsWithScroll( + ctx context.Context, + request *manager.ListWorkflowExecutionsRequestV2, +) (*store.InternalListWorkflowExecutionsResponse, error) { + var ( + searchResult *elastic.SearchResult + scrollErr error + ) + + p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter) + if err != nil { + return nil, err + } + + if len(request.NextPageToken) == 0 { + searchResult, scrollErr = s.esClient.OpenScroll(ctx, p, scrollKeepAliveInterval) + } else if p.ScrollID != "" { + searchResult, scrollErr = s.esClient.Scroll(ctx, p.ScrollID, scrollKeepAliveInterval) + } else { + return nil, serviceerror.NewInvalidArgument("scrollId must present in pagination token") + } + + if scrollErr != nil && scrollErr != io.EOF { + return nil, convertElasticsearchClientError("ScanWorkflowExecutions failed", scrollErr) + } + + // Both io.IOF and empty hits list indicate that this is a last page. + if (searchResult.Hits != nil && len(searchResult.Hits.Hits) < request.PageSize) || + scrollErr == io.EOF { + err := s.esClient.CloseScroll(ctx, searchResult.ScrollId) + if err != nil { + return nil, convertElasticsearchClientError("Unable to close scroll", err) + } + } + + return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize) +} + +func (s *visibilityStore) scanWorkflowExecutionsWithPit( + ctx context.Context, + request *manager.ListWorkflowExecutionsRequestV2, ) (*store.InternalListWorkflowExecutionsResponse, error) { p, err := s.buildSearchParametersV2(request, s.getScanFieldSorter) if err != nil { @@ -491,6 +547,8 @@ func (s *visibilityStore) ScanWorkflowExecutions( return nil, convertElasticsearchClientError("Unable to create point in time", err) } p.PointInTime = elastic.NewPointInTimeWithKeepAlive(pitID, pointInTimeKeepAliveInterval) + } else if p.PointInTime == nil { + return nil, serviceerror.NewInvalidArgument("pointInTimeId must present in pagination token") } searchResult, err := s.esClient.Search(ctx, p) @@ -667,7 +725,14 @@ func (s *visibilityStore) processPageToken( pageToken *visibilityPageToken, namespaceName namespace.Name, ) error { - if pageToken == nil || len(pageToken.SearchAfter) == 0 { + if pageToken == nil { + return nil + } + if pageToken.ScrollID != "" { + params.ScrollID = pageToken.ScrollID + return nil + } + if len(pageToken.SearchAfter) == 0 { return nil } if pageToken.PointInTimeID != "" { @@ -805,9 +870,10 @@ func (s *visibilityStore) getListWorkflowExecutionsResponse( lastHitSort = hit.Sort } - if len(searchResult.Hits.Hits) == pageSize && lastHitSort != nil { // this means the response is not the last page + if len(searchResult.Hits.Hits) == pageSize { // this means the response might not the last page response.NextPageToken, err = s.serializePageToken(&visibilityPageToken{ SearchAfter: lastHitSort, + ScrollID: searchResult.ScrollId, PointInTimeID: searchResult.PitId, }) if err != nil { diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go index 873bf4f5ae6d..7a157d6aaca8 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go @@ -1119,13 +1119,12 @@ func (s *ESVisibilitySuite) TestListWorkflowExecutions_Error() { s.Equal("ListWorkflowExecutions failed: elastic: Error 500 (Internal Server Error): error reason [type=]", unavailableErr.Message) } -func (s *ESVisibilitySuite) TestScanWorkflowExecutions() { - pitID := "pitID" +func (s *ESVisibilitySuite) TestScanWorkflowExecutions_Scroll() { + scrollID := "scrollID" request := &manager.ListWorkflowExecutionsRequestV2{ NamespaceID: testNamespaceID, Namespace: testNamespace, PageSize: 1, - Query: `ExecutionStatus = "Terminated"`, } data := []byte(`{"ExecutionStatus": "Running", @@ -1144,57 +1143,70 @@ func (s *ESVisibilitySuite) TestScanWorkflowExecutions() { Hits: []*elastic.SearchHit{ { Source: source, - Sort: []interface{}{json.Number("123")}, }, }, }, - PitId: pitID, + ScrollId: scrollID, } - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(searchResult, nil) - s.mockESClient.EXPECT().OpenPointInTime(gomock.Any(), testIndex, gomock.Any()).Return(pitID, nil) - _, err := s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) - s.NoError(err) + + s.mockESClient.EXPECT().IsPointInTimeSupported(gomock.Any()).Return(false).AnyTimes() // test bad request request.Query = `invalid query` - _, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) + _, err := s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.Error(err) _, ok := err.(*serviceerror.InvalidArgument) s.True(ok) s.True(strings.HasPrefix(err.Error(), "invalid query")) // test search - request.Query = `ExecutionStatus = "Terminated"` - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(searchResult, nil) + request.Query = `ExecutionStatus = "Running"` + s.mockESClient.EXPECT().OpenScroll( + gomock.Any(), + &client.SearchParameters{ + Index: testIndex, + Query: elastic.NewBoolQuery(). + Filter( + elastic.NewTermQuery(searchattribute.NamespaceID, testNamespaceID.String()), + elastic.NewBoolQuery().Filter( + elastic.NewMatchQuery( + searchattribute.ExecutionStatus, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), + ), + ), + ). + MustNot(elastic.NewExistsQuery(searchattribute.TemporalNamespaceDivision)), + PageSize: 1, + Sorter: docSorter, + }, + gomock.Any(), + ).Return(searchResult, nil) - token := &visibilityPageToken{ - SearchAfter: []interface{}{json.Number("1528358645123456789")}, - PointInTimeID: pitID, - } + token := &visibilityPageToken{ScrollID: scrollID} tokenBytes, err := s.visibilityStore.serializePageToken(token) s.NoError(err) - request.NextPageToken = tokenBytes + result, err := s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.NoError(err) - responseToken, err := s.visibilityStore.deserializePageToken(result.NextPageToken) - s.NoError(err) - s.Equal([]interface{}{json.Number("123")}, responseToken.SearchAfter) - s.Equal(pitID, responseToken.PointInTimeID) + s.Equal(tokenBytes, result.NextPageToken) // test last page + request.NextPageToken = tokenBytes searchResult = &elastic.SearchResult{ Hits: &elastic.SearchHits{ Hits: []*elastic.SearchHit{}, }, - PitId: pitID, + ScrollId: scrollID, } - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(searchResult, nil) - s.mockESClient.EXPECT().ClosePointInTime(gomock.Any(), pitID).Return(true, nil) - _, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) + s.mockESClient.EXPECT().Scroll(gomock.Any(), scrollID, gomock.Any()).Return(searchResult, nil) + s.mockESClient.EXPECT().CloseScroll(gomock.Any(), scrollID).Return(nil) + result, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.NoError(err) + s.Nil(result.NextPageToken) // test unavailable error - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(nil, errTestESSearch) + request.NextPageToken = nil + s.mockESClient.EXPECT().OpenScroll(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errTestESSearch) _, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.Error(err) _, ok = err.(*serviceerror.Unavailable) @@ -1202,13 +1214,12 @@ func (s *ESVisibilitySuite) TestScanWorkflowExecutions() { s.Contains(err.Error(), "ScanWorkflowExecutions failed") } -func (s *ESVisibilitySuite) TestScanWorkflowExecutions_OldPageToken() { +func (s *ESVisibilitySuite) TestScanWorkflowExecutions_Pit() { pitID := "pitID" request := &manager.ListWorkflowExecutionsRequestV2{ NamespaceID: testNamespaceID, Namespace: testNamespace, PageSize: 1, - Query: `ExecutionStatus = "Terminated"`, } data := []byte(`{"ExecutionStatus": "Running", @@ -1222,41 +1233,104 @@ func (s *ESVisibilitySuite) TestScanWorkflowExecutions_OldPageToken() { "WorkflowId": "6bfbc1e5-6ce4-4e22-bbfb-e0faa9a7a604-1-2256", "WorkflowType": "basic.stressWorkflowExecute"}`) source := json.RawMessage(data) + searchAfter := []any{json.Number("123")} searchResult := &elastic.SearchResult{ Hits: &elastic.SearchHits{ Hits: []*elastic.SearchHit{ { Source: source, - Sort: []interface{}{json.Number("123")}, + Sort: searchAfter, }, }, }, PitId: pitID, } - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(searchResult, nil) - s.mockESClient.EXPECT().OpenPointInTime(gomock.Any(), testIndex, gomock.Any()).Return(pitID, nil) + + s.mockESClient.EXPECT().IsPointInTimeSupported(gomock.Any()).Return(true).AnyTimes() + + // test bad request + request.Query = `invalid query` _, err := s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) - s.NoError(err) + s.Error(err) + _, ok := err.(*serviceerror.InvalidArgument) + s.True(ok) + s.True(strings.HasPrefix(err.Error(), "invalid query")) - // test search - token := struct { - SearchAfter []interface{} - ScrollID string - PointInTimeID string - }{ - SearchAfter: []interface{}{json.Number("1528358645123456789")}, - ScrollID: "random-scroll", - PointInTimeID: "random-pit", + request.Query = `ExecutionStatus = "Running"` + s.mockESClient.EXPECT().OpenPointInTime(gomock.Any(), testIndex, gomock.Any()).Return(pitID, nil) + s.mockESClient.EXPECT().Search( + gomock.Any(), + &client.SearchParameters{ + Index: testIndex, + Query: elastic.NewBoolQuery(). + Filter( + elastic.NewTermQuery(searchattribute.NamespaceID, testNamespaceID.String()), + elastic.NewBoolQuery().Filter( + elastic.NewMatchQuery( + searchattribute.ExecutionStatus, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), + ), + ), + ). + MustNot(elastic.NewExistsQuery(searchattribute.TemporalNamespaceDivision)), + PageSize: 1, + Sorter: docSorter, + PointInTime: elastic.NewPointInTimeWithKeepAlive(pitID, pointInTimeKeepAliveInterval), + }, + ).Return(searchResult, nil) + + token := &visibilityPageToken{ + SearchAfter: searchAfter, + PointInTimeID: pitID, } - tokenBytes, err := json.Marshal(token) + tokenBytes, err := s.visibilityStore.serializePageToken(token) s.NoError(err) - request.NextPageToken = tokenBytes - s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(searchResult, nil) + result, err := s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.NoError(err) - responseToken, err := s.visibilityStore.deserializePageToken(result.NextPageToken) + s.Equal(tokenBytes, result.NextPageToken) + + // test last page + request.NextPageToken = tokenBytes + searchResult = &elastic.SearchResult{ + Hits: &elastic.SearchHits{ + Hits: []*elastic.SearchHit{}, + }, + PitId: pitID, + } + s.mockESClient.EXPECT().ClosePointInTime(gomock.Any(), pitID).Return(true, nil) + s.mockESClient.EXPECT().Search( + gomock.Any(), + &client.SearchParameters{ + Index: testIndex, + Query: elastic.NewBoolQuery(). + Filter( + elastic.NewTermQuery(searchattribute.NamespaceID, testNamespaceID.String()), + elastic.NewBoolQuery().Filter( + elastic.NewMatchQuery( + searchattribute.ExecutionStatus, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING.String(), + ), + ), + ). + MustNot(elastic.NewExistsQuery(searchattribute.TemporalNamespaceDivision)), + PageSize: 1, + Sorter: docSorter, + SearchAfter: token.SearchAfter, + PointInTime: elastic.NewPointInTimeWithKeepAlive(pitID, pointInTimeKeepAliveInterval), + }, + ).Return(searchResult, nil) + result, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) s.NoError(err) - s.Equal([]interface{}{json.Number("123")}, responseToken.SearchAfter) + s.Nil(result.NextPageToken) + + // test unavailable error + s.mockESClient.EXPECT().Search(gomock.Any(), gomock.Any()).Return(nil, errTestESSearch) + _, err = s.visibilityStore.ScanWorkflowExecutions(context.Background(), request) + s.Error(err) + _, ok = err.(*serviceerror.Unavailable) + s.True(ok) + s.Contains(err.Error(), "ScanWorkflowExecutions failed") } func (s *ESVisibilitySuite) TestCountWorkflowExecutions() {