Skip to content

Commit

Permalink
Clean up exchanges in EsqlNodeFailureIT (#121633) (#121690)
Browse files Browse the repository at this point in the history
If the query hits the failing index first, we will cancel the request,
preventing exchange-sink requests and data-node requests from reaching
another data node. As a result, exchange sinks could stay for 30
seconds.
  • Loading branch information
dnhatn authored Feb 4, 2025
1 parent 7990781 commit 3f7e23c
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.FailingFieldPlugin;
Expand All @@ -27,9 +29,23 @@
*/
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class EsqlNodeFailureIT extends AbstractEsqlIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), FailingFieldPlugin.class);
var plugins = new ArrayList<>(super.nodePlugins());
plugins.add(FailingFieldPlugin.class);
plugins.add(InternalExchangePlugin.class);
return plugins;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 4000)))
.build();
logger.info("settings {}", settings);
return settings;
}

/**
Expand All @@ -49,7 +65,7 @@ public void testFailureLoadingFields() throws IOException {
mapping.endObject();
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();

int docCount = 100;
int docCount = 50;
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
for (int d = 0; d < docCount; d++) {
docs.add(client().prepareIndex("ok").setSource("foo", d));
Expand Down

0 comments on commit 3f7e23c

Please sign in to comment.