-
Notifications
You must be signed in to change notification settings - Fork 8.3k
/
Copy pathcluster_client.ts
260 lines (234 loc) · 8.75 KB
/
cluster_client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Client } from 'elasticsearch';
import { get } from 'lodash';
import { ElasticsearchErrorHelpers } from './errors';
import { GetAuthHeaders, isRealRequest, LegacyRequest } from '../http';
import { filterHeaders, Headers, KibanaRequest, ensureRawRequest } from '../http/router';
import { Logger } from '../logging';
import {
ElasticsearchClientConfig,
parseElasticsearchClientConfig,
} from './elasticsearch_client_config';
import { ScopedClusterClient, IScopedClusterClient } from './scoped_cluster_client';
import { CallAPIOptions, APICaller } from './api_types';
/**
* Support Legacy platform request for the period of migration.
*
* @public
*/
const noop = () => undefined;
/**
* Calls the Elasticsearch API endpoint with the specified parameters.
* @param client Raw Elasticsearch JS client instance to use.
* @param endpoint Name of the API endpoint to call.
* @param clientParams Parameters that will be directly passed to the
* Elasticsearch JS client.
* @param options Options that affect the way we call the API and process the result.
*/
const callAPI = async (
client: Client,
endpoint: string,
clientParams: Record<string, any> = {},
options: CallAPIOptions = { wrap401Errors: true }
) => {
const clientPath = endpoint.split('.');
const api: any = get(client, clientPath);
if (!api) {
throw new Error(`called with an invalid endpoint: ${endpoint}`);
}
const apiContext = clientPath.length === 1 ? client : get(client, clientPath.slice(0, -1));
try {
return await new Promise((resolve, reject) => {
const request = api.call(apiContext, clientParams);
if (options.signal) {
options.signal.addEventListener('abort', () => {
request.abort();
reject(new Error('Request was aborted'));
});
}
return request.then(resolve, reject);
});
} catch (err) {
if (!options.wrap401Errors || err.statusCode !== 401) {
throw err;
}
throw ElasticsearchErrorHelpers.decorateNotAuthorizedError(err);
}
};
/**
* Fake request object created manually by Kibana plugins.
* @public
*/
export interface FakeRequest {
/** Headers used for authentication against Elasticsearch */
headers: Headers;
}
/**
* Represents an Elasticsearch cluster API client created by the platform.
* It allows to call API on behalf of the internal Kibana user and
* the actual user that is derived from the request headers (via `asScoped(...)`).
*
* See {@link ClusterClient}.
*
* @public
*/
export type IClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'asScoped'>;
/**
* Represents an Elasticsearch cluster API client created by a plugin.
* It allows to call API on behalf of the internal Kibana user and
* the actual user that is derived from the request headers (via `asScoped(...)`).
*
* See {@link ClusterClient}.
*
* @public
*/
export type ICustomClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'close' | 'asScoped'>;
/**
A user credentials container.
* It accommodates the necessary auth credentials to impersonate the current user.
*
* @public
* See {@link KibanaRequest}.
*/
export type ScopeableRequest = KibanaRequest | LegacyRequest | FakeRequest;
/**
* {@inheritDoc IClusterClient}
* @public
*/
export class ClusterClient implements IClusterClient {
/**
* Raw Elasticsearch JS client that acts on behalf of the Kibana internal user.
*/
private readonly client: Client;
/**
* Optional raw Elasticsearch JS client that is shared between all the scoped clients created
* from this cluster client. Every API call is attributed by the wh
*/
private scopedClient?: Client;
/**
* Indicates whether this cluster client (and all internal raw Elasticsearch JS clients) has been closed.
*/
private isClosed = false;
constructor(
private readonly config: ElasticsearchClientConfig,
private readonly log: Logger,
private readonly getAuthHeaders: GetAuthHeaders = noop
) {
this.client = new Client(parseElasticsearchClientConfig(config, log));
}
/**
* Calls specified endpoint with provided clientParams on behalf of the
* Kibana internal user.
* See {@link APICaller}.
*
* @param endpoint - String descriptor of the endpoint e.g. `cluster.getSettings` or `ping`.
* @param clientParams - A dictionary of parameters that will be passed directly to the Elasticsearch JS client.
* @param options - Options that affect the way we call the API and process the result.
*/
public callAsInternalUser: APICaller = async (
endpoint: string,
clientParams: Record<string, any> = {},
options?: CallAPIOptions
) => {
this.assertIsNotClosed();
return await (callAPI.bind(null, this.client) as APICaller)(endpoint, clientParams, options);
};
/**
* Closes the cluster client. After that client cannot be used and one should
* create a new client instance to be able to interact with Elasticsearch API.
*/
public close() {
if (this.isClosed) {
return;
}
this.isClosed = true;
this.client.close();
if (this.scopedClient !== undefined) {
this.scopedClient.close();
}
}
/**
* Creates an instance of {@link IScopedClusterClient} based on the configuration the
* current cluster client that exposes additional `callAsCurrentUser` method
* scoped to the provided req. Consumers shouldn't worry about closing
* scoped client instances, these will be automatically closed as soon as the
* original cluster client isn't needed anymore and closed.
*
* @param request - Request the `IScopedClusterClient` instance will be scoped to.
* Supports request optionality, Legacy.Request & FakeRequest for BWC with LegacyPlatform
*/
public asScoped(request?: ScopeableRequest): IScopedClusterClient {
// It'd have been quite expensive to create and configure client for every incoming
// request since it involves parsing of the config, reading of the SSL certificate and
// key files etc. Moreover scoped client needs two Elasticsearch JS clients at the same
// time: one to support `callAsInternalUser` and another one for `callAsCurrentUser`.
// To reduce that overhead we create one scoped client per cluster client and share it
// between all scoped client instances.
if (this.scopedClient === undefined) {
this.scopedClient = new Client(
parseElasticsearchClientConfig(this.config, this.log, {
auth: false,
ignoreCertAndKey: !this.config.ssl || !this.config.ssl.alwaysPresentCertificate,
})
);
}
return new ScopedClusterClient(
this.callAsInternalUser,
this.callAsCurrentUser,
filterHeaders(this.getHeaders(request), this.config.requestHeadersWhitelist)
);
}
/**
* Calls specified endpoint with provided clientParams on behalf of the
* user initiated request to the Kibana server (via HTTP request headers).
* See {@link APICaller}.
*
* @param endpoint - String descriptor of the endpoint e.g. `cluster.getSettings` or `ping`.
* @param clientParams - A dictionary of parameters that will be passed directly to the Elasticsearch JS client.
* @param options - Options that affect the way we call the API and process the result.
*/
private callAsCurrentUser: APICaller = async (
endpoint: string,
clientParams: Record<string, any> = {},
options?: CallAPIOptions
) => {
this.assertIsNotClosed();
return await (callAPI.bind(null, this.scopedClient!) as APICaller)(
endpoint,
clientParams,
options
);
};
private assertIsNotClosed() {
if (this.isClosed) {
throw new Error('Cluster client cannot be used after it has been closed.');
}
}
private getHeaders(
request?: KibanaRequest | LegacyRequest | FakeRequest
): Record<string, string | string[] | undefined> {
if (!isRealRequest(request)) {
return request && request.headers ? request.headers : {};
}
const authHeaders = this.getAuthHeaders(request);
const headers = ensureRawRequest(request).headers;
return { ...headers, ...authHeaders };
}
}