diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 4cede421a112b..0a7cd82e38179 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -197,8 +197,7 @@ protected SearchHit nextDoc(int doc) throws IOException { BytesReference sourceRef = hit.hit().getSourceRef(); if (sourceRef != null) { - int sourceLength = sourceRef.length(); - this.accumulatedBytesInLeaf += sourceLength; + this.accumulatedBytesInLeaf += sourceRef.length(); } success = true; return hit.hit(); diff --git a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java index fd60621c7e400..8b54f67348a87 100644 --- a/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/FetchSearchPhaseTests.java @@ -9,6 +9,9 @@ package org.elasticsearch.action.search; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Query; @@ -20,16 +23,19 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.store.MockDirectoryWrapper; import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; @@ -55,6 +61,7 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.search.lookup.Source; import org.elasticsearch.search.profile.ProfileResult; import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult; import org.elasticsearch.search.profile.SearchProfileShardResult; @@ -72,10 +79,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class FetchSearchPhaseTests extends ESTestCase { @@ -820,6 +829,57 @@ public void testFetchTimeoutNoPartialResults() throws IOException { } } + public void testFetchPhaseChecksMemoryBreaker() throws IOException { + Directory dir = newDirectory(); + RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + // we're indexing 100 documents with a field that is 48KB long so the fetch phase should check the memory breaker 5 times + // (every 22 documents that accumulate 1MiB in source sizes, and then a final time when we finished processing the one segment) + + String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }"; + for (int i = 0; i < 100; i++) { + Document document = new Document(); + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); + document.add(new StoredField("_source", new BytesRef(body))); + w.addDocument(document); + } + w.forceMerge(1); + IndexReader r = w.getReader(); + w.close(); + ContextIndexSearcher contextIndexSearcher = createSearcher(r); + AtomicInteger breakerCalledCount = new AtomicInteger(0); + NoopCircuitBreaker breakingCircuitBreaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST) { + @Override + public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + breakerCalledCount.incrementAndGet(); + } + }; + try (SearchContext searchContext = createSearchContext(contextIndexSearcher, true, breakingCircuitBreaker)) { + FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() { + @Override + public void setNextReader(LeafReaderContext readerContext) throws IOException { + + } + + @Override + public void process(FetchSubPhase.HitContext hitContext) throws IOException { + Source source = hitContext.source(); + hitContext.hit().sourceRef(source.internalSourceRef()); + } + + @Override + public StoredFieldsSpec storedFieldsSpec() { + return StoredFieldsSpec.NEEDS_SOURCE; + } + })); + fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null); + assertThat(breakerCalledCount.get(), is(5)); + } finally { + r.close(); + dir.close(); + } + } + private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException { return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() { @Override @@ -857,6 +917,14 @@ public StoredFieldsSpec storedFieldsSpec() { } private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) { + return createSearchContext(contextIndexSearcher, allowPartialResults, null); + } + + private static SearchContext createSearchContext( + ContextIndexSearcher contextIndexSearcher, + boolean allowPartialResults, + @Nullable CircuitBreaker circuitBreaker + ) { IndexSettings indexSettings = new IndexSettings( IndexMetadata.builder("index") .settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())) @@ -929,6 +997,15 @@ public FetchSearchResult fetchResult() { public ShardSearchRequest request() { return request; } + + @Override + public CircuitBreaker circuitBreaker() { + if (circuitBreaker != null) { + return circuitBreaker; + } else { + return super.circuitBreaker(); + } + } }; searchContext.addReleasable(searchContext.fetchResult()::decRef); searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));