Skip to content

Commit d190415

Browse files
committed
fix(elasticsearch sink): Elasticsearch sink with api_version set to auto does not recognize the API version of ES6 as V6 (vectordotdev#17226)
1 parent 7a9bdf0 commit d190415

File tree

1 file changed

+25
-16
lines changed

1 file changed

+25
-16
lines changed

src/sinks/elasticsearch/common.rs

+25-16
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,10 @@ impl ElasticsearchCommon {
145145
)
146146
.await
147147
{
148-
Ok(version) => version,
148+
Ok(version) => {
149+
debug!(message = "Auto-detected Elasticsearch API version.", %version);
150+
version
151+
}
149152
// This error should be fatal, but for now we only emit it as a warning
150153
// to make the transition smoother.
151154
Err(error) => {
@@ -277,28 +280,34 @@ async fn get_version(
277280
proxy_config: &ProxyConfig,
278281
) -> crate::Result<usize> {
279282
#[derive(Deserialize)]
280-
struct ClusterState {
281-
version: Option<usize>,
283+
struct Version {
284+
number: Option<String>,
285+
}
286+
#[derive(Deserialize)]
287+
struct ResponsePayload {
288+
version: Option<Version>,
282289
}
283290

284291
let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
285-
let response = get(
286-
base_url,
287-
http_auth,
288-
aws_auth,
289-
region,
290-
request,
291-
client,
292-
"/_cluster/state/version",
293-
)
294-
.await
295-
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
292+
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
293+
.await
294+
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
296295

297296
let (_, body) = response.into_parts();
298297
let mut body = body::aggregate(body).await?;
299298
let body = body.copy_to_bytes(body.remaining());
300-
let ClusterState { version } = serde_json::from_slice(&body)?;
301-
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
299+
let ResponsePayload { version } = serde_json::from_slice(&body)?;
300+
if let Some(version) = version {
301+
if let Some(number) = version.number {
302+
let v: Vec<&str> = number.split('.').collect();
303+
if !v.is_empty() {
304+
if let Ok(major_version) = v[0].parse::<usize>() {
305+
return Ok(major_version);
306+
}
307+
}
308+
}
309+
}
310+
Err("Unexpected response from Elasticsearch endpoint `/`. Missing `version`. Consider setting `api_version` option.".into())
302311
}
303312

304313
async fn get(

0 commit comments

Comments
 (0)