Skip to content

Commit

Permalink
[bug][storage] Enhanced the idempotency of ES-Rollover
Browse files Browse the repository at this point in the history
Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 committed Jan 30, 2025
1 parent 6d8c70e commit 4415f6e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 61 deletions.
30 changes: 7 additions & 23 deletions cmd/es-rollover/app/init/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@
package init

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"

"github.com/jaegertracing/jaeger/cmd/es-rollover/app"
"github.com/jaegertracing/jaeger/pkg/es"
Expand Down Expand Up @@ -69,29 +66,16 @@ func (c Action) Do() error {
}

func createIndexIfNotExist(c client.IndexAPI, index string) error {
err := c.CreateIndex(index)
exists, err := c.IndexExists(index)
if err != nil {
var esErr client.ResponseError
if errors.As(err, &esErr) {
if esErr.StatusCode != http.StatusBadRequest || esErr.Body == nil {
return esErr.Err
}
// check for the reason of the error
jsonError := map[string]any{}
err := json.Unmarshal(esErr.Body, &jsonError)
if err != nil {
// return unmarshal error
return err
}
errorMap := jsonError["error"].(map[string]any)
// check for reason, ignore already exist error
if strings.Contains(errorMap["type"].(string), "resource_already_exists_exception") {
return nil
}
}
// Return any other error unrelated to the response
return err
}
if !exists {
err := c.CreateIndex(index)
if err != nil {
return err
}
}
return nil
}

Expand Down
67 changes: 29 additions & 38 deletions cmd/es-rollover/app/init/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package init

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -18,59 +17,47 @@ import (
)

func TestIndexCreateIfNotExist(t *testing.T) {
const esErrResponse = `{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"]"}],"type":"resource_already_exists_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}`

tests := []struct {
name string
returnErr error
expectedErr error
containsError string
name string
exists bool
indexExistsReturnError error
indexExistsExpectedError error
createIndexReturnErr error
createIndexExpectedError error
}{
{
name: "success",
name: "success",
exists: false,
},
{
name: "generic error",
returnErr: errors.New("may be an http error?"),
expectedErr: errors.New("may be an http error?"),
name: "generic error from index exists",
exists: false,
indexExistsReturnError: errors.New("may be an http error from index exists"),
indexExistsExpectedError: errors.New("may be an http error from index exists"),
},
{
name: "response error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusForbidden,
},
expectedErr: errors.New("x"),
name: "generic error from create index",
exists: false,
createIndexReturnErr: errors.New("may be an http error from create index"),
createIndexExpectedError: errors.New("may be an http error from create index"),
},
{
name: "unmarshal error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusBadRequest,
Body: []byte("blablabla"),
},
containsError: "invalid character",
},
{
name: "existing error",
returnErr: client.ResponseError{
Err: errors.New("x"),
StatusCode: http.StatusBadRequest,
Body: []byte(esErrResponse),
},
expectedErr: nil,
name: "index already exists",
exists: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexClient := &mocks.IndexAPI{}
indexClient.On("CreateIndex", "jaeger-span").Return(test.returnErr)
indexClient.On("IndexExists", "jaeger-span").Return(test.exists, test.indexExistsReturnError)
indexClient.On("CreateIndex", "jaeger-span").Return(test.createIndexReturnErr)
err := createIndexIfNotExist(indexClient, "jaeger-span")
if test.containsError != "" {
assert.ErrorContains(t, err, test.containsError)
} else {
assert.Equal(t, test.expectedErr, err)
if test.indexExistsExpectedError != nil {
assert.Equal(t, test.indexExistsExpectedError, err)
}
if test.createIndexExpectedError != nil {
assert.Equal(t, test.createIndexExpectedError, err)
}
})
}
Expand Down Expand Up @@ -157,6 +144,7 @@ func TestRolloverAction(t *testing.T) {
name: "fail to get jaeger indices",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, errors.New("error getting jaeger indices"))
Expand All @@ -173,6 +161,7 @@ func TestRolloverAction(t *testing.T) {
name: "fail to create alias",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand All @@ -193,6 +182,7 @@ func TestRolloverAction(t *testing.T) {
name: "create rollover index",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, _ *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand All @@ -213,6 +203,7 @@ func TestRolloverAction(t *testing.T) {
name: "create rollover index with ilm",
setupCallExpectations: func(indexClient *mocks.IndexAPI, clusterClient *mocks.ClusterAPI, ilmClient *mocks.IndexManagementLifecycleAPI) {
clusterClient.On("Version").Return(uint(7), nil)
indexClient.On("IndexExists", "jaeger-span-archive-000001").Return(false, nil)
indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil)
indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil)
indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil)
Expand Down
11 changes: 11 additions & 0 deletions pkg/es/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"net/http"
)

