Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate retryCallCluster for new ES client #71412

Merged
1 change: 1 addition & 0 deletions src/core/server/elasticsearch/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_clie
export { ElasticsearchClientConfig } from './client_config';
export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client';
export { configureClient } from './configure_client';
export { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';
283 changes: 283 additions & 0 deletions src/core/server/elasticsearch/client/retry_call_cluster.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { errors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from './mocks';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';

const dummyBody = { foo: 'bar' };
const createErrorReturn = (err: any) => elasticsearchClientMock.createClientError(err);

describe('retryCallCluster', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;

beforeEach(() => {
client = elasticsearchClientMock.createElasticSearchClient();
});

it('returns response from ES API call in case of success', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });

client.asyncSearch.get.mockReturnValue(successReturn);

const result = await retryCallCluster(() => client.asyncSearch.get());
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });

client.asyncSearch.get
.mockImplementationOnce(() =>
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
)
.mockImplementationOnce(() => successReturn);

const result = await retryCallCluster(() => client.asyncSearch.get());
expect(result.body).toEqual(dummyBody);
});

it('rejects when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));

await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
);
});

it('stops retrying when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() =>
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
)
.mockImplementationOnce(() =>
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));

await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
);
});
});

describe('migrationRetryCallCluster', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
let logger: ReturnType<typeof loggingSystemMock.createLogger>;

beforeEach(() => {
client = elasticsearchClientMock.createElasticSearchClient();
logger = loggingSystemMock.createLogger();
});

const mockClientPingWithErrorBeforeSuccess = (error: any) => {
client.ping
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
};

it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.NoLivingConnectionsError('no living connections', {} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with `ConnectionError`', async () => {
mockClientPingWithErrorBeforeSuccess(new errors.ConnectionError('connection error', {} as any));

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with `TimeoutError`', async () => {
mockClientPingWithErrorBeforeSuccess(new errors.TimeoutError('timeout error', {} as any));

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with 503 `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 503,
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects 401 `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 401,
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with 403 `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 403,
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with 408 `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 408,
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with 410 `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 410,
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('retries ES API calls that rejects with `snapshot_in_progress_exception` `ResponseError`', async () => {
mockClientPingWithErrorBeforeSuccess(
new errors.ResponseError({
statusCode: 500,
body: {
error: {
type: 'snapshot_in_progress_exception',
},
},
} as any)
);

const result = await migrationRetryCallCluster(() => client.ping(), logger, 1);
expect(result.body).toEqual(dummyBody);
});

it('logs only once for each unique error message', async () => {
client.ping
.mockImplementationOnce(() =>
createErrorReturn(
new errors.ResponseError({
statusCode: 503,
} as any)
)
)
.mockImplementationOnce(() =>
createErrorReturn(new errors.ConnectionError('connection error', {} as any))
)
.mockImplementationOnce(() =>
createErrorReturn(
new errors.ResponseError({
statusCode: 503,
} as any)
)
)
.mockImplementationOnce(() =>
createErrorReturn(new errors.ConnectionError('connection error', {} as any))
)
.mockImplementationOnce(() =>
createErrorReturn(
new errors.ResponseError({
statusCode: 500,
body: {
error: {
type: 'snapshot_in_progress_exception',
},
},
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));

await migrationRetryCallCluster(() => client.ping(), logger, 1);

expect(loggingSystemMock.collect(logger).warn).toMatchInlineSnapshot(`
Array [
Array [
"Unable to connect to Elasticsearch. Error: Response Error",
],
Array [
"Unable to connect to Elasticsearch. Error: connection error",
],
Array [
"Unable to connect to Elasticsearch. Error: snapshot_in_progress_exception",
],
]
`);
});

it('rejects when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() =>
createErrorReturn(
new errors.ResponseError({
statusCode: 418,
body: {
error: {
type: `I'm a teapot`,
},
},
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));

await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)
).rejects.toMatchInlineSnapshot(`[ResponseError: I'm a teapot]`);
});

it('stops retrying when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() =>
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
)
.mockImplementationOnce(() =>
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));

await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)
).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
});
});
103 changes: 103 additions & 0 deletions src/core/server/elasticsearch/client/retry_call_cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { defer, throwError, iif, timer } from 'rxjs';
import { concatMap, retryWhen } from 'rxjs/operators';
import { Logger } from '../../logging';

const retryResponseStatuses = [
503, // ServiceUnavailable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, if the status code of a response is 502, 503 or 504, the client performs an automatic failover internally, so have it here could be a duplication.
Regarding 4xx status codes, the client does not perform any automatic failover/retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Having this feature for our 'end' consumers is great. For the migration needs, we need to perform an infinity of retries, so keeping the logic in a single place is probably better. But we don't need to disable the internal failover here though, the duplication is probably fine.

401, // AuthorizationException
403, // AuthenticationException
408, // RequestTimeout
410, // Gone
];

/**
* Retries the provided Elasticsearch API call when a `NoLivingConnectionsError` error is
* encountered. The API call will be retried once a second, indefinitely, until
* a successful response or a different error is received.
*
* @example
* ```ts
* const response = await retryCallCluster(() => client.ping());
* ```
*
* @internal
*/
export const retryCallCluster = <T extends Promise<unknown>>(apiCaller: () => T): T => {
return defer(() => apiCaller())
.pipe(
retryWhen((errors) =>
errors.pipe(
concatMap((error) =>
iif(() => error.name === 'NoLivingConnectionsError', timer(1000), throwError(error))
)
)
)
)
.toPromise() as T;
};

/**
* Retries the provided Elasticsearch API call when an error such as
* `AuthenticationException` `NoConnections`, `ConnectionFault`,
* `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will
* be retried once a second, indefinitely, until a successful response or a
* different error is received.
*
* @example
* ```ts
* const response = await migrationRetryCallCluster(() => client.ping(), logger);
* ```
*
* @internal
*/
export const migrationRetryCallCluster = <T extends Promise<unknown>>(
apiCaller: () => T,
log: Logger,
delay: number = 2500
): T => {
const previousErrors: string[] = [];
return defer(() => apiCaller())
.pipe(
retryWhen((errors) =>
errors.pipe(
concatMap((error) => {
if (!previousErrors.includes(error.message)) {
log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`);
previousErrors.push(error.message);
}
return iif(
() =>
error.name === 'NoLivingConnectionsError' ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: it's not type safe anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, however the unit test are asserting behavior against concrete error from the library, so I guess this is alright. But I can revert the change if we prefer instanceof based checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that optional. Feel free to merge

error.name === 'ConnectionError' ||
error.name === 'TimeoutError' ||
(error.name === 'ResponseError' &&
retryResponseStatuses.includes(error.statusCode)) ||
error?.body?.error?.type === 'snapshot_in_progress_exception',
timer(delay),
throwError(error)
);
})
)
)
)
.toPromise() as T;
};
Loading