From d054ab3b3720e426238a4ee4210ec0508c968012 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 12 Feb 2019 15:04:10 -0600 Subject: [PATCH 1/2] ML refactor DatafeedsConfig(Update) so defaults are not populated in queries or aggs --- .../core/ml/datafeed/DatafeedConfig.java | 120 +++++++++--------- .../core/ml/datafeed/DatafeedUpdate.java | 103 +++++++++------ .../xpack/core/ml/job/messages/Messages.java | 4 +- .../core/ml/datafeed/DatafeedConfigTests.java | 59 +++++---- .../core/ml/datafeed/DatafeedUpdateTests.java | 65 ++++++---- .../deprecation/MlDeprecationChecksTests.java | 2 +- .../ml/integration/DelayedDataDetectorIT.java | 4 +- .../rest-api-spec/test/ml/datafeeds_crud.yml | 6 +- 8 files changed, 206 insertions(+), 157 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index ed858b58dd484..e5c7f5be7f6ab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.core.ml.datafeed; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; @@ -18,11 +20,9 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.AbstractQueryBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; @@ -71,19 +71,12 @@ public class DatafeedConfig extends AbstractDiffable implements (objectMap, id, warnings) -> { try { return QUERY_TRANSFORMER.fromMap(objectMap, warnings); - } catch (IOException | XContentParseException exception) { + } catch (Exception exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, - id, - exception.getCause().getMessage()), - exception.getCause()); - } else { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id), - exception); + exception = (Exception)exception.getCause(); } + throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id), exception); } }; @@ -92,22 +85,17 @@ public class DatafeedConfig extends AbstractDiffable implements (objectMap, id, warnings) -> { try { return AGG_TRANSFORMER.fromMap(objectMap, warnings); - } catch (IOException | XContentParseException exception) { + } catch (Exception exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, - id, - exception.getCause().getMessage()), - exception.getCause()); - } else { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id), - exception); + exception = (Exception)exception.getCause(); } + throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id), exception); } }; + private static final Logger logger = LogManager.getLogger(DatafeedConfig.class); + // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("datafeeds"); public static String TYPE = "datafeed"; @@ -164,15 +152,9 @@ private static ObjectParser createParser(boolean ignoreUnknownFie builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY); parser.declareString((builder, val) -> builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY); - if (ignoreUnknownFields) { - parser.declareObject(Builder::setQuery, (p, c) -> p.mapOrdered(), QUERY); - parser.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), AGGREGATIONS); - parser.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), AGGS); - } else { - parser.declareObject(Builder::setParsedQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY); - parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS); - parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS); - } + parser.declareObject((builder, val) -> builder.setQuery(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), QUERY); + parser.declareObject((builder, val) -> builder.setAggregations(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), AGGREGATIONS); + parser.declareObject((builder, val) -> builder.setAggregations(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), AGGS); parser.declareObject(Builder::setScriptFields, (p, c) -> { List parsedScriptFields = new ArrayList<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { @@ -582,7 +564,7 @@ public static class Builder { private TimeValue queryDelay; private TimeValue frequency; private List indices = Collections.emptyList(); - private Map query; + private Map query = Collections.singletonMap(MatchAllQueryBuilder.NAME, Collections.emptyMap()); private Map aggregations; private List scriptFields; private Integer scrollSize = DEFAULT_SCROLL_SIZE; @@ -590,11 +572,7 @@ public static class Builder { private Map headers = Collections.emptyMap(); private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); - public Builder() { - try { - this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery()); - } catch (IOException ex) { /*Should never happen*/ } - } + public Builder() { } public Builder(String id, String jobId) { this(); @@ -647,48 +625,64 @@ public void setFrequency(TimeValue frequency) { this.frequency = frequency; } - public void setParsedQuery(QueryBuilder query) { + public void setQuery(Map query) { + setQuery(query, true); + } + + public void setQuery(Map query, boolean lenient) { + this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()); try { - setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()))); - } catch (IOException | XContentParseException exception) { - if (exception.getCause() instanceof IllegalArgumentException) { - // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, - id, - exception.getCause().getMessage()), - exception.getCause()); + QUERY_TRANSFORMER.fromMap(query); + } catch(Exception ex) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id); + + if (ex.getCause() instanceof IllegalArgumentException) { + ex = (Exception)ex.getCause(); + } + + if (lenient) { + logger.warn(msg, ex); } else { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception); + throw ExceptionsHelper.badRequestException(msg, ex); } } } - public void setQuery(Map query) { - this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName()); - } - + // Kept for easier testing public void setParsedAggregations(AggregatorFactories.Builder aggregations) { try { setAggregations(AGG_TRANSFORMER.toMap(aggregations)); - } catch (IOException | XContentParseException exception) { + } catch (Exception exception) { // Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user if (exception.getCause() instanceof IllegalArgumentException) { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, - id, - exception.getCause().getMessage()), - exception.getCause()); - } else { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception); + exception = (Exception)exception.getCause(); } + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id), exception); } } void setAggregations(Map aggregations) { + setAggregations(aggregations, true); + } + + void setAggregations(Map aggregations, boolean lenient) { this.aggregations = aggregations; + try { + AGG_TRANSFORMER.fromMap(aggregations); + } catch (Exception ex) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id); + + if (ex.getCause() instanceof IllegalArgumentException) { + ex = (Exception)ex.getCause(); + } + + if (lenient) { + logger.warn(msg, ex); + } else { + throw ExceptionsHelper.badRequestException(msg, ex); + } + } } public void setScriptFields(List scriptFields) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index 23c2eeccc6a59..ab686e757d7c7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -16,13 +16,12 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -34,6 +33,11 @@ import java.util.Objects; import java.util.stream.Collectors; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.AGG_TRANSFORMER; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.QUERY_TRANSFORMER; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyAggParser; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyQueryParser; + /** * A datafeed update contains partial properties to update a {@link DatafeedConfig}. * The main difference between this class and {@link DatafeedConfig} is that here all @@ -52,12 +56,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { TimeValue.parseTimeValue(val, DatafeedConfig.QUERY_DELAY.getPreferredName())), DatafeedConfig.QUERY_DELAY); PARSER.declareString((builder, val) -> builder.setFrequency( TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY); - PARSER.declareObject(Builder::setQuery, - (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), DatafeedConfig.QUERY); - PARSER.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), - DatafeedConfig.AGGREGATIONS); - PARSER.declareObject(Builder::setAggregations,(p, c) -> AggregatorFactories.parseAggregators(p), - DatafeedConfig.AGGS); + PARSER.declareObject(Builder::setQuery, (p, c) -> p.mapOrdered(), DatafeedConfig.QUERY); + PARSER.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), DatafeedConfig.AGGREGATIONS); + PARSER.declareObject(Builder::setAggregations,(p, c) -> p.mapOrdered(), DatafeedConfig.AGGS); PARSER.declareObject(Builder::setScriptFields, (p, c) -> { List parsedScriptFields = new ArrayList<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { @@ -78,16 +79,16 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final TimeValue queryDelay; private final TimeValue frequency; private final List indices; - private final QueryBuilder query; - private final AggregatorFactories.Builder aggregations; + private final Map query; + private final Map aggregations; private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final DelayedDataCheckConfig delayedDataCheckConfig; - private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, QueryBuilder query, - AggregatorFactories.Builder aggregations, List scriptFields, Integer scrollSize, - ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { + private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, + Map query, Map aggregations, List scriptFields, + Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -117,8 +118,17 @@ public DatafeedUpdate(StreamInput in) throws IOException { in.readStringList(); } } - this.query = in.readOptionalNamedWriteable(QueryBuilder.class); - this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); + if (in.getVersion().before(Version.V_7_1_0)) { + this.query = QUERY_TRANSFORMER.toMap(in.readNamedWriteable(QueryBuilder.class)); + this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new)); + } else { + this.query = in.readMap(); + if (in.readBoolean()) { + this.aggregations = in.readMap(); + } else { + this.aggregations = null; + } + } if (in.readBoolean()) { this.scriptFields = in.readList(SearchSourceBuilder.ScriptField::new); } else { @@ -158,8 +168,16 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(true); out.writeStringCollection(Collections.emptyList()); } - out.writeOptionalNamedWriteable(query); - out.writeOptionalWriteable(aggregations); + if (out.getVersion().before(Version.V_7_1_0)) { + out.writeNamedWriteable(lazyQueryParser.apply(query, id, new ArrayList<>())); + out.writeOptionalWriteable(lazyAggParser.apply(aggregations, id, new ArrayList<>())); + } else { + out.writeMap(query); + out.writeBoolean(aggregations != null); + if (aggregations != null) { + out.writeMap(aggregations); + } + } if (scriptFields != null) { out.writeBoolean(true); out.writeList(scriptFields); @@ -227,27 +245,20 @@ Integer getScrollSize() { return scrollSize; } - QueryBuilder getQuery() { + Map getQuery() { return query; } - AggregatorFactories.Builder getAggregations() { + Map getAggregations() { return aggregations; } - /** - * Returns the histogram's interval as epoch millis. - */ - long getHistogramIntervalMillis() { - return ExtractorUtils.getHistogramIntervalMillis(aggregations); - } - /** * @return {@code true} when there are non-empty aggregations, {@code false} * otherwise */ boolean hasAggregations() { - return aggregations != null && aggregations.count() > 0; + return aggregations != null && aggregations.size() > 0; } List getScriptFields() { @@ -285,11 +296,11 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map h builder.setIndices(indices); } if (query != null) { - builder.setParsedQuery(query); + builder.setQuery(query); } if (aggregations != null) { - DatafeedConfig.validateAggregations(aggregations); - builder.setParsedAggregations(aggregations); + DatafeedConfig.validateAggregations(lazyAggParser.apply(aggregations, id, new ArrayList<>())); + builder.setAggregations(aggregations); } if (scriptFields != null) { builder.setScriptFields(scriptFields); @@ -360,9 +371,9 @@ boolean isNoop(DatafeedConfig datafeed) { return (frequency == null || Objects.equals(frequency, datafeed.getFrequency())) && (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay())) && (indices == null || Objects.equals(indices, datafeed.getIndices())) - && (query == null || Objects.equals(query, datafeed.getParsedQuery())) + && (query == null || Objects.equals(query, datafeed.getQuery())) && (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay())) - && (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations())) + && (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations())) && (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields())) && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())); @@ -375,8 +386,8 @@ public static class Builder { private TimeValue queryDelay; private TimeValue frequency; private List indices; - private QueryBuilder query; - private AggregatorFactories.Builder aggregations; + private Map query; + private Map aggregations; private List scriptFields; private Integer scrollSize; private ChunkingConfig chunkingConfig; @@ -423,12 +434,32 @@ public void setFrequency(TimeValue frequency) { this.frequency = frequency; } - public void setQuery(QueryBuilder query) { + public void setQuery(Map query) { this.query = query; + try { + QUERY_TRANSFORMER.fromMap(query); + } catch(Exception ex) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id); + + if (ex.getCause() instanceof IllegalArgumentException) { + ex = (Exception)ex.getCause(); + } + throw ExceptionsHelper.badRequestException(msg, ex); + } } - public void setAggregations(AggregatorFactories.Builder aggregations) { + public void setAggregations(Map aggregations) { this.aggregations = aggregations; + try { + AGG_TRANSFORMER.fromMap(aggregations); + } catch(Exception ex) { + String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id); + + if (ex.getCause() instanceof IllegalArgumentException) { + ex = (Exception)ex.getCause(); + } + throw ExceptionsHelper.badRequestException(msg, ex); + } } public void setScriptFields(List scriptFields) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 77ae8cb26eae9..09874ec611b22 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -26,8 +26,8 @@ public final class Messages { "delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]"; public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS = "delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]"; - public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}"; - public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}"; + public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable"; + public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable"; public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency"; public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists"; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 788870013885e..c09634e82e117 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -26,8 +26,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; @@ -58,6 +56,8 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.QUERY_TRANSFORMER; +import static org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig.lazyQueryParser; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -89,7 +89,8 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId); builder.setIndices(randomStringList(1, 10)); if (randomBoolean()) { - builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); + builder.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))); } boolean addScriptFields = randomBoolean(); if (addScriptFields) { @@ -228,7 +229,7 @@ public void testPastQueryConfigParse() throws IOException { DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery()); - assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage()); + assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getCause().getMessage()); } try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) @@ -236,7 +237,7 @@ public void testPastQueryConfigParse() throws IOException { XContentParseException e = expectThrows(XContentParseException.class, () -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build()); - assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage()); + assertEquals("[6:64] [datafeed_config] failed to parse field [query]", e.getMessage()); } } @@ -247,8 +248,8 @@ public void testPastAggConfigParse() throws IOException { DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build()); assertEquals( - "Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]", - e.getMessage()); + "[size] must be greater than 0. Found [0] in [airline]", + e.getCause().getMessage()); } try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) @@ -256,7 +257,7 @@ public void testPastAggConfigParse() throws IOException { XContentParseException e = expectThrows(XContentParseException.class, () -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build()); - assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage()); + assertEquals("[25:3] [datafeed_config] failed to parse field [aggregations]", e.getMessage()); } } @@ -443,7 +444,7 @@ public void testBuild_GivenHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); - assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); + assertThat(e.getCause().getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); } public void testBuild_GivenDateHistogramWithInvalidTimeZone() { @@ -636,7 +637,7 @@ public void testGetQueryDeprecations() { DatafeedConfig spiedConfig = spy(datafeed); spiedConfig.getQueryDeprecations(); - verify(spiedConfig).getQueryDeprecations(DatafeedConfig.lazyQueryParser); + verify(spiedConfig).getQueryDeprecations(lazyQueryParser); } public void testSerializationOfComplexAggs() throws IOException { @@ -656,9 +657,11 @@ public void testSerializationOfComplexAggs() throws IOException { .subAggregation(derivativePipelineAggregationBuilder) .subAggregation(bucketScriptPipelineAggregationBuilder); DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); - QueryBuilder terms = - new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); - datafeedConfigBuilder.setParsedQuery(terms); + Map terms = Collections.singletonMap(BoolQueryBuilder.NAME, + Collections.singletonMap("filter", + Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))))); + datafeedConfigBuilder.setQuery(terms); DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder().addAggregator(dateHistogram); @@ -675,7 +678,7 @@ public void testSerializationOfComplexAggs() throws IOException { // Assert that the parsed versions of our aggs and queries work as well assertEquals(aggBuilder, parsedDatafeedConfig.getParsedAggregations()); - assertEquals(terms, parsedDatafeedConfig.getParsedQuery()); + assertEquals(terms, parsedDatafeedConfig.getQuery()); try(BytesStreamOutput output = new BytesStreamOutput()) { datafeedConfig.writeTo(output); @@ -685,7 +688,7 @@ public void testSerializationOfComplexAggs() throws IOException { // Assert that the parsed versions of our aggs and queries work as well assertEquals(aggBuilder, streamedDatafeedConfig.getParsedAggregations()); - assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + assertEquals(terms, streamedDatafeedConfig.getQuery()); } } } @@ -707,9 +710,15 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException { .subAggregation(derivativePipelineAggregationBuilder) .subAggregation(bucketScriptPipelineAggregationBuilder); DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilderWithDateHistogram(dateHistogram); - QueryBuilder terms = - new BoolQueryBuilder().filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); - datafeedConfigBuilder.setParsedQuery(terms); + Map terms = Collections.singletonMap(BoolQueryBuilder.NAME, + Collections.singletonMap("filter", + Collections.singletonList( + Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))))); + // So equality check between the streamed and current passes + // Streamed DatafeedConfigs when they are before 6.6.0 require a parsed object for aggs and queries, consequently all the default + // values are added between them + datafeedConfigBuilder.setQuery(QUERY_TRANSFORMER.toMap(QUERY_TRANSFORMER.fromMap(terms))); DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); @@ -726,7 +735,7 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException { // Assert that the parsed versions of our aggs and queries work as well assertEquals(new AggregatorFactories.Builder().addAggregator(dateHistogram), streamedDatafeedConfig.getParsedAggregations()); - assertEquals(terms, streamedDatafeedConfig.getParsedQuery()); + assertEquals(datafeedConfig.getParsedQuery(), streamedDatafeedConfig.getParsedQuery()); } } } @@ -800,12 +809,14 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept builder.setIndices(indices); break; case 5: - BoolQueryBuilder query = new BoolQueryBuilder(); - if (instance.getParsedQuery() != null) { - query.must(instance.getParsedQuery()); + Map query = new HashMap<>(); + if (instance.getQuery() != null) { + query.put("must", instance.getQuery()); } - query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); - builder.setParsedQuery(query); + query.put("filter", Collections.singletonList( + Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))))); + builder.setQuery(query); break; case 6: if (instance.hasAggregations()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index 302bfefc7c42a..60e73cd56a884 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -12,15 +12,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField; import org.elasticsearch.test.AbstractSerializingTestCase; @@ -29,7 +23,9 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -61,7 +57,8 @@ public static DatafeedUpdate createRandomized(String datafeedId, @Nullable Dataf builder.setIndices(DatafeedConfigTests.randomStringList(1, 10)); } if (randomBoolean()) { - builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10))); + builder.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)))); } if (randomBoolean()) { int scriptsSize = randomInt(3); @@ -75,10 +72,9 @@ public static DatafeedUpdate createRandomized(String datafeedId, @Nullable Dataf if (randomBoolean() && datafeed == null) { // can only test with a single agg as the xcontent order gets randomized by test base class and then // the actual xcontent isn't the same and test fail. - // Testing with a single agg is ok as we don't have special list writeable / xconent logic - AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); - aggs.addAggregator(AggregationBuilders.avg(randomAlphaOfLength(10)).field(randomAlphaOfLength(10))); - builder.setAggregations(aggs); + // Testing with a single agg is ok as we don't have special list writeable / xcontent logic + builder.setAggregations(Collections.singletonMap(randomAlphaOfLength(10), + Collections.singletonMap("avg", Collections.singletonMap("field", randomAlphaOfLength(10))))); } if (randomBoolean()) { builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); @@ -149,7 +145,7 @@ public void testApply_givenFullUpdateNoAggregations() { update.setIndices(Collections.singletonList("i_2")); update.setQueryDelay(TimeValue.timeValueSeconds(42)); update.setFrequency(TimeValue.timeValueSeconds(142)); - update.setQuery(QueryBuilders.termQuery("a", "b")); + update.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("a", "b"))); update.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))); update.setScrollSize(8000); update.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); @@ -161,7 +157,8 @@ public void testApply_givenFullUpdateNoAggregations() { assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2"))); assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42))); assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142))); - assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b"))); + assertThat(updatedDatafeed.getQuery(), + equalTo(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("a", "b")))); assertThat(updatedDatafeed.hasAggregations(), is(false)); assertThat(updatedDatafeed.getScriptFields(), equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false)))); @@ -177,16 +174,21 @@ public void testApply_givenAggregations() { DatafeedConfig datafeed = datafeedBuilder.build(); DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId()); - MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - update.setAggregations(new AggregatorFactories.Builder().addAggregator( - AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))); + Map maxTime = Collections.singletonMap("time", + Collections.singletonMap("max", Collections.singletonMap("field", "time"))); + Map histoDefinition = new HashMap<>(); + histoDefinition.put("interval", 300000); + histoDefinition.put("field", "time"); + Map aggBody = new HashMap<>(); + aggBody.put("histogram", histoDefinition); + aggBody.put("aggs", maxTime); + Map aggMap = Collections.singletonMap("a", aggBody); + update.setAggregations(aggMap); DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap()); assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1"))); - assertThat(updatedDatafeed.getParsedAggregations(), - equalTo(new AggregatorFactories.Builder().addAggregator( - AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime)))); + assertThat(updatedDatafeed.getAggregations(), equalTo(aggMap)); } public void testApply_GivenRandomUpdates_AssertImmutability() { @@ -243,22 +245,31 @@ protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) { builder.setIndices(indices); break; case 5: - BoolQueryBuilder query = new BoolQueryBuilder(); + Map boolQuery = new HashMap<>(); if (instance.getQuery() != null) { - query.must(instance.getQuery()); + boolQuery.put("must", instance.getQuery()); } - query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))); - builder.setQuery(query); + boolQuery.put("filter", + Collections.singletonList( + Collections.singletonMap(TermQueryBuilder.NAME, + Collections.singletonMap(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10))))); + builder.setQuery(Collections.singletonMap("bool", boolQuery)); break; case 6: if (instance.hasAggregations()) { builder.setAggregations(null); } else { - AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder(); String timeField = randomAlphaOfLength(10); - aggBuilder.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000)) - .subAggregation(new MaxAggregationBuilder(timeField).field(timeField))); - builder.setAggregations(aggBuilder); + Map maxTime = Collections.singletonMap(timeField, + Collections.singletonMap("max", Collections.singletonMap("field", timeField))); + Map histoDefinition = new HashMap<>(); + histoDefinition.put("interval", between(10000, 3600000)); + histoDefinition.put("field", timeField); + Map aggBody = new HashMap<>(); + aggBody.put("aggs", maxTime); + aggBody.put("date_histogram", histoDefinition); + Map aggMap = Collections.singletonMap(timeField, aggBody); + builder.setAggregations(aggMap); if (instance.getScriptFields().isEmpty() == false) { builder.setScriptFields(Collections.emptyList()); } diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecksTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecksTests.java index 6d93ed1873184..bf868c86bae88 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecksTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecksTests.java @@ -22,7 +22,7 @@ protected boolean enableWarningsCheck() { public void testCheckDataFeedQuery() { DatafeedConfig.Builder goodDatafeed = new DatafeedConfig.Builder("good-df", "job-id"); goodDatafeed.setIndices(Collections.singletonList("some-index")); - goodDatafeed.setParsedQuery(new TermQueryBuilder("foo", "bar")); + goodDatafeed.setQuery(Collections.singletonMap(TermQueryBuilder.NAME, Collections.singletonMap("foo", "bar"))); assertNull(MlDeprecationChecks.checkDataFeedQuery(goodDatafeed.build())); DatafeedConfig.Builder deprecatedDatafeed = new DatafeedConfig.Builder("df-with-deprecated-query", "job-id"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java index ddebbe6038f19..aa25cb0619377 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DelayedDataDetectorIT.java @@ -159,7 +159,9 @@ public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception .subAggregation(avgAggregationBuilder) .field("time") .interval(TimeValue.timeValueMinutes(5).millis()))); - datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2)); + datafeedConfigBuilder.setQuery(Collections.singletonMap(RangeQueryBuilder.NAME, + Collections.singletonMap("value", + Collections.singletonMap(RangeQueryBuilder.GTE_FIELD.getPreferredName(), numDocs/2)))); datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12))); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml index 742fc00beda74..5dda4f3def672 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/datafeeds_crud.yml @@ -356,9 +356,9 @@ setup: datafeed_id: test-datafeed-aggs-1 - match: { datafeeds.0.datafeed_id: "test-datafeed-aggs-1" } - match: { datafeeds.0.aggregations.histogram_buckets.date_histogram.field: "@timestamp" } - - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.@timestamp.max.field: "@timestamp" } - - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.bytes_in_avg.avg.field: "system.network.in.bytes" } - - match: { datafeeds.0.aggregations.histogram_buckets.aggregations.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggs.@timestamp.max.field: "@timestamp" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggs.bytes_in_avg.avg.field: "system.network.in.bytes" } + - match: { datafeeds.0.aggregations.histogram_buckets.aggs.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" } --- "Test delete datafeed": From e1628b821e6e85500535a75c7de6b0f6176d260a Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Wed, 13 Feb 2019 11:05:57 -0600 Subject: [PATCH 2/2] Addressing pr feedback --- .../core/ml/datafeed/DatafeedConfig.java | 16 +++- .../core/ml/datafeed/DatafeedUpdate.java | 18 ++++- .../core/ml/datafeed/DatafeedConfigTests.java | 75 +++++++++++++++++++ .../core/ml/datafeed/DatafeedUpdateTests.java | 69 +++++++++++++++++ 4 files changed, 172 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index e5c7f5be7f6ab..597edd3675270 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -153,8 +153,10 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareString((builder, val) -> builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY); parser.declareObject((builder, val) -> builder.setQuery(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), QUERY); - parser.declareObject((builder, val) -> builder.setAggregations(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), AGGREGATIONS); - parser.declareObject((builder, val) -> builder.setAggregations(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), AGGS); + parser.declareObject((builder, val) -> builder.setAggregationsSafe(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), + AGGREGATIONS); + parser.declareObject((builder, val) -> builder.setAggregationsSafe(val, ignoreUnknownFields), (p, c) -> p.mapOrdered(), + AGGS); parser.declareObject(Builder::setScriptFields, (p, c) -> { List parsedScriptFields = new ArrayList<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { @@ -662,6 +664,13 @@ public void setParsedAggregations(AggregatorFactories.Builder aggregations) { } } + private void setAggregationsSafe(Map aggregations, boolean lenient) { + if (this.aggregations != null) { + throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]"); + } + setAggregations(aggregations, lenient); + } + void setAggregations(Map aggregations) { setAggregations(aggregations, true); } @@ -669,6 +678,9 @@ void setAggregations(Map aggregations) { void setAggregations(Map aggregations, boolean lenient) { this.aggregations = aggregations; try { + if (aggregations != null && aggregations.isEmpty()) { + throw new Exception("[aggregations] are empty"); + } AGG_TRANSFORMER.fromMap(aggregations); } catch (Exception ex) { String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index ab686e757d7c7..5468ea1ee2688 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -57,8 +57,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { PARSER.declareString((builder, val) -> builder.setFrequency( TimeValue.parseTimeValue(val, DatafeedConfig.FREQUENCY.getPreferredName())), DatafeedConfig.FREQUENCY); PARSER.declareObject(Builder::setQuery, (p, c) -> p.mapOrdered(), DatafeedConfig.QUERY); - PARSER.declareObject(Builder::setAggregations, (p, c) -> p.mapOrdered(), DatafeedConfig.AGGREGATIONS); - PARSER.declareObject(Builder::setAggregations,(p, c) -> p.mapOrdered(), DatafeedConfig.AGGS); + PARSER.declareObject(Builder::setAggregationsSafe, (p, c) -> p.mapOrdered(), DatafeedConfig.AGGREGATIONS); + PARSER.declareObject(Builder::setAggregationsSafe,(p, c) -> p.mapOrdered(), DatafeedConfig.AGGS); PARSER.declareObject(Builder::setScriptFields, (p, c) -> { List parsedScriptFields = new ArrayList<>(); while (p.nextToken() != XContentParser.Token.END_OBJECT) { @@ -119,7 +119,7 @@ public DatafeedUpdate(StreamInput in) throws IOException { } } if (in.getVersion().before(Version.V_7_1_0)) { - this.query = QUERY_TRANSFORMER.toMap(in.readNamedWriteable(QueryBuilder.class)); + this.query = QUERY_TRANSFORMER.toMap(in.readOptionalNamedWriteable(QueryBuilder.class)); this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new)); } else { this.query = in.readMap(); @@ -169,7 +169,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringCollection(Collections.emptyList()); } if (out.getVersion().before(Version.V_7_1_0)) { - out.writeNamedWriteable(lazyQueryParser.apply(query, id, new ArrayList<>())); + out.writeOptionalNamedWriteable(lazyQueryParser.apply(query, id, new ArrayList<>())); out.writeOptionalWriteable(lazyAggParser.apply(aggregations, id, new ArrayList<>())); } else { out.writeMap(query); @@ -448,9 +448,19 @@ public void setQuery(Map query) { } } + private void setAggregationsSafe(Map aggregations) { + if (this.aggregations != null) { + throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]"); + } + setAggregations(aggregations); + } + public void setAggregations(Map aggregations) { this.aggregations = aggregations; try { + if (aggregations != null && aggregations.isEmpty()) { + throw new Exception("[aggregations] are empty"); + } AGG_TRANSFORMER.fromMap(aggregations); } catch(Exception ex) { String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index c09634e82e117..40b7ce88df0a8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -7,6 +7,7 @@ import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -27,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -215,6 +217,41 @@ protected DatafeedConfig doParseInstance(XContentParser parser) { " }\n" + "}"; + private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" + + " \"datafeed_id\": \"farequote-datafeed\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + " \"aggregations\": {\n" + + " \"buckets\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " }\n" + + " }\n" + + " }\n" + + " }," + + " \"aggs\": {\n" + + " \"buckets2\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + public void testFutureConfigParse() throws IOException { XContentParser parser = XContentFactory.xContent(XContentType.JSON) .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED); @@ -229,6 +266,7 @@ public void testPastQueryConfigParse() throws IOException { DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build(); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery()); + assertNotNull(e.getCause()); assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getCause().getMessage()); } @@ -247,6 +285,7 @@ public void testPastAggConfigParse() throws IOException { DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build()); + assertNotNull(e.getCause()); assertEquals( "[size] must be greater than 0. Found [0] in [airline]", e.getCause().getMessage()); @@ -268,6 +307,25 @@ public void testFutureMetadataParse() throws IOException { assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build()); } + public void testMultipleDefinedAggParse() throws IOException { + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) { + XContentParseException ex = expectThrows(XContentParseException.class, + () -> DatafeedConfig.LENIENT_PARSER.apply(parser, null)); + assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_config] failed to parse field [aggs]")); + assertNotNull(ex.getCause()); + assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]")); + } + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) { + XContentParseException ex = expectThrows(XContentParseException.class, + () -> DatafeedConfig.STRICT_PARSER.apply(parser, null)); + assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_config] failed to parse field [aggs]")); + assertNotNull(ex.getCause()); + assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]")); + } + } + public void testToXContentForInternalStorage() throws IOException { DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300); @@ -444,6 +502,7 @@ public void testBuild_GivenHistogramWithDefaultInterval() { ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build); + assertNotNull(e.getCause()); assertThat(e.getCause().getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]")); } @@ -747,6 +806,22 @@ public void testCopyingDatafeedDoesNotCauseStackOverflow() { } } + public void testEmptyQueryMap() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("empty_query_map", "job1"); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> builder.setQuery(Collections.emptyMap(), false)); + assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(ex.getMessage(), equalTo("Datafeed [empty_query_map] query is not parsable")); + } + + public void testEmptyAggMap() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("empty_agg_map", "job1"); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> builder.setAggregations(Collections.emptyMap(), false)); + assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(ex.getMessage(), equalTo("Datafeed [empty_agg_map] aggregations are not parsable")); + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java index 60e73cd56a884..96798b251d345 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java @@ -5,14 +5,20 @@ */ package org.elasticsearch.xpack.core.ml.datafeed; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -21,6 +27,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode; import org.elasticsearch.xpack.core.ml.job.config.JobTests; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -110,6 +117,52 @@ protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(searchModule.getNamedXContents()); } + private static final String MULTIPLE_AGG_DEF_DATAFEED = "{\n" + + " \"datafeed_id\": \"farequote-datafeed\",\n" + + " \"job_id\": \"farequote\",\n" + + " \"frequency\": \"1h\",\n" + + " \"indices\": [\"farequote1\", \"farequote2\"],\n" + + " \"aggregations\": {\n" + + " \"buckets\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " }\n" + + " }\n" + + " }\n" + + " }," + + " \"aggs\": {\n" + + " \"buckets2\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"time\",\n" + + " \"interval\": \"360s\",\n" + + " \"time_zone\": \"UTC\"\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"time\": {\n" + + " \"max\": {\"field\": \"time\"}\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + public void testMultipleDefinedAggParse() throws IOException { + try(XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, MULTIPLE_AGG_DEF_DATAFEED)) { + XContentParseException ex = expectThrows(XContentParseException.class, + () -> DatafeedUpdate.PARSER.apply(parser, null)); + assertThat(ex.getMessage(), equalTo("[32:3] [datafeed_update] failed to parse field [aggs]")); + assertNotNull(ex.getCause()); + assertThat(ex.getCause().getMessage(), equalTo("Found two aggregation definitions: [aggs] and [aggregations]")); + } + } + public void testApply_failBecauseTargetDatafeedHasDifferentId() { DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo"); expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null)); @@ -210,6 +263,22 @@ public void testApply_GivenRandomUpdates_AssertImmutability() { } } + public void testEmptyQueryMap() { + DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder("empty_query_map"); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> builder.setQuery(Collections.emptyMap())); + assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(ex.getMessage(), equalTo("Datafeed [empty_query_map] query is not parsable")); + } + + public void testEmptyAggMap() { + DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder("empty_agg_map"); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> builder.setAggregations(Collections.emptyMap())); + assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(ex.getMessage(), equalTo("Datafeed [empty_agg_map] aggregations are not parsable")); + } + @Override protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) { DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);