Skip to content

Commit

Permalink
Remove support for ES PIT and scroll in ES client (temporalio#4223)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou authored and samanbarghi committed May 5, 2023
1 parent a1f5bbd commit 14996c5
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,10 @@ type (
Count(ctx context.Context, index string, query elastic.Query) (int64, error)
RunBulkProcessor(ctx context.Context, p *BulkProcessorParameters) (BulkProcessor, error)

OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*elastic.SearchResult, error)
Scroll(ctx context.Context, scrollID string, keepAliveInterval string) (*elastic.SearchResult, error)
CloseScroll(ctx context.Context, id string) error

// TODO (alex): move this to some admin client (and join with IntegrationTestsClient)
PutMapping(ctx context.Context, index string, mapping map[string]enumspb.IndexedValueType) (bool, error)
WaitForYellowStatus(ctx context.Context, index string) (string, error)
GetMapping(ctx context.Context, index string) (map[string]string, error)

IsPointInTimeSupported(ctx context.Context) bool
OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error)
ClosePointInTime(ctx context.Context, id string) (bool, error)
}

CLIClient interface {
Expand All @@ -84,6 +76,5 @@ type (
Sorter []elastic.Sorter

SearchAfter []interface{}
PointInTime *elastic.PointInTime
}
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/blang/semver/v4"
"github.com/olivere/elastic/v7"
"github.com/olivere/elastic/v7/uritemplates"
enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -46,20 +44,9 @@ type (
clientImpl struct {
esClient *elastic.Client
url url.URL

initIsPointInTimeSupported sync.Once
isPointInTimeSupported bool
}
)

const (
pointInTimeSupportedFlavor = "default" // the other flavor is "oss".
)

var (
pointInTimeSupportedIn = semver.MustParseRange(">=7.10.0")
)

var _ Client = (*clientImpl)(nil)

// newClient create a ES client
Expand Down Expand Up @@ -129,10 +116,6 @@ func (c *clientImpl) Search(ctx context.Context, p *SearchParameters) (*elastic.
Query(p.Query).
SortBy(p.Sorter...)

if p.PointInTime != nil {
searchSource.PointInTime(p.PointInTime)
}

if p.PageSize != 0 {
searchSource.Size(p.PageSize)
}
Expand All @@ -141,78 +124,10 @@ func (c *clientImpl) Search(ctx context.Context, p *SearchParameters) (*elastic.
searchSource.SearchAfter(p.SearchAfter...)
}

searchService := c.esClient.Search().SearchSource(searchSource)
// When pit.id is specified index must not be used.
if p.PointInTime == nil {
searchService.Index(p.Index)
}

searchService := c.esClient.Search(p.Index).SearchSource(searchSource)
return searchService.Do(ctx)
}

func (c *clientImpl) OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*elastic.SearchResult, error) {
scrollService := elastic.NewScrollService(c.esClient).
Index(p.Index).
Query(p.Query).
SortBy(p.Sorter...).
KeepAlive(keepAliveInterval)

if p.PageSize != 0 {
scrollService.Size(p.PageSize)
}

searchResult, err := scrollService.Do(ctx)
return searchResult, err
}

func (c *clientImpl) Scroll(ctx context.Context, scrollID string, keepAliveInterval string) (*elastic.SearchResult, error) {
scrollService := elastic.NewScrollService(c.esClient)
result, err := scrollService.ScrollId(scrollID).KeepAlive(keepAliveInterval).Do(ctx)
return result, err
}

func (c *clientImpl) CloseScroll(ctx context.Context, id string) error {
return elastic.NewScrollService(c.esClient).ScrollId(id).Clear(ctx)
}

func (c *clientImpl) IsPointInTimeSupported(ctx context.Context) bool {
c.initIsPointInTimeSupported.Do(func() {
c.isPointInTimeSupported = c.queryPointInTimeSupported(ctx)
})
return c.isPointInTimeSupported
}

func (c *clientImpl) queryPointInTimeSupported(ctx context.Context) bool {
result, _, err := c.esClient.Ping(c.url.String()).Do(ctx)
if err != nil {
return false
}
if result == nil || result.Version.BuildFlavor != pointInTimeSupportedFlavor {
return false
}
esVersion, err := semver.ParseTolerant(result.Version.Number)
if err != nil {
return false
}
return pointInTimeSupportedIn(esVersion)
}

func (c *clientImpl) OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error) {
resp, err := c.esClient.OpenPointInTime(index).KeepAlive(keepAliveInterval).Do(ctx)
if err != nil {
return "", err
}
return resp.Id, nil
}

func (c *clientImpl) ClosePointInTime(ctx context.Context, id string) (bool, error) {
resp, err := c.esClient.ClosePointInTime(id).Do(ctx)
if err != nil {
return false, err
}
return resp.Succeeded, nil
}

func (c *clientImpl) Count(ctx context.Context, index string, query elastic.Query) (int64, error) {
return c.esClient.Count(index).Query(query).Do(ctx)
}
Expand Down

0 comments on commit 14996c5

Please sign in to comment.