Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added soft limit to open scroll contexts #25244 #36009

Merged
merged 4 commits into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_DATA_SETTING,
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -145,6 +146,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);

public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 150, 0, Property.NodeScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting should be dynamic, you should extract the value in the SearchService ctr and register an update consumer. DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS is a good example of how to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be dynamic now. Please review the changes.



private final ThreadPool threadPool;

Expand Down Expand Up @@ -182,6 +186,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final MultiBucketConsumerService multiBucketConsumerService;

private final AtomicInteger openScrollContexts = new AtomicInteger();

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
Expand Down Expand Up @@ -592,11 +598,19 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}

final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
if (request.scroll() != null && openScrollContexts.get() >= MAX_OPEN_SCROLL_CONTEXT.get(clusterService.getSettings())) {
throw new ElasticsearchException(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
MAX_OPEN_SCROLL_CONTEXT.get(clusterService.getSettings()) + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}

SearchContext context = createContext(request);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
Expand Down Expand Up @@ -696,6 +710,7 @@ public boolean freeContext(long id) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also check the scroll contexts in freeAllContextForIndex above ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't freeAllContextForIndex simply call freeContext ? Am I supposed to do any extra processing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my bad, it does call freeContext

context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
Expand Down Expand Up @@ -75,6 +77,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -416,6 +419,44 @@ searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_A
}
}

/**
* test that creating more than the allowed number of scroll contexts throws an exception
*/
public void testMaxOpenScrollContexts() throws RuntimeException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();

final SearchService service = getInstanceFromNode(SearchService.class);
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);

// Open all possible scrolls, clear some of them, then open more until the limit is reached
LinkedList<String> clearScrollIds = new LinkedList<>();

for (int i = 0; i < SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY); i++) {
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();

if (randomInt(4) == 0) clearScrollIds.addLast(searchResponse.getScrollId());
}

ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.setScrollIds(clearScrollIds);
client().clearScroll(clearScrollRequest);

for (int i = 0; i < clearScrollIds.size(); i++) {
client().prepareSearch("index").setSize(1).setScroll("1m").get();
}

ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId())));
assertEquals(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
"This limit can be set by changing the [search.max_open_scroll_context] setting.",
ex.getMessage());
}

public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
@Override
public List<QuerySpec<?>> getQueries() {
Expand Down Expand Up @@ -471,6 +512,22 @@ public String getWriteableName() {
}
}

public static class ShardScrollRequestTest extends ShardSearchLocalRequest {
private Scroll scroll;

ShardScrollRequestTest(ShardId shardId) {
super(shardId, 1, SearchType.DEFAULT, new SearchSourceBuilder(),
new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, true, null, null);

this.scroll = new Scroll(TimeValue.timeValueMinutes(1));
}

@Override
public Scroll scroll() {
return this.scroll;
}
}

public void testCanMatch() throws IOException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
Expand Down