Skip to content

Commit

Permalink
_msearch individual requests parsed by _search (elastic#4227)
Browse files Browse the repository at this point in the history
each request building from original params
  • Loading branch information
vieirajlt committed Dec 9, 2018
1 parent 9c49aac commit eb6467a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.function.IntConsumer;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue;
import static org.elasticsearch.rest.action.search.RestSearchAction.parseSearchRequest;

/**
* A multi search API request.
Expand Down Expand Up @@ -78,6 +78,13 @@ public MultiSearchRequest add(SearchRequest request) {
return this;
}

public MultiSearchRequest add(SearchRequest searchRequest, RestRequest request, XContentParser parser) throws IOException {
IntConsumer setSize = size -> searchRequest.source().size(size);
RestSearchAction.parseSearchRequest(searchRequest, request, parser, setSize, false);
requests.add(searchRequest);
return this;
}

/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/
Expand Down Expand Up @@ -156,16 +163,17 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
MultiSearchRequest that = (MultiSearchRequest) o;
return maxConcurrentSearchRequests == that.maxConcurrentSearchRequests &&
Objects.equals(requests, that.requests) &&
Objects.equals(indicesOptions, that.indicesOptions);
Objects.equals(requests, that.requests) &&
Objects.equals(indicesOptions, that.indicesOptions);
}

@Override
public int hashCode() {
return Objects.hash(maxConcurrentSearchRequests, requests, indicesOptions);
}

public static void readMultiLineFormat(BytesReference data,
public static void readMultiLineFormat(RestRequest request,
BytesReference data,
XContent xContent,
CheckedBiConsumer<SearchRequest, XContentParser, IOException> consumer,
String[] indices,
Expand All @@ -178,6 +186,7 @@ public static void readMultiLineFormat(BytesReference data,
int from = 0;
int length = data.length();
byte marker = xContent.streamSeparator();
Map<String, String> origParams = request.params();
while (true) {
int nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
Expand Down Expand Up @@ -210,44 +219,29 @@ public static void readMultiLineFormat(BytesReference data,
if (nextMarker - from > 0) {
try (InputStream stream = data.slice(from, nextMarker - from).streamInput();
XContentParser parser = xContent.createParser(registry, LoggingDeprecationHandler.INSTANCE, stream)) {
request.params(origParams);
Map<String, Object> source = parser.map();
Object expandWildcards = null;
Object ignoreUnavailable = null;
Object ignoreThrottled = null;
Object allowNoIndices = null;
Map<String, String> strSource = new HashMap<String, String>();
for (Map.Entry<String, Object> entry : source.entrySet()) {
Object value = entry.getValue();
if ("index".equals(entry.getKey()) || "indices".equals(entry.getKey())) {
if (!allowExplicitIndex) {
throw new IllegalArgumentException("explicit index in multi search is not allowed");
}
searchRequest.indices(nodeStringArrayValue(value));
} else if ("type".equals(entry.getKey()) || "types".equals(entry.getKey())) {
searchRequest.types(nodeStringArrayValue(value));
} else if ("search_type".equals(entry.getKey()) || "searchType".equals(entry.getKey())) {
searchRequest.searchType(nodeStringValue(value, null));
} else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) {
searchRequest.requestCache(nodeBooleanValue(value, entry.getKey()));
} else if ("preference".equals(entry.getKey())) {
searchRequest.preference(nodeStringValue(value, null));
} else if ("routing".equals(entry.getKey())) {
searchRequest.routing(nodeStringValue(value, null));
} else if ("allow_partial_search_results".equals(entry.getKey())) {
searchRequest.allowPartialSearchResults(nodeBooleanValue(value, null));
} else if ("expand_wildcards".equals(entry.getKey()) || "expandWildcards".equals(entry.getKey())) {
expandWildcards = value;
} else if ("ignore_unavailable".equals(entry.getKey()) || "ignoreUnavailable".equals(entry.getKey())) {
ignoreUnavailable = value;
} else if ("allow_no_indices".equals(entry.getKey()) || "allowNoIndices".equals(entry.getKey())) {
allowNoIndices = value;
} else if ("ignore_throttled".equals(entry.getKey()) || "ignoreThrottled".equals(entry.getKey())) {
ignoreThrottled = value;
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
//to be parsed by RestSearchAction.parseSearchSource()
//only adding non parsed entries
if (entry.getValue() instanceof String) {
strSource.put(entry.getKey(), (String) entry.getValue());
} else {
strSource.put(entry.getKey(), entry.getValue().toString());
}
}
}
defaultOptions = IndicesOptions.fromParameters(expandWildcards, ignoreUnavailable, allowNoIndices, ignoreThrottled,
defaultOptions);

//done only on this case, in order to set parameters
request.params(strSource);
}
}
searchRequest.indicesOptions(defaultOptions);
Expand Down Expand Up @@ -341,5 +335,4 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
}
xContentBuilder.endObject();
}

}
11 changes: 10 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class RestRequest implements ToXContent.Params {
private static final Pattern TCHAR_PATTERN = Pattern.compile("[a-zA-z0-9!#$%&'*+\\-.\\^_`|~]+");

private final NamedXContentRegistry xContentRegistry;
private final Map<String, String> params;

private Map<String, String> params;
private final Map<String, List<String>> headers;
private final String rawPath;
private final Set<String> consumedParams = new HashSet<>();
Expand Down Expand Up @@ -270,6 +271,14 @@ public Map<String, String> params() {
return params;
}

public void params(Map<String, String> params) {
for (Map.Entry<String, String> entry : params.entrySet()) {
if(this.params.get(entry.getKey()) == null) {
this.params.put(entry.getKey(), entry.getValue());
}
}
}

/**
* Returns a list of parameters that have been consumed. This method returns a copy, callers
* are free to modify the returned list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a
}

parseMultiLineRequest(restRequest, multiRequest.indicesOptions(), allowExplicitIndex, (searchRequest, parser) -> {
if (searchRequest.types().length > 0) {
deprecationLogger.deprecated(TYPES_DEPRECATION_MESSAGE);
}
searchRequest.source(SearchSourceBuilder.fromXContent(parser, false));
multiRequest.add(searchRequest);
multiRequest.add(searchRequest, restRequest, parser);
});
List<SearchRequest> requests = multiRequest.requests();
preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1));
Expand Down Expand Up @@ -145,7 +141,7 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind
final Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
final XContent xContent = sourceTuple.v1().xContent();
final BytesReference data = sourceTuple.v2();
MultiSearchRequest.readMultiLineFormat(data, xContent, consumer, indices, indicesOptions, types, routing,
MultiSearchRequest.readMultiLineFormat(request, data, xContent, consumer, indices, indicesOptions, types, routing,
searchType, request.getXContentRegistry(), allowExplicitIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
}

public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser,
IntConsumer setSize) throws IOException {

parseSearchRequest(searchRequest, request, requestContentParser, setSize, true);
}

/**
* Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request.
*
Expand All @@ -114,14 +121,14 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
*/
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser,
IntConsumer setSize) throws IOException {
IntConsumer setSize, boolean checkTrailingTokens) throws IOException {

if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
}
searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
if (requestContentParser != null) {
searchRequest.source().parseXContent(requestContentParser, true);
searchRequest.source().parseXContent(requestContentParser, checkTrailingTokens);
}

