diff --git a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc index 87d3fcb379312..16a5250ab0269 100644 --- a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc @@ -848,6 +848,7 @@ GET /_search -------------------------------------------------- // TESTRESPONSE[s/\.\.\.//] +[[search-aggregations-bucket-composite-aggregation-pipeline-aggregations]] ==== Pipeline aggregations The composite agg is not currently compatible with pipeline aggregations, nor does it make sense in most cases. diff --git a/docs/reference/ml/anomaly-detection/ml-configuring-aggregations.asciidoc b/docs/reference/ml/anomaly-detection/ml-configuring-aggregations.asciidoc index bdd59e3dde9d2..155b97d0f36a5 100644 --- a/docs/reference/ml/anomaly-detection/ml-configuring-aggregations.asciidoc +++ b/docs/reference/ml/anomaly-detection/ml-configuring-aggregations.asciidoc @@ -11,57 +11,61 @@ distributes these calculations across your cluster. You can then feed this aggregated data into the {ml-features} instead of raw results, which reduces the volume of data that must be considered while detecting anomalies. -TIP: If you use a terms aggregation and the cardinality of a term is high, the -aggregation might not be effective and you might want to just use the default -search and scroll behavior. +TIP: If you use a terms aggregation and the cardinality of a term is high but +still significantly less than your total number of documents, use +{ref}/search-aggregations-bucket-composite-aggregation.html[composite aggregations] +experimental:[Support for composite aggregations inside datafeeds is currently experimental]. [discrete] [[aggs-limits-dfeeds]] == Requirements and limitations -There are some limitations to using aggregations in {dfeeds}. Your aggregation -must include a `date_histogram` aggregation, which in turn must contain a `max` -aggregation on the time field. This requirement ensures that the aggregated data -is a time series and the timestamp of each bucket is the time of the last record -in the bucket. +There are some limitations to using aggregations in {dfeeds}. -IMPORTANT: The name of the aggregation and the name of the field that the agg -operates on need to match, otherwise the aggregation doesn't work. For example, -if you use a `max` aggregation on a time field called `responsetime`, the name +Your aggregation must include a `date_histogram` aggregation or a top level `composite` aggregation, +which in turn must contain a `max` aggregation on the time field. +This requirement ensures that the aggregated data is a time series and the timestamp +of each bucket is the time of the last record in the bucket. + +IMPORTANT: The name of the aggregation and the name of the field that it +operates on need to match, otherwise the aggregation doesn't work. For example, +if you use a `max` aggregation on a time field called `responsetime`, the name of the aggregation must be also `responsetime`. -You must also consider the interval of the date histogram aggregation carefully. -The bucket span of your {anomaly-job} must be divisible by the value of the -`calendar_interval` or `fixed_interval` in your aggregation (with no remainder). -If you specify a `frequency` for your {dfeed}, it must also be divisible by this -interval. {anomaly-jobs-cap} cannot use date histograms with an interval -measured in months because the length of the month is not fixed. {dfeeds-cap} -tolerate weeks or smaller units. +You must consider the interval of the `date_histogram` or `composite` +aggregation carefully. The bucket span of your {anomaly-job} must be divisible +by the value of the `calendar_interval` or `fixed_interval` in your aggregation +(with no remainder). If you specify a `frequency` for your {dfeed}, +it must also be divisible by this interval. {anomaly-jobs-cap} cannot use +`date_histogram` or `composite` aggregations with an interval measured in months +because the length of the month is not fixed; they can use weeks or smaller units. TIP: As a rule of thumb, if your detectors use <> or -<> analytical functions, set the date histogram +<> analytical functions, set the `date_histogram` or `composite` aggregation interval to a tenth of the bucket span. This suggestion creates finer, more granular time buckets, which are ideal for this type of analysis. If your detectors use <> or <> functions, set the interval to the same value as the bucket span. -If your <> and -model plot is not enabled for the {anomaly-job}, neither the **Single Metric -Viewer** nor the **Anomaly Explorer** can plot and display an anomaly -chart for the job. In these cases, the charts are not visible and an explanatory +If your <> and +model plot is not enabled for the {anomaly-job}, neither the **Single Metric +Viewer** nor the **Anomaly Explorer** can plot and display an anomaly +chart for the job. In these cases, the charts are not visible and an explanatory message is shown. -When the aggregation interval of the {dfeed} and the bucket span of the job -don't match, the values of the chart plotted in both the **Single Metric -Viewer** and the **Anomaly Explorer** differ from the actual values of the job. -To avoid this behavior, make sure that the aggregation interval in the {dfeed} -configuration and the bucket span in the {anomaly-job} configuration have the +When the aggregation interval of the {dfeed} and the bucket span of the job +don't match, the values of the chart plotted in both the **Single Metric +Viewer** and the **Anomaly Explorer** differ from the actual values of the job. +To avoid this behavior, make sure that the aggregation interval in the {dfeed} +configuration and the bucket span in the {anomaly-job} configuration have the same values. +Your {dfeed} can contain multiple aggregations, but only the ones with names +that match values in the job configuration are fed to the job. [discrete] -[[aggs-include-jobs]] -== Including aggregations in {anomaly-jobs} +[[aggs-using-date-histogram]] +=== Including aggregations in {anomaly-jobs} When you create or update an {anomaly-job}, you can include the names of aggregations, for example: @@ -86,8 +90,8 @@ PUT _ml/anomaly_detectors/farequote ---------------------------------- // TEST[skip:setup:farequote_data] -<1> The `airline`, `responsetime`, and `time` fields are aggregations. Only the -aggregated fields defined in the `analysis_config` object are analyzed by the +<1> The `airline`, `responsetime`, and `time` fields are aggregations. Only the +aggregated fields defined in the `analysis_config` object are analyzed by the {anomaly-job}. NOTE: When the `summary_count_field_name` property is set to a non-null value, @@ -134,25 +138,135 @@ PUT _ml/datafeeds/datafeed-farequote ---------------------------------- // TEST[skip:setup:farequote_job] -<1> The aggregations have names that match the fields that they operate on. The +<1> The aggregations have names that match the fields that they operate on. The `max` aggregation is named `time` and its field also needs to be `time`. -<2> The `term` aggregation is named `airline` and its field is also named +<2> The `term` aggregation is named `airline` and its field is also named `airline`. -<3> The `avg` aggregation is named `responsetime` and its field is also named +<3> The `avg` aggregation is named `responsetime` and its field is also named `responsetime`. -Your {dfeed} can contain multiple aggregations, but only the ones with names -that match values in the job configuration are fed to the job. +TIP: If you are using a `term` aggregation to gather influencer or partition +field information, consider using a `composite` aggregation. It performs +better than a `date_histogram` with a nested `term` aggregation and also includes +all the values of the field instead of the top values per bucket. + +[discrete] +[[aggs-using-composite]] +=== Using composite aggregations in {anomaly-jobs} + +experimental::[] + +For `composite` aggregation support, there must be exactly one `date_histogram` value +source. That value source must not be sorted in descending order. Additional +`composite` aggregation value sources are allowed, such as `terms`. + +NOTE: A {dfeed} that uses composite aggregations may not be as performant as datafeeds that use scrolling or +date histogram aggregations. Composite aggregations are optimized +for queries that are either `match_all` or `range` filters. Other types of +queries may cause the `composite` aggregation to be ineffecient. + +Here is an example that uses a `composite` aggregation instead of a +`date_histogram`. + +Assuming the same job configuration as above. + +[source,console] +---------------------------------- +PUT _ml/anomaly_detectors/farequote-composite +{ + "analysis_config": { + "bucket_span": "60m", + "detectors": [{ + "function": "mean", + "field_name": "responsetime", + "by_field_name": "airline" + }], + "summary_count_field_name": "doc_count" + }, + "data_description": { + "time_field":"time" + } +} +---------------------------------- +// TEST[skip:setup:farequote_data] + +This is an example of a datafeed that uses a `composite` aggregation to bucket +the metrics based on time and terms: + +[source,console] +---------------------------------- +PUT _ml/datafeeds/datafeed-farequote-composite +{ + "job_id": "farequote-composite", + "indices": [ + "farequote" + ], + "aggregations": { + "buckets": { + "composite": { + "size": 1000, <1> + "sources": [ + { + "time_bucket": { <2> + "date_histogram": { + "field": "time", + "fixed_interval": "360s", + "time_zone": "UTC" + } + } + }, + { + "airline": { <3> + "terms": { + "field": "airline" + } + } + } + ] + }, + "aggregations": { + "time": { <4> + "max": { + "field": "time" + } + }, + "responsetime": { <5> + "avg": { + "field": "responsetime" + } + } + } + } + } +} +---------------------------------- +// TEST[skip:setup:farequote_job] +<1> Provide the `size` to the composite agg to control how many resources +are used when aggregating the data. A larger `size` means a faster datafeed but +more cluster resources are used when searching. +<2> The required `date_histogram` composite aggregation source. Make sure it +is named differently than your desired time field. +<3> Instead of using a regular `term` aggregation, adding a composite +aggregation `term` source with the name `airline` works. Note its name +is the same as the field. +<4> The required `max` aggregation whose name is the time field in the +job analysis config. +<5> The `avg` aggregation is named `responsetime` and its field is also named +`responsetime`. [discrete] [[aggs-dfeeds]] == Nested aggregations in {dfeeds} -{dfeeds-cap} support complex nested aggregations. This example uses the -`derivative` pipeline aggregation to find the first order derivative of the +{dfeeds-cap} support complex nested aggregations. This example uses the +`derivative` pipeline aggregation to find the first order derivative of the counter `system.network.out.bytes` for each value of the field `beat.name`. +NOTE: `derivative` or other pipeline aggregations may not work within `composite` +aggregations. See +{ref}/search-aggregations-bucket-composite-aggregation.html#search-aggregations-bucket-composite-aggregation-pipeline-aggregations[composite aggregations and pipeline aggregations]. + [source,js] ---------------------------------- "aggregations": { @@ -247,8 +361,9 @@ number of unique entries for the `error` field. [[aggs-define-dfeeds]] == Defining aggregations in {dfeeds} -When you define an aggregation in a {dfeed}, it must have the following form: +When you define an aggregation in a {dfeed}, it must have one of the following forms: +When using a `date_histogram` aggregation to bucket by time: [source,js] ---------------------------------- "aggregations": { @@ -282,36 +397,75 @@ When you define an aggregation in a {dfeed}, it must have the following form: ---------------------------------- // NOTCONSOLE -The top level aggregation must be either a -{ref}/search-aggregations-bucket.html[bucket aggregation] containing as single -sub-aggregation that is a `date_histogram` or the top level aggregation is the -required `date_histogram`. There must be exactly one `date_histogram` -aggregation. For more information, see -{ref}/search-aggregations-bucket-datehistogram-aggregation.html[Date histogram aggregation]. +When using a `composite` aggregation: + +[source,js] +---------------------------------- +"aggregations": { + "composite_agg": { + "sources": [ + { + "date_histogram_agg": { + "field": "time", + ...settings... + } + }, + ...other valid sources... + ], + ...composite agg settings..., + "aggregations": { + "timestamp": { + "max": { + "field": "time" + } + }, + ...other aggregations... + [ + [,"aggregations" : { + []+ + } ] + }] + } + } +} +---------------------------------- +// NOTCONSOLE + +The top level aggregation must be exclusively one of the following: +* A {ref}/search-aggregations-bucket.html[bucket aggregation] containing a single +sub-aggregation that is a `date_histogram` +* A top level aggregation that is a `date_histogram` +* A top level aggregation is a `composite` aggregation. + +There must be exactly one `date_histogram`, `composite` aggregation. For more information, see +{ref}/search-aggregations-bucket-datehistogram-aggregation.html[Date histogram aggregation] and +{ref}/search-aggregations-bucket-composite-aggregation.html[Composite aggregation]. NOTE: The `time_zone` parameter in the date histogram aggregation must be set to `UTC`, which is the default value. -Each histogram bucket has a key, which is the bucket start time. This key cannot -be used for aggregations in {dfeeds}, however, because they need to know the -time of the latest record within a bucket. Otherwise, when you restart a -{dfeed}, it continues from the start time of the histogram bucket and possibly -fetches the same data twice. The max aggregation for the time field is therefore -necessary to provide the time of the latest record within a bucket. +Each histogram or composite bucket has a key, which is the bucket start time. +This key cannot be used for aggregations in {dfeeds}, however, because +they need to know the time of the latest record within a bucket. +Otherwise, when you restart a {dfeed}, it continues from the start time of the +histogram or composite bucket and possibly fetches the same data twice. +The max aggregation for the time field is therefore necessary to provide +the time of the latest record within a bucket. You can optionally specify a terms aggregation, which creates buckets for different values of a field. IMPORTANT: If you use a terms aggregation, by default it returns buckets for the top ten terms. Thus if the cardinality of the term is greater than 10, not -all terms are analyzed. +all terms are analyzed. In this case, consider using `composite` aggregations +experimental:[Support for composite aggregations inside datafeeds is currently experimental]. You can change this behavior by setting the `size` parameter. To determine the cardinality of your data, you can run searches such as: [source,js] -------------------------------------------------- -GET .../_search +GET .../_search { "aggs": { "service_cardinality": { @@ -324,10 +478,11 @@ GET .../_search -------------------------------------------------- // NOTCONSOLE + By default, {es} limits the maximum number of terms returned to 10000. For high cardinality fields, the query might not run. It might return errors related to circuit breaking exceptions that indicate that the data is too large. In such -cases, do not use aggregations in your {dfeed}. For more information, see +cases, use `composite` aggregations in your {dfeed}. For more information, see {ref}/search-aggregations-bucket-terms-aggregation.html[Terms aggregation]. You can also optionally specify multiple sub-aggregations. The sub-aggregations diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 767e6c5f153de..129f25446fbb9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -212,7 +212,10 @@ public DateHistogramValuesSourceBuilder fixedInterval(DateHistogramInterval inte * {@code null} then it means that the interval is expressed as a fixed * {@link TimeValue} and may be accessed via {@link #getIntervalAsFixed()} ()}. */ public DateHistogramInterval getIntervalAsCalendar() { - return dateHistogramInterval.getAsCalendarInterval(); + if (dateHistogramInterval.getIntervalType().equals(DateIntervalWrapper.IntervalTypeEnum.CALENDAR)) { + return dateHistogramInterval.getAsCalendarInterval(); + } + return null; } /** @@ -220,7 +223,10 @@ public DateHistogramInterval getIntervalAsCalendar() { * the interval cannot be parsed as a fixed time. */ public DateHistogramInterval getIntervalAsFixed() { - return dateHistogramInterval.getAsFixedInterval(); + if (dateHistogramInterval.getIntervalType().equals(DateIntervalWrapper.IntervalTypeEnum.FIXED)) { + return dateHistogramInterval.getAsFixedInterval(); + } + return null; } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/ChunkingConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/ChunkingConfig.java index cf213c0bd6a4a..97690008247cb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/ChunkingConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/ChunkingConfig.java @@ -86,6 +86,10 @@ public boolean isEnabled() { return mode != Mode.OFF; } + public boolean isManual() { + return mode == Mode.MANUAL; + } + Mode getMode() { return mode; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 361118a58add7..51647375cf42f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -28,13 +28,16 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.core.common.time.TimeUtils; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.ml.utils.QueryProvider; @@ -56,6 +59,20 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.core.ClientHelper.assertNoAuthorizationHeader; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SORT; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SOURCE_MISSING_BUCKET; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_BE_TOP_LEVEL_AND_ALONE; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_HAVE_SINGLE_DATE_SOURCE; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.INVALID_ID; +import static org.elasticsearch.xpack.core.ml.job.messages.Messages.getMessage; import static org.elasticsearch.xpack.core.ml.utils.ToXContentParams.EXCLUDE_GENERATED; /** @@ -119,11 +136,19 @@ public static void validateAggregations(AggregatorFactories.Builder aggregations } Collection aggregatorFactories = aggregations.getAggregatorFactories(); if (aggregatorFactories.isEmpty()) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); + throw ExceptionsHelper.badRequestException(DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM); } + Builder.checkForOnlySingleTopLevelCompositeAggAndValidate(aggregations.getAggregatorFactories()); AggregationBuilder histogramAggregation = ExtractorUtils.getHistogramAggregation(aggregatorFactories); + if (histogramAggregation instanceof CompositeAggregationBuilder + && aggregations.getPipelineAggregatorFactories().isEmpty() == false) { + throw ExceptionsHelper.badRequestException( + "when using composite aggregations, top level pipeline aggregations are not supported" + ); + } Builder.checkNoMoreHistogramAggregations(histogramAggregation.getSubAggregations()); + Builder.checkNoMoreCompositeAggregations(histogramAggregation.getSubAggregations()); Builder.checkHistogramAggregationHasChildMaxTimeAgg(histogramAggregation); Builder.checkHistogramIntervalIsPositive(histogramAggregation); } @@ -141,7 +166,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareString((builder, val) -> builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY); parser.declareObject(Builder::setQueryProvider, - (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT), + (p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, DATAFEED_CONFIG_QUERY_BAD_FORMAT), QUERY); parser.declareObject(Builder::setAggregationsSafe, (p, c) -> AggProvider.fromXContent(p, ignoreUnknownFields), @@ -321,7 +346,7 @@ private QueryBuilder parseQuery(NamedXContentRegistry namedXContentRegistry, Lis if (exception.getCause() instanceof IllegalArgumentException) { exception = (Exception)exception.getCause(); } - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception); + throw ExceptionsHelper.badRequestException(DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception); } } @@ -369,7 +394,7 @@ private AggregatorFactories.Builder parseAggregations(NamedXContentRegistry name if (exception.getCause() instanceof IllegalArgumentException) { exception = (Exception)exception.getCause(); } - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception); + throw ExceptionsHelper.badRequestException(DATAFEED_CONFIG_AGG_BAD_FORMAT, exception); } } @@ -402,6 +427,21 @@ public long getHistogramIntervalMillis(NamedXContentRegistry namedXContentRegist return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations(namedXContentRegistry)); } + /** + * Indicates if the datafeed is using composite aggs. + * @param namedXContentRegistry XContent registry to transform the lazily parsed aggregations + * @return If the datafeed utilizes composite aggs or not + */ + public boolean hasCompositeAgg(NamedXContentRegistry namedXContentRegistry) { + if (hasAggregations() == false) { + return false; + } + AggregationBuilder maybeComposite = ExtractorUtils.getHistogramAggregation( + getParsedAggregations(namedXContentRegistry).getAggregatorFactories() + ); + return maybeComposite instanceof CompositeAggregationBuilder; + } + /** * @return {@code true} when there are non-empty aggregations, {@code false} otherwise */ @@ -552,9 +592,14 @@ private static ChunkingConfig defaultChunkingConfig(@Nullable AggProvider aggPro if (aggProvider == null || aggProvider.getParsedAggs() == null) { return ChunkingConfig.newAuto(); } else { - long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(aggProvider.getParsedAggs()); + AggregationBuilder histogram = ExtractorUtils.getHistogramAggregation(aggProvider.getParsedAggs().getAggregatorFactories()); + if (histogram instanceof CompositeAggregationBuilder) { + // Allow composite aggs to handle the underlying chunking and searching + return ChunkingConfig.newOff(); + } + long histogramIntervalMillis = ExtractorUtils.getHistogramIntervalMillis(histogram); if (histogramIntervalMillis <= 0) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); + throw ExceptionsHelper.badRequestException(DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); } return ChunkingConfig.newManual(TimeValue.timeValueMillis(DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis)); } @@ -785,8 +830,7 @@ public Builder setScriptFields(List scriptField public Builder setScrollSize(int scrollSize) { if (scrollSize < 0) { - String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, - DatafeedConfig.SCROLL_SIZE.getPreferredName(), scrollSize); + String msg = getMessage(DATAFEED_CONFIG_INVALID_OPTION_VALUE, DatafeedConfig.SCROLL_SIZE.getPreferredName(), scrollSize); throw ExceptionsHelper.badRequestException(msg); } this.scrollSize = scrollSize; @@ -807,8 +851,11 @@ public Builder setMaxEmptySearches(int maxEmptySearches) { if (maxEmptySearches == -1) { this.maxEmptySearches = null; } else if (maxEmptySearches <= 0) { - String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, - DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches); + String msg = getMessage( + DATAFEED_CONFIG_INVALID_OPTION_VALUE, + DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), + maxEmptySearches + ); throw ExceptionsHelper.badRequestException(msg); } else { this.maxEmptySearches = maxEmptySearches; @@ -835,7 +882,7 @@ public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); if (MlStrings.isValidId(id) == false) { - throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, ID.getPreferredName(), id)); + throw ExceptionsHelper.badRequestException(getMessage(INVALID_ID, ID.getPreferredName(), id)); } if (indices == null || indices.isEmpty() || indices.contains(null) || indices.contains("")) { throw invalidOptionValue(INDICES.getPreferredName(), indices); @@ -858,15 +905,14 @@ void validateScriptFields() { return; } if (scriptFields != null && scriptFields.isEmpty() == false) { - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); + throw ExceptionsHelper.badRequestException(getMessage(DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); } } private static void checkNoMoreHistogramAggregations(Collection aggregations) { for (AggregationBuilder agg : aggregations) { if (ExtractorUtils.isHistogram(agg)) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM); + throw ExceptionsHelper.badRequestException(DATAFEED_AGGREGATIONS_MAX_ONE_DATE_HISTOGRAM); } checkNoMoreHistogramAggregations(agg.getSubAggregations()); } @@ -875,7 +921,13 @@ private static void checkNoMoreHistogramAggregations(Collection) histogramAggregation).field(); + } + if (histogramAggregation instanceof CompositeAggregationBuilder) { + DateHistogramValuesSourceBuilder valueSource = ExtractorUtils.getDateHistogramValuesSource( + (CompositeAggregationBuilder) histogramAggregation + ); + timeField = valueSource.field(); } for (AggregationBuilder agg : histogramAggregation.getSubAggregations()) { @@ -887,17 +939,86 @@ static void checkHistogramAggregationHasChildMaxTimeAgg(AggregationBuilder histo } } - throw ExceptionsHelper.badRequestException( - Messages.getMessage(Messages.DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION, timeField)); + throw ExceptionsHelper.badRequestException(getMessage(DATAFEED_DATA_HISTOGRAM_MUST_HAVE_NESTED_MAX_AGGREGATION, timeField)); } private static void checkHistogramIntervalIsPositive(AggregationBuilder histogramAggregation) { long interval = ExtractorUtils.getHistogramIntervalMillis(histogramAggregation); if (interval <= 0) { - throw ExceptionsHelper.badRequestException(Messages.DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); + throw ExceptionsHelper.badRequestException(DATAFEED_AGGREGATIONS_INTERVAL_MUST_BE_GREATER_THAN_ZERO); + } + } + + static void validateCompositeAggregationSources(CompositeAggregationBuilder histogramAggregation) { + boolean hasDateValueSource = false; + DateHistogramValuesSourceBuilder foundBuilder = null; + for (CompositeValuesSourceBuilder valueSource : histogramAggregation.sources()) { + if (valueSource instanceof DateHistogramValuesSourceBuilder) { + if (hasDateValueSource) { + throw ExceptionsHelper.badRequestException( + getMessage( + DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_HAVE_SINGLE_DATE_SOURCE, + histogramAggregation.getName() + ) + ); + } + hasDateValueSource = true; + foundBuilder = (DateHistogramValuesSourceBuilder) valueSource; + } + } + if (foundBuilder == null) { + throw ExceptionsHelper.badRequestException( + getMessage( + DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_HAVE_SINGLE_DATE_SOURCE, + histogramAggregation.getName() + ) + ); + } + if (foundBuilder.missingBucket()) { + throw ExceptionsHelper.badRequestException( + getMessage( + DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SOURCE_MISSING_BUCKET, + histogramAggregation.getName(), + foundBuilder.name() + ) + ); + } + if (foundBuilder.order() != SortOrder.ASC) { + throw ExceptionsHelper.badRequestException( + getMessage( + DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SORT, + histogramAggregation.getName(), + foundBuilder.name() + ) + ); } } + private static void checkForOnlySingleTopLevelCompositeAggAndValidate(Collection aggregationBuilders) { + Optional maybeComposite = aggregationBuilders.stream() + .filter(agg -> agg instanceof CompositeAggregationBuilder) + .findFirst(); + if (maybeComposite.isEmpty() == false) { + CompositeAggregationBuilder composite = (CompositeAggregationBuilder) maybeComposite.get(); + if (aggregationBuilders.size() > 1) { + throw ExceptionsHelper.badRequestException( + getMessage(DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_BE_TOP_LEVEL_AND_ALONE, composite.getName()) + ); + } + validateCompositeAggregationSources(composite); + } + } + + private static void checkNoMoreCompositeAggregations(Collection aggregations) { + for (AggregationBuilder agg : aggregations) { + if (agg instanceof CompositeAggregationBuilder) { + throw ExceptionsHelper.badRequestException( + getMessage(DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_BE_TOP_LEVEL_AND_ALONE, agg.getName()) + ); + } + checkNoMoreCompositeAggregations(agg.getSubAggregations()); + } + } private void setDefaultChunkingConfig() { if (chunkingConfig == null) { chunkingConfig = defaultChunkingConfig(aggProvider); @@ -911,7 +1032,7 @@ private void setDefaultQueryDelay() { } private static ElasticsearchException invalidOptionValue(String fieldName, Object value) { - String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); + String msg = getMessage(DATAFEED_CONFIG_INVALID_OPTION_VALUE, fieldName, value); throw ExceptionsHelper.badRequestException(msg); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java index 191d740f170c2..c833374ae2d13 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java @@ -24,12 +24,14 @@ private DatafeedJobValidator() {} * Validates a datafeedConfig in relation to the job it refers to * @param datafeedConfig the datafeed config * @param job the job + * @param xContentRegistry the named xcontent registry for parsing datafeed aggs */ public static void validate(DatafeedConfig datafeedConfig, Job job, NamedXContentRegistry xContentRegistry) { AnalysisConfig analysisConfig = job.getAnalysisConfig(); if (analysisConfig.getLatency() != null && analysisConfig.getLatency().seconds() > 0) { throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); } + // TODO should we validate that the aggs define the fields requested in the analysis config? if (datafeedConfig.hasAggregations()) { checkSummaryCountFieldNameIsSet(analysisConfig); checkValidHistogramInterval(datafeedConfig, analysisConfig, xContentRegistry); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java index 4a21faac4ca4c..5e8a049a3d91b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/extractor/ExtractorUtils.java @@ -14,11 +14,16 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Collection; import java.util.concurrent.TimeUnit; @@ -75,7 +80,24 @@ public static AggregationBuilder getHistogramAggregation(Collection valuesSourceBuilder : compositeAggregationBuilder.sources()) { + if (valuesSourceBuilder instanceof DateHistogramValuesSourceBuilder) { + return (DateHistogramValuesSourceBuilder)valuesSourceBuilder; + } + } + throw ExceptionsHelper.badRequestException("[composite] aggregations require exactly one [date_histogram] value source"); } /** @@ -91,7 +113,13 @@ public static long getHistogramIntervalMillis(AggregationBuilder histogramAggreg if (histogramAggregation instanceof HistogramAggregationBuilder) { return (long) ((HistogramAggregationBuilder) histogramAggregation).interval(); } else if (histogramAggregation instanceof DateHistogramAggregationBuilder) { - return validateAndGetDateHistogramInterval((DateHistogramAggregationBuilder) histogramAggregation); + return validateAndGetDateHistogramInterval( + DateHistogramAggOrValueSource.fromAgg((DateHistogramAggregationBuilder) histogramAggregation) + ); + } else if (histogramAggregation instanceof CompositeAggregationBuilder) { + return validateAndGetDateHistogramInterval( + DateHistogramAggOrValueSource.fromCompositeAgg((CompositeAggregationBuilder)histogramAggregation) + ); } else { throw new IllegalStateException("Invalid histogram aggregation [" + histogramAggregation.getName() + "]"); } @@ -101,7 +129,7 @@ public static long getHistogramIntervalMillis(AggregationBuilder histogramAggreg * Returns the date histogram interval as epoch millis if valid, or throws * an {@link ElasticsearchException} with the validation error */ - private static long validateAndGetDateHistogramInterval(DateHistogramAggregationBuilder dateHistogram) { + private static long validateAndGetDateHistogramInterval(DateHistogramAggOrValueSource dateHistogram) { if (dateHistogram.timeZone() != null && dateHistogram.timeZone().normalized().equals(ZoneOffset.UTC) == false) { throw ExceptionsHelper.badRequestException("ML requires date_histogram.time_zone to be UTC"); } @@ -116,7 +144,7 @@ private static long validateAndGetDateHistogramInterval(DateHistogramAggregation } else if (dateHistogram.interval() != 0) { return dateHistogram.interval(); } else { - throw new IllegalArgumentException("Must specify an interval for DateHistogram"); + throw new IllegalArgumentException("Must specify an interval for date_histogram"); } } @@ -148,7 +176,7 @@ public static long validateAndGetCalendarInterval(String calendarInterval) { throw ExceptionsHelper.badRequestException("Unexpected dateTimeUnit [" + dateTimeUnit + "]"); } } else { - interval = TimeValue.parseTimeValue(calendarInterval, "date_histogram.interval"); + interval = TimeValue.parseTimeValue(calendarInterval, "date_histogram.calendar_interval"); } if (interval.days() > 7) { throw ExceptionsHelper.badRequestException(invalidDateHistogramCalendarIntervalMessage(calendarInterval)); @@ -162,4 +190,56 @@ private static String invalidDateHistogramCalendarIntervalMessage(String interva "variable lengths of periods greater than a week"); } + private static class DateHistogramAggOrValueSource { + + static DateHistogramAggOrValueSource fromAgg(DateHistogramAggregationBuilder agg) { + return new DateHistogramAggOrValueSource(agg, null); + } + + static DateHistogramAggOrValueSource fromCompositeAgg(CompositeAggregationBuilder compositeAggregationBuilder) { + return new DateHistogramAggOrValueSource(null, getDateHistogramValuesSource(compositeAggregationBuilder)); + } + + private final DateHistogramAggregationBuilder agg; + private final DateHistogramValuesSourceBuilder sourceBuilder; + + private DateHistogramAggOrValueSource(DateHistogramAggregationBuilder agg, DateHistogramValuesSourceBuilder sourceBuilder) { + assert agg != null || sourceBuilder != null; + this.agg = agg; + this.sourceBuilder = sourceBuilder; + } + + private ZoneId timeZone() { + return agg != null ? + agg.timeZone() : + sourceBuilder.timeZone(); + } + + private DateHistogramInterval getFixedInterval() { + return agg != null ? + agg.getFixedInterval() : + sourceBuilder.getIntervalAsFixed(); + } + + private DateHistogramInterval getCalendarInterval() { + return agg != null ? + agg.getCalendarInterval() : + sourceBuilder.getIntervalAsCalendar(); + } + + @Deprecated + private DateHistogramInterval dateHistogramInterval() { + return agg != null ? + agg.dateHistogramInterval() : + sourceBuilder.dateHistogramInterval(); + } + + @Deprecated + private long interval() { + return agg != null ? + agg.interval() : + sourceBuilder.interval(); + } + } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 401e6660f8917..294d7560ef1a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -50,6 +50,14 @@ public final class Messages { public static final String DATAFEED_MISSING_MAX_AGGREGATION_FOR_TIME_FIELD = "Missing max aggregation for time_field [{0}]"; public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL = "Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]"; + public static final String DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_HAVE_SINGLE_DATE_SOURCE = + "Composite aggregation [{0}] must have exactly one date_histogram source"; + public static final String DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SOURCE_MISSING_BUCKET = + "Datafeed composite aggregation [{0}] date_histogram [{1}] source does not support missing_buckets"; + public static final String DATAFEED_AGGREGATIONS_COMPOSITE_AGG_DATE_HISTOGRAM_SORT = + "Datafeed composite aggregation [{0}] date_histogram [{1}] must be sorted in ascending order"; + public static final String DATAFEED_AGGREGATIONS_COMPOSITE_AGG_MUST_BE_TOP_LEVEL_AND_ALONE = + "Composite aggregation [{0}] must be the only composite agg and should be the only top level aggregation"; public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists"; public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" + " but the current node [{2}] is not allowed to connect to remote clusters." + diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 00c0b62adce1d..902ddfa0c7aaa 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -35,9 +35,13 @@ import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.script.Script; import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -58,6 +62,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -121,8 +126,19 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri aggHistogramInterval = aggHistogramInterval> bucketSpanMillis ? bucketSpanMillis : aggHistogramInterval; aggHistogramInterval = aggHistogramInterval <= 0 ? 1 : aggHistogramInterval; MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); - aggs.addAggregator(AggregationBuilders.dateHistogram("buckets") - .fixedInterval(new DateHistogramInterval(aggHistogramInterval + "ms")).subAggregation(maxTime).field("time")); + AggregationBuilder topAgg = randomBoolean() ? + AggregationBuilders.dateHistogram("buckets") + .field("time") + .fixedInterval(new DateHistogramInterval(aggHistogramInterval + "ms")) : + AggregationBuilders.composite( + "buckets", + Collections.singletonList( + new DateHistogramValuesSourceBuilder("time") + .field("time") + .fixedInterval(new DateHistogramInterval(aggHistogramInterval + "ms")) + ) + ); + aggs.addAggregator(topAgg.subAggregation(maxTime)); builder.setParsedAggregations(aggs); } if (randomBoolean()) { @@ -659,6 +675,59 @@ public void testCheckHistogramAggregationHasChildMaxTimeAgg() { assertThat(e.getMessage(), containsString("Date histogram must have nested max aggregation for time_field [max_time]")); } + public void testValidateCompositeAggValueSources_MustHaveExactlyOneDateValue() { + CompositeAggregationBuilder aggregationBuilder = AggregationBuilders.composite( + "buckets", + Arrays.asList(new TermsValuesSourceBuilder("foo").field("bar")) + ); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> DatafeedConfig.Builder.validateCompositeAggregationSources(aggregationBuilder)); + assertThat(ex.getMessage(), containsString("must have exactly one date_histogram source")); + + CompositeAggregationBuilder aggregationBuilderWithMoreDateHisto = AggregationBuilders.composite( + "buckets", + Arrays.asList( + new TermsValuesSourceBuilder("foo").field("bar"), + new DateHistogramValuesSourceBuilder("date1").field("time").fixedInterval(DateHistogramInterval.days(1)), + new DateHistogramValuesSourceBuilder("date2").field("time").fixedInterval(DateHistogramInterval.days(1)) + ) + ); + ex = expectThrows(ElasticsearchStatusException.class, + () -> DatafeedConfig.Builder.validateCompositeAggregationSources(aggregationBuilderWithMoreDateHisto)); + assertThat(ex.getMessage(), containsString("must have exactly one date_histogram source")); + } + public void testValidateCompositeAggValueSources_DateHistoWithMissingBucket() { + CompositeAggregationBuilder aggregationBuilder = AggregationBuilders.composite( + "buckets", + Arrays.asList( + new TermsValuesSourceBuilder("foo").field("bar"), + new DateHistogramValuesSourceBuilder("date1") + .field("time") + .fixedInterval(DateHistogramInterval.days(1)) + .missingBucket(true) + ) + ); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> DatafeedConfig.Builder.validateCompositeAggregationSources(aggregationBuilder)); + assertThat(ex.getMessage(), containsString("does not support missing_buckets")); + } + + public void testValidateCompositeAggValueSources_DateHistoBadOrder() { + CompositeAggregationBuilder aggregationBuilder = AggregationBuilders.composite( + "buckets", + Arrays.asList( + new TermsValuesSourceBuilder("foo").field("bar"), + new DateHistogramValuesSourceBuilder("date1") + .field("time") + .fixedInterval(DateHistogramInterval.days(1)) + .order("desc") + ) + ); + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, + () -> DatafeedConfig.Builder.validateCompositeAggregationSources(aggregationBuilder)); + assertThat(ex.getMessage(), containsString("must be sorted in ascending order")); + } + public void testValidateAggregations_GivenMulitpleHistogramAggs() { DateHistogramAggregationBuilder nestedDateHistogram = AggregationBuilders.dateHistogram("nested_time"); AvgAggregationBuilder avg = AggregationBuilders.avg("avg").subAggregation(nestedDateHistogram); @@ -713,8 +782,10 @@ public void testDefaultFrequency_GivenNoAggregations() { assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(48), xContentRegistry())); } - public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Second() { - DatafeedConfig datafeed = createDatafeedWithDateHistogram("1s"); + public void testDefaultFrequency_GivenAggregationsWithHistogramOrCompositeInterval_1_Second() { + DatafeedConfig datafeed = randomBoolean() ? + createDatafeedWithDateHistogram("1s") : + createDatafeedWithCompositeAgg("1s"); assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60), xContentRegistry())); assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90), xContentRegistry())); @@ -726,8 +797,10 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Second assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(13), xContentRegistry())); } - public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Minute() { - DatafeedConfig datafeed = createDatafeedWithDateHistogram("1m"); + public void testDefaultFrequency_GivenAggregationsWithHistogramOrCompositeInterval_1_Minute() { + DatafeedConfig datafeed = randomBoolean() ? + createDatafeedWithDateHistogram("1m") : + createDatafeedWithCompositeAgg("1m"); assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(60), xContentRegistry())); assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(90), xContentRegistry())); @@ -745,9 +818,10 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Minute assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(72), xContentRegistry())); } - public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_10_Minutes() { - DatafeedConfig datafeed = createDatafeedWithDateHistogram("10m"); - + public void testDefaultFrequency_GivenAggregationsWithHistogramOrCompositeInterval_10_Minutes() { + DatafeedConfig datafeed = randomBoolean() ? + createDatafeedWithDateHistogram("10m") : + createDatafeedWithCompositeAgg("10m"); assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(10), xContentRegistry())); assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(20), xContentRegistry())); assertEquals(TimeValue.timeValueMinutes(10), datafeed.defaultFrequency(TimeValue.timeValueMinutes(30), xContentRegistry())); @@ -755,9 +829,10 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_10_Minut assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueMinutes(13 * 60), xContentRegistry())); } - public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() { - DatafeedConfig datafeed = createDatafeedWithDateHistogram("1h"); - + public void testDefaultFrequency_GivenAggregationsWithHistogramOrCompositeInterval_1_Hour() { + DatafeedConfig datafeed = randomBoolean() ? + createDatafeedWithDateHistogram("1h") : + createDatafeedWithCompositeAgg("1h"); assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(1), xContentRegistry())); assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(3601), xContentRegistry())); assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(2), xContentRegistry())); @@ -873,6 +948,24 @@ public static String randomValidDatafeedId() { return generator.ofCodePointsLength(random(), 10, 10); } + private static DatafeedConfig createDatafeedWithCompositeAgg(String interval) { + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + DateHistogramValuesSourceBuilder sourceBuilder = new DateHistogramValuesSourceBuilder("time"); + sourceBuilder.field("time"); + if (interval != null) { + if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(interval) != null) { + sourceBuilder.calendarInterval(new DateHistogramInterval(interval)); + } else { + sourceBuilder.fixedInterval(new DateHistogramInterval(interval)); + } + } + CompositeAggregationBuilder composite = AggregationBuilders.composite( + "buckets", + Arrays.asList(sourceBuilder) + ).subAggregation(maxTime); + return createDatafeedWithComposite(composite); + } + private static DatafeedConfig createDatafeedWithDateHistogram(String interval) { MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); DateHistogramAggregationBuilder dateHistogram = AggregationBuilders.dateHistogram("buckets").subAggregation(maxTime).field("time"); @@ -908,6 +1001,19 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre return createDatafeedBuilderWithDateHistogram(dateHistogram).build(); } + private static DatafeedConfig.Builder createDatafeedBuilderWithComposite(CompositeAggregationBuilder compositeAggregationBuilder) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); + builder.setIndices(Collections.singletonList("myIndex")); + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(compositeAggregationBuilder); + DatafeedConfig.validateAggregations(aggs); + builder.setParsedAggregations(aggs); + return builder; + } + + private static DatafeedConfig createDatafeedWithComposite(CompositeAggregationBuilder dateHistogram) { + return createDatafeedBuilderWithComposite(dateHistogram).build(); + } + @Override protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException { DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java index d24b8bc44e41d..697b4daf65d9b 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -6,16 +6,26 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; @@ -32,9 +42,9 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.hamcrest.Matcher; import org.junit.After; -import org.junit.Before; import java.time.Duration; import java.time.Instant; @@ -54,11 +64,19 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { @After public void cleanup() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .putNull("logger.org.elasticsearch.xpack.ml.datafeed") + .build()).get(); cleanUp(); } @@ -287,6 +305,146 @@ public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() thro assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L)); } + public void testStopAndRestartCompositeDatafeed() throws Exception { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("logger.org.elasticsearch.xpack.ml.datafeed", "TRACE") + .build()).get(); + String indexName = "stop-restart-data"; + client().admin().indices().prepareCreate("stop-restart-data") + .setMapping("time", "type=date") + .get(); + long numDocs = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, indexName, numDocs, twoWeeksAgo, oneWeekAgo); + long numDocs2 = randomIntBetween(32, 2048); + indexDocs(logger, indexName, numDocs2, oneWeekAgo, now); + client().admin().cluster().prepareHealth(indexName).setWaitForYellowStatus().get(); + + String scrollJobId = "stop-restart-scroll"; + Job.Builder scrollJob = createScheduledJob(scrollJobId); + registerJob(scrollJob); + putJob(scrollJob); + openJob(scrollJobId); + assertBusy(() -> assertEquals(getJobStats(scrollJobId).get(0).getState(), JobState.OPENED)); + + DatafeedConfig datafeedConfig = createDatafeedBuilder(scrollJobId+ "-datafeed", scrollJobId, Collections.singletonList(indexName)) + .setChunkingConfig(ChunkingConfig.newManual(new TimeValue(1, TimeUnit.SECONDS))) + .build(); + registerDatafeed(datafeedConfig); + putDatafeed(datafeedConfig); + startDatafeed(datafeedConfig.getId(), 0L, null); + + // Wait until we have processed data + assertBusy(() -> assertThat(getDataCounts(scrollJobId).getProcessedRecordCount(), greaterThan(0L))); + stopDatafeed(datafeedConfig.getId()); + assertBusy(() -> assertThat(getJobStats(scrollJobId).get(0).getState(), is(oneOf(JobState.CLOSED, JobState.OPENED)))); + // If we are not OPENED, then we are closed and shouldn't restart as the datafeed finished running through the data + if (getJobStats(scrollJobId).get(0).getState().equals(JobState.OPENED)) { + updateDatafeed(new DatafeedUpdate.Builder().setId(datafeedConfig.getId()).setChunkingConfig(ChunkingConfig.newAuto()).build()); + startDatafeed( + datafeedConfig.getId(), + randomLongBetween(0, getDataCounts(scrollJobId).getLatestRecordTimeStamp().getTime()), + now + ); + waitUntilJobIsClosed(scrollJobId); + } + + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(scrollJobId); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + }, 60, TimeUnit.SECONDS); + + String compositeJobId = "stop-restart-composite"; + Job.Builder compositeJob = createScheduledJob(compositeJobId); + compositeJob.setAnalysisConfig( + new AnalysisConfig.Builder(compositeJob.getAnalysisConfig()).setSummaryCountFieldName("doc_count") + ); + registerJob(compositeJob); + putJob(compositeJob); + openJob(compositeJobId); + assertBusy(() -> assertEquals(getJobStats(compositeJobId).get(0).getState(), JobState.OPENED)); + + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator( + AggregationBuilders.composite( + "buckets", + Collections.singletonList( + new DateHistogramValuesSourceBuilder("timebucket") + .fixedInterval(new DateHistogramInterval("1h")) + .field("time") + ) + ).subAggregation(AggregationBuilders.max("time").field("time")) + ); + DatafeedConfig compositeDatafeedConfig = createDatafeedBuilder( + compositeJobId + "-datafeed", + compositeJobId, + Collections.singletonList(indexName)) + .setParsedAggregations(aggs) + .setFrequency(TimeValue.timeValueHours(1)) + // Start off chunking at an hour so that it runs more slowly and the test has time to stop it in the middle of processing + .setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))) + .build(); + registerDatafeed(compositeDatafeedConfig); + putDatafeed(compositeDatafeedConfig); + startDatafeed(compositeDatafeedConfig.getId(), 0L, null); + + // Wait until we have processed data + assertBusy(() -> assertThat(getDataCounts(compositeJobId).getProcessedRecordCount(), greaterThan(0L))); + stopDatafeed(compositeDatafeedConfig.getId()); + assertBusy(() -> + assertThat(getJobStats(compositeJobId).get(0).getState(), is(oneOf(JobState.CLOSED, JobState.OPENED))) + ); + // If we are not OPENED, then we are closed and shouldn't restart as the datafeed finished running through the data + if (getJobStats(compositeJobId).get(0).getState().equals(JobState.OPENED)) { + updateDatafeed(new DatafeedUpdate.Builder() + .setId(compositeDatafeedConfig.getId()) + // Set to auto to speed up and finish the job + .setChunkingConfig(ChunkingConfig.newAuto()) + .build()); + startDatafeed( + compositeDatafeedConfig.getId(), + randomLongBetween(0, getDataCounts(compositeJobId).getLatestRecordTimeStamp().getTime()), + now + ); + waitUntilJobIsClosed(compositeJobId); + } + + List scrollBuckets = getBuckets(scrollJobId); + List compositeBuckets = getBuckets(compositeJobId); + for (int i = 0; i < scrollBuckets.size(); i++) { + Bucket scrollBucket = scrollBuckets.get(i); + Bucket compositeBucket = compositeBuckets.get(i); + try { + assertThat( + "composite bucket [" + compositeBucket.getTimestamp() + "] [" + compositeBucket.getEventCount() + "] does not equal" + + " scroll bucket [" + scrollBucket.getTimestamp() + "] [" + scrollBucket.getEventCount() + "]", + compositeBucket.getEventCount(), + equalTo(scrollBucket.getEventCount()) + ); + } catch (AssertionError ae) { + String originalMessage = ae.getMessage(); + try { + SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.rangeQuery("time") + .gte(scrollBucket.getTimestamp().getTime()) + .lte(scrollBucket.getTimestamp().getTime() + TimeValue.timeValueHours(1).getMillis())) + .size(10_000); + SearchHits hits = client().search(new SearchRequest() + .indices(indexName) + .source(builder)).actionGet().getHits(); + fail("Hits: " + Strings.arrayToDelimitedString(hits.getHits(), "\n") + " \n failure: " + originalMessage); + } catch (ElasticsearchException ee) { + fail("could not search indices for better info. Original failure: " + originalMessage); + } + } + } + } + private void assertDatafeedStats(String datafeedId, DatafeedState state, String jobId, Matcher searchCountMatcher) { GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java index a6ef700fa3e30..813855412814c 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DatafeedWithAggsIT.java @@ -13,6 +13,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; @@ -26,6 +28,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.junit.After; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -42,14 +45,41 @@ public void cleanup(){ } public void testRealtime() throws Exception { - String dataIndex = "datafeed-with-aggs-rt-data"; + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time") + .fixedInterval(new DateHistogramInterval("1000ms")) + .subAggregation(AggregationBuilders.max("time").field("time"))); + testDfWithAggs( + aggs, + new Detector.Builder("count", null), + "datafeed-with-aggs-rt-job", + "datafeed-with-aggs-rt-data" + ); + } + + public void testRealtimeComposite() throws Exception { + AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); + aggs.addAggregator(AggregationBuilders.composite("buckets", + Arrays.asList( + new DateHistogramValuesSourceBuilder("time").field("time").fixedInterval(new DateHistogramInterval("1000ms")), + new TermsValuesSourceBuilder("field").field("field") + )) + .size(1000) + .subAggregation(AggregationBuilders.max("time").field("time"))); + testDfWithAggs( + aggs, + new Detector.Builder("count", null).setByFieldName("field"), + "datafeed-with-composite-aggs-rt-job", + "datafeed-with-composite-aggs-rt-data" + ); + } + + private void testDfWithAggs(AggregatorFactories.Builder aggs, Detector.Builder detector, String jobId, String dfId) throws Exception { // A job with a bucket_span of 2s - String jobId = "datafeed-with-aggs-rt-job"; DataDescription.Builder dataDescription = new DataDescription.Builder(); - Detector.Builder d = new Detector.Builder("count", null); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build())); analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(2)); analysisConfig.setSummaryCountFieldName("doc_count"); @@ -64,12 +94,8 @@ public void testRealtime() throws Exception { DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder(datafeedId, jobId); datafeedBuilder.setQueryDelay(TimeValue.timeValueMillis(100)); datafeedBuilder.setFrequency(TimeValue.timeValueSeconds(1)); - datafeedBuilder.setIndices(Collections.singletonList(dataIndex)); + datafeedBuilder.setIndices(Collections.singletonList(dfId)); - AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); - aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time") - .fixedInterval(new DateHistogramInterval("1000ms")) - .subAggregation(AggregationBuilders.max("time").field("time"))); datafeedBuilder.setParsedAggregations(aggs); DatafeedConfig datafeed = datafeedBuilder.build(); @@ -82,8 +108,8 @@ public void testRealtime() throws Exception { openJob(jobId); // Now let's index the data - client().admin().indices().prepareCreate(dataIndex) - .setMapping("time", "type=date") + client().admin().indices().prepareCreate(dfId) + .setMapping("time", "type=date", "field", "type=keyword") .get(); // Index a doc per second from a minute ago to a minute later @@ -93,8 +119,8 @@ public void testRealtime() throws Exception { long curTime = aMinuteAgo; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); while (curTime < aMinuteLater) { - IndexRequest indexRequest = new IndexRequest(dataIndex); - indexRequest.source("time", curTime); + IndexRequest indexRequest = new IndexRequest(dfId); + indexRequest.source("time", curTime, "field", randomFrom("foo", "bar", "baz")); bulkRequestBuilder.add(indexRequest); curTime += TimeValue.timeValueSeconds(1).millis(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3127680551eb2..271aba0191b42 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -88,6 +88,7 @@ To ensure that a subsequent stop datafeed call will see that same task status (a */ public class TransportStartDatafeedAction extends TransportMasterNodeAction { + private static final Version COMPOSITE_AGG_SUPPORT = Version.V_7_13_0; private static final Logger logger = LogManager.getLogger(TransportStartDatafeedAction.class); private final Client client; @@ -251,6 +252,21 @@ public void onFailure(Exception e) { params.setJobId(datafeedConfig.getJobId()); params.setIndicesOptions(datafeedConfig.getIndicesOptions()); datafeedConfigHolder.set(datafeedConfig); + if (datafeedConfig.hasCompositeAgg(xContentRegistry)) { + if (state.nodes() + .mastersFirstStream() + .filter(MachineLearning::isMlNode) + .map(DiscoveryNode::getVersion) + .anyMatch(COMPOSITE_AGG_SUPPORT::after)) { + listener.onFailure(ExceptionsHelper.badRequestException( + "cannot start datafeed [{}] as [{}] requires all machine learning nodes to be at least version [{}]", + datafeedConfig.getId(), + "composite aggs", + COMPOSITE_AGG_SUPPORT + )); + return; + } + } jobConfigProvider.getJob(datafeedConfig.getJobId(), jobListener); } catch (Exception e) { listener.onFailure(e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 62360c71a8f8b..620a690ff307b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -16,6 +17,8 @@ import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -34,6 +37,7 @@ import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -49,6 +53,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; + public class TransportStopDatafeedAction extends TransportTasksAction { @@ -58,17 +64,20 @@ public class TransportStopDatafeedAction extends TransportTasksAction startedDatafeeds, List stoppingDatafeeds) { final Set executorNodes = new HashSet<>(); + final List startedDatafeedsJobs = new ArrayList<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetadata.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); if (datafeedTask == null) { @@ -171,6 +181,7 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A assert datafeedTask != null : msg; logger.error(msg); } else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) { + startedDatafeedsJobs.add(((StartDatafeedAction.DatafeedParams) datafeedTask.getParams()).getJobId()); executorNodes.add(datafeedTask.getExecutorNode()); } else { // This is the easy case - the datafeed is not currently assigned to a valid node, @@ -186,7 +197,7 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A } } - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); + request.setNodes(executorNodes.toArray(new String[0])); // wait for started and stopping datafeeds // Map datafeedId -> datafeed task Id. @@ -196,7 +207,32 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A .collect(Collectors.toList()); ActionListener finalListener = ActionListener.wrap( - r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener), + r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, ActionListener.wrap( + finished -> { + if (startedDatafeedsJobs.isEmpty()) { + listener.onResponse(finished); + return; + } + client.admin().indices().prepareRefresh(startedDatafeedsJobs + .stream() + .map(AnomalyDetectorsIndex::jobResultsAliasedName) + .toArray(String[]::new)) + .execute(ActionListener.wrap( + _unused -> listener.onResponse(finished), + ex -> { + logger.warn( + () -> new ParameterizedMessage( + "failed to refresh job [{}] results indices when stopping datafeeds [{}]", + startedDatafeedsJobs, + startedDatafeeds + ), + ex); + listener.onResponse(finished); + } + )); + }, + listener::onFailure + )), e -> { if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) { // A node has dropped out of the cluster since we started executing the requests. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 780f8e5adc935..9e06afb7ea84f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -345,7 +345,10 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro DataCounts counts; try (InputStream in = extractedData.get()) { counts = postData(in, XContentType.JSON); - LOGGER.trace("[{}] Processed another {} records", jobId, counts.getProcessedRecordCount()); + LOGGER.trace(() -> new ParameterizedMessage("[{}] Processed another {} records with latest timestamp [{}]", + jobId, + counts.getProcessedRecordCount(), + counts.getLatestRecordTimeStamp())); timingStatsReporter.reportDataCounts(counts); } catch (Exception e) { if (e instanceof InterruptedException) { @@ -354,7 +357,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro if (isIsolated) { return; } - LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e); + LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e); // a conflict exception means the job state is not open any more. // we should therefore stop the datafeed. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index 86a67d7a94dd7..3fc779bd36b5f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -8,6 +8,7 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.IndexNotFoundException; @@ -19,7 +20,9 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregatedSearchRequestBuilder; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.CompositeAggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; @@ -36,6 +39,8 @@ static void create(Client client, NamedXContentRegistry xContentRegistry, DatafeedTimingStatsReporter timingStatsReporter, ActionListener listener) { + final boolean hasAggs = datafeed.hasAggregations(); + final boolean isComposite = hasAggs && datafeed.hasCompositeAgg(xContentRegistry); ActionListener factoryHandler = ActionListener.wrap( factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled() ? new ChunkedDataExtractorFactory(client, datafeed, job, xContentRegistry, factory, timingStatsReporter) : factory) @@ -44,20 +49,49 @@ static void create(Client client, ActionListener getRollupIndexCapsActionHandler = ActionListener.wrap( response -> { - if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config - if (datafeed.hasAggregations()) { - factoryHandler.onResponse( - new AggregationDataExtractorFactory(client, datafeed, job, xContentRegistry, timingStatsReporter)); + final boolean hasRollup = response.getJobs().isEmpty() == false; + if (hasRollup && hasAggs == false) { + listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices")); + return; + } + if (hasAggs == false) { + ScrollDataExtractorFactory.create(client, datafeed, job, xContentRegistry, timingStatsReporter, factoryHandler); + return; + } + if (hasRollup && datafeed.getRuntimeMappings().isEmpty() == false) { + // TODO Rollup V2 will support runtime fields + listener.onFailure(new IllegalArgumentException("The datafeed has runtime_mappings defined, " + + "runtime fields are not supported in rollup searches")); + return; + } + if (isComposite) { + String[] indices = datafeed.getIndices().toArray(new String[0]); + IndicesOptions indicesOptions = datafeed.getIndicesOptions(); + AggregatedSearchRequestBuilder aggregatedSearchRequestBuilder = hasRollup ? + RollupDataExtractorFactory.requestBuilder(client, indices, indicesOptions) : + AggregationDataExtractorFactory.requestBuilder(client, indices, indicesOptions); + final DataExtractorFactory dataExtractorFactory = new CompositeAggregationDataExtractorFactory( + client, + datafeed, + job, + xContentRegistry, + timingStatsReporter, + aggregatedSearchRequestBuilder + ); + if (datafeed.getChunkingConfig().isManual()) { + factoryHandler.onResponse(dataExtractorFactory); } else { - ScrollDataExtractorFactory.create(client, datafeed, job, xContentRegistry, timingStatsReporter, factoryHandler); + listener.onResponse(dataExtractorFactory); } + return; + } + + if (hasRollup) { + RollupDataExtractorFactory.create( + client, datafeed, job, response.getJobs(), xContentRegistry, timingStatsReporter, factoryHandler); } else { - if (datafeed.hasAggregations()) { // Rollup indexes require aggregations - RollupDataExtractorFactory.create( - client, datafeed, job, response.getJobs(), xContentRegistry, timingStatsReporter, factoryHandler); - } else { - listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices")); - } + factoryHandler.onResponse( + new AggregationDataExtractorFactory(client, datafeed, job, xContentRegistry, timingStatsReporter)); } }, e -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java index de2c273ea4d15..36907a2b096c0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AbstractAggregationDataExtractor.java @@ -41,20 +41,11 @@ abstract class AbstractAggregationDataExtractor isCancelled, outputStream); + // We process the whole search. So, if we are chunking or not, we have nothing more to process given the current query + hasNext = false; return new ByteArrayInputStream(outputStream.toByteArray()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregatedSearchRequestBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregatedSearchRequestBuilder.java new file mode 100644 index 0000000000000..a6c6c3fb27a4f --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregatedSearchRequestBuilder.java @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.builder.SearchSourceBuilder; + + +/** + * This is used when building search actions for aggregated data. + * + * Implementations can be found for regular searches and rollup searches. + */ +public interface AggregatedSearchRequestBuilder { + ActionRequestBuilder build(SearchSourceBuilder searchSourceBuilder); +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java index 97796f90f9107..bfcf4a8cbd98b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java @@ -36,7 +36,7 @@ class AggregationDataExtractorContext { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); - this.indices = indices.toArray(new String[indices.size()]); + this.indices = indices.toArray(new String[0]); this.query = Objects.requireNonNull(query); this.aggs = Objects.requireNonNull(aggs); this.start = start; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index d4f7764e9852b..c4a6d5a866e44 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -6,6 +6,9 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; @@ -25,6 +28,19 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { private final NamedXContentRegistry xContentRegistry; private final DatafeedTimingStatsReporter timingStatsReporter; + public static AggregatedSearchRequestBuilder requestBuilder( + Client client, + String[] indices, + IndicesOptions indicesOptions + ) { + return (searchSourceBuilder) -> + new SearchRequestBuilder(client, SearchAction.INSTANCE) + .setSource(searchSourceBuilder) + .setIndicesOptions(indicesOptions) + .setAllowPartialSearchResults(false) + .setIndices(indices); + } + public AggregationDataExtractorFactory( Client client, DatafeedConfig datafeedConfig, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index c900890f2d2f2..5b9fb1d0f999f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -8,13 +8,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.Max; @@ -28,6 +31,7 @@ import java.io.OutputStream; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; @@ -37,6 +41,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -55,6 +60,7 @@ class AggregationToJsonProcessor { private long keyValueWrittenCount; private final SortedMap>> docsByBucketTimestamp; private final long startTime; + private final String compositeAggDateValueSourceName; /** * Constructs a processor that processes aggregations into JSON @@ -63,8 +69,13 @@ class AggregationToJsonProcessor { * @param fields the fields to convert into JSON * @param includeDocCount whether to include the doc_count * @param startTime buckets with a timestamp before this time are discarded + * @param compositeAggDateValueSourceName the value source for the date_histogram source in the composite agg, if it exists */ - AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, long startTime) { + AggregationToJsonProcessor(String timeField, + Set fields, + boolean includeDocCount, + long startTime, + @Nullable String compositeAggDateValueSourceName) { this.timeField = Objects.requireNonNull(timeField); this.fields = Objects.requireNonNull(fields); this.includeDocCount = includeDocCount; @@ -72,6 +83,7 @@ class AggregationToJsonProcessor { docsByBucketTimestamp = new TreeMap<>(); keyValueWrittenCount = 0; this.startTime = startTime; + this.compositeAggDateValueSourceName = compositeAggDateValueSourceName; } public void process(Aggregations aggs) throws IOException { @@ -149,6 +161,9 @@ private void processAggs(long docCount, List aggregations) throws I MultiBucketsAggregation bucketAgg = bucketAggregations.get(0); if (bucketAgg instanceof Histogram) { processDateHistogram((Histogram) bucketAgg); + } else if (bucketAgg instanceof CompositeAggregation) { + // This indicates that our composite agg contains our date histogram bucketing via one of its sources + processCompositeAgg((CompositeAggregation) bucketAgg); } else { // Ignore bucket aggregations that don't contain a field we // are interested in. This avoids problems with pipeline bucket @@ -185,8 +200,8 @@ private void processAggs(long docCount, List aggregations) throws I private void processDateHistogram(Histogram agg) throws IOException { if (keyValuePairs.containsKey(timeField)) { - throw new IllegalArgumentException("More than one Date histogram cannot be used in the aggregation. " + - "[" + agg.getName() + "] is another instance of a Date histogram"); + throw new IllegalArgumentException("More than one composite or date_histogram cannot be used in the aggregation. " + + "[" + agg.getName() + "] is another instance of a composite or date_histogram aggregation"); } // buckets are ordered by time, once we get to a bucket past the @@ -210,6 +225,62 @@ private void processDateHistogram(Histogram agg) throws IOException { } } + private void processCompositeAgg(CompositeAggregation agg) throws IOException { + if (keyValuePairs.containsKey(timeField)) { + throw new IllegalArgumentException("More than one composite or date_histogram cannot be used in the aggregation. " + + "[" + agg.getName() + "] is another instance of a composite or date_histogram aggregation"); + } + // Shouldn't ever happen + if (compositeAggDateValueSourceName == null) { + throw new IllegalArgumentException("attempted to process composite agg [" + + agg.getName() + + "] but does not contain date_histogram value source"); + } + + // Composite aggs have multiple items in the bucket. It is possible that within the current + // date_histogram interval, there are still unprocessed terms, so we shouldn't skip forward past those buckets + // Instead, we skip according to the `max(timeField)` agg. + boolean checkBucketTime = true; + for (CompositeAggregation.Bucket bucket : agg.getBuckets()) { + if (checkBucketTime) { + + long bucketTime = toHistogramKeyToEpoch(bucket.getKey().get(compositeAggDateValueSourceName)); + if (bucketTime < startTime) { + LOGGER.debug(() -> new ParameterizedMessage("Skipping bucket at [{}], startTime is [{}]", bucketTime, startTime)); + continue; + } else { + checkBucketTime = false; + } + } + + Collection addedFields = processCompositeAggBucketKeys(bucket.getKey()); + List childAggs = bucket.getAggregations().asList(); + processAggs(bucket.getDocCount(), childAggs); + keyValuePairs.remove(timeField); + for (String fieldName : addedFields) { + keyValuePairs.remove(fieldName); + } + } + } + + /** + * It is possible that the key values in composite agg bucket contain field values we care about + * Make sure if they do, they get processed + * @param bucketKeys the composite agg bucket keys + * @return The field names we added to the key value pairs + */ + private Collection processCompositeAggBucketKeys(Map bucketKeys) { + List addedFieldValues = new ArrayList<>(); + for (Map.Entry bucketKey : bucketKeys.entrySet()) { + if (bucketKey.getKey().equals(compositeAggDateValueSourceName) == false && fields.contains(bucketKey.getKey())) { + // TODO any validations or processing??? + keyValuePairs.put(bucketKey.getKey(), bucketKey.getValue()); + addedFieldValues.add(bucketKey.getKey()); + } + } + return addedFieldValues; + } + /* * Date Histograms have a {@link ZonedDateTime} object as the key, * Histograms have either a Double or Long. @@ -239,6 +310,10 @@ boolean bucketAggContainsRequiredAgg(MultiBucketsAggregation aggregation) { if (fields.contains(aggregation.getName())) { return true; } + if (aggregation instanceof CompositeAggregation + && Sets.haveNonEmptyIntersection(((CompositeAggregation) aggregation).afterKey().keySet(), fields)) { + return true; + } if (aggregation.getBuckets().isEmpty()) { return false; @@ -265,12 +340,17 @@ boolean bucketAggContainsRequiredAgg(MultiBucketsAggregation aggregation) { private void processBucket(MultiBucketsAggregation bucketAgg, boolean addField) throws IOException { for (MultiBucketsAggregation.Bucket bucket : bucketAgg.getBuckets()) { + List addedFields = new ArrayList<>(); if (addField) { + addedFields.add(bucketAgg.getName()); keyValuePairs.put(bucketAgg.getName(), bucket.getKey()); } + if (bucket instanceof CompositeAggregation.Bucket) { + addedFields.addAll(processCompositeAggBucketKeys(((CompositeAggregation.Bucket)bucket).getKey())); + } processAggs(bucket.getDocCount(), asList(bucket.getAggregations())); - if (addField) { - keyValuePairs.remove(bucketAgg.getName()); + for (String fieldName : addedFields) { + keyValuePairs.remove(fieldName); } } } @@ -338,41 +418,38 @@ private void queueDocToWrite(Map doc, long docCount) { } /** - * Write the aggregated documents one bucket at a time until {@code batchSize} - * key-value pairs have been written. Buckets are written in their entirety and - * the check on {@code batchSize} run after the bucket has been written so more - * than {@code batchSize} key-value pairs could be written. - * The function should be called repeatedly until it returns false, at that point - * there are no more documents to write. + * This writes ALL the documents stored within the processor object unless indicated otherwise by the `shouldCancel` predicate * - * @param batchSize The number of key-value pairs to write. - * @return True if there are any more documents to write after the call. - * False if there are no documents to write. - * @throws IOException If an error occurs serialising the JSON + * This returns `true` if it is safe to cancel the overall process as the current `date_histogram` bucket has finished. + * + * For a simple `date_histogram` this is guaranteed. But, for a `composite` agg, it is possible that the current page is in the + * middle of a bucket. If you are writing with `composite` aggs, don't cancel the processing until this method returns true. + * + * @param shouldCancel determines if a given timestamp indicates that the processing stream should be cancelled + * @param outputStream where to write the aggregated data + * @return true if it is acceptable for the caller to close the process and cancel the stream + * @throws IOException if there is a parsing exception */ - boolean writeDocs(int batchSize, OutputStream outputStream) throws IOException { - + boolean writeAllDocsCancellable(Predicate shouldCancel, OutputStream outputStream) throws IOException { if (docsByBucketTimestamp.isEmpty()) { - return false; + return true; } try (XContentBuilder jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream)) { - long previousWrittenCount = keyValueWrittenCount; Iterator>>> iterator = docsByBucketTimestamp.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry>> entry = iterator.next(); + if (shouldCancel.test(entry.getKey())) { + return true; + } for (Map map : entry.getValue()) { writeJsonObject(jsonBuilder, map); } iterator.remove(); - - if (keyValueWrittenCount - previousWrittenCount >= batchSize) { - break; - } } } - return docsByBucketTimestamp.isEmpty() == false; + return false; } private void writeJsonObject(XContentBuilder jsonBuilder, Map record) throws IOException { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java new file mode 100644 index 0000000000000..52b6d404a1469 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; + +/** + * An implementation that extracts data from elasticsearch using search with composite aggregations on a client. + * The first time {@link #next()} is called, the search is executed. All the aggregated buckets from the composite agg are then + * returned. Subsequent calls to {@link #next()} execute additional searches, moving forward the composite agg `afterKey`. + * + * It's like scroll, but with aggs. + * + * It is not thread-safe, we reuse underlying objects without synchronization (like a pre-constructed composite agg object) + */ +class CompositeAggregationDataExtractor implements DataExtractor { + + private static final Logger LOGGER = LogManager.getLogger(CompositeAggregationDataExtractor.class); + + private volatile Map afterKey = null; + private final CompositeAggregationBuilder compositeAggregationBuilder; + private final Client client; + private final CompositeAggregationDataExtractorContext context; + private final DatafeedTimingStatsReporter timingStatsReporter; + private final AggregatedSearchRequestBuilder requestBuilder; + private final long interval; + private volatile boolean isCancelled; + private volatile long nextBucketOnCancel; + private boolean hasNext; + + CompositeAggregationDataExtractor( + CompositeAggregationBuilder compositeAggregationBuilder, + Client client, + CompositeAggregationDataExtractorContext dataExtractorContext, + DatafeedTimingStatsReporter timingStatsReporter, + AggregatedSearchRequestBuilder requestBuilder + ) { + this.compositeAggregationBuilder = Objects.requireNonNull(compositeAggregationBuilder); + this.client = Objects.requireNonNull(client); + this.context = Objects.requireNonNull(dataExtractorContext); + this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter); + this.requestBuilder = Objects.requireNonNull(requestBuilder); + this.interval = ExtractorUtils.getHistogramIntervalMillis(compositeAggregationBuilder); + this.hasNext = true; + } + + @Override + public boolean hasNext() { + return hasNext; + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public void cancel() { + LOGGER.debug(() -> new ParameterizedMessage("[{}] Data extractor received cancel request", context.jobId)); + isCancelled = true; + } + + @Override + public long getEndTime() { + return context.end; + } + + @Override + public Optional next() throws IOException { + if (hasNext() == false) { + throw new NoSuchElementException(); + } + + Aggregations aggs = search(); + if (aggs == null) { + LOGGER.trace(() -> new ParameterizedMessage("[{}] extraction finished", context.jobId)); + hasNext = false; + afterKey = null; + return Optional.empty(); + } + return Optional.of(processAggs(aggs)); + } + + private Aggregations search() { + // Compare to the normal aggregation implementation, this search does not search for the previous bucket's data. + // For composite aggs, since it is scrolling, it is not really possible to know the previous pages results in the current page. + // Aggregations like derivative cannot work within composite aggs, for now. + // Also, it doesn't make sense to have a derivative when grouping by time AND by some other criteria. + + LOGGER.trace( + () -> new ParameterizedMessage( + "[{}] Executing composite aggregated search from [{}] to [{}]", + context.jobId, + context.start, + context.end + ) + ); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .size(0) + .query(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, context.start, context.end)); + + if (context.runtimeMappings.isEmpty() == false) { + searchSourceBuilder.runtimeMappings(context.runtimeMappings); + } + if (afterKey != null) { + compositeAggregationBuilder.aggregateAfter(afterKey); + } + searchSourceBuilder.aggregation(compositeAggregationBuilder); + ActionRequestBuilder searchRequest = requestBuilder.build(searchSourceBuilder); + SearchResponse searchResponse = executeSearchRequest(searchRequest); + LOGGER.trace(() -> new ParameterizedMessage("[{}] Search composite response was obtained", context.jobId)); + timingStatsReporter.reportSearchDuration(searchResponse.getTook()); + Aggregations aggregations = searchResponse.getAggregations(); + if (aggregations == null) { + return null; + } + CompositeAggregation compositeAgg = aggregations.get(compositeAggregationBuilder.getName()); + if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) { + return null; + } + return aggregations; + } + + protected SearchResponse executeSearchRequest(ActionRequestBuilder searchRequestBuilder) { + return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); + } + + private InputStream processAggs(Aggregations aggs) throws IOException { + AggregationToJsonProcessor aggregationToJsonProcessor = new AggregationToJsonProcessor( + context.timeField, + context.fields, + context.includeDocCount, + context.start, + context.compositeAggDateHistogramGroupSourceName + ); + LOGGER.trace(() -> new ParameterizedMessage( + "[{}] got [{}] composite buckets", + context.jobId, + ((CompositeAggregation)aggs.get(compositeAggregationBuilder.getName())).getBuckets().size() + )); + aggregationToJsonProcessor.process(aggs); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final boolean hasAfterKey = afterKey != null && (afterKey.get(context.compositeAggDateHistogramGroupSourceName) instanceof Long); + boolean cancellable = aggregationToJsonProcessor.writeAllDocsCancellable( + timestamp -> { + if (isCancelled) { + // If we have not processed a single composite agg page yet and we are cancelled + // We should not process anything + if (hasAfterKey == false) { + return true; + } + if (nextBucketOnCancel == 0L) { + // If we have been cancelled, record the bucket above our latest timestamp + // This indicates when we have completed the current bucket of this timestamp and thus will move to the next + // date_histogram bucket + nextBucketOnCancel = Intervals.alignToCeil(timestamp, interval); + LOGGER.debug(() -> new ParameterizedMessage( + "[{}] set future timestamp cancel to [{}] via timestamp [{}]", + context.jobId, + nextBucketOnCancel, + timestamp + )); + } + return timestamp >= nextBucketOnCancel; + } + return false; + }, outputStream); + // If the process is canceled and cancelable, then we can indicate that there are no more buckets to process. + if (isCancelled && cancellable) { + LOGGER.debug( + () -> new ParameterizedMessage( + "[{}] cancelled before bucket [{}] on date_histogram page [{}]", + context.jobId, + nextBucketOnCancel, + hasAfterKey ? afterKey.get(context.compositeAggDateHistogramGroupSourceName) : "__null__" + ) + ); + hasNext = false; + } + // Only set the after key once we have processed the search, allows us to cancel on the first page + CompositeAggregation compositeAgg = aggs.get(compositeAggregationBuilder.getName()); + afterKey = compositeAgg.afterKey(); + + return new ByteArrayInputStream(outputStream.toByteArray()); + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorContext.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorContext.java new file mode 100644 index 0000000000000..9650fd949526b --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorContext.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +class CompositeAggregationDataExtractorContext { + + final String jobId; + final String timeField; + final Set fields; + final String[] indices; + final QueryBuilder query; + final CompositeAggregationBuilder compositeAggregationBuilder; + final long start; + final long end; + final boolean includeDocCount; + final Map headers; + final IndicesOptions indicesOptions; + final Map runtimeMappings; + final String compositeAggDateHistogramGroupSourceName; + + CompositeAggregationDataExtractorContext(String jobId, + String timeField, + Set fields, + List indices, + QueryBuilder query, + CompositeAggregationBuilder compositeAggregationBuilder, + String compositeAggDateHistogramGroupSourceName, + long start, + long end, + boolean includeDocCount, + Map headers, + IndicesOptions indicesOptions, + Map runtimeMappings) { + this.jobId = Objects.requireNonNull(jobId); + this.timeField = Objects.requireNonNull(timeField); + this.fields = Objects.requireNonNull(fields); + this.indices = indices.toArray(new String[0]); + this.query = Objects.requireNonNull(query); + this.compositeAggregationBuilder = Objects.requireNonNull(compositeAggregationBuilder); + this.compositeAggDateHistogramGroupSourceName = Objects.requireNonNull(compositeAggDateHistogramGroupSourceName); + this.start = start; + this.end = end; + this.includeDocCount = includeDocCount; + this.headers = headers; + this.indicesOptions = Objects.requireNonNull(indicesOptions); + this.runtimeMappings = Objects.requireNonNull(runtimeMappings); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorFactory.java new file mode 100644 index 0000000000000..be423ec6cdfc1 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorFactory.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.utils.Intervals; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +public class CompositeAggregationDataExtractorFactory implements DataExtractorFactory { + + private final Client client; + private final DatafeedConfig datafeedConfig; + private final Job job; + private final DatafeedTimingStatsReporter timingStatsReporter; + private final String compositeAggName; + private final Collection subAggs; + private final Collection subPipelineAggs; + private final String dateHistogramGroupSourceName; + private final AggregatedSearchRequestBuilder requestBuilder; + private final int numBuckets; + private final List> compositeValuesSourceBuilders; + private final QueryBuilder parsedQuery; + + public CompositeAggregationDataExtractorFactory( + Client client, + DatafeedConfig datafeedConfig, + Job job, + NamedXContentRegistry xContentRegistry, + DatafeedTimingStatsReporter timingStatsReporter, + AggregatedSearchRequestBuilder requestBuilder + ) { + this.client = Objects.requireNonNull(client); + this.datafeedConfig = Objects.requireNonNull(datafeedConfig); + this.job = Objects.requireNonNull(job); + this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter); + this.parsedQuery = datafeedConfig.getParsedQuery(xContentRegistry); + AggregationBuilder aggregationBuilder = ExtractorUtils.getHistogramAggregation( + datafeedConfig.getParsedAggregations(xContentRegistry).getAggregatorFactories() + ); + if (aggregationBuilder instanceof CompositeAggregationBuilder == false) { + throw new IllegalArgumentException( + "top level aggregation must be a composite agg [" + + aggregationBuilder.getName() + + "] is a [" + + aggregationBuilder.getType() + + "]" + ); + } + CompositeAggregationBuilder compositeAggregationBuilder = (CompositeAggregationBuilder) aggregationBuilder; + this.numBuckets = compositeAggregationBuilder.size(); + this.compositeAggName = compositeAggregationBuilder.getName(); + this.subAggs = compositeAggregationBuilder.getSubAggregations(); + this.subPipelineAggs = compositeAggregationBuilder.getPipelineAggregations(); + // We want to make sure our date_histogram source is first. This way we order at the top level by the timestamp + this.compositeValuesSourceBuilders = new ArrayList<>(compositeAggregationBuilder.sources().size()); + List> others = new ArrayList<>(compositeAggregationBuilder.sources().size() - 1); + String dateHistoGroupName = null; + for (CompositeValuesSourceBuilder sourceBuilder : compositeAggregationBuilder.sources()) { + if (sourceBuilder instanceof DateHistogramValuesSourceBuilder) { + this.compositeValuesSourceBuilders.add(sourceBuilder); + dateHistoGroupName = sourceBuilder.name(); + } else { + others.add(sourceBuilder); + } + } + this.dateHistogramGroupSourceName = dateHistoGroupName; + this.compositeValuesSourceBuilders.addAll(others); + this.requestBuilder = requestBuilder; + } + + @Override + public DataExtractor newExtractor(long start, long end) { + CompositeAggregationBuilder compositeAggregationBuilder = new CompositeAggregationBuilder( + compositeAggName, + compositeValuesSourceBuilders + ); + compositeAggregationBuilder.size(numBuckets); + subAggs.forEach(compositeAggregationBuilder::subAggregation); + subPipelineAggs.forEach(compositeAggregationBuilder::subAggregation); + long histogramInterval = ExtractorUtils.getHistogramIntervalMillis(compositeAggregationBuilder); + CompositeAggregationDataExtractorContext dataExtractorContext = new CompositeAggregationDataExtractorContext( + job.getId(), + job.getDataDescription().getTimeField(), + job.getAnalysisConfig().analysisFields(), + datafeedConfig.getIndices(), + parsedQuery, + compositeAggregationBuilder, + this.dateHistogramGroupSourceName, + Intervals.alignToCeil(start, histogramInterval), + Intervals.alignToFloor(end, histogramInterval), + job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT), + datafeedConfig.getHeaders(), + datafeedConfig.getIndicesOptions(), + datafeedConfig.getRuntimeMappings()); + return new CompositeAggregationDataExtractor( + compositeAggregationBuilder, + client, + dataExtractorContext, + timingStatsReporter, + requestBuilder + ); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java index 6a6ee90ee071f..35c66f83128d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactory.java @@ -8,6 +8,8 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.search.aggregations.AggregationBuilder; @@ -21,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.utils.Intervals; import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps; import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps; +import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; @@ -62,6 +65,20 @@ private RollupDataExtractorFactory( this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter); } + public static AggregatedSearchRequestBuilder requestBuilder( + Client client, + String[] indices, + IndicesOptions indicesOptions + ) { + return (searchSourceBuilder) -> { + SearchRequest searchRequest = new SearchRequest().indices(indices) + .indicesOptions(indicesOptions) + .allowPartialSearchResults(false) + .source(searchSourceBuilder); + return new RollupSearchAction.RequestBuilder(client, searchRequest); + }; + } + @Override public DataExtractor newExtractor(long start, long end) { long histogramInterval = datafeedConfig.getHistogramIntervalMillis(xContentRegistry); @@ -89,13 +106,6 @@ public static void create(Client client, DatafeedTimingStatsReporter timingStatsReporter, ActionListener listener) { - if (datafeed.getRuntimeMappings().isEmpty() == false) { - // TODO Rollup V2 will support runtime fields - listener.onFailure(new IllegalArgumentException("The datafeed has runtime_mappings defined, " - + "runtime fields are not supported in rollup searches")); - return; - } - final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation( datafeed.getParsedAggregations(xContentRegistry).getAggregatorFactories()); if ((datafeedHistogramAggregation instanceof DateHistogramAggregationBuilder) == false) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java index 61263a48e77ee..0b675b171acba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.java @@ -265,9 +265,9 @@ private RollupSearchAction.RequestBuilder rollupRangeSearchRequest() { private class ScrolledDataSummary implements DataSummary { - private long earliestTime; - private long latestTime; - private long totalHits; + private final long earliestTime; + private final long latestTime; + private final long totalHits; private ScrolledDataSummary(long earliestTime, long latestTime, long totalHits) { this.earliestTime = earliestTime; @@ -309,7 +309,7 @@ public boolean hasData() { } } - private class AggregatedDataSummary implements DataSummary { + private static class AggregatedDataSummary implements DataSummary { private final double earliestTime; private final double latestTime; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java index 32b1a71ccfe75..a868b0447df01 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java @@ -209,6 +209,44 @@ public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener); } + public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndRuntimeFields() { + givenAggregatableRollup("myField", "max", 5, "termField"); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField"); + TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField); + Map settings = new HashMap<>(); + settings.put("type", "keyword"); + settings.put("script", ""); + Map field = new HashMap<>(); + field.put("runtime_field_bar", settings); + datafeedConfig.setParsedAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.dateHistogram("time") + .fixedInterval(new DateHistogramInterval("600000ms")) + .subAggregation(maxTime) + .subAggregation(myTerm) + .field("time"))) + .setRuntimeMappings(field); + ActionListener listener = ActionListener.wrap( + dataExtractorFactory -> fail(), + e -> { + assertThat( + e.getMessage(), + equalTo("The datafeed has runtime_mappings defined, runtime fields are not supported in rollup searches") + ); + assertThat(e, instanceOf(IllegalArgumentException.class)); + } + ); + DataExtractorFactory.create( + client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener); + } + + public void testCreateDataExtractorFactoryGivenRollupAndValidAggregation() { givenAggregatableRollup("myField", "max", 5, "termField"); DataDescription.Builder dataDescription = new DataDescription.Builder(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 6fe60925ab2bb..5f923eff7ca02 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -146,38 +146,6 @@ public void testExtraction() throws IOException { stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime"))); } - public void testExtractionGivenMultipleBatches() throws IOException { - // Each bucket is 4 key-value pairs and there are 2 terms, thus 600 buckets will be 600 * 4 * 2 = 4800 - // key-value pairs. They should be processed in 5 batches. - int buckets = 600; - List histogramBuckets = new ArrayList<>(buckets); - long timestamp = 1000; - for (int i = 0; i < buckets; i++) { - histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp), - createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0))))); - timestamp += 1000L; - } - - TestDataExtractor extractor = new TestDataExtractor(1000L, timestamp + 1); - - SearchResponse response = createSearchResponse("time", histogramBuckets); - extractor.setNextResponse(response); - - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(200L)); - assertThat(extractor.hasNext(), is(false)); - - assertThat(capturedSearchRequests.size(), equalTo(1)); - } - public void testExtractionGivenResponseHasNullAggs() throws IOException { TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); @@ -246,13 +214,16 @@ public void testExtractionGivenCancelHalfWay() throws IOException { extractor.setNextResponse(response); assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - assertThat(countMatches('{', asString(extractor.next().get())), equalTo(250L)); - assertThat(extractor.hasNext(), is(true)); - + assertThat(countMatches('{', asString(extractor.next().get())), equalTo(2400L)); + histogramBuckets = new ArrayList<>(buckets); + for (int i = 0; i < buckets; i++) { + histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp), + createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0))))); + timestamp += 1000L; + } + response = createSearchResponse("time", histogramBuckets); + extractor.setNextResponse(response); extractor.cancel(); - assertThat(extractor.hasNext(), is(false)); assertThat(extractor.isCancelled(), is(true)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index a19985e3aa1e6..b809ac19cae18 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -6,13 +6,16 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.Avg; import org.elasticsearch.search.aggregations.metrics.GeoCentroid; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.Max; @@ -41,6 +44,24 @@ static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, Lis return bucket; } + static CompositeAggregation.Bucket createCompositeBucket(long timestamp, + String dateValueSource, + long docCount, + List subAggregations, + List> termValues) { + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + when(bucket.getDocCount()).thenReturn(docCount); + Aggregations aggs = createAggs(subAggregations); + when(bucket.getAggregations()).thenReturn(aggs); + Map bucketKey = new HashMap<>(); + bucketKey.put(dateValueSource, timestamp); + for (Tuple termValue : termValues) { + bucketKey.put(termValue.v1(), termValue.v2()); + } + when(bucket.getKey()).thenReturn(bucketKey); + return bucket; + } + static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List subAggregations) { SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class); when(singleBucketAggregation.getName()).thenReturn(name); @@ -65,6 +86,15 @@ static Histogram createHistogramAggregation(String name, List return histogram; } + @SuppressWarnings("unchecked") + static CompositeAggregation createCompositeAggregation(String name, List buckets) { + CompositeAggregation compositeAggregation = mock(CompositeAggregation.class); + when((List)compositeAggregation.getBuckets()).thenReturn(buckets); + when(compositeAggregation.getName()).thenReturn(name); + return compositeAggregation; + + } + static Max createMax(String name, double value) { Max max = mock(Max.class); when(max.getName()).thenReturn(name); @@ -73,6 +103,14 @@ static Max createMax(String name, double value) { return max; } + static Avg createAvg(String name, double value) { + Avg avg = mock(Avg.class); + when(avg.getName()).thenReturn(name); + when(avg.value()).thenReturn(value); + when(avg.getValue()).thenReturn(value); + return avg; + } + static GeoCentroid createGeoCentroid(String name, long count, double lat, double lon) { GeoCentroid centroid = mock(GeoCentroid.class); when(centroid.count()).thenReturn(count); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index 474545339ad27..7e2284f684bf4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -6,14 +6,17 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -28,6 +31,8 @@ import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createCompositeAggregation; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createCompositeBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createGeoCentroid; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramAggregation; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; @@ -47,25 +52,52 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String timeField = "time"; private boolean includeDocCount = true; private long startTime = 0; + private String compositeAggValueSource = "time"; - public void testProcessGivenMultipleDateHistograms() { - List nestedHistogramBuckets = Arrays.asList( + @Before + public void setValues() { + compositeAggValueSource = "time"; + } + + public void testProcessGivenMultipleDateHistogramsOrComposite() { + Aggregation nestedBucket; + if (randomBoolean()) { + List nestedHistogramBuckets = Arrays.asList( createHistogramBucket(1000L, 3, Collections.singletonList(createMax("metric1", 1200))), createHistogramBucket(2000L, 5, Collections.singletonList(createMax("metric1", 2800))) - ); - Histogram histogram = createHistogramAggregation("buckets", nestedHistogramBuckets); + ); + nestedBucket = createHistogramAggregation("buckets", nestedHistogramBuckets); + } else { + List nestedCompositebuckets = Arrays.asList( + createCompositeBucket( + 1000L, + "time", + 3, + Collections.singletonList(createMax("metric1", 1200)), + Collections.emptyList() + ), + createCompositeBucket( + 2000L, + "time", + 5, + Collections.singletonList(createMax("metric1", 2800)), + Collections.emptyList() + ) + ); + nestedBucket = createCompositeAggregation("buckets", nestedCompositebuckets); + } List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 3, Arrays.asList(createMax("time", 1000L), histogram)) + createHistogramBucket(1000L, 3, Arrays.asList(createMax("time", 1000L), nestedBucket)) ); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(Sets.newHashSet("my_field"), histogramBuckets)); - assertThat(e.getMessage(), containsString("More than one Date histogram cannot be used in the aggregation. " + - "[buckets] is another instance of a Date histogram")); + assertThat(e.getMessage(), containsString("More than one composite or date_histogram cannot be used in the aggregation." + + " [buckets] is another instance of a composite or date_histogram aggregation")); } - public void testProcessGivenMaxTimeIsMissing() throws IOException { + public void testProcessGivenMaxTimeIsMissing() { List histogramBuckets = Arrays.asList( createHistogramBucket(1000L, 3), createHistogramBucket(2000L, 5) @@ -74,17 +106,37 @@ public void testProcessGivenMaxTimeIsMissing() throws IOException { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(Collections.emptySet(), histogramBuckets)); assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); + + List compositeBuckets = Arrays.asList( + createCompositeBucket(1000L, "time",3, Collections.emptyList(), Collections.emptyList()), + createCompositeBucket(2000L, "time",5, Collections.emptyList(), Collections.emptyList()) + ); + + e = expectThrows(IllegalArgumentException.class, + () -> aggToStringComposite(Collections.emptySet(), compositeBuckets)); + assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); } - public void testProcessGivenNonMaxTimeAgg() throws IOException { + public void testProcessGivenNonMaxTimeAgg() { + List aggs = Collections.singletonList(createTerms("time", new Term("a", 1), new Term("b", 2))); List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 3, Collections.singletonList(createTerms("time", new Term("a", 1), new Term("b", 2)))), - createHistogramBucket(2000L, 5, Collections.singletonList(createTerms("time", new Term("a", 1), new Term("b", 2)))) + createHistogramBucket(1000L, 3, aggs), + createHistogramBucket(2000L, 5, aggs) ); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(Collections.emptySet(), histogramBuckets)); assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); + + + List compositeBuckets = Arrays.asList( + createCompositeBucket(1000L, "time", 3, aggs, Collections.emptyList()), + createCompositeBucket(2000L, "time",5, aggs, Collections.emptyList()) + ); + + e = expectThrows(IllegalArgumentException.class, + () -> aggToStringComposite(Collections.emptySet(), compositeBuckets)); + assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); } public void testProcessGivenHistogramOnly() throws IOException { @@ -113,6 +165,63 @@ public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException { assertThat(keyValuePairsWritten, equalTo(2L)); } + + public void testProcessGivenCompositeOnly() throws IOException { + compositeAggValueSource = "timestamp"; + List compositeBuckets = Arrays.asList( + createCompositeBucket(1000L, "timestamp", 3, Collections.singletonList(createMax("timestamp", 1200)), Collections.emptyList()), + createCompositeBucket(2000L, "timestamp", 5, Collections.singletonList(createMax("timestamp", 2800)), Collections.emptyList()) + ); + + timeField = "timestamp"; + String json = aggToStringComposite(Collections.emptySet(), compositeBuckets); + + assertThat(json, equalTo("{\"timestamp\":1200,\"doc_count\":3} {\"timestamp\":2800,\"doc_count\":5}")); + assertThat(keyValuePairsWritten, equalTo(4L)); + } + + public void testProcessGivenCompositeOnlyAndNoDocCount() throws IOException { + List compositeBuckets = Arrays.asList( + createCompositeBucket(1000L, "time", 3, Collections.singletonList(createMax("time", 1000)), Collections.emptyList()), + createCompositeBucket(2000L, "time", 5, Collections.singletonList(createMax("time", 2000)), Collections.emptyList()) + ); + + includeDocCount = false; + String json = aggToStringComposite(Collections.emptySet(), compositeBuckets); + + assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}")); + assertThat(keyValuePairsWritten, equalTo(2L)); + } + + public void testProcessGivenCompositeWithDocAndTerms() throws IOException { + compositeAggValueSource = "timestamp"; + List compositeBuckets = Arrays.asList( + createCompositeBucket(1000L, + "timestamp", + 3, + Collections.singletonList(createMax("timestamp", 1200)), + Arrays.asList(Tuple.tuple("foo", "value1"), Tuple.tuple("bar", "value1")) + ), + createCompositeBucket(2000L, + "timestamp", + 5, + Collections.singletonList(createMax("timestamp", 2800)), + Arrays.asList(Tuple.tuple("foo", "value2"), Tuple.tuple("bar", "value2")) + ) + ); + + timeField = "timestamp"; + String json = aggToStringComposite(Sets.newHashSet("foo", "bar"), compositeBuckets); + + assertThat(json, + equalTo( + "{\"bar\":\"value1\",\"foo\":\"value1\",\"timestamp\":1200,\"doc_count\":3}" + + " {\"bar\":\"value2\",\"foo\":\"value2\",\"timestamp\":2800,\"doc_count\":5}" + ) + ); + assertThat(keyValuePairsWritten, equalTo(8L)); + } + public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException { List histogramABuckets = Arrays.asList( @@ -265,7 +374,7 @@ public void testProcessGivenMultipleSingleMetricPerSingleTermsPerHistogram() thr "{\"time\":4000,\"my_field\":\"b\",\"my_value\":421.0,\"my_value2\":422.0}")); } - public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException { + public void testProcessGivenUnsupportedAggregationUnderHistogram() { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Aggregation anotherHistogram = mock(Aggregation.class); when(anotherHistogram.getName()).thenReturn("nested-agg"); @@ -277,7 +386,7 @@ public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOExce assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]")); } - public void testProcessGivenMultipleBucketAggregations() throws IOException { + public void testProcessGivenMultipleBucketAggregations() { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Terms terms1 = mock(Terms.class); when(terms1.getName()).thenReturn("terms_1"); @@ -355,7 +464,7 @@ public void testProcessGivenSinglePercentilesPerHistogram() throws IOException { "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); } - public void testProcessGivenMultiplePercentilesPerHistogram() throws IOException { + public void testProcessGivenMultiplePercentilesPerHistogram() { List histogramBuckets = Arrays.asList( createHistogramBucket(1000L, 4, Arrays.asList( createMax("time", 1000), createPercentiles("my_field", 1.0))), @@ -373,10 +482,10 @@ public void testProcessGivenMultiplePercentilesPerHistogram() throws IOException } @SuppressWarnings("unchecked") - public void testBucketAggContainsRequiredAgg() throws IOException { + public void testBucketAggContainsRequiredAgg() { Set fields = new HashSet<>(); fields.add("foo"); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor("time", fields, false, 0L, null); Terms termsAgg = mock(Terms.class); when(termsAgg.getBuckets()).thenReturn(Collections.emptyList()); @@ -459,7 +568,7 @@ public void testSingleBucketAgg() throws IOException { " {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}")); } - public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException { + public void testSingleBucketAgg_failureWithSubMultiBucket() { List histogramBuckets = Collections.singletonList( createHistogramBucket(1000L, 4, Arrays.asList( @@ -500,10 +609,22 @@ private String aggToString(Set fields, List buckets) t private String aggToString(Set fields, Aggregations aggregations) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, startTime); + AggregationToJsonProcessor processor = new AggregationToJsonProcessor( + timeField, + fields, + includeDocCount, + startTime, + compositeAggValueSource + ); processor.process(aggregations); - processor.writeDocs(10000, outputStream); + processor.writeAllDocsCancellable(_timestamp -> false, outputStream); keyValuePairsWritten = processor.getKeyValueCount(); return outputStream.toString(StandardCharsets.UTF_8.name()); } + + private String aggToStringComposite(Set fields, List buckets) throws IOException { + CompositeAggregation compositeAggregation = createCompositeAggregation("buckets", buckets); + return aggToString(fields, createAggs(Collections.singletonList(compositeAggregation))); + } + } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java new file mode 100644 index 0000000000000..d53048db54977 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java @@ -0,0 +1,397 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; +import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister; +import org.junit.Before; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAvg; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createCompositeBucket; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.stringContainsInOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CompositeAggregationDataExtractorTests extends ESTestCase { + + private Client testClient; + private List capturedSearchRequests; + private String jobId; + private String timeField; + private Set fields; + private List indices; + private QueryBuilder query; + private DatafeedTimingStatsReporter timingStatsReporter; + private CompositeAggregationBuilder compositeAggregationBuilder; + private AggregatedSearchRequestBuilder aggregatedSearchRequestBuilder; + private Map runtimeMappings; + + private class TestDataExtractor extends CompositeAggregationDataExtractor { + + private SearchResponse nextResponse; + private SearchPhaseExecutionException ex; + + TestDataExtractor(long start, long end) { + super(compositeAggregationBuilder, testClient, createContext(start, end), timingStatsReporter, aggregatedSearchRequestBuilder); + } + + @Override + protected SearchResponse executeSearchRequest(ActionRequestBuilder searchRequestBuilder) { + capturedSearchRequests.add(searchRequestBuilder.request()); + if (ex != null) { + throw ex; + } + return nextResponse; + } + + void setNextResponse(SearchResponse searchResponse) { + nextResponse = searchResponse; + } + + void setNextResponseToError(SearchPhaseExecutionException ex) { + this.ex = ex; + } + } + + @Before + public void setUpTests() { + testClient = mock(Client.class); + capturedSearchRequests = new ArrayList<>(); + jobId = "test-job"; + timeField = "time"; + fields = new HashSet<>(); + fields.addAll(Arrays.asList("time", "airline", "responsetime")); + indices = Arrays.asList("index-1", "index-2"); + query = QueryBuilders.matchAllQuery(); + compositeAggregationBuilder = AggregationBuilders.composite( + "buckets", + Arrays.asList( + new DateHistogramValuesSourceBuilder("time_bucket") + .field("time") + .fixedInterval(new DateHistogramInterval("1000ms")), + new TermsValuesSourceBuilder("airline").field("airline"))) + .size(10) + .subAggregation(AggregationBuilders.max("time").field("time")) + .subAggregation(AggregationBuilders.avg("responsetime").field("responsetime")); + runtimeMappings = Collections.emptyMap(); + timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(DatafeedTimingStatsPersister.class)); + aggregatedSearchRequestBuilder = (searchSourceBuilder) -> new SearchRequestBuilder(testClient, SearchAction.INSTANCE) + .setSource(searchSourceBuilder) + .setAllowPartialSearchResults(false) + .setIndices(indices.toArray(String[]::new)); + } + + public void testExtraction() throws IOException { + List compositeBucket = Arrays.asList( + createCompositeBucket( + 1000L, + "time_bucket", + 1, + Arrays.asList(createMax("time", 1999), createAvg("responsetime", 11.0)), + Collections.singletonList(Tuple.tuple("airline", "a")) + ), + createCompositeBucket( + 1000L, + "time_bucket", + 2, + Arrays.asList(createMax("time", 1999), createAvg("responsetime", 12.0)), + Collections.singletonList(Tuple.tuple("airline", "b")) + ), + createCompositeBucket( + 2000L, + "time_bucket", + 0, + Collections.emptyList(), + Collections.emptyList() + ), + createCompositeBucket( + 3000L, + "time_bucket", + 4, + Arrays.asList(createMax("time", 3999), createAvg("responsetime", 31.0)), + Collections.singletonList(Tuple.tuple("airline", "c")) + ), + createCompositeBucket( + 3000L, + "time_bucket", + 3, + Arrays.asList(createMax("time", 3999), createAvg("responsetime", 32.0)), + Collections.singletonList(Tuple.tuple("airline", "b")) + )); + + TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L); + + SearchResponse response = createSearchResponse("buckets", + compositeBucket, + MapBuilder.newMapBuilder() + .put("time_bucket", 4000L) + .put("airline", "d") + .map() + ); + extractor.setNextResponse(response); + + assertThat(extractor.hasNext(), is(true)); + Optional stream = extractor.next(); + assertThat(stream.isPresent(), is(true)); + String expectedStream = "{\"airline\":\"a\",\"time\":1999,\"responsetime\":11.0,\"doc_count\":1} " + + "{\"airline\":\"b\",\"time\":1999,\"responsetime\":12.0,\"doc_count\":2} " + + "{\"airline\":\"c\",\"time\":3999,\"responsetime\":31.0,\"doc_count\":4} " + + "{\"airline\":\"b\",\"time\":3999,\"responsetime\":32.0,\"doc_count\":3}"; + assertThat(asString(stream.get()), equalTo(expectedStream)); + assertThat(capturedSearchRequests.size(), equalTo(1)); + + String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"size\":0")); + assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," + + "{\"range\":{\"time\":{\"from\":1000,\"to\":4000,\"include_lower\":true,\"include_upper\":false," + + "\"format\":\"epoch_millis\",\"boost\":1.0}}}]")); + assertThat(searchRequest, + stringContainsInOrder(Arrays.asList("aggregations", "composite", "time", "terms", "airline", "avg", "responsetime"))); + } + + public void testExtractionGivenResponseHasNullAggs() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + + SearchResponse response = createSearchResponse(null); + extractor.setNextResponse(response); + + assertThat(extractor.hasNext(), is(true)); + assertThat(extractor.next().isPresent(), is(false)); + assertThat(extractor.hasNext(), is(false)); + + assertThat(capturedSearchRequests.size(), equalTo(1)); + } + + public void testExtractionGivenResponseHasEmptyAggs() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + Aggregations emptyAggs = AggregationTestUtils.createAggs(Collections.emptyList()); + SearchResponse response = createSearchResponse(emptyAggs); + extractor.setNextResponse(response); + + assertThat(extractor.hasNext(), is(true)); + assertThat(extractor.next().isPresent(), is(false)); + assertThat(extractor.hasNext(), is(false)); + + assertThat(capturedSearchRequests.size(), equalTo(1)); + } + + public void testExtractionGivenCancelBeforeNext() { + TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L); + SearchResponse response = createSearchResponse("time", Collections.emptyList(), Collections.emptyMap()); + extractor.setNextResponse(response); + + extractor.cancel(); + // Composite aggs should be true because we need to make sure the first search has occurred or not + assertThat(extractor.hasNext(), is(true)); + } + + public void testExtractionCancelOnFirstPage() throws IOException { + int numBuckets = 10; + List buckets = new ArrayList<>(numBuckets); + long timestamp = 1000; + for (int i = 0; i < numBuckets; i++) { + buckets.add( + createCompositeBucket( + timestamp, + "time_bucket", + 3, + Arrays.asList(createMax("time", randomLongBetween(timestamp, timestamp + 1000)), createAvg("responsetime", 32.0)), + Collections.singletonList(Tuple.tuple("airline", "c")) + ) + ); + } + + TestDataExtractor extractor = new TestDataExtractor(1000L, timestamp + 1000 + 1); + + SearchResponse response = createSearchResponse( + "buckets", + buckets, + MapBuilder.newMapBuilder() + .put("time_bucket", 1000L) + .put("airline", "d") + .map() + ); + extractor.setNextResponse(response); + extractor.cancel(); + // We should have next right now as we have not yet determined if we have handled a page or not + assertThat(extractor.hasNext(), is(true)); + // Should be empty + assertThat(countMatches('{', asString(extractor.next().get())), equalTo(0L)); + // Determined that we were on the first page and ended + assertThat(extractor.hasNext(), is(false)); + } + + public void testExtractionGivenCancelHalfWay() throws IOException { + int numBuckets = 10; + List buckets = new ArrayList<>(numBuckets); + long timestamp = 1000; + for (int i = 0; i < numBuckets; i++) { + buckets.add( + createCompositeBucket( + timestamp, + "time_bucket", + 3, + Arrays.asList(createMax("time", randomLongBetween(timestamp, timestamp + 1000)), createAvg("responsetime", 32.0)), + Collections.singletonList(Tuple.tuple("airline", "c")) + ) + ); + } + + TestDataExtractor extractor = new TestDataExtractor(1000L, timestamp + 1000 + 1); + + SearchResponse response = createSearchResponse( + "buckets", + buckets, + MapBuilder.newMapBuilder() + .put("time_bucket", 1000L) + .put("airline", "d") + .map() + ); + extractor.setNextResponse(response); + + assertThat(extractor.hasNext(), is(true)); + assertThat(countMatches('{', asString(extractor.next().get())), equalTo(10L)); + buckets = new ArrayList<>(numBuckets); + for (int i = 0; i < 6; i++) { + buckets.add( + createCompositeBucket( + timestamp, + "time_bucket", + 3, + Arrays.asList(createMax("time", randomLongBetween(timestamp, timestamp + 1000)), createAvg("responsetime", 32.0)), + Collections.singletonList(Tuple.tuple("airline", "c")) + ) + ); + } + timestamp += 1000; + for (int i = 0; i < 4; i++) { + buckets.add( + createCompositeBucket( + timestamp, + "time_bucket", + 3, + Arrays.asList(createMax("time", randomLongBetween(timestamp, timestamp + 1000)), + createAvg("responsetime", 32.0)), + Collections.singletonList(Tuple.tuple("airline", "c")) + ) + ); + } + response = createSearchResponse("buckets", + buckets, + MapBuilder.newMapBuilder() + .put("time_bucket", 3000L) + .put("airline", "a") + .map()); + extractor.setNextResponse(response); + extractor.cancel(); + assertThat(extractor.hasNext(), is(true)); + assertThat(extractor.isCancelled(), is(true)); + // Only the docs in the previous bucket before cancelling + assertThat(countMatches('{', asString(extractor.next().get())), equalTo(6L)); + + // Once we have handled the 6 remaining in that time bucket, we shouldn't finish the page and the extractor should end + assertThat(extractor.hasNext(), is(false)); + assertThat(capturedSearchRequests.size(), equalTo(2)); + } + + public void testExtractionGivenSearchResponseHasError() { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + extractor.setNextResponseToError(new SearchPhaseExecutionException("phase 1", "boom", ShardSearchFailure.EMPTY_ARRAY)); + + assertThat(extractor.hasNext(), is(true)); + expectThrows(SearchPhaseExecutionException.class, extractor::next); + } + + private CompositeAggregationDataExtractorContext createContext(long start, long end) { + return new CompositeAggregationDataExtractorContext( + jobId, + timeField, + fields, + indices, + query, + compositeAggregationBuilder, + "time_bucket", + start, + end, + true, + Collections.emptyMap(), + SearchRequest.DEFAULT_INDICES_OPTIONS, + runtimeMappings); + } + + @SuppressWarnings("unchecked") + private SearchResponse createSearchResponse(String aggName, List buckets, Map afterKey) { + CompositeAggregation compositeAggregation = mock(CompositeAggregation.class); + when(compositeAggregation.getName()).thenReturn(aggName); + when(compositeAggregation.afterKey()).thenReturn(afterKey); + when((List)compositeAggregation.getBuckets()).thenReturn(buckets); + + Aggregations searchAggs = AggregationTestUtils.createAggs(Collections.singletonList(compositeAggregation)); + return createSearchResponse(searchAggs); + } + + private SearchResponse createSearchResponse(Aggregations aggregations) { + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.status()).thenReturn(RestStatus.OK); + when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000)); + when(searchResponse.getAggregations()).thenReturn(aggregations); + when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(randomNonNegativeLong())); + return searchResponse; + } + + private static String asString(InputStream inputStream) throws IOException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); + } + } + + private static long countMatches(char c, String text) { + return text.chars().filter(current -> current == c).count(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactoryTests.java deleted file mode 100644 index 2c6ae2b5a6a63..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/RollupDataExtractorFactoryTests.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.core.ml.job.config.DataDescription; -import org.elasticsearch.xpack.core.ml.job.config.Detector; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; - -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Mockito.mock; - -public class RollupDataExtractorFactoryTests extends ESTestCase { - - public void testCreateWithRuntimeFields() { - String jobId = "foojob"; - - Detector.Builder detectorBuilder = new Detector.Builder(); - detectorBuilder.setFunction("sum"); - detectorBuilder.setFieldName("value"); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detectorBuilder.build())); - Job.Builder jobBuilder = new Job.Builder(jobId); - jobBuilder.setDataDescription(new DataDescription.Builder()); - jobBuilder.setAnalysisConfig(analysisConfig); - - DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobId); - datafeedConfigBuilder.setIndices(Collections.singletonList("my_index")); - - Map settings = new HashMap<>(); - settings.put("type", "keyword"); - settings.put("script", ""); - Map field = new HashMap<>(); - field.put("runtime_field_bar", settings); - datafeedConfigBuilder.setRuntimeMappings(field); - - AtomicReference exceptionRef = new AtomicReference<>(); - ActionListener listener = ActionListener.wrap( - r -> fail("unexpected response"), - exceptionRef::set - ); - - RollupDataExtractorFactory.create(mock(Client.class), datafeedConfigBuilder.build(), jobBuilder.build(new Date()), - Collections.emptyMap(), xContentRegistry(), mock(DatafeedTimingStatsReporter.class), listener); - - assertNotNull(exceptionRef.get()); - Exception e = exceptionRef.get(); - assertThat(e, instanceOf(IllegalArgumentException.class)); - assertThat(e.getMessage(), equalTo("The datafeed has runtime_mappings defined, " + - "runtime fields are not supported in rollup searches")); - } -}