diff --git a/src/core/server/elasticsearch/client/index.ts b/src/core/server/elasticsearch/client/index.ts index 18e84482024ca..b8125de2ee498 100644 --- a/src/core/server/elasticsearch/client/index.ts +++ b/src/core/server/elasticsearch/client/index.ts @@ -22,3 +22,4 @@ export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_clie export { ElasticsearchClientConfig } from './client_config'; export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client'; export { configureClient } from './configure_client'; +export { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster'; diff --git a/src/core/server/elasticsearch/client/retry_call_cluster.test.ts b/src/core/server/elasticsearch/client/retry_call_cluster.test.ts new file mode 100644 index 0000000000000..a7177c0b29047 --- /dev/null +++ b/src/core/server/elasticsearch/client/retry_call_cluster.test.ts @@ -0,0 +1,283 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { errors } from '@elastic/elasticsearch'; +import { elasticsearchClientMock } from './mocks'; +import { loggingSystemMock } from '../../logging/logging_system.mock'; +import { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster'; + +const dummyBody = { foo: 'bar' }; +const createErrorReturn = (err: any) => elasticsearchClientMock.createClientError(err); + +describe('retryCallCluster', () => { + let client: ReturnType; + + beforeEach(() => { + client = elasticsearchClientMock.createElasticSearchClient(); + }); + + it('returns response from ES API call in case of success', async () => { + const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody }); + + client.asyncSearch.get.mockReturnValue(successReturn); + + const result = await retryCallCluster(() => client.asyncSearch.get()); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => { + const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody }); + + client.asyncSearch.get + .mockImplementationOnce(() => + createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any)) + ) + .mockImplementationOnce(() => successReturn); + + const result = await retryCallCluster(() => client.asyncSearch.get()); + expect(result.body).toEqual(dummyBody); + }); + + it('rejects when ES API calls reject with other errors', async () => { + client.ping + .mockImplementationOnce(() => createErrorReturn(new Error('unknown error'))) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + + await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot( + `[Error: unknown error]` + ); + }); + + it('stops retrying when ES API calls reject with other errors', async () => { + client.ping + .mockImplementationOnce(() => + createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any)) + ) + .mockImplementationOnce(() => + createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any)) + ) + .mockImplementationOnce(() => createErrorReturn(new Error('unknown error'))) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + + await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot( + `[Error: unknown error]` + ); + }); +}); + +describe('migrationRetryCallCluster', () => { + let client: ReturnType; + let logger: ReturnType; + + beforeEach(() => { + client = elasticsearchClientMock.createElasticSearchClient(); + logger = loggingSystemMock.createLogger(); + }); + + const mockClientPingWithErrorBeforeSuccess = (error: any) => { + client.ping + .mockImplementationOnce(() => createErrorReturn(error)) + .mockImplementationOnce(() => createErrorReturn(error)) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + }; + + it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.NoLivingConnectionsError('no living connections', {} as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with `ConnectionError`', async () => { + mockClientPingWithErrorBeforeSuccess(new errors.ConnectionError('connection error', {} as any)); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with `TimeoutError`', async () => { + mockClientPingWithErrorBeforeSuccess(new errors.TimeoutError('timeout error', {} as any)); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with 503 `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 503, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects 401 `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 401, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with 403 `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 403, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with 408 `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 408, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with 410 `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 410, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('retries ES API calls that rejects with `snapshot_in_progress_exception` `ResponseError`', async () => { + mockClientPingWithErrorBeforeSuccess( + new errors.ResponseError({ + statusCode: 500, + body: { + error: { + type: 'snapshot_in_progress_exception', + }, + }, + } as any) + ); + + const result = await migrationRetryCallCluster(() => client.ping(), logger, 1); + expect(result.body).toEqual(dummyBody); + }); + + it('logs only once for each unique error message', async () => { + client.ping + .mockImplementationOnce(() => + createErrorReturn( + new errors.ResponseError({ + statusCode: 503, + } as any) + ) + ) + .mockImplementationOnce(() => + createErrorReturn(new errors.ConnectionError('connection error', {} as any)) + ) + .mockImplementationOnce(() => + createErrorReturn( + new errors.ResponseError({ + statusCode: 503, + } as any) + ) + ) + .mockImplementationOnce(() => + createErrorReturn(new errors.ConnectionError('connection error', {} as any)) + ) + .mockImplementationOnce(() => + createErrorReturn( + new errors.ResponseError({ + statusCode: 500, + body: { + error: { + type: 'snapshot_in_progress_exception', + }, + }, + } as any) + ) + ) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + + await migrationRetryCallCluster(() => client.ping(), logger, 1); + + expect(loggingSystemMock.collect(logger).warn).toMatchInlineSnapshot(` + Array [ + Array [ + "Unable to connect to Elasticsearch. Error: Response Error", + ], + Array [ + "Unable to connect to Elasticsearch. Error: connection error", + ], + Array [ + "Unable to connect to Elasticsearch. Error: snapshot_in_progress_exception", + ], + ] + `); + }); + + it('rejects when ES API calls reject with other errors', async () => { + client.ping + .mockImplementationOnce(() => + createErrorReturn( + new errors.ResponseError({ + statusCode: 418, + body: { + error: { + type: `I'm a teapot`, + }, + }, + } as any) + ) + ) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + + await expect( + migrationRetryCallCluster(() => client.ping(), logger, 1) + ).rejects.toMatchInlineSnapshot(`[ResponseError: I'm a teapot]`); + }); + + it('stops retrying when ES API calls reject with other errors', async () => { + client.ping + .mockImplementationOnce(() => + createErrorReturn(new errors.TimeoutError('timeout error', {} as any)) + ) + .mockImplementationOnce(() => + createErrorReturn(new errors.TimeoutError('timeout error', {} as any)) + ) + .mockImplementationOnce(() => createErrorReturn(new Error('unknown error'))) + .mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody })); + + await expect( + migrationRetryCallCluster(() => client.ping(), logger, 1) + ).rejects.toMatchInlineSnapshot(`[Error: unknown error]`); + }); +}); diff --git a/src/core/server/elasticsearch/client/retry_call_cluster.ts b/src/core/server/elasticsearch/client/retry_call_cluster.ts new file mode 100644 index 0000000000000..1ad039e512215 --- /dev/null +++ b/src/core/server/elasticsearch/client/retry_call_cluster.ts @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { defer, throwError, iif, timer } from 'rxjs'; +import { concatMap, retryWhen } from 'rxjs/operators'; +import { Logger } from '../../logging'; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 401, // AuthorizationException + 403, // AuthenticationException + 408, // RequestTimeout + 410, // Gone +]; + +/** + * Retries the provided Elasticsearch API call when a `NoLivingConnectionsError` error is + * encountered. The API call will be retried once a second, indefinitely, until + * a successful response or a different error is received. + * + * @example + * ```ts + * const response = await retryCallCluster(() => client.ping()); + * ``` + * + * @internal + */ +export const retryCallCluster = >(apiCaller: () => T): T => { + return defer(() => apiCaller()) + .pipe( + retryWhen((errors) => + errors.pipe( + concatMap((error) => + iif(() => error.name === 'NoLivingConnectionsError', timer(1000), throwError(error)) + ) + ) + ) + ) + .toPromise() as T; +}; + +/** + * Retries the provided Elasticsearch API call when an error such as + * `AuthenticationException` `NoConnections`, `ConnectionFault`, + * `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will + * be retried once a second, indefinitely, until a successful response or a + * different error is received. + * + * @example + * ```ts + * const response = await migrationRetryCallCluster(() => client.ping(), logger); + * ``` + * + * @internal + */ +export const migrationRetryCallCluster = >( + apiCaller: () => T, + log: Logger, + delay: number = 2500 +): T => { + const previousErrors: string[] = []; + return defer(() => apiCaller()) + .pipe( + retryWhen((errors) => + errors.pipe( + concatMap((error) => { + if (!previousErrors.includes(error.message)) { + log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`); + previousErrors.push(error.message); + } + return iif( + () => + error.name === 'NoLivingConnectionsError' || + error.name === 'ConnectionError' || + error.name === 'TimeoutError' || + (error.name === 'ResponseError' && + retryResponseStatuses.includes(error.statusCode)) || + error?.body?.error?.type === 'snapshot_in_progress_exception', + timer(delay), + throwError(error) + ); + }) + ) + ) + ) + .toPromise() as T; +}; diff --git a/src/core/server/elasticsearch/legacy/retry_call_cluster.ts b/src/core/server/elasticsearch/legacy/retry_call_cluster.ts index 475a76d406017..1b05cb2bf13cd 100644 --- a/src/core/server/elasticsearch/legacy/retry_call_cluster.ts +++ b/src/core/server/elasticsearch/legacy/retry_call_cluster.ts @@ -53,7 +53,7 @@ export function migrationsRetryCallCluster( .pipe( retryWhen((error$) => error$.pipe( - concatMap((error, i) => { + concatMap((error) => { if (!previousErrors.includes(error.message)) { log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`); previousErrors.push(error.message); @@ -100,7 +100,7 @@ export function retryCallCluster(apiCaller: LegacyAPICaller) { .pipe( retryWhen((errors) => errors.pipe( - concatMap((error, i) => + concatMap((error) => iif( () => error instanceof legacyElasticsearch.errors.NoConnections, timer(1000),