diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index d0b06942c1348..79e28d955c6f9 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -89,8 +89,7 @@ public void skipOnAborted() { */ public void testSortByManyLongsSuccess() throws IOException { initManyLongs(); - Response response = sortByManyLongs(500); - Map map = responseAsMap(response); + Map response = sortByManyLongs(500); ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long")) .item(matchesMap().entry("name", "b").entry("type", "long")); ListMatcher values = matchesList(); @@ -99,7 +98,7 @@ public void testSortByManyLongsSuccess() throws IOException { values = values.item(List.of(0, b)); } } - assertResultMap(map, columns, values); + assertResultMap(response, columns, values); } /** @@ -107,7 +106,8 @@ public void testSortByManyLongsSuccess() throws IOException { */ public void testSortByManyLongsTooMuchMemory() throws IOException { initManyLongs(); - assertCircuitBreaks(() -> sortByManyLongs(5000)); + // 5000 is plenty to break on most nodes + assertCircuitBreaks(attempt -> sortByManyLongs(attempt * 5000)); } /** @@ -191,26 +191,42 @@ public void testSortByManyLongsTooMuchMemoryAsync() throws IOException { ); } - private void assertCircuitBreaks(ThrowingRunnable r) throws IOException { - ResponseException e = expectThrows(ResponseException.class, r); - Map map = responseAsMap(e.getResponse()); - logger.info("expected circuit breaker {}", map); - assertMap( - map, + private static final int MAX_ATTEMPTS = 5; + + interface TryCircuitBreaking { + Map attempt(int attempt) throws IOException; + } + + private void assertCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOException { + assertCircuitBreaks( + tryBreaking, matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception")) ); } - private void assertFoldCircuitBreaks(ThrowingRunnable r) throws IOException { - ResponseException e = expectThrows(ResponseException.class, r); - Map map = responseAsMap(e.getResponse()); - logger.info("expected fold circuit breaking {}", map); - assertMap( - map, + private void assertFoldCircuitBreaks(TryCircuitBreaking tryBreaking) throws IOException { + assertCircuitBreaks( + tryBreaking, matchesMap().entry("status", 400).entry("error", matchesMap().extraOk().entry("type", "fold_too_much_memory_exception")) ); } + private void assertCircuitBreaks(TryCircuitBreaking tryBreaking, MapMatcher responseMatcher) throws IOException { + int attempt = 1; + while (attempt <= MAX_ATTEMPTS) { + try { + Map response = tryBreaking.attempt(attempt); + logger.warn("{}: should circuit broken but got {}", attempt, response); + attempt++; + } catch (ResponseException e) { + Map map = responseAsMap(e.getResponse()); + assertMap(map, responseMatcher); + return; + } + } + fail("giving up circuit breaking after " + attempt + " attempts"); + } + private void assertParseFailure(ThrowingRunnable r) throws IOException { ResponseException e = expectThrows(ResponseException.class, r); Map map = responseAsMap(e.getResponse()); @@ -218,9 +234,9 @@ private void assertParseFailure(ThrowingRunnable r) throws IOException { assertMap(map, matchesMap().entry("status", 400).entry("error", matchesMap().extraOk().entry("type", "parsing_exception"))); } - private Response sortByManyLongs(int count) throws IOException { + private Map sortByManyLongs(int count) throws IOException { logger.info("sorting by {} longs", count); - return query(makeSortByManyLongs(count).toString(), null); + return responseAsMap(query(makeSortByManyLongs(count).toString(), null)); } private StringBuilder makeSortByManyLongs(int count) { @@ -318,8 +334,7 @@ private Response concat(int evals) throws IOException { public void testManyConcat() throws IOException { int strings = 300; initManyLongs(); - Response resp = manyConcat("FROM manylongs", strings); - assertManyStrings(resp, strings); + assertManyStrings(manyConcat("FROM manylongs", strings), strings); } /** @@ -327,7 +342,8 @@ public void testManyConcat() throws IOException { */ public void testHugeManyConcat() throws IOException { initManyLongs(); - assertCircuitBreaks(() -> manyConcat("FROM manylongs", 2000)); + // 2000 is plenty to break on most nodes + assertCircuitBreaks(attempt -> manyConcat("FROM manylongs", attempt * 2000)); } /** @@ -335,18 +351,18 @@ public void testHugeManyConcat() throws IOException { */ public void testManyConcatFromRow() throws IOException { int strings = 2000; - Response resp = manyConcat("ROW a=9999, b=9999, c=9999, d=9999, e=9999", strings); - assertManyStrings(resp, strings); + assertManyStrings(manyConcat("ROW a=9999, b=9999, c=9999, d=9999, e=9999", strings), strings); } /** * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyConcatFromRow() throws IOException { + // 5000 is plenty to break on most nodes assertFoldCircuitBreaks( - () -> manyConcat( + attempt -> manyConcat( "ROW a=9999999999999, b=99999999999999999, c=99999999999999999, d=99999999999999999, e=99999999999999999", - 5000 + attempt * 5000 ) ); } @@ -361,7 +377,7 @@ public void testHugeHugeManyConcatFromRow() throws IOException { /** * Tests that generate many moderately long strings. */ - private Response manyConcat(String init, int strings) throws IOException { + private Map manyConcat(String init, int strings) throws IOException { StringBuilder query = startQuery(); query.append(init).append(" | EVAL str = CONCAT("); query.append( @@ -388,7 +404,7 @@ private Response manyConcat(String init, int strings) throws IOException { query.append("str").append(s); } query.append("\"}"); - return query(query.toString(), "columns"); + return responseAsMap(query(query.toString(), "columns")); } /** @@ -397,8 +413,7 @@ private Response manyConcat(String init, int strings) throws IOException { public void testManyRepeat() throws IOException { int strings = 30; initManyLongs(); - Response resp = manyRepeat("FROM manylongs", strings); - assertManyStrings(resp, 30); + assertManyStrings(manyRepeat("FROM manylongs", strings), 30); } /** @@ -406,7 +421,8 @@ public void testManyRepeat() throws IOException { */ public void testHugeManyRepeat() throws IOException { initManyLongs(); - assertCircuitBreaks(() -> manyRepeat("FROM manylongs", 75)); + // 75 is plenty to break on most nodes + assertCircuitBreaks(attempt -> manyRepeat("FROM manylongs", attempt * 75)); } /** @@ -414,15 +430,15 @@ public void testHugeManyRepeat() throws IOException { */ public void testManyRepeatFromRow() throws IOException { int strings = 300; - Response resp = manyRepeat("ROW a = 99", strings); - assertManyStrings(resp, strings); + assertManyStrings(manyRepeat("ROW a = 99", strings), strings); } /** * Hits a circuit breaker by building many moderately long strings. */ public void testHugeManyRepeatFromRow() throws IOException { - assertFoldCircuitBreaks(() -> manyRepeat("ROW a = 99", 400)); + // 400 is enough to break on most nodes + assertFoldCircuitBreaks(attempt -> manyRepeat("ROW a = 99", attempt * 400)); } /** @@ -435,7 +451,7 @@ public void testHugeHugeManyRepeatFromRow() throws IOException { /** * Tests that generate many moderately long strings. */ - private Response manyRepeat(String init, int strings) throws IOException { + private Map manyRepeat(String init, int strings) throws IOException { StringBuilder query = startQuery(); query.append(init).append(" | EVAL str = TO_STRING(a)"); for (int s = 0; s < strings; s++) { @@ -449,23 +465,21 @@ private Response manyRepeat(String init, int strings) throws IOException { query.append("str").append(s); } query.append("\"}"); - return query(query.toString(), "columns"); + return responseAsMap(query(query.toString(), "columns")); } - private void assertManyStrings(Response resp, int strings) throws IOException { - Map map = responseAsMap(resp); + private void assertManyStrings(Map resp, int strings) throws IOException { ListMatcher columns = matchesList(); for (int s = 0; s < strings; s++) { columns = columns.item(matchesMap().entry("name", "str" + s).entry("type", "keyword")); } MapMatcher mapMatcher = matchesMap(); - assertMap(map, mapMatcher.entry("columns", columns)); + assertMap(resp, mapMatcher.entry("columns", columns)); } public void testManyEval() throws IOException { initManyLongs(); - Response resp = manyEval(1); - Map map = responseAsMap(resp); + Map response = manyEval(1); ListMatcher columns = matchesList(); columns = columns.item(matchesMap().entry("name", "a").entry("type", "long")); columns = columns.item(matchesMap().entry("name", "b").entry("type", "long")); @@ -475,15 +489,16 @@ public void testManyEval() throws IOException { for (int i = 0; i < 20; i++) { columns = columns.item(matchesMap().entry("name", "i0" + i).entry("type", "long")); } - assertResultMap(map, columns, hasSize(10_000)); + assertResultMap(response, columns, hasSize(10_000)); } public void testTooManyEval() throws IOException { initManyLongs(); - assertCircuitBreaks(() -> manyEval(490)); + // 490 is plenty to fail on most nodes + assertCircuitBreaks(attempt -> manyEval(attempt * 490)); } - private Response manyEval(int evalLines) throws IOException { + private Map manyEval(int evalLines) throws IOException { StringBuilder query = startQuery(); query.append("FROM manylongs"); for (int e = 0; e < evalLines; e++) { @@ -496,7 +511,7 @@ private Response manyEval(int evalLines) throws IOException { } } query.append("\n| LIMIT 10000\"}"); - return query(query.toString(), null); + return responseAsMap(query(query.toString(), null)); } private Response query(String query, String filterPath) throws IOException { @@ -554,76 +569,75 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE public void testFetchManyBigFields() throws IOException { initManyBigFieldsIndex(100); - fetchManyBigFields(100); + Map response = fetchManyBigFields(100); + ListMatcher columns = matchesList(); + for (int f = 0; f < 1000; f++) { + columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword")); + } + assertMap(response, matchesMap().entry("columns", columns)); } public void testFetchTooManyBigFields() throws IOException { initManyBigFieldsIndex(500); - assertCircuitBreaks(() -> fetchManyBigFields(500)); + // 500 docs is plenty to circuit break on most nodes + assertCircuitBreaks(attempt -> fetchManyBigFields(attempt * 500)); } /** * Fetches documents containing 1000 fields which are {@code 1kb} each. */ - private void fetchManyBigFields(int docs) throws IOException { + private Map fetchManyBigFields(int docs) throws IOException { StringBuilder query = startQuery(); query.append("FROM manybigfields | SORT f000 | LIMIT " + docs + "\"}"); - Response response = query(query.toString(), "columns"); - Map map = responseAsMap(response); - ListMatcher columns = matchesList(); - for (int f = 0; f < 1000; f++) { - columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", "keyword")); - } - assertMap(map, matchesMap().entry("columns", columns)); + return responseAsMap(query(query.toString(), "columns")); } public void testAggMvLongs() throws IOException { int fieldValues = 100; initMvLongsIndex(1, 3, fieldValues); - Response response = aggMvLongs(3); - Map map = responseAsMap(response); + Map response = aggMvLongs(3); ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(f00)").entry("type", "long")) .item(matchesMap().entry("name", "f00").entry("type", "long")) .item(matchesMap().entry("name", "f01").entry("type", "long")) .item(matchesMap().entry("name", "f02").entry("type", "long")); - assertMap(map, matchesMap().entry("columns", columns)); + assertMap(response, matchesMap().entry("columns", columns)); } public void testAggTooManyMvLongs() throws IOException { initMvLongsIndex(1, 3, 1000); - assertCircuitBreaks(() -> aggMvLongs(3)); + // 3 fields is plenty on most nodes + assertCircuitBreaks(attempt -> aggMvLongs(attempt * 3)); } - private Response aggMvLongs(int fields) throws IOException { + private Map aggMvLongs(int fields) throws IOException { StringBuilder query = startQuery(); query.append("FROM mv_longs | STATS MAX(f00) BY f00"); for (int f = 1; f < fields; f++) { query.append(", f").append(String.format(Locale.ROOT, "%02d", f)); } - return query(query.append("\"}").toString(), "columns"); + return responseAsMap(query(query.append("\"}").toString(), "columns")); } public void testFetchMvLongs() throws IOException { int fields = 100; initMvLongsIndex(100, fields, 1000); - Response response = fetchMvLongs(); - Map map = responseAsMap(response); + Map response = fetchMvLongs(); ListMatcher columns = matchesList(); for (int f = 0; f < fields; f++) { columns = columns.item(matchesMap().entry("name", String.format(Locale.ROOT, "f%02d", f)).entry("type", "long")); } - assertMap(map, matchesMap().entry("columns", columns)); + assertMap(response, matchesMap().entry("columns", columns)); } public void testFetchTooManyMvLongs() throws IOException { initMvLongsIndex(500, 100, 1000); - assertCircuitBreaks(() -> fetchMvLongs()); + assertCircuitBreaks(attempt -> fetchMvLongs()); } - private Response fetchMvLongs() throws IOException { + private Map fetchMvLongs() throws IOException { StringBuilder query = startQuery(); query.append("FROM mv_longs\"}"); - return query(query.toString(), "columns"); + return responseAsMap(query(query.toString(), "columns")); } public void testLookupExplosion() throws IOException { @@ -634,11 +648,8 @@ public void testLookupExplosion() throws IOException { } public void testLookupExplosionManyMatches() throws IOException { - assertCircuitBreaks(() -> { - // 1500, 10000 is enough locally, but some CI machines need more. - Map result = lookupExplosion(2000, 10000); - logger.error("should have failed but got {}", result); - }); + // 1500, 10000 is enough locally, but some CI machines need more. + assertCircuitBreaks(attempt -> lookupExplosion(attempt * 1500, 10000)); } public void testLookupExplosionNoFetch() throws IOException { @@ -649,10 +660,8 @@ public void testLookupExplosionNoFetch() throws IOException { } public void testLookupExplosionNoFetchManyMatches() throws IOException { - assertCircuitBreaks(() -> { - Map result = lookupExplosionNoFetch(8500, 10000); - logger.error("should have failed but got {}", result); - }); + // 8500 is plenty on most nodes + assertCircuitBreaks(attempt -> lookupExplosionNoFetch(attempt * 8500, 10000)); } public void testLookupExplosionBigString() throws IOException { @@ -663,25 +672,31 @@ public void testLookupExplosionBigString() throws IOException { } public void testLookupExplosionBigStringManyMatches() throws IOException { - assertCircuitBreaks(() -> { - // 500, 1 is enough to make it fail locally but some CI needs more - Map result = lookupExplosionBigString(800, 1); - logger.error("should have failed but got {}", result); - }); + // 500, 1 is enough to make it fail locally but some CI needs more + assertCircuitBreaks(attempt -> lookupExplosionBigString(attempt * 500, 1)); } - private Map lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException { - lookupExplosionData(sensorDataCount, lookupEntries); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}"); - return responseAsMap(query(query.toString(), null)); + private Map lookupExplosion(int sensorDataCount, int lookupEntries) throws IOException { + try { + lookupExplosionData(sensorDataCount, lookupEntries); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(location)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + } } - private Map lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException { - lookupExplosionData(sensorDataCount, lookupEntries); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}"); - return responseAsMap(query(query.toString(), null)); + private Map lookupExplosionNoFetch(int sensorDataCount, int lookupEntries) throws IOException { + try { + lookupExplosionData(sensorDataCount, lookupEntries); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(*)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } } private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws IOException { @@ -689,20 +704,25 @@ private void lookupExplosionData(int sensorDataCount, int lookupEntries) throws initSensorLookup(lookupEntries, 1, i -> "73.9857 40.7484"); } - private Map lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException { - initSensorData(sensorDataCount, 1); - initSensorLookupString(lookupEntries, 1, i -> { - int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); - StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes())); - while (str.length() < target) { - str.append("Lorem ipsum dolor sit amet, consectetur adipiscing elit."); - } - logger.info("big string is {} characters", str.length()); - return str.toString(); - }); - StringBuilder query = startQuery(); - query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}"); - return responseAsMap(query(query.toString(), null)); + private Map lookupExplosionBigString(int sensorDataCount, int lookupEntries) throws IOException { + try { + initSensorData(sensorDataCount, 1); + initSensorLookupString(lookupEntries, 1, i -> { + int target = Math.toIntExact(ByteSizeValue.ofMb(1).getBytes()); + StringBuilder str = new StringBuilder(Math.toIntExact(ByteSizeValue.ofMb(2).getBytes())); + while (str.length() < target) { + str.append("Lorem ipsum dolor sit amet, consectetur adipiscing elit."); + } + logger.info("big string is {} characters", str.length()); + return str.toString(); + }); + StringBuilder query = startQuery(); + query.append("FROM sensor_data | LOOKUP JOIN sensor_lookup ON id | STATS COUNT(string)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); + } } public void testEnrichExplosion() throws IOException { @@ -713,22 +733,25 @@ public void testEnrichExplosion() throws IOException { } public void testEnrichExplosionManyMatches() throws IOException { - assertCircuitBreaks(() -> { - Map result = enrichExplosion(3000, 10000); - logger.error("should have failed but got {}", result); - }); + // 1000, 10000 is enough on most nodes + assertCircuitBreaks(attempt -> enrichExplosion(1000, attempt * 5000)); } - private Map enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException { - initSensorData(sensorDataCount, 1); - initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484"); + private Map enrichExplosion(int sensorDataCount, int lookupEntries) throws IOException { try { - StringBuilder query = startQuery(); - query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}"); - return responseAsMap(query(query.toString(), null)); + initSensorData(sensorDataCount, 1); + initSensorEnrich(lookupEntries, 1, i -> "73.9857 40.7484"); + try { + StringBuilder query = startQuery(); + query.append("FROM sensor_data | ENRICH sensor ON id | STATS COUNT(*)\"}"); + return responseAsMap(query(query.toString(), null)); + } finally { + Request delete = new Request("DELETE", "/_enrich/policy/sensor"); + assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true)); + } } finally { - Request delete = new Request("DELETE", "/_enrich/policy/sensor"); - assertMap(responseAsMap(client().performRequest(delete)), matchesMap().entry("acknowledged", true)); + deleteIndex("sensor_data"); + deleteIndex("sensor_lookup"); } }