type notFoundError struct {
err error
}

func (e notFoundError) Error() string {
return e.err.Error()
}

// ResponseError holds information about a request error
type ResponseError struct {
// Error returned by the http client
Expand Down Expand Up @@ -79,6 +87,9 @@ func (c *Client) request(esRequest elasticRequest) ([]byte, error) {
}
defer res.Body.Close()

if res.StatusCode == http.StatusNotFound {
return []byte{}, notFoundError{err: fmt.Errorf("%s doesn't exists", esRequest.endpoint)}
}
if res.StatusCode != http.StatusOK {
return []byte{}, c.handleFailedRequest(res)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/es/client/index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ func (i *IndicesClient) DeleteAlias(aliases []Alias) error {
return nil
}

// IndexExists check whether an index exists or not
func (i *IndicesClient) IndexExists(index string) (bool, error) {
_, err := i.request(elasticRequest{
endpoint: index,
method: http.MethodHead,
})
if err != nil {
if errors.As(err, &notFoundError{}) {
return false, nil
}
return false, fmt.Errorf("failed to check if index exists: %w", err)

Check warning on line 199 in pkg/es/client/index_client.go

View check run for this annotation

Codecov / codecov/patch

pkg/es/client/index_client.go#L199

Added line #L199 was not covered by tests
}
return true, nil
}

func (*IndicesClient) aliasesString(aliases []Alias) string {
concatAliases := ""
for _, alias := range aliases {
Expand Down
44 changes: 44 additions & 0 deletions pkg/es/client/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,50 @@ func TestClientDeleteIndices(t *testing.T) {
}
}

func TestClientIndexExists(t *testing.T) {
maxURLPathLength := 4000
tests := []struct {
name string
exists bool
responseCode int
}{
{
name: "exists",
responseCode: http.StatusOK,
exists: true,
},
{
name: "not exists",
responseCode: http.StatusNotFound,
exists: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
apiTriggered := false
testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
apiTriggered = true
assert.Equal(t, http.MethodHead, req.Method)
assert.Equal(t, "Basic foobar", req.Header.Get("Authorization"))
assert.LessOrEqual(t, len(req.URL.Path), maxURLPathLength)
res.WriteHeader(test.responseCode)
}))
defer testServer.Close()
c := &IndicesClient{
Client: Client{
Client: testServer.Client(),
Endpoint: testServer.URL,
BasicAuth: "foobar",
},
}
exists, err := c.IndexExists("jaeger-span")
require.NoError(t, err)
assert.True(t, apiTriggered)
assert.Equal(t, test.exists, exists)
})
}
}

func TestClientRequestError(t *testing.T) {
c := &IndicesClient{
Client: Client{
Expand Down
1 change: 1 addition & 0 deletions pkg/es/client/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package client

type IndexAPI interface {
GetJaegerIndices(prefix string) ([]Index, error)
IndexExists(index string) (bool, error)
DeleteIndices(indices []Index) error
CreateIndex(index string) error
CreateAlias(aliases []Alias) error
Expand Down
28 changes: 28 additions & 0 deletions pkg/es/client/mocks/IndexAPI.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions plugin/storage/integration/es_index_rollover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ func TestIndexRollover_FailIfILMNotPresent(t *testing.T) {
assert.Empty(t, indices)
}

func TestIndexRollover_Idempotency(t *testing.T) {
SkipUnlessEnv(t, "elasticsearch", "opensearch")
t.Cleanup(func() {
testutils.VerifyGoLeaksOnceForES(t)
})
client, err := createESClient(t, getESHttpClient(t))
require.NoError(t, err)
// Make sure that es is clean before the test!
cleanES(t, client, defaultILMPolicyName)
err = runEsRollover("init", []string{}, false)
require.NoError(t, err)
err = runEsRollover("init", []string{}, false)
require.NoError(t, err)
cleanES(t, client, defaultILMPolicyName)
}

func TestIndexRollover_CreateIndicesWithILM(t *testing.T) {
SkipUnlessEnv(t, "elasticsearch", "opensearch")
t.Cleanup(func() {
Expand Down

0 comments on commit 4415f6e

Please sign in to comment.