Skip to content

Commit

Permalink
Retry ES|QL node requests on shard level failures (#120774) (#121879)
Browse files Browse the repository at this point in the history
* Retry ES|QL node requests on shard level failures (#120774)

Today, ES|QL fails fast on any failure. This PR introduces support for
retrying within a cluster when data-node requests fail.

There are two types of failures that occur with data-node requests:
entire request failures and individual shard failures. For individual
shard failures, we can retry the next copies of the failing shards. For
entire request failures, we can retry every shard in the node request if
no pages have been received.

On the handling side, ES|QL executes against a batch of shards
concurrently. Here, we need to track whether any pages have been
produced. If pages have been produced, the entire request must fail.
Otherwise, we can track the failed shards and send them back to the
sender for retries.

There are two decisions around how quickly we should retry:

1. Should we notify the sender of failing shards immediately (via a
different channel) to enable quick retries, or should we accumulate
failures and return them in the final response?

2. What is the maximum number of inflight requests we should allow on
the sending side?

This PR considers failures often occurring when the cluster is under
load or during a rolling upgrade. To prevent retries from adding more
load and to allow the cluster to stabilize, this PR chooses to send
shard failures in the final response and limits the number of inflight
requests to one per data node

Includes #121999

Closes #121966
  • Loading branch information
dnhatn authored Feb 15, 2025
1 parent 7727dff commit 7ee9810
Show file tree
Hide file tree
Showing 8 changed files with 978 additions and 202 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/120774.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120774
summary: Retry ES|QL node requests on shard level failures
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED_BACKPORT_8_X = def(8_840_0_01);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR_BACKPORT_8_X = def(8_840_0_02);
public static final TransportVersion ESQL_RETRY_ON_SHARD_LEVEL_FAILURE_BACKPORT_8_19 = def(8_840_0_03);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public FailureCollector(int maxExceptions) {
}
}

private static Exception unwrapTransportException(TransportException te) {
public static Exception unwrapTransportException(TransportException te) {
final Throwable cause = te.getCause();
if (cause == null) {
return te;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.index.shard.IndexShardTestCase.closeShardNoCheck;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class EsqlRetryIT extends AbstractEsqlIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
}

public void testRetryOnShardFailures() throws Exception {
populateIndices();
try {
final AtomicBoolean relocated = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
// fail some target shards while handling the data node request
MockTransportService.getInstance(node)
.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
if (relocated.compareAndSet(false, true)) {
closeOrFailShards(node);
}
handler.messageReceived(request, channel, task);
});
}
try (var resp = run("FROM log-* | STATS COUNT(timestamp) | LIMIT 1")) {
assertThat(EsqlTestUtils.getValuesList(resp).get(0).get(0), equalTo(7L));
}
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService.getInstance(node).clearAllRules();
}
}
}

private void populateIndices() {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
assertAcked(prepareCreate("log-index-2").setSettings(indexSettings(between(1, 3), 1)).setMapping("timestamp", "type=date"));
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2015-07-08"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2018-07-08"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-03-03"));
reqs.add(prepareIndex("log-index-1").setSource("timestamp", "2020-09-09"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2019-10-12"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-02-02"));
reqs.add(prepareIndex("log-index-2").setSource("timestamp", "2020-10-10"));
indexRandom(true, reqs);
ensureGreen("log-index-1", "log-index-2");
indicesAdmin().prepareRefresh("log-index-1", "log-index-2").get();
}

private void closeOrFailShards(String nodeName) throws Exception {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (randomBoolean()) {
indexShard.failShard("simulated", new IOException("simulated failure"));
} else if (randomBoolean()) {
closeShardNoCheck(indexShard);
}
}
}
}
}
Loading

0 comments on commit 7ee9810

Please sign in to comment.