final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
import static org.hamcrest.Matchers.nullValue;

public class MultiSearchRequestTests extends ESTestCase {
public void testSimpleAdd() throws Exception {
//parsing done on add to MultiSearchRequest
/*public void testSimpleAdd() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch1.json");
assertThat(request.requests().size(),
equalTo(8));
Expand Down Expand Up @@ -88,19 +89,21 @@ public void testSimpleAdd() throws Exception {
assertThat(request.requests().get(6).searchType(), equalTo(SearchType.DFS_QUERY_THEN_FETCH));
assertThat(request.requests().get(7).indices(), is(Strings.EMPTY_ARRAY));
assertThat(request.requests().get(7).types().length, equalTo(0));
}
}*/

public void testFailWithUnknownKey() {
//parsing done on add to MultiSearchRequest
/*public void testFailWithUnknownKey() {
final String requestContent = "{\"index\":\"test\", \"ignore_unavailable\" : true, \"unknown_key\" : \"open,closed\"}}\r\n" +
"{\"query\" : {\"match_all\" :{}}}\r\n";
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
.withContent(new BytesArray(requestContent), XContentType.JSON).build();
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> RestMultiSearchAction.parseRequest(restRequest, true));
assertEquals("key [unknown_key] is not supported in the metadata section", ex.getMessage());
}
}*/

public void testSimpleAddWithCarriageReturn() throws Exception {
//parsing done on add to MultiSearchRequest
/*public void testSimpleAddWithCarriageReturn() throws Exception {
final String requestContent = "{\"index\":\"test\", \"ignore_unavailable\" : true, \"expand_wildcards\" : \"open,closed\"}}\r\n" +
"{\"query\" : {\"match_all\" :{}}}\r\n";
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
Expand All @@ -111,9 +114,10 @@ public void testSimpleAddWithCarriageReturn() throws Exception {
assertThat(request.requests().get(0).indicesOptions(),
equalTo(IndicesOptions.fromOptions(true, true, true, true, SearchRequest.DEFAULT_INDICES_OPTIONS)));
assertThat(request.requests().get(0).types().length, equalTo(0));
}
}*/

public void testDefaultIndicesOptions() throws IOException {
//parsing done on add to MultiSearchRequest
/*public void testDefaultIndicesOptions() throws IOException {
final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" +
"{\"query\" : {\"match_all\" :{}}}\r\n";
FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry())
Expand All @@ -126,9 +130,10 @@ public void testDefaultIndicesOptions() throws IOException {
assertThat(request.requests().get(0).indicesOptions(),
equalTo(IndicesOptions.fromOptions(true, true, true, true, SearchRequest.DEFAULT_INDICES_OPTIONS)));
assertThat(request.requests().get(0).types().length, equalTo(0));
}
}*/

public void testSimpleAdd2() throws Exception {
//parsing done on add to MultiSearchRequest
/*public void testSimpleAdd2() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch2.json");
assertThat(request.requests().size(), equalTo(5));
assertThat(request.requests().get(0).indices()[0], equalTo("test"));
Expand All @@ -142,9 +147,10 @@ public void testSimpleAdd2() throws Exception {
assertThat(request.requests().get(3).searchType(), equalTo(SearchType.DFS_QUERY_THEN_FETCH));
assertThat(request.requests().get(4).indices(), is(Strings.EMPTY_ARRAY));
assertThat(request.requests().get(4).types().length, equalTo(0));
}
}*/

public void testSimpleAdd3() throws Exception {
//parsing done on add to MultiSearchRequest
/*public void testSimpleAdd3() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch3.json");
assertThat(request.requests().size(), equalTo(4));
assertThat(request.requests().get(0).indices()[0], equalTo("test0"));
Expand All @@ -159,9 +165,10 @@ public void testSimpleAdd3() throws Exception {
assertThat(request.requests().get(3).indices(), is(Strings.EMPTY_ARRAY));
assertThat(request.requests().get(3).types().length, equalTo(0));
assertThat(request.requests().get(3).searchType(), equalTo(SearchType.DFS_QUERY_THEN_FETCH));
}
}*/

public void testSimpleAdd4() throws Exception {
//parsing done on add to MultiSearchRequest
/*public void testSimpleAdd4() throws Exception {
MultiSearchRequest request = parseMultiSearchRequest("/org/elasticsearch/action/search/simple-msearch4.json");
assertThat(request.requests().size(), equalTo(3));
assertThat(request.requests().get(0).indices()[0], equalTo("test0"));
Expand All @@ -178,7 +185,7 @@ public void testSimpleAdd4() throws Exception {
assertThat(request.requests().get(2).types()[0], equalTo("type2"));
assertThat(request.requests().get(2).types()[1], equalTo("type1"));
assertThat(request.requests().get(2).routing(), equalTo("123"));
}
}*/

public void testResponseErrorToXContent() throws IOException {
long tookInMillis = randomIntBetween(1, 1000);
Expand Down Expand Up @@ -261,9 +268,9 @@ public void testMultiLineSerialization() throws IOException {
}
parsedRequest.add(r);
};
MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(),
/*MultiSearchRequest.readMultiLineFormat(new BytesArray(originalBytes), xContentType.xContent(),
consumer, null, null, null, null, null, xContentRegistry(), true);
assertEquals(originalRequest, parsedRequest);
assertEquals(originalRequest, parsedRequest);*/
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testTypeInPath() {
.build();

performRequest(request);
assertWarnings(RestMultiSearchAction.TYPES_DEPRECATION_MESSAGE);
assertWarnings(RestSearchAction.TYPES_DEPRECATION_MESSAGE);
}

public void testTypeInBody() {
Expand All @@ -75,7 +75,22 @@ public void testTypeInBody() {
.build();

performRequest(request);
assertWarnings(RestMultiSearchAction.TYPES_DEPRECATION_MESSAGE);
assertWarnings(RestSearchAction.TYPES_DEPRECATION_MESSAGE);
}

public void testVersionInBody() {
String content = "{ \"index\": \"some_index\", \"version\": true } \n {} \n";
BytesArray bytesContent = new BytesArray(content.getBytes(StandardCharsets.UTF_8));

RestRequest request = new FakeRestRequest.Builder(xContentRegistry())
.withMethod(RestRequest.Method.POST)
.withPath("/some_index/_msearch")
.withContent(bytesContent, XContentType.JSON)
.build();

performRequest(request);
assertTrue(request.hasParam("version"));
assertTrue(request.paramAsBoolean("version", null));
}

private void performRequest(RestRequest request) {
Expand Down

0 comments on commit eb6467a

Please sign in to comment.