diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index ccd8fea012035..678155c656170 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -16,7 +16,6 @@
-
@@ -428,7 +427,6 @@
-
diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
index fa9980977f4f1..a354bdfb7ba5a 100644
--- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
+++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java
@@ -371,14 +371,14 @@ public void clearScrollAsync(ClearScrollRequest clearScrollRequest, ActionListen
listener, emptySet(), headers);
}
- private Resp performRequestAndParseEntity(Req request,
+ protected Resp performRequestAndParseEntity(Req request,
CheckedFunction requestConverter,
CheckedFunction entityParser,
Set ignores, Header... headers) throws IOException {
return performRequest(request, requestConverter, (response) -> parseEntity(response.getEntity(), entityParser), ignores, headers);
}
- Resp performRequest(Req request,
+ protected Resp performRequest(Req request,
CheckedFunction requestConverter,
CheckedFunction responseConverter,
Set ignores, Header... headers) throws IOException {
@@ -408,7 +408,7 @@ Resp performRequest(Req request,
}
}
- private void performRequestAsyncAndParseEntity(Req request,
+ protected void performRequestAsyncAndParseEntity(Req request,
CheckedFunction requestConverter,
CheckedFunction entityParser,
ActionListener listener, Set ignores, Header... headers) {
@@ -416,7 +416,7 @@ private void performRequestAsyncAndParseEntity
listener, ignores, headers);
}
- void performRequestAsync(Req request,
+ protected void performRequestAsync(Req request,
CheckedFunction requestConverter,
CheckedFunction responseConverter,
ActionListener listener, Set ignores, Header... headers) {
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java
new file mode 100644
index 0000000000000..8ad42c2232020
--- /dev/null
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CustomRestHighLevelClientTests.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.RequestLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.message.BasicHttpResponse;
+import org.apache.http.message.BasicRequestLine;
+import org.apache.http.message.BasicStatusLine;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.Build;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.main.MainResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static org.elasticsearch.client.ESRestHighLevelClientTestCase.execute;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMapOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test and demonstrates how {@link RestHighLevelClient} can be extended to support custom endpoints.
+ */
+public class CustomRestHighLevelClientTests extends ESTestCase {
+
+ private static final String ENDPOINT = "/_custom";
+
+ private CustomRestClient restHighLevelClient;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void initClients() throws IOException {
+ if (restHighLevelClient == null) {
+ final RestClient restClient = mock(RestClient.class);
+ restHighLevelClient = new CustomRestClient(restClient);
+
+ doAnswer(mock -> mockPerformRequest((Header) mock.getArguments()[4]))
+ .when(restClient)
+ .performRequest(eq(HttpGet.METHOD_NAME), eq(ENDPOINT), anyMapOf(String.class, String.class), anyObject(), anyVararg());
+
+ doAnswer(mock -> mockPerformRequestAsync((Header) mock.getArguments()[5], (ResponseListener) mock.getArguments()[4]))
+ .when(restClient)
+ .performRequestAsync(eq(HttpGet.METHOD_NAME), eq(ENDPOINT), anyMapOf(String.class, String.class),
+ any(HttpEntity.class), any(ResponseListener.class), anyVararg());
+ }
+ }
+
+ public void testCustomEndpoint() throws IOException {
+ final MainRequest request = new MainRequest();
+ final Header header = new BasicHeader("node_name", randomAlphaOfLengthBetween(1, 10));
+
+ MainResponse response = execute(request, restHighLevelClient::custom, restHighLevelClient::customAsync, header);
+ assertEquals(header.getValue(), response.getNodeName());
+
+ response = execute(request, restHighLevelClient::customAndParse, restHighLevelClient::customAndParseAsync, header);
+ assertEquals(header.getValue(), response.getNodeName());
+ }
+
+ /**
+ * The {@link RestHighLevelClient} must declare the following execution methods using the protected
modifier
+ * so that they can be used by subclasses to implement custom logic.
+ */
+ @SuppressForbidden(reason = "We're forced to uses Class#getDeclaredMethods() here because this test checks protected methods")
+ public void testMethodsVisibility() throws ClassNotFoundException {
+ String[] methodNames = new String[]{"performRequest", "performRequestAndParseEntity", "performRequestAsync",
+ "performRequestAsyncAndParseEntity"};
+ for (String methodName : methodNames) {
+ boolean found = false;
+ for (Method method : RestHighLevelClient.class.getDeclaredMethods()) {
+ if (method.getName().equals(methodName)) {
+ assertTrue("Method " + methodName + " must be protected", Modifier.isProtected(method.getModifiers()));
+ found = true;
+ }
+ }
+ assertTrue("Failed to find method " + methodName, found);
+ }
+ }
+
+ /**
+ * Mocks the asynchronous request execution by calling the {@link #mockPerformRequest(Header)} method.
+ */
+ private Void mockPerformRequestAsync(Header httpHeader, ResponseListener responseListener) {
+ try {
+ responseListener.onSuccess(mockPerformRequest(httpHeader));
+ } catch (IOException e) {
+ responseListener.onFailure(e);
+ }
+ return null;
+ }
+
+ /**
+ * Mocks the synchronous request execution like if it was executed by Elasticsearch.
+ */
+ private Response mockPerformRequest(Header httpHeader) throws IOException {
+ ProtocolVersion protocol = new ProtocolVersion("HTTP", 1, 1);
+ HttpResponse httpResponse = new BasicHttpResponse(new BasicStatusLine(protocol, 200, "OK"));
+
+ MainResponse response = new MainResponse(httpHeader.getValue(), Version.CURRENT, ClusterName.DEFAULT, "_na", Build.CURRENT, true);
+ BytesRef bytesRef = XContentHelper.toXContent(response, XContentType.JSON, false).toBytesRef();
+ httpResponse.setEntity(new ByteArrayEntity(bytesRef.bytes, ContentType.APPLICATION_JSON));
+
+ RequestLine requestLine = new BasicRequestLine(HttpGet.METHOD_NAME, ENDPOINT, protocol);
+ return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse);
+ }
+
+ /**
+ * A custom high level client that provides custom methods to execute a request and get its associate response back.
+ */
+ static class CustomRestClient extends RestHighLevelClient {
+
+ private CustomRestClient(RestClient restClient) {
+ super(restClient);
+ }
+
+ MainResponse custom(MainRequest mainRequest, Header... headers) throws IOException {
+ return performRequest(mainRequest, this::toRequest, this::toResponse, emptySet(), headers);
+ }
+
+ MainResponse customAndParse(MainRequest mainRequest, Header... headers) throws IOException {
+ return performRequestAndParseEntity(mainRequest, this::toRequest, MainResponse::fromXContent, emptySet(), headers);
+ }
+
+ void customAsync(MainRequest mainRequest, ActionListener listener, Header... headers) {
+ performRequestAsync(mainRequest, this::toRequest, this::toResponse, listener, emptySet(), headers);
+ }
+
+ void customAndParseAsync(MainRequest mainRequest, ActionListener listener, Header... headers) {
+ performRequestAsyncAndParseEntity(mainRequest, this::toRequest, MainResponse::fromXContent, listener, emptySet(), headers);
+ }
+
+ Request toRequest(MainRequest mainRequest) throws IOException {
+ return new Request(HttpGet.METHOD_NAME, ENDPOINT, emptyMap(), null);
+ }
+
+ MainResponse toResponse(Response response) throws IOException {
+ return parseEntity(response.getEntity(), MainResponse::fromXContent);
+ }
+ }
+}
\ No newline at end of file
diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java
index 5667053d914b6..328f2ee32f557 100644
--- a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java
+++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java
@@ -174,7 +174,6 @@ public void testSearchWithRangeAgg() throws IOException {
assertSearchHeader(searchResponse);
assertNull(searchResponse.getSuggest());
assertEquals(Collections.emptyMap(), searchResponse.getProfileResults());
- assertThat(searchResponse.getTook().nanos(), greaterThan(0L));
assertEquals(5, searchResponse.getHits().totalHits);
assertEquals(0, searchResponse.getHits().getHits().length);
assertEquals(0f, searchResponse.getHits().getMaxScore(), 0f);
@@ -257,7 +256,6 @@ public void testSearchWithMatrixStats() throws IOException {
assertSearchHeader(searchResponse);
assertNull(searchResponse.getSuggest());
assertEquals(Collections.emptyMap(), searchResponse.getProfileResults());
- assertThat(searchResponse.getTook().nanos(), greaterThan(0L));
assertEquals(5, searchResponse.getHits().totalHits);
assertEquals(0, searchResponse.getHits().getHits().length);
assertEquals(0f, searchResponse.getHits().getMaxScore(), 0f);
@@ -337,7 +335,6 @@ public void testSearchWithParentJoin() throws IOException {
assertSearchHeader(searchResponse);
assertNull(searchResponse.getSuggest());
assertEquals(Collections.emptyMap(), searchResponse.getProfileResults());
- assertThat(searchResponse.getTook().nanos(), greaterThan(0L));
assertEquals(3, searchResponse.getHits().totalHits);
assertEquals(0, searchResponse.getHits().getHits().length);
assertEquals(0f, searchResponse.getHits().getMaxScore(), 0f);
diff --git a/core/build.gradle b/core/build.gradle
index 230fb5a731913..2e2a7fc2fde57 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -81,7 +81,7 @@ dependencies {
compile "com.vividsolutions:jts:${versions.jts}", optional
// logging
- compile "org.apache.logging.log4j:log4j-api:${versions.log4j}", optional
+ compile "org.apache.logging.log4j:log4j-api:${versions.log4j}"
compile "org.apache.logging.log4j:log4j-core:${versions.log4j}", optional
// to bridge dependencies that are still on Log4j 1 to Log4j 2
compile "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}", optional
diff --git a/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java b/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
index dbad7e0bf721b..cd5da674b8e71 100644
--- a/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
+++ b/core/src/main/java/org/apache/lucene/queries/BlendedTermQuery.java
@@ -296,27 +296,6 @@ public int hashCode() {
return Objects.hash(classHash(), Arrays.hashCode(equalsTerms()));
}
- public static BlendedTermQuery booleanBlendedQuery(Term[] terms) {
- return booleanBlendedQuery(terms, null);
- }
-
- public static BlendedTermQuery booleanBlendedQuery(Term[] terms, final float[] boosts) {
- return new BlendedTermQuery(terms, boosts) {
- @Override
- protected Query topLevelQuery(Term[] terms, TermContext[] ctx, int[] docFreqs, int maxDoc) {
- BooleanQuery.Builder booleanQueryBuilder = new BooleanQuery.Builder();
- for (int i = 0; i < terms.length; i++) {
- Query query = new TermQuery(terms[i], ctx[i]);
- if (boosts != null && boosts[i] != 1f) {
- query = new BoostQuery(query, boosts[i]);
- }
- booleanQueryBuilder.add(query, BooleanClause.Occur.SHOULD);
- }
- return booleanQueryBuilder.build();
- }
- };
- }
-
public static BlendedTermQuery commonTermsBlendedQuery(Term[] terms, final float[] boosts, final float maxTermFrequency) {
return new BlendedTermQuery(terms, boosts) {
@Override
diff --git a/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java b/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java
index 947c7cf3ccd0a..07f646a89d1cc 100644
--- a/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java
+++ b/core/src/main/java/org/apache/lucene/queryparser/classic/MapperQueryParser.java
@@ -26,6 +26,7 @@
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BoostQuery;
import org.apache.lucene.search.DisjunctionMaxQuery;
import org.apache.lucene.search.FuzzyQuery;
@@ -155,31 +156,20 @@ public Query getFieldQuery(String field, String queryText, boolean quoted) throw
// if there is no match in the mappings.
return new MatchNoDocsQuery("empty fields");
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getFieldQuerySingle(mField, queryText, quoted);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getFieldQuerySingle(mField, queryText, quoted);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getFieldQuerySingle(mField, queryText, quoted);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return getFieldQuerySingle(field, queryText, quoted);
}
@@ -255,33 +245,21 @@ private Query getFieldQuerySingle(String field, String queryText, boolean quoted
protected Query getFieldQuery(String field, String queryText, int slop) throws ParseException {
Collection fields = extractMultiFields(field);
if (fields != null) {
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = super.getFieldQuery(mField, queryText, slop);
- if (q != null) {
- added = true;
- q = applySlop(q, slop);
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = super.getFieldQuery(mField, queryText, slop);
- if (q != null) {
- q = applySlop(q, slop);
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = super.getFieldQuery(mField, queryText, slop);
+ if (q != null) {
+ added = true;
+ q = applySlop(q, slop);
+ queries.add(applyBoost(mField, q));
}
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return super.getFieldQuery(field, queryText, slop);
}
@@ -308,31 +286,20 @@ protected Query getRangeQuery(String field, String part1, String part2,
return getRangeQuerySingle(fields.iterator().next(), part1, part2, startInclusive, endInclusive, context);
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getRangeQuerySingle(mField, part1, part2, startInclusive, endInclusive, context);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getRangeQuerySingle(mField, part1, part2, startInclusive, endInclusive, context);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getRangeQuerySingle(mField, part1, part2, startInclusive, endInclusive, context);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
- }
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
}
private Query getRangeQuerySingle(String field, String part1, String part2,
@@ -367,30 +334,20 @@ protected Query getFuzzyQuery(String field, String termStr, String minSimilarity
if (fields.size() == 1) {
return getFuzzyQuerySingle(fields.iterator().next(), termStr, minSimilarity);
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getFuzzyQuerySingle(mField, termStr, minSimilarity);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getFuzzyQuerySingle(mField, termStr, minSimilarity);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getFuzzyQuerySingle(mField, termStr, minSimilarity);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return getFuzzyQuerySingle(field, termStr, minSimilarity);
}
@@ -430,31 +387,20 @@ protected Query getPrefixQuery(String field, String termStr) throws ParseExcepti
if (fields.size() == 1) {
return getPrefixQuerySingle(fields.iterator().next(), termStr);
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getPrefixQuerySingle(mField, termStr);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getPrefixQuerySingle(mField, termStr);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getPrefixQuerySingle(mField, termStr);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return getPrefixQuerySingle(field, termStr);
}
@@ -592,31 +538,20 @@ protected Query getWildcardQuery(String field, String termStr) throws ParseExcep
if (fields.size() == 1) {
return getWildcardQuerySingle(fields.iterator().next(), termStr);
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getWildcardQuerySingle(mField, termStr);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getWildcardQuerySingle(mField, termStr);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getWildcardQuerySingle(mField, termStr);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return getWildcardQuerySingle(field, termStr);
}
@@ -656,31 +591,20 @@ protected Query getRegexpQuery(String field, String termStr) throws ParseExcepti
if (fields.size() == 1) {
return getRegexpQuerySingle(fields.iterator().next(), termStr);
}
- if (settings.useDisMax()) {
- List queries = new ArrayList<>();
- boolean added = false;
- for (String mField : fields) {
- Query q = getRegexpQuerySingle(mField, termStr);
- if (q != null) {
- added = true;
- queries.add(applyBoost(mField, q));
- }
- }
- if (!added) {
- return null;
- }
- return new DisjunctionMaxQuery(queries, settings.tieBreaker());
- } else {
- List clauses = new ArrayList<>();
- for (String mField : fields) {
- Query q = getRegexpQuerySingle(mField, termStr);
- if (q != null) {
- clauses.add(new BooleanClause(applyBoost(mField, q), BooleanClause.Occur.SHOULD));
- }
+ float tiebreaker = settings.useDisMax() ? settings.tieBreaker() : 1.0f;
+ List queries = new ArrayList<>();
+ boolean added = false;
+ for (String mField : fields) {
+ Query q = getRegexpQuerySingle(mField, termStr);
+ if (q != null) {
+ added = true;
+ queries.add(applyBoost(mField, q));
}
- if (clauses.isEmpty()) return null; // happens for stopwords
- return getBooleanQuery(clauses);
}
+ if (!added) {
+ return null;
+ }
+ return new DisjunctionMaxQuery(queries, tiebreaker);
} else {
return getRegexpQuerySingle(field, termStr);
}
diff --git a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java
index e48773389021c..cbcd1e3a4117d 100644
--- a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java
+++ b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingDocValuesSource.java
@@ -40,7 +40,7 @@
abstract class CollapsingDocValuesSource extends GroupSelector {
protected final String field;
- CollapsingDocValuesSource(String field) throws IOException {
+ CollapsingDocValuesSource(String field) {
this.field = field;
}
@@ -58,7 +58,7 @@ static class Numeric extends CollapsingDocValuesSource {
private long value;
private boolean hasValue;
- Numeric(String field) throws IOException {
+ Numeric(String field) {
super(field);
}
@@ -148,7 +148,7 @@ static class Keyword extends CollapsingDocValuesSource {
private SortedDocValues values;
private int ord;
- Keyword(String field) throws IOException {
+ Keyword(String field) {
super(field);
}
diff --git a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java
index b5cb02bcd6536..fedda3ead596b 100644
--- a/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java
+++ b/core/src/main/java/org/apache/lucene/search/grouping/CollapsingTopDocsCollector.java
@@ -46,7 +46,7 @@ public final class CollapsingTopDocsCollector extends FirstPassGroupingCollec
private final boolean trackMaxScore;
CollapsingTopDocsCollector(GroupSelector groupSelector, String collapseField, Sort sort,
- int topN, boolean trackMaxScore) throws IOException {
+ int topN, boolean trackMaxScore) {
super(groupSelector, sort, topN);
this.collapseField = collapseField;
this.trackMaxScore = trackMaxScore;
@@ -60,7 +60,7 @@ public final class CollapsingTopDocsCollector extends FirstPassGroupingCollec
/**
* Transform {@link FirstPassGroupingCollector#getTopGroups(int, boolean)} output in
- * {@link CollapseTopFieldDocs}. The collapsing needs only one pass so we can create the final top docs at the end
+ * {@link CollapseTopFieldDocs}. The collapsing needs only one pass so we can get the final top docs at the end
* of the first pass.
*/
public CollapseTopFieldDocs getTopDocs() throws IOException {
@@ -132,10 +132,9 @@ public void collect(int doc) throws IOException {
* This must be non-null, ie, if you want to groupSort by relevance
* use Sort.RELEVANCE.
* @param topN How many top groups to keep.
- * @throws IOException When I/O related errors occur
*/
public static CollapsingTopDocsCollector> createNumeric(String collapseField, Sort sort,
- int topN, boolean trackMaxScore) throws IOException {
+ int topN, boolean trackMaxScore) {
return new CollapsingTopDocsCollector<>(new CollapsingDocValuesSource.Numeric(collapseField),
collapseField, sort, topN, trackMaxScore);
}
@@ -152,12 +151,10 @@ public static CollapsingTopDocsCollector> createNumeric(String collapseField,
* document per collapsed key.
* This must be non-null, ie, if you want to groupSort by relevance use Sort.RELEVANCE.
* @param topN How many top groups to keep.
- * @throws IOException When I/O related errors occur
*/
public static CollapsingTopDocsCollector> createKeyword(String collapseField, Sort sort,
- int topN, boolean trackMaxScore) throws IOException {
+ int topN, boolean trackMaxScore) {
return new CollapsingTopDocsCollector<>(new CollapsingDocValuesSource.Keyword(collapseField),
collapseField, sort, topN, trackMaxScore);
}
}
-
diff --git a/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPassageFormatter.java b/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPassageFormatter.java
deleted file mode 100644
index a33bf16dee4c7..0000000000000
--- a/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPassageFormatter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.lucene.search.postingshighlight;
-
-import org.apache.lucene.search.highlight.Snippet;
-import org.apache.lucene.search.highlight.Encoder;
-import org.elasticsearch.search.fetch.subphase.highlight.HighlightUtils;
-
-/**
-Custom passage formatter that allows us to:
-1) extract different snippets (instead of a single big string) together with their scores ({@link Snippet})
-2) use the {@link Encoder} implementations that are already used with the other highlighters
- */
-public class CustomPassageFormatter extends PassageFormatter {
-
- private final String preTag;
- private final String postTag;
- private final Encoder encoder;
-
- public CustomPassageFormatter(String preTag, String postTag, Encoder encoder) {
- this.preTag = preTag;
- this.postTag = postTag;
- this.encoder = encoder;
- }
-
- @Override
- public Snippet[] format(Passage[] passages, String content) {
- Snippet[] snippets = new Snippet[passages.length];
- int pos;
- for (int j = 0; j < passages.length; j++) {
- Passage passage = passages[j];
- StringBuilder sb = new StringBuilder();
- pos = passage.getStartOffset();
- for (int i = 0; i < passage.getNumMatches(); i++) {
- int start = passage.getMatchStarts()[i];
- int end = passage.getMatchEnds()[i];
- // its possible to have overlapping terms
- if (start > pos) {
- append(sb, content, pos, start);
- }
- if (end > pos) {
- sb.append(preTag);
- append(sb, content, Math.max(pos, start), end);
- sb.append(postTag);
- pos = end;
- }
- }
- // its possible a "term" from the analyzer could span a sentence boundary.
- append(sb, content, pos, Math.max(pos, passage.getEndOffset()));
- //we remove the paragraph separator if present at the end of the snippet (we used it as separator between values)
- if (sb.charAt(sb.length() - 1) == HighlightUtils.PARAGRAPH_SEPARATOR) {
- sb.deleteCharAt(sb.length() - 1);
- } else if (sb.charAt(sb.length() - 1) == HighlightUtils.NULL_SEPARATOR) {
- sb.deleteCharAt(sb.length() - 1);
- }
- //and we trim the snippets too
- snippets[j] = new Snippet(sb.toString().trim(), passage.getScore(), passage.getNumMatches() > 0);
- }
- return snippets;
- }
-
- protected void append(StringBuilder dest, String content, int start, int end) {
- dest.append(encoder.encodeText(content.substring(start, end)));
- }
-}
diff --git a/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPostingsHighlighter.java b/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPostingsHighlighter.java
deleted file mode 100644
index ac90a3e57aee7..0000000000000
--- a/core/src/main/java/org/apache/lucene/search/postingshighlight/CustomPostingsHighlighter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.lucene.search.postingshighlight;
-
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.highlight.Snippet;
-
-import java.io.IOException;
-import java.text.BreakIterator;
-import java.util.Map;
-
-/**
- * Subclass of the {@link PostingsHighlighter} that works for a single field in a single document.
- * Uses a custom {@link PassageFormatter}. Accepts field content as a constructor argument, given that loading
- * is custom and can be done reading from _source field. Supports using different {@link BreakIterator} to break
- * the text into fragments. Considers every distinct field value as a discrete passage for highlighting (unless
- * the whole content needs to be highlighted). Supports both returning empty snippets and non highlighted snippets
- * when no highlighting can be performed.
- *
- * The use that we make of the postings highlighter is not optimal. It would be much better to highlight
- * multiple docs in a single call, as we actually lose its sequential IO. That would require to
- * refactor the elasticsearch highlight api which currently works per hit.
- */
-public final class CustomPostingsHighlighter extends PostingsHighlighter {
-
- private static final Snippet[] EMPTY_SNIPPET = new Snippet[0];
- private static final Passage[] EMPTY_PASSAGE = new Passage[0];
-
- private final Analyzer analyzer;
- private final CustomPassageFormatter passageFormatter;
- private final BreakIterator breakIterator;
- private final boolean returnNonHighlightedSnippets;
- private final String fieldValue;
-
- /**
- * Creates a new instance of {@link CustomPostingsHighlighter}
- *
- * @param analyzer the analyzer used for the field at index time, used for multi term queries internally
- * @param passageFormatter our own {@link PassageFormatter} which generates snippets in forms of {@link Snippet} objects
- * @param fieldValue the original field values as constructor argument, loaded from te _source field or the relevant stored field.
- * @param returnNonHighlightedSnippets whether non highlighted snippets should be returned rather than empty snippets when
- * no highlighting can be performed
- */
- public CustomPostingsHighlighter(Analyzer analyzer, CustomPassageFormatter passageFormatter, String fieldValue, boolean returnNonHighlightedSnippets) {
- this(analyzer, passageFormatter, null, fieldValue, returnNonHighlightedSnippets);
- }
-
- /**
- * Creates a new instance of {@link CustomPostingsHighlighter}
- *
- * @param analyzer the analyzer used for the field at index time, used for multi term queries internally
- * @param passageFormatter our own {@link PassageFormatter} which generates snippets in forms of {@link Snippet} objects
- * @param breakIterator an instance {@link BreakIterator} selected depending on the highlighting options
- * @param fieldValue the original field values as constructor argument, loaded from te _source field or the relevant stored field.
- * @param returnNonHighlightedSnippets whether non highlighted snippets should be returned rather than empty snippets when
- * no highlighting can be performed
- */
- public CustomPostingsHighlighter(Analyzer analyzer, CustomPassageFormatter passageFormatter, BreakIterator breakIterator, String fieldValue, boolean returnNonHighlightedSnippets) {
- this.analyzer = analyzer;
- this.passageFormatter = passageFormatter;
- this.breakIterator = breakIterator;
- this.returnNonHighlightedSnippets = returnNonHighlightedSnippets;
- this.fieldValue = fieldValue;
- }
-
- /**
- * Highlights terms extracted from the provided query within the content of the provided field name
- */
- public Snippet[] highlightField(String field, Query query, IndexSearcher searcher, int docId, int maxPassages) throws IOException {
- Map fieldsAsObjects = super.highlightFieldsAsObjects(new String[]{field}, query, searcher, new int[]{docId}, new int[]{maxPassages});
- Object[] snippetObjects = fieldsAsObjects.get(field);
- if (snippetObjects != null) {
- //one single document at a time
- assert snippetObjects.length == 1;
- Object snippetObject = snippetObjects[0];
- if (snippetObject != null && snippetObject instanceof Snippet[]) {
- return (Snippet[]) snippetObject;
- }
- }
- return EMPTY_SNIPPET;
- }
-
- @Override
- protected PassageFormatter getFormatter(String field) {
- return passageFormatter;
- }
-
- @Override
- protected BreakIterator getBreakIterator(String field) {
- if (breakIterator == null) {
- return super.getBreakIterator(field);
- }
- return breakIterator;
- }
-
- /*
- By default the postings highlighter returns non highlighted snippet when there are no matches.
- We want to return no snippets by default, unless no_match_size is greater than 0
- */
- @Override
- protected Passage[] getEmptyHighlight(String fieldName, BreakIterator bi, int maxPassages) {
- if (returnNonHighlightedSnippets) {
- //we want to return the first sentence of the first snippet only
- return super.getEmptyHighlight(fieldName, bi, 1);
- }
- return EMPTY_PASSAGE;
- }
-
- @Override
- protected Analyzer getIndexAnalyzer(String field) {
- return analyzer;
- }
-
- @Override
- protected String[][] loadFieldValues(IndexSearcher searcher, String[] fields, int[] docids, int maxLength) throws IOException {
- //we only highlight one field, one document at a time
- return new String[][]{new String[]{fieldValue}};
- }
-}
diff --git a/core/src/main/java/org/apache/lucene/search/uhighlight/CustomPassageFormatter.java b/core/src/main/java/org/apache/lucene/search/uhighlight/CustomPassageFormatter.java
index 7a34a805db623..52eee559c6888 100644
--- a/core/src/main/java/org/apache/lucene/search/uhighlight/CustomPassageFormatter.java
+++ b/core/src/main/java/org/apache/lucene/search/uhighlight/CustomPassageFormatter.java
@@ -20,7 +20,6 @@
package org.apache.lucene.search.uhighlight;
import org.apache.lucene.search.highlight.Encoder;
-import org.apache.lucene.search.highlight.Snippet;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightUtils;
/**
diff --git a/core/src/main/java/org/apache/lucene/search/uhighlight/CustomUnifiedHighlighter.java b/core/src/main/java/org/apache/lucene/search/uhighlight/CustomUnifiedHighlighter.java
index b6d6f1d1a4dae..ebc13298202a6 100644
--- a/core/src/main/java/org/apache/lucene/search/uhighlight/CustomUnifiedHighlighter.java
+++ b/core/src/main/java/org/apache/lucene/search/uhighlight/CustomUnifiedHighlighter.java
@@ -27,7 +27,6 @@
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.search.highlight.Snippet;
import org.apache.lucene.search.spans.SpanMultiTermQueryWrapper;
import org.apache.lucene.search.spans.SpanNearQuery;
import org.apache.lucene.search.spans.SpanOrQuery;
diff --git a/core/src/main/java/org/apache/lucene/search/highlight/Snippet.java b/core/src/main/java/org/apache/lucene/search/uhighlight/Snippet.java
similarity index 90%
rename from core/src/main/java/org/apache/lucene/search/highlight/Snippet.java
rename to core/src/main/java/org/apache/lucene/search/uhighlight/Snippet.java
index 81a3d406ea346..b7490c55feffa 100644
--- a/core/src/main/java/org/apache/lucene/search/highlight/Snippet.java
+++ b/core/src/main/java/org/apache/lucene/search/uhighlight/Snippet.java
@@ -17,11 +17,11 @@
* under the License.
*/
-package org.apache.lucene.search.highlight;
+package org.apache.lucene.search.uhighlight;
/**
* Represents a scored highlighted snippet.
- * It's our own arbitrary object that we get back from the postings highlighter when highlighting a document.
+ * It's our own arbitrary object that we get back from the unified highlighter when highlighting a document.
* Every snippet contains its formatted text and its score.
* The score is needed in case we want to sort snippets by score, they get sorted by position in the text by default.
*/
diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
index ae006045e3d47..7c20ed7d2c482 100644
--- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java
+++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java
@@ -829,8 +829,7 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.transport.SendRequestTransportException::new, 58, UNKNOWN_VERSION_ADDED),
ES_REJECTED_EXECUTION_EXCEPTION(org.elasticsearch.common.util.concurrent.EsRejectedExecutionException.class,
org.elasticsearch.common.util.concurrent.EsRejectedExecutionException::new, 59, UNKNOWN_VERSION_ADDED),
- EARLY_TERMINATION_EXCEPTION(org.elasticsearch.common.lucene.Lucene.EarlyTerminationException.class,
- org.elasticsearch.common.lucene.Lucene.EarlyTerminationException::new, 60, UNKNOWN_VERSION_ADDED),
+ // 60 used to be for EarlyTerminationException
// 61 used to be for RoutingValidationException
NOT_SERIALIZABLE_EXCEPTION_WRAPPER(org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper.class,
org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper::new, 62, UNKNOWN_VERSION_ADDED),
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/storedscripts/GetStoredScriptResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/storedscripts/GetStoredScriptResponse.java
index 2dc0ed870c025..d543ac67e1d91 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/storedscripts/GetStoredScriptResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/storedscripts/GetStoredScriptResponse.java
@@ -81,7 +81,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_5_3_0)) {
source.writeTo(out);
} else {
- out.writeString(source.getCode());
+ out.writeString(source.getSource());
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java
index 35dd53276cd6d..7d948e7137ebf 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java
@@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.create;
+import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -32,14 +33,16 @@
public class CreateIndexResponse extends AcknowledgedResponse {
private boolean shardsAcked;
+ private String index;
protected CreateIndexResponse() {
}
- protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) {
+ protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked, String index) {
super(acknowledged);
assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too
this.shardsAcked = shardsAcked;
+ this.index = index;
}
@Override
@@ -47,6 +50,9 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
shardsAcked = in.readBoolean();
+ if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
+ index = in.readString();
+ }
}
@Override
@@ -54,6 +60,9 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(shardsAcked);
+ if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
+ out.writeString(index);
+ }
}
/**
@@ -65,7 +74,12 @@ public boolean isShardsAcked() {
return shardsAcked;
}
+ public String index() {
+ return index;
+ }
+
public void addCustomFields(XContentBuilder builder) throws IOException {
builder.field("shards_acknowledged", isShardsAcked());
+ builder.field("index", index());
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java
index 354dcf2387345..0ac8d02f97760 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java
@@ -79,7 +79,7 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
.waitForActiveShards(request.waitForActiveShards());
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
- listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())),
+ listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked(), indexName)),
listener::onFailure));
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java
index e7ad0afe3aa17..0c5149f6bf353 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java
@@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse {
ShrinkResponse() {
}
- ShrinkResponse(boolean acknowledged, boolean shardsAcked) {
- super(acknowledged, shardsAcked);
+ ShrinkResponse(boolean acknowledged, boolean shardsAcked, String index) {
+ super(acknowledged, shardsAcked, index);
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java
index 8c482eac10cfc..2555299709cda 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java
@@ -91,8 +91,13 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, indexNameExpressionResolver);
- createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
- listener.onResponse(new ShrinkResponse(response.isAcknowledged(), response.isShardsAcked())), listener::onFailure));
+ createIndexService.createIndex(
+ updateRequest,
+ ActionListener.wrap(response ->
+ listener.onResponse(new ShrinkResponse(response.isAcknowledged(), response.isShardsAcked(), updateRequest.index())),
+ listener::onFailure
+ )
+ );
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
index 597b27eae4bd2..7a2c5eb02222a 100644
--- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
+++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -477,7 +477,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned";
- operationResult = executeFailureNoOpOnReplica(failure, replica);
+ operationResult = executeFailureNoOpOnReplica(failure, primaryTerm, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
@@ -673,9 +673,10 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons
return replica.delete(delete);
}
- private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, IndexShard replica) throws IOException {
- final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOp(
- primaryFailure.getSeqNo(), primaryFailure.getMessage());
+ private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, long primaryTerm,
+ IndexShard replica) throws IOException {
+ final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOpOnReplica(
+ primaryFailure.getSeqNo(), primaryTerm, primaryFailure.getMessage());
return replica.markSeqNoAsNoOp(noOp);
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java
new file mode 100644
index 0000000000000..d94fe1a2bbe6b
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/action/search/ClearScrollController.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.action.search;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.apache.logging.log4j.util.Supplier;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.util.concurrent.CountDown;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportResponse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
+
+final class ClearScrollController implements Runnable {
+ private final DiscoveryNodes nodes;
+ private final SearchTransportService searchTransportService;
+ private final CountDown expectedOps;
+ private final ActionListener listener;
+ private final AtomicBoolean hasFailed = new AtomicBoolean(false);
+ private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
+ private final Logger logger;
+ private final Runnable runner;
+
+ ClearScrollController(ClearScrollRequest request, ActionListener listener, DiscoveryNodes nodes, Logger logger,
+ SearchTransportService searchTransportService) {
+ this.nodes = nodes;
+ this.logger = logger;
+ this.searchTransportService = searchTransportService;
+ this.listener = listener;
+ List scrollIds = request.getScrollIds();
+ final int expectedOps;
+ if (scrollIds.size() == 1 && "_all".equals(scrollIds.get(0))) {
+ expectedOps = nodes.getSize();
+ runner = this::cleanAllScrolls;
+ } else {
+ List parsedScrollIds = new ArrayList<>();
+ for (String parsedScrollId : request.getScrollIds()) {
+ ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
+ for (ScrollIdForNode id : context) {
+ parsedScrollIds.add(id);
+ }
+ }
+ if (parsedScrollIds.isEmpty()) {
+ expectedOps = 0;
+ runner = () -> listener.onResponse(new ClearScrollResponse(true, 0));
+ } else {
+ expectedOps = parsedScrollIds.size();
+ runner = () -> cleanScrollIds(parsedScrollIds);
+ }
+ }
+ this.expectedOps = new CountDown(expectedOps);
+
+ }
+
+ @Override
+ public void run() {
+ runner.run();
+ }
+
+ void cleanAllScrolls() {
+ for (final DiscoveryNode node : nodes) {
+ try {
+ Transport.Connection connection = searchTransportService.getConnection(null, node);
+ searchTransportService.sendClearAllScrollContexts(connection, new ActionListener() {
+ @Override
+ public void onResponse(TransportResponse response) {
+ onFreedContext(true);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ onFailedFreedContext(e, node);
+ }
+ });
+ } catch (Exception e) {
+ onFailedFreedContext(e, node);
+ }
+ }
+ }
+
+ void cleanScrollIds(List parsedScrollIds) {
+ for (ScrollIdForNode target : parsedScrollIds) {
+ final DiscoveryNode node = nodes.get(target.getNode());
+ if (node == null) {
+ onFreedContext(false);
+ } else {
+ try {
+ Transport.Connection connection = searchTransportService.getConnection(null, node);
+ searchTransportService.sendFreeContext(connection, target.getScrollId(),
+ ActionListener.wrap(freed -> onFreedContext(freed.isFreed()),
+ e -> onFailedFreedContext(e, node)));
+ } catch (Exception e) {
+ onFailedFreedContext(e, node);
+ }
+ }
+ }
+ }
+
+ private void onFreedContext(boolean freed) {
+ if (freed) {
+ freedSearchContexts.incrementAndGet();
+ }
+ if (expectedOps.countDown()) {
+ boolean succeeded = hasFailed.get() == false;
+ listener.onResponse(new ClearScrollResponse(succeeded, freedSearchContexts.get()));
+ }
+ }
+
+ private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
+ logger.warn((Supplier>) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e);
+ if (expectedOps.countDown()) {
+ listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get()));
+ } else {
+ hasFailed.set(true);
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
index e1e0205e7e518..879607d059e80 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java
@@ -405,9 +405,18 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
- return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(), 0, isScrollRequest);
+ return reducedQueryPhase(queryResults, isScrollRequest, true);
}
+ /**
+ * Reduces the given query results and consumes all aggregations and profile results.
+ * @param queryResults a list of non-null query shard results
+ */
+ public ReducedQueryPhase reducedQueryPhase(Collection extends SearchPhaseResult> queryResults, boolean isScrollRequest, boolean trackTotalHits) {
+ return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
+ }
+
+
/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
@@ -711,6 +720,7 @@ InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(S
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
+ final boolean trackTotalHits = source == null || source.trackTotalHits();
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
@@ -722,18 +732,30 @@ InitialSearchPhase.SearchPhaseResults newSearchPhaseResults(S
return new InitialSearchPhase.SearchPhaseResults(numShards) {
@Override
public ReducedQueryPhase reduce() {
- return reducedQueryPhase(results.asList(), isScrollRequest);
+ return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
}
};
}
static final class TopDocsStats {
+ final boolean trackTotalHits;
long totalHits;
long fetchHits;
float maxScore = Float.NEGATIVE_INFINITY;
+ TopDocsStats() {
+ this(true);
+ }
+
+ TopDocsStats(boolean trackTotalHits) {
+ this.trackTotalHits = trackTotalHits;
+ this.totalHits = trackTotalHits ? 0 : -1;
+ }
+
void add(TopDocs topDocs) {
- totalHits += topDocs.totalHits;
+ if (trackTotalHits) {
+ totalHits += topDocs.totalHits;
+ }
fetchHits += topDocs.scoreDocs.length;
if (!Float.isNaN(topDocs.getMaxScore())) {
maxScore = Math.max(maxScore, topDocs.getMaxScore());
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
index 9e35cca05b94f..01a3e94620a46 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java
@@ -39,6 +39,8 @@
import java.util.Collections;
import java.util.Objects;
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
/**
* A request to execute search against one or more indices (or all). Best created using
* {@link org.elasticsearch.client.Requests#searchRequest(String...)}.
@@ -102,7 +104,12 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
@Override
public ActionRequestValidationException validate() {
- return null;
+ ActionRequestValidationException validationException = null;
+ if (source != null && source.trackTotalHits() == false && scroll() != null) {
+ validationException =
+ addValidationError("disabling [track_total_hits] is not allowed in a scroll context", validationException);
+ }
+ return validationException;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
index ffe2c1b20c516..0333092b91755 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java
@@ -363,14 +363,21 @@ public SearchRequestBuilder slice(SliceBuilder builder) {
}
/**
- * Applies when sorting, and controls if scores will be tracked as well. Defaults to
- * false.
+ * Applies when sorting, and controls if scores will be tracked as well. Defaults to false.
*/
public SearchRequestBuilder setTrackScores(boolean trackScores) {
sourceBuilder().trackScores(trackScores);
return this;
}
+ /**
+ * Indicates if the total hit count for the query should be tracked. Defaults to true
+ */
+ public SearchRequestBuilder setTrackTotalHits(boolean trackTotalHits) {
+ sourceBuilder().trackTotalHits(trackTotalHits);
+ return this;
+ }
+
/**
* Adds stored fields to load and return (note, it must be stored) as part of the search request.
* To disable the stored fields entirely (source and metadata fields) use {@code storedField("_none_")}.
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
index 90ee1dfd434c5..3aa5e3a2adbc6 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java
@@ -45,8 +45,6 @@
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
-import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
-import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;
/**
@@ -245,7 +243,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
} else if (NUM_REDUCE_PHASES.match(currentFieldName)) {
numReducePhases = parser.intValue();
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (SearchHits.Fields.HITS.equals(currentFieldName)) {
@@ -268,7 +266,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
} else if (RestActions.TOTAL_FIELD.match(currentFieldName)) {
totalShards = parser.intValue();
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (RestActions.FAILURES_FIELD.match(currentFieldName)) {
@@ -276,14 +274,14 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
failures.add(ShardSearchFailure.fromXContent(parser));
}
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
} else {
- throwUnknownToken(token, parser.getTokenLocation());
+ parser.skipChildren();
}
}
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index 9dd2125d5e2fe..2d20d383288f4 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -98,14 +98,14 @@ public void onFailure(Exception e) {
}, SearchFreeContextResponse::new));
}
- public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) {
- transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
- new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
+ public void sendFreeContext(Transport.Connection connection, long contextId, final ActionListener listener) {
+ transportService.sendRequest(connection, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
+ TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
}
- public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) {
- transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
- new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
+ public void sendClearAllScrollContexts(Transport.Connection connection, final ActionListener listener) {
+ transportService.sendRequest(connection, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
+ TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}
public void sendExecuteDfs(Transport.Connection connection, final ShardSearchTransportRequest request, SearchTask task,
diff --git a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java
index 8fed61af29412..7eb939ca8274e 100644
--- a/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java
+++ b/core/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java
@@ -38,8 +38,6 @@
import java.io.IOException;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
-import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
-import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;
/**
* Represents a failure to search on a specific shard.
@@ -200,16 +198,16 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
} else if (NODE_FIELD.equals(currentFieldName)) {
nodeId = parser.text();
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (REASON_FIELD.equals(currentFieldName)) {
exception = ElasticsearchException.fromXContent(parser);
} else {
- throwUnknownField(currentFieldName, parser.getTokenLocation());
+ parser.skipChildren();
}
} else {
- throwUnknownToken(token, parser.getTokenLocation());
+ parser.skipChildren();
}
}
return new ShardSearchFailure(exception,
diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java
index 716077c915d6b..d9afbdacafe3c 100644
--- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java
@@ -19,30 +19,16 @@
package org.elasticsearch.action.search;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
-import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
-
public class TransportClearScrollAction extends HandledTransportAction {
private final ClusterService clusterService;
@@ -53,105 +39,16 @@ public TransportClearScrollAction(Settings settings, TransportService transportS
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService) {
- super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new);
+ super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
+ ClearScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
}
@Override
protected void doExecute(ClearScrollRequest request, final ActionListener listener) {
- new Async(request, listener, clusterService.state()).run();
- }
-
- private class Async {
- final DiscoveryNodes nodes;
- final CountDown expectedOps;
- final List contexts = new ArrayList<>();
- final ActionListener listener;
- final AtomicReference expHolder;
- final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0);
-
- private Async(ClearScrollRequest request, ActionListener listener, ClusterState clusterState) {
- int expectedOps = 0;
- this.nodes = clusterState.nodes();
- if (request.getScrollIds().size() == 1 && "_all".equals(request.getScrollIds().get(0))) {
- expectedOps = nodes.getSize();
- } else {
- for (String parsedScrollId : request.getScrollIds()) {
- ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
- expectedOps += context.length;
- this.contexts.add(context);
- }
- }
- this.listener = listener;
- this.expHolder = new AtomicReference<>();
- this.expectedOps = new CountDown(expectedOps);
- }
-
- public void run() {
- if (expectedOps.isCountedDown()) {
- listener.onResponse(new ClearScrollResponse(true, 0));
- return;
- }
-
- if (contexts.isEmpty()) {
- for (final DiscoveryNode node : nodes) {
- searchTransportService.sendClearAllScrollContexts(node, new ActionListener() {
- @Override
- public void onResponse(TransportResponse response) {
- onFreedContext(true);
- }
-
- @Override
- public void onFailure(Exception e) {
- onFailedFreedContext(e, node);
- }
- });
- }
- } else {
- for (ScrollIdForNode[] context : contexts) {
- for (ScrollIdForNode target : context) {
- final DiscoveryNode node = nodes.get(target.getNode());
- if (node == null) {
- onFreedContext(false);
- continue;
- }
-
- searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener() {
- @Override
- public void onResponse(SearchTransportService.SearchFreeContextResponse freed) {
- onFreedContext(freed.isFreed());
- }
-
- @Override
- public void onFailure(Exception e) {
- onFailedFreedContext(e, node);
- }
- });
- }
- }
- }
- }
-
- void onFreedContext(boolean freed) {
- if (freed) {
- numberOfFreedSearchContexts.incrementAndGet();
- }
- if (expectedOps.countDown()) {
- boolean succeeded = expHolder.get() == null;
- listener.onResponse(new ClearScrollResponse(succeeded, numberOfFreedSearchContexts.get()));
- }
- }
-
- void onFailedFreedContext(Throwable e, DiscoveryNode node) {
- logger.warn((Supplier>) () -> new ParameterizedMessage("Clear SC failed on node[{}]", node), e);
- if (expectedOps.countDown()) {
- listener.onResponse(new ClearScrollResponse(false, numberOfFreedSearchContexts.get()));
- } else {
- expHolder.set(e);
- }
- }
-
+ Runnable runnable = new ClearScrollController(request, listener, clusterService.state().nodes(), logger, searchTransportService);
+ runnable.run();
}
}
diff --git a/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java
index 85b418e046cc0..ed9b7c8d15d60 100644
--- a/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java
+++ b/core/src/main/java/org/elasticsearch/action/support/GroupedActionListener.java
@@ -37,7 +37,7 @@
public final class GroupedActionListener implements ActionListener {
private final CountDown countDown;
private final AtomicInteger pos = new AtomicInteger();
- private final AtomicArray roles;
+ private final AtomicArray results;
private final ActionListener> delegate;
private final Collection defaults;
private final AtomicReference failure = new AtomicReference<>();
@@ -49,7 +49,7 @@ public final class GroupedActionListener implements ActionListener {
*/
public GroupedActionListener(ActionListener> delegate, int groupSize,
Collection defaults) {
- roles = new AtomicArray<>(groupSize);
+ results = new AtomicArray<>(groupSize);
countDown = new CountDown(groupSize);
this.delegate = delegate;
this.defaults = defaults;
@@ -57,12 +57,12 @@ public GroupedActionListener(ActionListener> delegate, int groupSi
@Override
public void onResponse(T element) {
- roles.set(pos.incrementAndGet() - 1, element);
+ results.setOnce(pos.incrementAndGet() - 1, element);
if (countDown.countDown()) {
if (failure.get() != null) {
delegate.onFailure(failure.get());
} else {
- List collect = this.roles.asList();
+ List collect = this.results.asList();
collect.addAll(defaults);
delegate.onResponse(Collections.unmodifiableList(collect));
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
index 8973890021f7a..543118a172f95 100644
--- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
+++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java
@@ -375,7 +375,7 @@ private static class ShardStartedTransportHandler implements TransportRequestHan
public void messageReceived(ShardEntry request, TransportChannel channel) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
clusterService.submitStateUpdateTask(
- "shard-started",
+ "shard-started " + request,
request,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
index e47585356a01b..fcc0fdebdd4bc 100644
--- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
+++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
@@ -243,7 +243,8 @@ public SortedMap getAliasAndIndexLookup() {
*
* @param aliases The names of the index aliases to find
* @param concreteIndices The concrete indexes the index aliases must point to order to be returned.
- * @return the found index aliases grouped by index
+ * @return a map of index to a list of alias metadata, the list corresponding to a concrete index will be empty if no aliases are
+ * present for that index
*/
public ImmutableOpenMap> findAliases(final String[] aliases, String[] concreteIndices) {
assert aliases != null;
@@ -273,8 +274,8 @@ public int compare(AliasMetaData o1, AliasMetaData o2) {
return o1.alias().compareTo(o2.alias());
}
});
- mapBuilder.put(index, Collections.unmodifiableList(filteredValues));
}
+ mapBuilder.put(index, Collections.unmodifiableList(filteredValues));
}
return mapBuilder.build();
}
diff --git a/core/src/main/java/org/elasticsearch/common/geo/GeoUtils.java b/core/src/main/java/org/elasticsearch/common/geo/GeoUtils.java
index a586414631805..aed72f502bfe9 100644
--- a/core/src/main/java/org/elasticsearch/common/geo/GeoUtils.java
+++ b/core/src/main/java/org/elasticsearch/common/geo/GeoUtils.java
@@ -82,7 +82,7 @@ public static boolean isValidLatitude(double latitude) {
/** Returns true if longitude is actually a valid longitude value. */
public static boolean isValidLongitude(double longitude) {
- if (Double.isNaN(longitude) || Double.isNaN(longitude) || longitude < GeoUtils.MIN_LON || longitude > GeoUtils.MAX_LON) {
+ if (Double.isNaN(longitude) || Double.isInfinite(longitude) || longitude < GeoUtils.MIN_LON || longitude > GeoUtils.MAX_LON) {
return false;
}
return true;
diff --git a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java
index c213c384611f5..52550f1ba67df 100644
--- a/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java
+++ b/core/src/main/java/org/elasticsearch/common/lucene/Lucene.java
@@ -49,6 +49,7 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
@@ -245,20 +246,6 @@ protected Object doBody(String segmentFileName) throws IOException {
}.run();
}
- /**
- * Wraps delegate
with count based early termination collector with a threshold of maxCountHits
- */
- public static final EarlyTerminatingCollector wrapCountBasedEarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
- return new EarlyTerminatingCollector(delegate, maxCountHits);
- }
-
- /**
- * Wraps delegate
with a time limited collector with a timeout of timeoutInMillis
- */
- public static final TimeLimitingCollector wrapTimeLimitingCollector(final Collector delegate, final Counter counter, long timeoutInMillis) {
- return new TimeLimitingCollector(delegate, counter, timeoutInMillis);
- }
-
/**
* Check whether there is one or more documents matching the provided query.
*/
@@ -617,71 +604,6 @@ public static void writeExplanation(StreamOutput out, Explanation explanation) t
}
}
- /**
- * This exception is thrown when {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminatingCollector}
- * reaches early termination
- * */
- public static final class EarlyTerminationException extends ElasticsearchException {
-
- public EarlyTerminationException(String msg) {
- super(msg);
- }
-
- public EarlyTerminationException(StreamInput in) throws IOException{
- super(in);
- }
- }
-
- /**
- * A collector that terminates early by throwing {@link org.elasticsearch.common.lucene.Lucene.EarlyTerminationException}
- * when count of matched documents has reached maxCountHits
- */
- public static final class EarlyTerminatingCollector extends SimpleCollector {
-
- private final int maxCountHits;
- private final Collector delegate;
-
- private int count = 0;
- private LeafCollector leafCollector;
-
- EarlyTerminatingCollector(final Collector delegate, int maxCountHits) {
- this.maxCountHits = maxCountHits;
- this.delegate = Objects.requireNonNull(delegate);
- }
-
- public int count() {
- return count;
- }
-
- public boolean exists() {
- return count > 0;
- }
-
- @Override
- public void setScorer(Scorer scorer) throws IOException {
- leafCollector.setScorer(scorer);
- }
-
- @Override
- public void collect(int doc) throws IOException {
- leafCollector.collect(doc);
-
- if (++count >= maxCountHits) {
- throw new EarlyTerminationException("early termination [CountBased]");
- }
- }
-
- @Override
- public void doSetNextReader(LeafReaderContext atomicReaderContext) throws IOException {
- leafCollector = delegate.getLeafCollector(atomicReaderContext);
- }
-
- @Override
- public boolean needsScores() {
- return delegate.needsScores();
- }
- }
-
private Lucene() {
}
@@ -838,14 +760,16 @@ public void delete() {
}
/**
- * Given a {@link Scorer}, return a {@link Bits} instance that will match
+ * Given a {@link ScorerSupplier}, return a {@link Bits} instance that will match
* all documents contained in the set. Note that the returned {@link Bits}
* instance MUST be consumed in order.
*/
- public static Bits asSequentialAccessBits(final int maxDoc, @Nullable Scorer scorer) throws IOException {
- if (scorer == null) {
+ public static Bits asSequentialAccessBits(final int maxDoc, @Nullable ScorerSupplier scorerSupplier) throws IOException {
+ if (scorerSupplier == null) {
return new Bits.MatchNoBits(maxDoc);
}
+ // Since we want bits, we need random-access
+ final Scorer scorer = scorerSupplier.get(true); // this never returns null
final TwoPhaseIterator twoPhase = scorer.twoPhaseIterator();
final DocIdSetIterator iterator;
if (twoPhase == null) {
diff --git a/core/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java b/core/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java
index 3a5d71d1fcd5c..e9db2928ca724 100644
--- a/core/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java
+++ b/core/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java
@@ -22,7 +22,7 @@
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.FilterLeafCollector;
import org.apache.lucene.search.LeafCollector;
-import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.Lucene;
@@ -41,9 +41,9 @@ public FilteredCollector(Collector collector, Weight filter) {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
- final Scorer filterScorer = filter.scorer(context);
+ final ScorerSupplier filterScorerSupplier = filter.scorerSupplier(context);
final LeafCollector in = collector.getLeafCollector(context);
- final Bits bits = Lucene.asSequentialAccessBits(context.reader().maxDoc(), filterScorer);
+ final Bits bits = Lucene.asSequentialAccessBits(context.reader().maxDoc(), filterScorerSupplier);
return new FilterLeafCollector(in) {
@Override
diff --git a/core/src/main/java/org/elasticsearch/common/lucene/search/MultiPhrasePrefixQuery.java b/core/src/main/java/org/elasticsearch/common/lucene/search/MultiPhrasePrefixQuery.java
index dbfc1f0af11b6..b8e1039b2df1d 100644
--- a/core/src/main/java/org/elasticsearch/common/lucene/search/MultiPhrasePrefixQuery.java
+++ b/core/src/main/java/org/elasticsearch/common/lucene/search/MultiPhrasePrefixQuery.java
@@ -164,6 +164,11 @@ public Query rewrite(IndexReader reader) throws IOException {
}
}
if (terms.isEmpty()) {
+ if (sizeMinus1 == 0) {
+ // no prefix and the phrase query is empty
+ return Queries.newMatchNoDocsQuery("No terms supplied for " + MultiPhrasePrefixQuery.class.getName());
+ }
+
// if the terms does not exist we could return a MatchNoDocsQuery but this would break the unified highlighter
// which rewrites query with an empty reader.
return new BooleanQuery.Builder()
diff --git a/core/src/main/java/org/elasticsearch/common/lucene/search/function/FiltersFunctionScoreQuery.java b/core/src/main/java/org/elasticsearch/common/lucene/search/function/FiltersFunctionScoreQuery.java
index 2f2a70537c03d..40465dc6ece07 100644
--- a/core/src/main/java/org/elasticsearch/common/lucene/search/function/FiltersFunctionScoreQuery.java
+++ b/core/src/main/java/org/elasticsearch/common/lucene/search/function/FiltersFunctionScoreQuery.java
@@ -27,6 +27,7 @@
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -174,8 +175,8 @@ private FiltersFunctionFactorScorer functionScorer(LeafReaderContext context) th
for (int i = 0; i < filterFunctions.length; i++) {
FilterFunction filterFunction = filterFunctions[i];
functions[i] = filterFunction.function.getLeafScoreFunction(context);
- Scorer filterScorer = filterWeights[i].scorer(context);
- docSets[i] = Lucene.asSequentialAccessBits(context.reader().maxDoc(), filterScorer);
+ ScorerSupplier filterScorerSupplier = filterWeights[i].scorerSupplier(context);
+ docSets[i] = Lucene.asSequentialAccessBits(context.reader().maxDoc(), filterScorerSupplier);
}
return new FiltersFunctionFactorScorer(this, subQueryScorer, scoreMode, filterFunctions, maxBoost, functions, docSets, combineFunction, needsScores);
}
@@ -200,7 +201,7 @@ public Explanation explain(LeafReaderContext context, int doc) throws IOExceptio
List filterExplanations = new ArrayList<>();
for (int i = 0; i < filterFunctions.length; ++i) {
Bits docSet = Lucene.asSequentialAccessBits(context.reader().maxDoc(),
- filterWeights[i].scorer(context));
+ filterWeights[i].scorerSupplier(context));
if (docSet.get(doc)) {
FilterFunction filterFunction = filterFunctions[i];
Explanation functionExplanation = filterFunction.function.getLeafScoreFunction(context).explainScore(doc, expl);
diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
index 68e2865e28452..69173cc4216cd 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
@@ -83,4 +83,8 @@ private void setLastCommittedTranslogGeneration(List extends IndexCommit> comm
public SnapshotDeletionPolicy getIndexDeletionPolicy() {
return indexDeletionPolicy;
}
+
+ public TranslogDeletionPolicy getTranslogDeletionPolicy() {
+ return translogDeletionPolicy;
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 7763c8d04a4e7..6e93d1feed5f8 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1102,8 +1102,8 @@ public static class Delete extends Operation {
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
- this.type = type;
- this.id = id;
+ this.type = Objects.requireNonNull(type);
+ this.id = Objects.requireNonNull(id);
}
public Delete(String type, String id, Term uid) {
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 8c0481d686f41..f84f76b537e0d 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -305,7 +305,8 @@ private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
try {
- Translog.Snapshot snapshot = translog.newSnapshot();
+ final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
+ Translog.Snapshot snapshot = translog.newSnapshot(translogGen);
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
@@ -321,6 +322,8 @@ private void recoverFromTranslogInternal() throws IOException {
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
+ // clean up what's not needed
+ translog.trimUnreferencedReaders();
}
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) throws IOException {
@@ -1772,7 +1775,7 @@ protected void doRun() throws Exception {
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
- private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
+ protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
diff --git a/core/src/main/java/org/elasticsearch/index/fielddata/ScriptDocValues.java b/core/src/main/java/org/elasticsearch/index/fielddata/ScriptDocValues.java
index 339e70c50b1c8..3ae4e6ebc8123 100644
--- a/core/src/main/java/org/elasticsearch/index/fielddata/ScriptDocValues.java
+++ b/core/src/main/java/org/elasticsearch/index/fielddata/ScriptDocValues.java
@@ -85,76 +85,6 @@ public final void sort(Comparator super T> c) {
throw new UnsupportedOperationException("doc values are unmodifiable");
}
- public static final class Strings extends ScriptDocValues {
-
- private final SortedBinaryDocValues in;
- private BytesRefBuilder[] values = new BytesRefBuilder[0];
- private int count;
-
- public Strings(SortedBinaryDocValues in) {
- this.in = in;
- }
-
- @Override
- public void setNextDocId(int docId) throws IOException {
- if (in.advanceExact(docId)) {
- resize(in.docValueCount());
- for (int i = 0; i < count; i++) {
- values[i].copyBytes(in.nextValue());
- }
- } else {
- resize(0);
- }
- }
-
- /**
- * Set the {@link #size()} and ensure that the {@link #values} array can
- * store at least that many entries.
- */
- protected void resize(int newSize) {
- count = newSize;
- if (newSize > values.length) {
- final int oldLength = values.length;
- values = ArrayUtil.grow(values, count);
- for (int i = oldLength; i < values.length; ++i) {
- values[i] = new BytesRefBuilder();
- }
- }
- }
-
- public SortedBinaryDocValues getInternalValues() {
- return this.in;
- }
-
- public BytesRef getBytesValue() {
- if (size() > 0) {
- return values[0].get();
- } else {
- return null;
- }
- }
-
- public String getValue() {
- BytesRef value = getBytesValue();
- if (value == null) {
- return null;
- } else {
- return value.utf8ToString();
- }
- }
-
- @Override
- public String get(int index) {
- return values[index].get().utf8ToString();
- }
-
- @Override
- public int size() {
- return count;
- }
-
- }
-
public static final class Longs extends ScriptDocValues {
protected static final DeprecationLogger deprecationLogger = new DeprecationLogger(ESLoggerFactory.getLogger(Longs.class));
@@ -570,13 +500,13 @@ private static boolean[] grow(boolean[] array, int minSize) {
}
- public static final class BytesRefs extends ScriptDocValues {
+ abstract static class BinaryScriptDocValues extends ScriptDocValues {
private final SortedBinaryDocValues in;
- private BytesRef[] values;
- private int count;
+ protected BytesRefBuilder[] values = new BytesRefBuilder[0];
+ protected int count;
- public BytesRefs(SortedBinaryDocValues in) {
+ BinaryScriptDocValues(SortedBinaryDocValues in) {
this.in = in;
}
@@ -585,7 +515,10 @@ public void setNextDocId(int docId) throws IOException {
if (in.advanceExact(docId)) {
resize(in.docValueCount());
for (int i = 0; i < count; i++) {
- values[i] = in.nextValue();
+ // We need to make a copy here, because BytesBinaryDVAtomicFieldData's SortedBinaryDocValues
+ // implementation reuses the returned BytesRef. Otherwise we would end up with the same BytesRef
+ // instance for all slots in the values array.
+ values[i].copyBytes(in.nextValue());
}
} else {
resize(0);
@@ -598,32 +531,69 @@ public void setNextDocId(int docId) throws IOException {
*/
protected void resize(int newSize) {
count = newSize;
- if (values == null) {
- values = new BytesRef[newSize];
- } else {
+ if (newSize > values.length) {
+ final int oldLength = values.length;
values = ArrayUtil.grow(values, count);
+ for (int i = oldLength; i < values.length; ++i) {
+ values[i] = new BytesRefBuilder();
+ }
}
}
- public SortedBinaryDocValues getInternalValues() {
- return this.in;
+ @Override
+ public int size() {
+ return count;
}
- public BytesRef getValue() {
- if (count == 0) {
- return new BytesRef();
+ }
+
+ public static final class Strings extends BinaryScriptDocValues {
+
+ public Strings(SortedBinaryDocValues in) {
+ super(in);
+ }
+
+ @Override
+ public String get(int index) {
+ return values[index].get().utf8ToString();
+ }
+
+ public BytesRef getBytesValue() {
+ if (size() > 0) {
+ return values[0].get();
+ } else {
+ return null;
+ }
+ }
+
+ public String getValue() {
+ BytesRef value = getBytesValue();
+ if (value == null) {
+ return null;
+ } else {
+ return value.utf8ToString();
}
- return values[0];
+ }
+
+ }
+
+ public static final class BytesRefs extends BinaryScriptDocValues {
+
+ public BytesRefs(SortedBinaryDocValues in) {
+ super(in);
}
@Override
public BytesRef get(int index) {
- return values[index];
+ return values[index].get();
}
- @Override
- public int size() {
- return count;
+ public BytesRef getValue() {
+ if (count == 0) {
+ return new BytesRef();
+ }
+ return values[0].get();
}
+
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/AllFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/AllFieldMapper.java
index f3001db39260a..0f88d3223edce 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/AllFieldMapper.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/AllFieldMapper.java
@@ -133,6 +133,7 @@ public MetadataFieldMapper.Builder,?> parse(String name, Map n
}
parseTextField(builder, builder.name, node, parserContext);
+ boolean enabledSet = false;
for (Iterator> iterator = node.entrySet().iterator(); iterator.hasNext();) {
Map.Entry entry = iterator.next();
String fieldName = entry.getKey();
@@ -140,9 +141,16 @@ public MetadataFieldMapper.Builder,?> parse(String name, Map n
if (fieldName.equals("enabled")) {
boolean enabled = TypeParsers.nodeBooleanValueLenient(name, "enabled", fieldNode);
builder.enabled(enabled ? EnabledAttributeMapper.ENABLED : EnabledAttributeMapper.DISABLED);
+ enabledSet = true;
iterator.remove();
}
}
+ if (enabledSet == false && parserContext.indexVersionCreated().before(Version.V_6_0_0_alpha1)) {
+ // So there is no "enabled" field, however, the index was created prior to 6.0,
+ // and therefore the default for this particular index should be "true" for
+ // enabling _all
+ builder.enabled(EnabledAttributeMapper.ENABLED);
+ }
return builder;
}
@@ -150,7 +158,13 @@ public MetadataFieldMapper.Builder,?> parse(String name, Map n
public MetadataFieldMapper getDefault(MappedFieldType fieldType, ParserContext context) {
final Settings indexSettings = context.mapperService().getIndexSettings().getSettings();
if (fieldType != null) {
- return new AllFieldMapper(indexSettings, fieldType);
+ if (context.indexVersionCreated().before(Version.V_6_0_0_alpha1)) {
+ // The index was created prior to 6.0, and therefore the default for this
+ // particular index should be "true" for enabling _all
+ return new AllFieldMapper(fieldType.clone(), EnabledAttributeMapper.ENABLED, indexSettings);
+ } else {
+ return new AllFieldMapper(indexSettings, fieldType);
+ }
} else {
return parse(NAME, Collections.emptyMap(), context)
.build(new BuilderContext(indexSettings, new ContentPath(1)));
@@ -197,7 +211,6 @@ private AllFieldMapper(Settings indexSettings, MappedFieldType existing) {
private AllFieldMapper(MappedFieldType fieldType, EnabledAttributeMapper enabled, Settings indexSettings) {
super(NAME, fieldType, Defaults.FIELD_TYPE, indexSettings);
this.enabledState = enabled;
-
}
public boolean enabled() {
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
index 489f4702bc36c..c2de26c96b38f 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java
@@ -424,15 +424,33 @@ private static ParseContext nestedContext(ParseContext context, ObjectMapper map
context = context.createNestedContext(mapper.fullPath());
ParseContext.Document nestedDoc = context.doc();
ParseContext.Document parentDoc = nestedDoc.getParent();
- // pre add the uid field if possible (id was already provided)
- IndexableField uidField = parentDoc.getField(UidFieldMapper.NAME);
- if (uidField != null) {
- // we don't need to add it as a full uid field in nested docs, since we don't need versioning
- // we also rely on this for UidField#loadVersion
- // this is a deeply nested field
- nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
+ // We need to add the uid or id to this nested Lucene document too,
+ // If we do not do this then when a document gets deleted only the root Lucene document gets deleted and
+ // not the nested Lucene documents! Besides the fact that we would have zombie Lucene documents, the ordering of
+ // documents inside the Lucene index (document blocks) will be incorrect, as nested documents of different root
+ // documents are then aligned with other root documents. This will lead tothe nested query, sorting, aggregations
+ // and inner hits to fail or yield incorrect results.
+ if (context.mapperService().getIndexSettings().isSingleType()) {
+ IndexableField idField = parentDoc.getField(IdFieldMapper.NAME);
+ if (idField != null) {
+ // We just need to store the id as indexed field, so that IndexWriter#deleteDocuments(term) can then
+ // delete it when the root document is deleted too.
+ nestedDoc.add(new Field(IdFieldMapper.NAME, idField.stringValue(), IdFieldMapper.Defaults.NESTED_FIELD_TYPE));
+ } else {
+ throw new IllegalStateException("The root document of a nested document should have an id field");
+ }
+ } else {
+ IndexableField uidField = parentDoc.getField(UidFieldMapper.NAME);
+ if (uidField != null) {
+ /// We just need to store the uid as indexed field, so that IndexWriter#deleteDocuments(term) can then
+ // delete it when the root document is deleted too.
+ nestedDoc.add(new Field(UidFieldMapper.NAME, uidField.stringValue(), UidFieldMapper.Defaults.NESTED_FIELD_TYPE));
+ } else {
+ throw new IllegalStateException("The root document of a nested document should have an uid field");
+ }
}
+
// the type of the nested doc starts with __, so we can identify that its a nested one in filters
// note, we don't prefix it with the type of the doc since it allows us to execute a nested query
// across types (for example, with similar nested objects)
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java
index a9a765f1c3a0e..813a546aaed36 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java
@@ -52,6 +52,7 @@ public static class Defaults {
public static final String NAME = IdFieldMapper.NAME;
public static final MappedFieldType FIELD_TYPE = new IdFieldType();
+ public static final MappedFieldType NESTED_FIELD_TYPE;
static {
FIELD_TYPE.setTokenized(false);
@@ -62,6 +63,10 @@ public static class Defaults {
FIELD_TYPE.setSearchAnalyzer(Lucene.KEYWORD_ANALYZER);
FIELD_TYPE.setName(NAME);
FIELD_TYPE.freeze();
+
+ NESTED_FIELD_TYPE = FIELD_TYPE.clone();
+ NESTED_FIELD_TYPE.setStored(false);
+ NESTED_FIELD_TYPE.freeze();
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/mapper/TokenCountFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/TokenCountFieldMapper.java
index 2ed6658e87c65..c18b66cf61855 100644
--- a/core/src/main/java/org/elasticsearch/index/mapper/TokenCountFieldMapper.java
+++ b/core/src/main/java/org/elasticsearch/index/mapper/TokenCountFieldMapper.java
@@ -134,6 +134,10 @@ protected void parseCreateField(ParseContext context, List field
value = context.parser().textOrNull();
}
+ if (value == null && fieldType().nullValue() == null) {
+ return;
+ }
+
final int tokenCount;
if (value == null) {
tokenCount = (Integer) fieldType().nullValue();
diff --git a/core/src/main/java/org/elasticsearch/index/query/SimpleQueryParser.java b/core/src/main/java/org/elasticsearch/index/query/SimpleQueryParser.java
index 9899ba9a748cc..f6e7dd32eb233 100644
--- a/core/src/main/java/org/elasticsearch/index/query/SimpleQueryParser.java
+++ b/core/src/main/java/org/elasticsearch/index/query/SimpleQueryParser.java
@@ -27,6 +27,7 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BoostQuery;
+import org.apache.lucene.search.DisjunctionMaxQuery;
import org.apache.lucene.search.FuzzyQuery;
import org.apache.lucene.search.PrefixQuery;
import org.apache.lucene.search.Query;
@@ -36,6 +37,7 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.List;
@@ -79,18 +81,21 @@ protected Query newTermQuery(Term term) {
@Override
public Query newDefaultQuery(String text) {
- BooleanQuery.Builder bq = new BooleanQuery.Builder();
+ List disjuncts = new ArrayList<>();
for (Map.Entry entry : weights.entrySet()) {
try {
Query q = createBooleanQuery(entry.getKey(), text, super.getDefaultOperator());
if (q != null) {
- bq.add(wrapWithBoost(q, entry.getValue()), BooleanClause.Occur.SHOULD);
+ disjuncts.add(wrapWithBoost(q, entry.getValue()));
}
} catch (RuntimeException e) {
rethrowUnlessLenient(e);
}
}
- return super.simplify(bq.build());
+ if (disjuncts.size() == 1) {
+ return disjuncts.get(0);
+ }
+ return new DisjunctionMaxQuery(disjuncts, 1.0f);
}
/**
@@ -99,23 +104,26 @@ public Query newDefaultQuery(String text) {
*/
@Override
public Query newFuzzyQuery(String text, int fuzziness) {
- BooleanQuery.Builder bq = new BooleanQuery.Builder();
+ List disjuncts = new ArrayList<>();
for (Map.Entry entry : weights.entrySet()) {
final String fieldName = entry.getKey();
try {
final BytesRef term = getAnalyzer().normalize(fieldName, text);
Query query = new FuzzyQuery(new Term(fieldName, term), fuzziness);
- bq.add(wrapWithBoost(query, entry.getValue()), BooleanClause.Occur.SHOULD);
+ disjuncts.add(wrapWithBoost(query, entry.getValue()));
} catch (RuntimeException e) {
rethrowUnlessLenient(e);
}
}
- return super.simplify(bq.build());
+ if (disjuncts.size() == 1) {
+ return disjuncts.get(0);
+ }
+ return new DisjunctionMaxQuery(disjuncts, 1.0f);
}
@Override
public Query newPhraseQuery(String text, int slop) {
- BooleanQuery.Builder bq = new BooleanQuery.Builder();
+ List disjuncts = new ArrayList<>();
for (Map.Entry entry : weights.entrySet()) {
try {
String field = entry.getKey();
@@ -129,13 +137,16 @@ public Query newPhraseQuery(String text, int slop) {
Float boost = entry.getValue();
Query q = createPhraseQuery(field, text, slop);
if (q != null) {
- bq.add(wrapWithBoost(q, boost), BooleanClause.Occur.SHOULD);
+ disjuncts.add(wrapWithBoost(q, boost));
}
} catch (RuntimeException e) {
rethrowUnlessLenient(e);
}
}
- return super.simplify(bq.build());
+ if (disjuncts.size() == 1) {
+ return disjuncts.get(0);
+ }
+ return new DisjunctionMaxQuery(disjuncts, 1.0f);
}
/**
@@ -144,25 +155,28 @@ public Query newPhraseQuery(String text, int slop) {
*/
@Override
public Query newPrefixQuery(String text) {
- BooleanQuery.Builder bq = new BooleanQuery.Builder();
+ List disjuncts = new ArrayList<>();
for (Map.Entry entry : weights.entrySet()) {
final String fieldName = entry.getKey();
try {
if (settings.analyzeWildcard()) {
Query analyzedQuery = newPossiblyAnalyzedQuery(fieldName, text);
if (analyzedQuery != null) {
- bq.add(wrapWithBoost(analyzedQuery, entry.getValue()), BooleanClause.Occur.SHOULD);
+ disjuncts.add(wrapWithBoost(analyzedQuery, entry.getValue()));
}
} else {
Term term = new Term(fieldName, getAnalyzer().normalize(fieldName, text));
Query query = new PrefixQuery(term);
- bq.add(wrapWithBoost(query, entry.getValue()), BooleanClause.Occur.SHOULD);
+ disjuncts.add(wrapWithBoost(query, entry.getValue()));
}
} catch (RuntimeException e) {
return rethrowUnlessLenient(e);
}
}
- return super.simplify(bq.build());
+ if (disjuncts.size() == 1) {
+ return disjuncts.get(0);
+ }
+ return new DisjunctionMaxQuery(disjuncts, 1.0f);
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/search/MultiMatchQuery.java b/core/src/main/java/org/elasticsearch/index/search/MultiMatchQuery.java
index 7c1f91d3587d5..944e27ed2f59d 100644
--- a/core/src/main/java/org/elasticsearch/index/search/MultiMatchQuery.java
+++ b/core/src/main/java/org/elasticsearch/index/search/MultiMatchQuery.java
@@ -22,9 +22,6 @@
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.Term;
import org.apache.lucene.queries.BlendedTermQuery;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanClause.Occur;
-import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BoostQuery;
import org.apache.lucene.search.DisjunctionMaxQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
@@ -84,7 +81,7 @@ public Query parse(MultiMatchQueryBuilder.Type type, Map fieldNam
queryBuilder = new QueryBuilder(tieBreaker);
break;
case CROSS_FIELDS:
- queryBuilder = new CrossFieldsQueryBuilder(tieBreaker);
+ queryBuilder = new CrossFieldsQueryBuilder();
break;
default:
throw new IllegalStateException("No such type: " + type);
@@ -99,15 +96,9 @@ public Query parse(MultiMatchQueryBuilder.Type type, Map fieldNam
private QueryBuilder queryBuilder;
public class QueryBuilder {
- protected final boolean groupDismax;
protected final float tieBreaker;
public QueryBuilder(float tieBreaker) {
- this(tieBreaker != 1.0f, tieBreaker);
- }
-
- public QueryBuilder(boolean groupDismax, float tieBreaker) {
- this.groupDismax = groupDismax;
this.tieBreaker = tieBreaker;
}
@@ -134,19 +125,11 @@ private Query combineGrouped(List extends Query> groupQuery) {
if (groupQuery.size() == 1) {
return groupQuery.get(0);
}
- if (groupDismax) {
- List queries = new ArrayList<>();
- for (Query query : groupQuery) {
- queries.add(query);
- }
- return new DisjunctionMaxQuery(queries, tieBreaker);
- } else {
- final BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
- for (Query query : groupQuery) {
- booleanQuery.add(query, BooleanClause.Occur.SHOULD);
- }
- return booleanQuery.build();
+ List queries = new ArrayList<>();
+ for (Query query : groupQuery) {
+ queries.add(query);
}
+ return new DisjunctionMaxQuery(queries, tieBreaker);
}
public Query blendTerm(Term term, MappedFieldType fieldType) {
@@ -165,8 +148,8 @@ public Query termQuery(MappedFieldType fieldType, Object value) {
final class CrossFieldsQueryBuilder extends QueryBuilder {
private FieldAndFieldType[] blendedFields;
- CrossFieldsQueryBuilder(float tieBreaker) {
- super(false, tieBreaker);
+ CrossFieldsQueryBuilder() {
+ super(0.0f);
}
@Override
@@ -306,8 +289,6 @@ static Query blendTerms(QueryShardContext context, BytesRef[] values, Float comm
blendedBoost = Arrays.copyOf(blendedBoost, i);
if (commonTermsCutoff != null) {
queries.add(BlendedTermQuery.commonTermsBlendedQuery(terms, blendedBoost, commonTermsCutoff));
- } else if (tieBreaker == 1.0f) {
- queries.add(BlendedTermQuery.booleanBlendedQuery(terms, blendedBoost));
} else {
queries.add(BlendedTermQuery.dismaxBlendedQuery(terms, blendedBoost, tieBreaker));
}
@@ -318,11 +299,7 @@ static Query blendTerms(QueryShardContext context, BytesRef[] values, Float comm
// best effort: add clauses that are not term queries so that they have an opportunity to match
// however their score contribution will be different
// TODO: can we improve this?
- BooleanQuery.Builder bq = new BooleanQuery.Builder();
- for (Query query : queries) {
- bq.add(query, Occur.SHOULD);
- }
- return bq.build();
+ return new DisjunctionMaxQuery(queries, 1.0f);
}
}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index e287278e1c519..90d127c4a5cb3 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -642,10 +642,11 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
return result;
}
- public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) {
+ public Engine.NoOp prepareMarkingSeqNoAsNoOpOnReplica(long seqNo, long opPrimaryTerm, String reason) {
verifyReplicationTarget();
+ assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
long startTime = System.nanoTime();
- return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
+ return new Engine.NoOp(seqNo, opPrimaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
}
public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java
index ce5cc8e76010b..547d5aa499fb3 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Checkpoint.java
@@ -44,6 +44,7 @@ final class Checkpoint {
final long minSeqNo;
final long maxSeqNo;
final long globalCheckpoint;
+ final long minTranslogGeneration;
private static final int INITIAL_VERSION = 1; // start with 1, just to recognize there was some magic serialization logic before
private static final int CURRENT_VERSION = 2; // introduction of global checkpoints
@@ -58,6 +59,7 @@ final class Checkpoint {
+ Long.BYTES // minimum sequence number, introduced in 6.0.0
+ Long.BYTES // maximum sequence number, introduced in 6.0.0
+ Long.BYTES // global checkpoint, introduced in 6.0.0
+ + Long.BYTES // minimum translog generation in the translog - introduced in 6.0.0
+ CodecUtil.footerLength();
// size of 5.0.0 checkpoint
@@ -76,15 +78,19 @@ final class Checkpoint {
* @param minSeqNo the current minimum sequence number of all operations in the translog
* @param maxSeqNo the current maximum sequence number of all operations in the translog
* @param globalCheckpoint the last-known global checkpoint
+ * @param minTranslogGeneration the minimum generation referenced by the translog at this moment.
*/
- Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint) {
- assert minSeqNo <= maxSeqNo;
+ Checkpoint(long offset, int numOps, long generation, long minSeqNo, long maxSeqNo, long globalCheckpoint, long minTranslogGeneration) {
+ assert minSeqNo <= maxSeqNo : "minSeqNo [" + minSeqNo + "] is higher than maxSeqNo [" + maxSeqNo + "]";
+ assert minTranslogGeneration <= generation :
+ "minTranslogGen [" + minTranslogGeneration + "] is higher than generation [" + generation + "]";
this.offset = offset;
this.numOps = numOps;
this.generation = generation;
this.minSeqNo = minSeqNo;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
+ this.minTranslogGeneration = minTranslogGeneration;
}
private void write(DataOutput out) throws IOException {
@@ -94,16 +100,18 @@ private void write(DataOutput out) throws IOException {
out.writeLong(minSeqNo);
out.writeLong(maxSeqNo);
out.writeLong(globalCheckpoint);
+ out.writeLong(minTranslogGeneration);
}
- static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint) {
+ static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
+ long minTranslogGeneration) {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
- return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint);
+ return new Checkpoint(offset, 0, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
}
static Checkpoint readCheckpointV6_0_0(final DataInput in) throws IOException {
- return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
+ return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), in.readLong(), in.readLong(), in.readLong(), in.readLong());
}
// reads a checksummed checkpoint introduced in ES 5.0.0
@@ -111,7 +119,8 @@ static Checkpoint readCheckpointV5_0_0(final DataInput in) throws IOException {
final long minSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
final long globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
- return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint);
+ final long minTranslogGeneration = -1L;
+ return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
}
@Override
@@ -123,6 +132,7 @@ public String toString() {
", minSeqNo=" + minSeqNo +
", maxSeqNo=" + maxSeqNo +
", globalCheckpoint=" + globalCheckpoint +
+ ", minTranslogGeneration=" + minTranslogGeneration +
'}';
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index c351f0346236e..d4a5fe0d99fd7 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -24,6 +24,7 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
@@ -41,6 +42,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardComponent;
@@ -55,7 +57,9 @@
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
@@ -63,7 +67,6 @@
import java.util.function.LongSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -170,11 +173,12 @@ public Translog(
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName());
}
- this.readers.addAll(recoverFromFiles(deletionPolicy.getMinTranslogGenerationForRecovery(), checkpoint));
+ this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
}
boolean success = false;
+ current = null;
try {
current = createWriter(checkpoint.generation + 1);
success = true;
@@ -192,14 +196,13 @@ public Translog(
final long generation = deletionPolicy.getMinTranslogGenerationForRecovery();
logger.debug("wipe translog location - creating new translog, starting generation [{}]", generation);
Files.createDirectories(location);
- final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong());
+ final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, globalCheckpointSupplier.getAsLong(), generation);
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
Checkpoint.write(getChannelFactory(), checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
IOUtils.fsync(checkpointFile, false);
- current = createWriter(generation);
-
+ current = createWriter(generation, generation);
+ readers.clear();
}
- // now that we know which files are there, create a new current one.
} catch (Exception e) {
// close the opened translog files if we fail to create a new translog...
IOUtils.closeWhileHandlingException(current);
@@ -209,29 +212,46 @@ public Translog(
}
/** recover all translog files found on disk */
- private ArrayList recoverFromFiles(long translogFileGeneration, Checkpoint checkpoint) throws IOException {
+ private ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException {
boolean success = false;
ArrayList foundTranslogs = new ArrayList<>();
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
boolean tempFileRenamed = false;
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
+
+ final long minGenerationToRecoverFrom;
+ if (checkpoint.minTranslogGeneration < 0) {
+ final Version indexVersionCreated = indexSettings().getIndexVersionCreated();
+ assert indexVersionCreated.before(Version.V_6_0_0_alpha3) :
+ "no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]";
+ minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery();
+ } else {
+ minGenerationToRecoverFrom = checkpoint.minTranslogGeneration;
+ }
+
final String checkpointTranslogFile = getFilename(checkpoint.generation);
// we open files in reverse order in order to validate tranlsog uuid before we start traversing the translog based on
// the generation id we found in the lucene commit. This gives for better error messages if the wrong
// translog was found.
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
- for (long i = checkpoint.generation - 1; i >= translogFileGeneration; i--) {
+ for (long i = checkpoint.generation - 1; i >= minGenerationToRecoverFrom; i--) {
Path committedTranslogFile = location.resolve(getFilename(i));
if (Files.exists(committedTranslogFile) == false) {
throw new IllegalStateException("translog file doesn't exist with generation: " + i + " recovering from: " +
- translogFileGeneration + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
+ minGenerationToRecoverFrom + " checkpoint: " + checkpoint.generation + " - translog ids must be consecutive");
}
final TranslogReader reader = openReader(committedTranslogFile, Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))));
foundTranslogs.add(reader);
logger.debug("recovered local translog from checkpoint {}", checkpoint);
}
Collections.reverse(foundTranslogs);
+
+ // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them;
+ // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there
+ IOUtils.deleteFilesIgnoringExceptions(location.resolve(getFilename(minGenerationToRecoverFrom - 1)),
+ location.resolve(getCommitCheckpointFileName(minGenerationToRecoverFrom - 1)));
+
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
if (Files.exists(commitCheckpoint)) {
Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
@@ -332,6 +352,20 @@ public long currentFileGeneration() {
}
}
+ /**
+ * Returns the minimum file generation referenced by the translog
+ */
+ long getMinFileGeneration() {
+ try (ReleasableLock ignored = readLock.acquire()) {
+ if (readers.isEmpty()) {
+ return current.getGeneration();
+ } else {
+ return readers.get(0).getGeneration();
+ }
+ }
+ }
+
+
/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
*/
@@ -372,7 +406,6 @@ private long sizeInBytes(long minGeneration) {
}
}
-
/**
* Creates a new translog for the specified generation.
*
@@ -381,6 +414,18 @@ private long sizeInBytes(long minGeneration) {
* @throws IOException if creating the translog failed
*/
TranslogWriter createWriter(long fileGeneration) throws IOException {
+ return createWriter(fileGeneration, getMinFileGeneration());
+ }
+
+ /**
+ * creates a new writer
+ *
+ * @param fileGeneration the generation of the write to be written
+ * @param initialMinTranslogGen the minimum translog generation to be written in the first checkpoint. This is
+ * needed to solve and initialization problem while constructing an empty translog.
+ * With no readers and no current, a call to {@link #getMinFileGeneration()} would not work.
+ */
+ private TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen) throws IOException {
final TranslogWriter newFile;
try {
newFile = TranslogWriter.create(
@@ -390,7 +435,9 @@ TranslogWriter createWriter(long fileGeneration) throws IOException {
location.resolve(getFilename(fileGeneration)),
getChannelFactory(),
config.getBufferSize(),
- globalCheckpointSupplier);
+ globalCheckpointSupplier,
+ initialMinTranslogGen,
+ this::getMinFileGeneration);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
@@ -494,12 +541,18 @@ public long getLastSyncedGlobalCheckpoint() {
* Snapshots are fixed in time and will not be updated with future operations.
*/
public Snapshot newSnapshot() {
- return createSnapshot(Long.MIN_VALUE);
+ try (ReleasableLock ignored = readLock.acquire()) {
+ return newSnapshot(getMinFileGeneration());
+ }
}
- private Snapshot createSnapshot(long minGeneration) {
+ public Snapshot newSnapshot(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
+ if (minGeneration < getMinFileGeneration()) {
+ throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
+ "Min referenced generation is [" + getMinFileGeneration() + "]");
+ }
Snapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.map(BaseTranslogReader::newSnapshot).toArray(Snapshot[]::new);
@@ -673,7 +726,7 @@ public long sizeInBytes() {
/** create a snapshot from this view */
public Snapshot snapshot() {
ensureOpen();
- return Translog.this.createSnapshot(minGeneration);
+ return Translog.this.newSnapshot(minGeneration);
}
void ensureOpen() {
@@ -868,8 +921,8 @@ public static class Index implements Operation {
private final String id;
private final long autoGeneratedIdTimestamp;
private final String type;
- private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
- private long primaryTerm = 0;
+ private final long seqNo;
+ private final long primaryTerm;
private final long version;
private final VersionType versionType;
private final BytesReference source;
@@ -899,6 +952,9 @@ public Index(StreamInput in) throws IOException {
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
+ } else {
+ seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ primaryTerm = 0;
}
}
@@ -925,6 +981,7 @@ public Index(String type, String id, long seqNo, long version, VersionType versi
this.id = id;
this.source = new BytesArray(source);
this.seqNo = seqNo;
+ this.primaryTerm = 0;
this.version = version;
this.versionType = versionType;
this.routing = routing;
@@ -1062,27 +1119,42 @@ public long getAutoGeneratedIdTimestamp() {
public static class Delete implements Operation {
- private static final int FORMAT_5_X = 2;
- private static final int FORMAT_SEQ_NO = FORMAT_5_X + 1;
+ public static final int FORMAT_5_0 = 2; // 5.0 - 5.5
+ private static final int FORMAT_SINGLE_TYPE = FORMAT_5_0 + 1; // 5.5 - 6.0
+ private static final int FORMAT_SEQ_NO = FORMAT_SINGLE_TYPE + 1; // 6.0 - *
public static final int SERIALIZATION_FORMAT = FORMAT_SEQ_NO;
- private String type, id;
- private Term uid;
- private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
- private long primaryTerm = 0;
- private long version = Versions.MATCH_ANY;
- private VersionType versionType = VersionType.INTERNAL;
+ private final String type, id;
+ private final Term uid;
+ private final long seqNo;
+ private final long primaryTerm;
+ private final long version;
+ private final VersionType versionType;
public Delete(StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
- assert format >= FORMAT_5_X : "format was: " + format;
- uid = new Term(in.readString(), in.readString());
+ assert format >= FORMAT_5_0 : "format was: " + format;
+ if (format >= FORMAT_SINGLE_TYPE) {
+ type = in.readString();
+ id = in.readString();
+ uid = new Term(in.readString(), in.readString());
+ } else {
+ uid = new Term(in.readString(), in.readString());
+ // the uid was constructed from the type and id so we can
+ // extract them back
+ Uid uidObject = Uid.createUid(uid.text());
+ type = uidObject.type();
+ id = uidObject.id();
+ }
this.version = in.readLong();
this.versionType = VersionType.fromValue(in.readByte());
assert versionType.validateVersionForWrites(this.version);
if (format >= FORMAT_SEQ_NO) {
seqNo = in.readLong();
primaryTerm = in.readLong();
+ } else {
+ seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
+ primaryTerm = 0;
}
}
@@ -1096,8 +1168,8 @@ public Delete(String type, String id, long seqNo, Term uid) {
}
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) {
- this.type = type;
- this.id = id;
+ this.type = Objects.requireNonNull(type);
+ this.id = Objects.requireNonNull(id);
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
@@ -1153,6 +1225,8 @@ public Source getSource() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
+ out.writeString(type);
+ out.writeString(id);
out.writeString(uid.field());
out.writeString(uid.text());
out.writeLong(version);
@@ -1442,30 +1516,58 @@ public void rollGeneration() throws IOException {
* Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
* required generation
*/
- public void trimUnreferencedReaders() {
+ public void trimUnreferencedReaders() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
- final long minExistingGen = readers.isEmpty() ? current.getGeneration() : readers.get(0).getGeneration();
- assert minReferencedGen >= minExistingGen :
+ assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
- + minExistingGen + "]";
- final List unreferenced =
- readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
- for (final TranslogReader unreferencedReader : unreferenced) {
- final Path translogPath = unreferencedReader.path();
+ + getMinFileGeneration() + "]";
+ assert minReferencedGen <= currentFileGeneration() :
+ "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation ["
+ + currentFileGeneration() + "]";
+
+
+ for (Iterator iterator = readers.iterator(); iterator.hasNext(); ) {
+ TranslogReader reader = iterator.next();
+ if (reader.getGeneration() >= minReferencedGen) {
+ break;
+ }
+ iterator.remove();
+ IOUtils.closeWhileHandlingException(reader);
+ final Path translogPath = reader.path();
logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
- IOUtils.closeWhileHandlingException(unreferencedReader);
- IOUtils.deleteFilesIgnoringExceptions(translogPath,
- translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
+ // The checkpoint is used when opening the translog to know which files should be recovered from.
+ // We now update the checkpoint to ignore the file we are going to remove.
+ // Note that there is a provision in recoverFromFiles to allow for the case where we synced the checkpoint
+ // but crashed before we could delete the file.
+ current.sync();
+ deleteReaderFiles(reader);
}
- readers.removeAll(unreferenced);
+ assert readers.isEmpty() == false || current.generation == minReferencedGen :
+ "all readers were cleaned but the minReferenceGen [" + minReferencedGen + "] is not the current writer's gen [" +
+ current.generation + "]";
+ } catch (Exception ex) {
+ try {
+ closeOnTragicEvent(ex);
+ } catch (final Exception inner) {
+ ex.addSuppressed(inner);
+ }
+ throw ex;
}
}
+ /**
+ * deletes all files associated with a reader. package-private to be able to simulate node failures at this point
+ */
+ void deleteReaderFiles(TranslogReader reader) {
+ IOUtils.deleteFilesIgnoringExceptions(reader.path(),
+ reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration())));
+ }
+
void closeFilesIfNoPendingViews() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get() && deletionPolicy.pendingViewsCount() == 0) {
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
index 4a98365e02fba..d637c9da79f65 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java
@@ -71,6 +71,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
private volatile long maxSeqNo;
private final LongSupplier globalCheckpointSupplier;
+ private final LongSupplier minTranslogGenerationSupplier;
protected final AtomicBoolean closed = new AtomicBoolean(false);
// lock order synchronized(syncLock) -> synchronized(this)
@@ -85,10 +86,11 @@ private TranslogWriter(
final FileChannel channel,
final Path path,
final ByteSizeValue bufferSize,
- final LongSupplier globalCheckpointSupplier) throws IOException {
+ final LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier) throws IOException {
super(initialCheckpoint.generation, channel, path, channel.position());
this.shardId = shardId;
this.channelFactory = channelFactory;
+ this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channel), bufferSize.bytesAsInt());
this.lastSyncedCheckpoint = initialCheckpoint;
this.totalOffset = initialCheckpoint.offset;
@@ -121,7 +123,9 @@ public static TranslogWriter create(
Path file,
ChannelFactory channelFactory,
ByteSizeValue bufferSize,
- final LongSupplier globalCheckpointSupplier) throws IOException {
+ final LongSupplier globalCheckpointSupplier,
+ final long initialMinTranslogGen,
+ final LongSupplier minTranslogGenerationSupplier) throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
@@ -132,9 +136,11 @@ public static TranslogWriter create(
writeHeader(out, ref);
channel.force(true);
final Checkpoint checkpoint =
- Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong());
+ Checkpoint.emptyTranslogCheckpoint(headerLength, fileGeneration, globalCheckpointSupplier.getAsLong(),
+ initialMinTranslogGen);
writeCheckpoint(channelFactory, file.getParent(), checkpoint);
- return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier);
+ return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, globalCheckpointSupplier,
+ minTranslogGenerationSupplier);
} catch (Exception exception) {
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
@@ -242,7 +248,9 @@ public void sync() throws IOException {
* checkpoint has not yet been fsynced
*/
public boolean syncNeeded() {
- return totalOffset != lastSyncedCheckpoint.offset || globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint;
+ return totalOffset != lastSyncedCheckpoint.offset ||
+ globalCheckpointSupplier.getAsLong() != lastSyncedCheckpoint.globalCheckpoint ||
+ minTranslogGenerationSupplier.getAsLong() != lastSyncedCheckpoint.minTranslogGeneration;
}
@Override
@@ -330,6 +338,7 @@ public boolean syncUpTo(long offset) throws IOException {
final long currentMinSeqNo;
final long currentMaxSeqNo;
final long currentGlobalCheckpoint;
+ final long currentMinTranslogGeneration;
synchronized (this) {
ensureOpen();
try {
@@ -339,6 +348,7 @@ public boolean syncUpTo(long offset) throws IOException {
currentMinSeqNo = minSeqNo;
currentMaxSeqNo = maxSeqNo;
currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
+ currentMinTranslogGeneration = minTranslogGenerationSupplier.getAsLong();
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@@ -354,7 +364,8 @@ public boolean syncUpTo(long offset) throws IOException {
try {
channel.force(false);
checkpoint =
- writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo, currentGlobalCheckpoint, path.getParent(), generation);
+ writeCheckpoint(channelFactory, offsetToSync, opsCounter, currentMinSeqNo, currentMaxSeqNo,
+ currentGlobalCheckpoint, currentMinTranslogGeneration, path.getParent(), generation);
} catch (Exception ex) {
try {
closeWithTragicEvent(ex);
@@ -398,9 +409,11 @@ private static Checkpoint writeCheckpoint(
long minSeqNo,
long maxSeqNo,
long globalCheckpoint,
+ long minTranslogGeneration,
Path translogFile,
long generation) throws IOException {
- final Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint);
+ final Checkpoint checkpoint =
+ new Checkpoint(syncPosition, numOperations, generation, minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration);
writeCheckpoint(channelFactory, translogFile, checkpoint);
return checkpoint;
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
index ea1f4c13dfd6a..408691692cacf 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java
@@ -168,8 +168,8 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
/** Write a checkpoint file to the given location with the given generation */
public static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration) throws IOException {
- Checkpoint emptyCheckpoint =
- Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, SequenceNumbersService.UNASSIGNED_SEQ_NO);
+ Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO, translogGeneration);
Checkpoint.write(FileChannel::open, filename, emptyCheckpoint,
StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
// fsync with metadata here to make sure.
diff --git a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
index 08669188a9fdb..25772d2d9a4d0 100644
--- a/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
+++ b/core/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
@@ -22,6 +22,10 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.script.TemplateScript;
import java.util.ArrayList;
import java.util.Arrays;
@@ -29,6 +33,8 @@
import java.util.List;
import java.util.Map;
+import static org.elasticsearch.script.Script.DEFAULT_TEMPLATE_LANG;
+
public final class ConfigurationUtils {
public static final String TAG_KEY = "tag";
@@ -265,10 +271,24 @@ public static List readProcessorConfigs(List