Skip to content

Commit

Permalink
Add support for ES Scroll for Scan API (#4614)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Add support for ES Scroll for Scan API.

This is a revert of #4223 and
#4249.

<!-- Tell your future self why have you made these changes -->
**Why?**
ES 7.10 hosted in AWS is the OSS flavor, not the default one which
includes support for PIT.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Add unit tests.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
No.

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
rodrigozhou authored and dnr committed Jul 21, 2023
1 parent 1a44972 commit bb50947
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ type (
WaitForYellowStatus(ctx context.Context, index string) (string, error)
GetMapping(ctx context.Context, index string) (map[string]string, error)

OpenScroll(ctx context.Context, p *SearchParameters, keepAliveInterval string) (*elastic.SearchResult, error)
Scroll(ctx context.Context, id string, keepAliveInterval string) (*elastic.SearchResult, error)
CloseScroll(ctx context.Context, id string) error

IsPointInTimeSupported(ctx context.Context) bool
OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error)
ClosePointInTime(ctx context.Context, id string) (bool, error)
}
Expand Down Expand Up @@ -79,6 +84,7 @@ type (
Sorter []elastic.Sorter

SearchAfter []interface{}
ScrollID string
PointInTime *elastic.PointInTime
}
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/blang/semver/v4"
"github.com/olivere/elastic/v7"
"github.com/olivere/elastic/v7/uritemplates"
enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -44,9 +46,20 @@ type (
clientImpl struct {
esClient *elastic.Client
url url.URL

initIsPointInTimeSupported sync.Once
isPointInTimeSupported bool
}
)

const (
pointInTimeSupportedFlavor = "default" // the other flavor is "oss"
)

var (
pointInTimeSupportedIn = semver.MustParseRange(">=7.10.0")
)

var _ Client = (*clientImpl)(nil)

// newClient create a ES client
Expand Down Expand Up @@ -138,6 +151,56 @@ func (c *clientImpl) Search(ctx context.Context, p *SearchParameters) (*elastic.
return searchService.Do(ctx)
}

func (c *clientImpl) OpenScroll(
ctx context.Context,
p *SearchParameters,
keepAliveInterval string,
) (*elastic.SearchResult, error) {
scrollService := elastic.NewScrollService(c.esClient).
Index(p.Index).
Query(p.Query).
SortBy(p.Sorter...).
KeepAlive(keepAliveInterval)
if p.PageSize != 0 {
scrollService.Size(p.PageSize)
}
return scrollService.Do(ctx)
}

func (c *clientImpl) Scroll(
ctx context.Context,
id string,
keepAliveInterval string,
) (*elastic.SearchResult, error) {
return elastic.NewScrollService(c.esClient).ScrollId(id).KeepAlive(keepAliveInterval).Do(ctx)
}

func (c *clientImpl) CloseScroll(ctx context.Context, id string) error {
return elastic.NewScrollService(c.esClient).ScrollId(id).Clear(ctx)
}

func (c *clientImpl) IsPointInTimeSupported(ctx context.Context) bool {
c.initIsPointInTimeSupported.Do(func() {
c.isPointInTimeSupported = c.queryPointInTimeSupported(ctx)
})
return c.isPointInTimeSupported
}

func (c *clientImpl) queryPointInTimeSupported(ctx context.Context) bool {
result, _, err := c.esClient.Ping(c.url.String()).Do(ctx)
if err != nil {
return false
}
if result == nil || result.Version.BuildFlavor != pointInTimeSupportedFlavor {
return false
}
esVersion, err := semver.ParseTolerant(result.Version.Number)
if err != nil {
return false
}
return pointInTimeSupportedIn(esVersion)
}

func (c *clientImpl) OpenPointInTime(ctx context.Context, index string, keepAliveInterval string) (string, error) {
resp, err := c.esClient.OpenPointInTime(index).KeepAlive(keepAliveInterval).Do(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit bb50947

Please sign in to comment.