Skip to content

Commit

Permalink
SearchResponseMerger now limits the total number of shard failures
Browse files Browse the repository at this point in the history
SearchResponseMergerTests modified to match new functionality.
Fixed JdbcShardFailureIT and RestSqlTestCase to no longer expect
suppressed header warnings, since we are limiting the number of failures returned.
  • Loading branch information
quux00 committed Mar 8, 2024
1 parent 756f9d1 commit 94ba5c7
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -144,15 +143,14 @@ public SearchResponse getMergedResponse(Clusters clusters) {
failedShards += searchResponse.getFailedShards();
numReducePhases += searchResponse.getNumReducePhases();

Collections.addAll(failures, searchResponse.getShardFailures());
// if (failures.size() < AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
// for (ShardSearchFailure shardFailure : searchResponse.getShardFailures()) {
// failures.add(shardFailure);
// if (failures.size() >= AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
// break;
// }
// }
// }
if (failures.size() < AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
for (ShardSearchFailure shardFailure : searchResponse.getShardFailures()) {
failures.add(shardFailure);
if (failures.size() >= AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
break;
}
}
}

profileResults.putAll(searchResponse.getProfileResults());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,19 @@ public void testMergeShardFailures() throws InterruptedException {
assertEquals(numResponses, mergedResponse.getTotalShards());
assertEquals(numResponses, mergedResponse.getSuccessfulShards());
assertEquals(0, mergedResponse.getSkippedShards());
int numExpectedFailures = priorityQueue.size();
assertEquals(numExpectedFailures, mergedResponse.getFailedShards());
assertEquals(priorityQueue.size(), mergedResponse.getFailedShards());
ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
assertEquals(priorityQueue.size(), shardFailures.length);
for (ShardSearchFailure shardFailure : shardFailures) {
ShardSearchFailure expected = priorityQueue.poll().v2();
assertSame(expected, shardFailure);
if (numExpectedFailures > AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
numExpectedFailures = AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE;
} else {
for (ShardSearchFailure shardFailure : shardFailures) {
ShardSearchFailure expected = priorityQueue.poll().v2();
assertSame(expected, shardFailure);
}
}
assertEquals(numExpectedFailures, shardFailures.length);
} finally {
mergedResponse.decRef();
}
Expand Down Expand Up @@ -254,12 +260,17 @@ public void testMergeShardFailuresNullShardTarget() throws InterruptedException
assertEquals(numResponses, mergedResponse.getTotalShards());
assertEquals(numResponses, mergedResponse.getSuccessfulShards());
assertEquals(0, mergedResponse.getSkippedShards());
assertEquals(priorityQueue.size(), mergedResponse.getFailedShards());
int numExpectedFailures = priorityQueue.size();
assertEquals(numExpectedFailures, mergedResponse.getFailedShards());
ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
assertEquals(priorityQueue.size(), shardFailures.length);
for (ShardSearchFailure shardFailure : shardFailures) {
ShardSearchFailure expected = priorityQueue.poll().v2();
assertSame(expected, shardFailure);
if (numExpectedFailures > AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
assertEquals(AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE, shardFailures.length);
} else {
assertEquals(priorityQueue.size(), shardFailures.length);
for (ShardSearchFailure shardFailure : shardFailures) {
ShardSearchFailure expected = priorityQueue.poll().v2();
assertSame(expected, shardFailure);
}
}
} finally {
mergedResponse.decRef();
Expand Down Expand Up @@ -308,8 +319,13 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException {
assertEquals(numResponses, merger.numResponses());
var mergedResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY);
try {
assertEquals(expectedFailures.size(), mergedResponse.getFailedShards());
ShardSearchFailure[] shardFailures = mergedResponse.getShardFailures();
assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY)));
if (expectedFailures.size() > AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE) {
assertEquals(AbstractSearchAsyncAction.MAX_FAILURES_IN_RESPONSE, shardFailures.length);
} else {
assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY)));
}
} finally {
mergedResponse.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Properties;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

public class JdbcShardFailureIT extends JdbcIntegrationTestCase {
Expand Down Expand Up @@ -141,19 +142,15 @@ public void testAllowPartialSearchResults() throws Exception {
properties.setProperty("allow.partial.search.results", "true"); // org.elasticsearch.xpack.sql.client package not available here
try (Connection c = esJdbc(properties); Statement s = c.createStatement(); ResultSet rs = s.executeQuery(query)) {
int failedShards = 0;
boolean hasSupressMessage = false;

SQLWarning warns = rs.getWarnings();
do {
if (warns.getMessage().contains(warnMessage)) {
failedShards++;
} else if (warns.getMessage().contains(suppressMessage)) {
hasSupressMessage = true;
}
} while ((warns = warns.getNextWarning()) != null);

assertEquals(maxWarningHeaders - 1, failedShards);
assertTrue(hasSupressMessage);
assertThat(failedShards, greaterThan(0));

int rows = 0;
while (rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.URL_PARAM_FORMAT;
import static org.elasticsearch.xpack.sql.proto.CoreProtocol.WAIT_FOR_COMPLETION_TIMEOUT_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -1848,7 +1849,6 @@ public void testAllowPartialSearchResults() throws IOException {
final int extraBadShards = randomIntBetween(1, 5);
final int okShards = randomIntBetween(1, 5);

final String suppressMessage = " remaining shard failure" + (extraBadShards > 1 ? "s" : "") + " suppressed";
final String reason = "Cannot search on field [bool] since it is not indexed nor has doc values";
final String warnMessage = "org.elasticsearch.index.query.QueryShardException: failed to create query: " + reason;

Expand All @@ -1875,19 +1875,16 @@ public void testAllowPartialSearchResults() throws IOException {
assertOK(response);

int failedShards = 0;
boolean hasSupressMessage = false;
for (Header header : response.getHeaders()) {
if (header.getName().toLowerCase(Locale.ROOT).equals("warning")) {
String headerVal = header.getValue();
assertThat(headerVal, containsString(reason));
if (headerVal.contains(warnMessage)) {
failedShards++;
} else if (headerVal.contains(extraBadShards + suppressMessage)) {
hasSupressMessage = true;
}
}
}
assertEquals(maxWarningHeaders - 1, failedShards);
assertTrue(hasSupressMessage);
assertThat(failedShards, greaterThan(0));
}

static Map<String, Object> runSql(RequestObjectBuilder builder, String mode) throws IOException {
Expand Down

0 comments on commit 94ba5c7

Please sign in to comment.