From 51342c393cb4a8270f0c0e257f428d9492db633f Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 25 Apr 2018 18:30:09 +0100 Subject: [PATCH 1/3] [ML] Account for gaps in data counts after job is reopened This commit fixes an issue with the data diagnostics were empty buckets are not reported even though they should. Once a job is reopened, the diagnostics do not get initialized from the current data counts (especially the latest record timestamp). The result is that if the data that is sent have a time gap compared to the previous ones, that gap is not accounted for in the empty bucket count. This commit fixes that by initializing the diagnostics with the current data counts. Closes #30080 --- .../ml/job/process/DataCountsReporter.java | 2 +- .../diagnostics/BucketDiagnostics.java | 10 +- .../diagnostics/DataStreamDiagnostics.java | 5 +- .../DataStreamDiagnosticsTests.java | 31 ++++--- .../ml/integration/ReopenJobWithGapIT.java | 93 +++++++++++++++++++ 5 files changed, 123 insertions(+), 18 deletions(-) create mode 100644 x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java index 80223027e8ee0..d906ccf2f7a9b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java @@ -82,7 +82,7 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData totalRecordStats = counts; incrementalRecordStats = new DataCounts(job.getId()); - diagnostics = new DataStreamDiagnostics(job); + diagnostics = new DataStreamDiagnostics(job, counts); acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings); acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java index c61926dfb0426..a4497653497ea 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/BucketDiagnostics.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ml.job.process.diagnostics; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.utils.Intervals; +import java.util.Date; + /** * A moving window of buckets that allow keeping * track of some statistics like the bucket count, @@ -33,12 +36,17 @@ class BucketDiagnostics { private long latestFlushedBucketStartMs = -1; private final BucketFlushListener bucketFlushListener; - BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) { + BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) { bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis(); latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis(); maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS); buckets = new long[maxSize]; this.bucketFlushListener = bucketFlushListener; + + Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp(); + if (latestRecordTimestamp != null) { + addRecord(latestRecordTimestamp.getTime()); + } } void addRecord(long recordTimestampMs) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java index a19f6eba02367..a225587d0bb75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnostics.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import java.util.Date; @@ -32,8 +33,8 @@ public class DataStreamDiagnostics { private long sparseBucketCount = 0; private long latestSparseBucketTime = -1; - public DataStreamDiagnostics(Job job) { - bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener()); + public DataStreamDiagnostics(Job job, DataCounts dataCounts) { + bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener()); } private BucketDiagnostics.BucketFlushListener createBucketFlushListener() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java index 19f7f88c38fef..0d9c52a28bd33 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/diagnostics/DataStreamDiagnosticsTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.junit.Before; import java.util.Arrays; @@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase { private static final long BUCKET_SPAN = 60000; private Job job; + private DataCounts dataCounts; @Before public void setUpMocks() { @@ -32,10 +34,11 @@ public void setUpMocks() { builder.setAnalysisConfig(acBuilder); builder.setDataDescription(new DataDescription.Builder()); job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null); + dataCounts = new DataCounts(job.getId()); } public void testIncompleteBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1000); d.checkRecord(2000); @@ -81,7 +84,7 @@ public void testIncompleteBuckets() { } public void testSimple() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(70000); d.checkRecord(130000); @@ -103,7 +106,7 @@ public void testSimple() { } public void testSimpleReverse() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(610000); d.checkRecord(550000); @@ -126,7 +129,7 @@ public void testSimpleReverse() { public void testWithLatencyLessThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -141,7 +144,7 @@ public void testWithLatencyLessThanTenBuckets() { public void testWithLatencyGreaterThanTenBuckets() { job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000)); - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); long timestamp = 70000; while (timestamp < 70000 + 20 * BUCKET_SPAN) { @@ -155,7 +158,7 @@ public void testWithLatencyGreaterThanTenBuckets() { } public void testEmptyBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -177,7 +180,7 @@ public void testEmptyBuckets() { } public void testEmptyBucketsStartLater() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(1110000); d.checkRecord(1170000); @@ -199,7 +202,7 @@ public void testEmptyBucketsStartLater() { } public void testSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -227,7 +230,7 @@ public void testSparseBuckets() { * signal */ public void testSparseBucketsLast() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -255,7 +258,7 @@ public void testSparseBucketsLast() { * signal on the 2nd to last */ public void testSparseBucketsLastTwo() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -280,7 +283,7 @@ public void testSparseBucketsLastTwo() { } public void testMixedEmptyAndSparseBuckets() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -308,7 +311,7 @@ public void testMixedEmptyAndSparseBuckets() { * whether counts are right. */ public void testEmptyBucketsLongerOutage() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.checkRecord(10000); d.checkRecord(70000); @@ -336,7 +339,7 @@ public void testEmptyBucketsLongerOutage() { * The number of sparse buckets should not be to much, it could be normal. */ public void testSparseBucketsLongerPeriod() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); sendManyDataPoints(d, 10000, 69000, 1000); sendManyDataPoints(d, 70000, 129000, 1200); @@ -374,7 +377,7 @@ private static Job createJob(TimeValue bucketSpan, TimeValue latency) { } public void testFlushAfterZeroRecords() { - DataStreamDiagnostics d = new DataStreamDiagnostics(job); + DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts); d.flush(); assertEquals(0, d.getBucketCount()); } diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java new file mode 100644 index 0000000000000..993f370723774 --- /dev/null +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ReopenJobWithGapIT.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests that after reopening a job and sending more + * data after a gap, data counts are reported correctly. + */ +public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase { + + private static final String JOB_ID = "reopen-job-with-gap-test"; + private static final long BUCKET_SPAN_SECONDS = 3600; + + @After + public void cleanUpTest() { + cleanUp(); + } + + public void test() throws Exception { + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder( + Collections.singletonList(new Detector.Builder("count", null).build())); + analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS)); + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeFormat("epoch"); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(dataDescription); + + registerJob(job); + putJob(job); + openJob(job.getId()); + + long timestamp = 1483228800L; // 2017-01-01T00:00:00Z + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + request.setExcludeInterim(true); + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L)); + assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L)); + + timestamp += 10 * BUCKET_SPAN_SECONDS; + data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createJsonRecord(createRecord(timestamp))); + timestamp += BUCKET_SPAN_SECONDS; + } + + openJob(job.getId()); + postData(job.getId(), data.stream().collect(Collectors.joining())); + flushJob(job.getId(), true); + closeJob(job.getId()); + + assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L)); + DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts(); + assertThat(dataCounts.getBucketCount(), equalTo(29L)); + assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L)); + } + + private static Map createRecord(long timestamp) { + Map record = new HashMap<>(); + record.put("time", timestamp); + return record; + } +} From 505bfb6c0dd7d1195f2fdc7b4676d75941e00e71 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 1 May 2018 12:53:06 +0100 Subject: [PATCH 2/3] Fix MlBasicMultiNodeIT --- .../xpack/ml/integration/MlBasicMultiNodeIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java index 3b84994f5acca..e7381050260c4 100644 --- a/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java +++ b/x-pack/qa/ml-basic-multi-node/src/test/java/org/elasticsearch/xpack/ml/integration/MlBasicMultiNodeIT.java @@ -241,7 +241,7 @@ public void testMiniFarequoteReopen() throws Exception { assertEquals(0, responseBody.get("invalid_date_count")); assertEquals(0, responseBody.get("missing_field_count")); assertEquals(0, responseBody.get("out_of_order_timestamp_count")); - assertEquals(0, responseBody.get("bucket_count")); + assertEquals(1000, responseBody.get("bucket_count")); // unintuitive: should return the earliest record timestamp of this feed??? assertEquals(null, responseBody.get("earliest_record_timestamp")); @@ -266,7 +266,7 @@ public void testMiniFarequoteReopen() throws Exception { assertEquals(0, dataCountsDoc.get("invalid_date_count")); assertEquals(0, dataCountsDoc.get("missing_field_count")); assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count")); - assertEquals(0, dataCountsDoc.get("bucket_count")); + assertEquals(1000, dataCountsDoc.get("bucket_count")); assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp")); assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp")); From afb93bfed8f4bede0806aaee1615c8c2e0fd92a0 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 2 May 2018 14:35:59 +0100 Subject: [PATCH 3/3] Add to changelog --- docs/CHANGELOG.asciidoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 0a04cc950e388..01f41bf626677 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -104,6 +104,10 @@ Do not ignore request analysis/similarity settings on index resize operations wh Fix NPE when CumulativeSum agg encounters null value/empty bucket ({pull}29641[#29641]) +Machine Learning:: + +* Account for gaps in data counts after job is reopened ({pull}30294[#30294]) + //[float] //=== Regressions