Skip to content

Commit

Permalink
[ML] Add runtime support for anomaly charts & add composite validatio…
Browse files Browse the repository at this point in the history
…ns (#96348)
  • Loading branch information
qn895 authored Apr 12, 2021
1 parent 92b98e7 commit e7f5d07
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 92 deletions.
14 changes: 2 additions & 12 deletions x-pack/plugins/ml/common/types/anomaly_detection_jobs/datafeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { estypes } from '@elastic/elasticsearch';
import type { estypes } from '@elastic/elasticsearch';
// import { IndexPatternTitle } from '../kibana';
// import { RuntimeMappings } from '../fields';
// import { JobId } from './job';
Expand Down Expand Up @@ -41,17 +41,7 @@ export type ChunkingConfig = estypes.ChunkingConfig;
// time_span?: string;
// }

export type Aggregation = Record<
string,
{
date_histogram: {
field: string;
fixed_interval: string;
};
aggregations?: { [key: string]: any };
aggs?: { [key: string]: any };
}
>;
export type Aggregation = Record<string, estypes.AggregationContainer>;

export type IndicesOptions = estypes.IndicesOptions;
// export interface IndicesOptions {
Expand Down
7 changes: 0 additions & 7 deletions x-pack/plugins/ml/common/util/datafeed_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,3 @@ export const getDatafeedAggregations = (
): Aggregation | undefined => {
return getAggregations<Aggregation>(datafeedConfig);
};

export const getAggregationBucketsName = (aggregations: any): string | undefined => {
if (aggregations !== null && typeof aggregations === 'object') {
const keys = Object.keys(aggregations);
return keys.length > 0 ? keys[0] : undefined;
}
};
117 changes: 72 additions & 45 deletions x-pack/plugins/ml/common/util/job_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import { each, isEmpty, isEqual, pick } from 'lodash';
import semverGte from 'semver/functions/gte';
import moment, { Duration } from 'moment';
import type { estypes } from '@elastic/elasticsearch';
// @ts-ignore
import numeral from '@elastic/numeral';

import { i18n } from '@kbn/i18n';
import { ALLOWED_DATA_UNITS, JOB_ID_MAX_LENGTH } from '../constants/validation';
import { parseInterval } from './parse_interval';
Expand All @@ -22,13 +22,9 @@ import { MlServerLimits } from '../types/ml_server_info';
import { JobValidationMessage, JobValidationMessageId } from '../constants/messages';
import { ES_AGGREGATION, ML_JOB_AGGREGATION } from '../constants/aggregation_types';
import { MLCATEGORY } from '../constants/field_types';
import {
getAggregationBucketsName,
getAggregations,
getDatafeedAggregations,
} from './datafeed_utils';
import { getAggregations, getDatafeedAggregations } from './datafeed_utils';
import { findAggField } from './validation_utils';
import { isPopulatedObject } from './object_utils';
import { getFirstKeyInObject, isPopulatedObject } from './object_utils';
import { isDefined } from '../types/guards';

export interface ValidationResults {
Expand All @@ -52,14 +48,6 @@ export function calculateDatafeedFrequencyDefaultSeconds(bucketSpanSeconds: numb
return freq;
}

export function hasRuntimeMappings(job: CombinedJob): boolean {
const hasDatafeed = isPopulatedObject(job.datafeed_config);
if (hasDatafeed) {
return isPopulatedObject(job.datafeed_config.runtime_mappings);
}
return false;
}

export function isTimeSeriesViewJob(job: CombinedJob): boolean {
return getSingleMetricViewerJobErrorMessage(job) === undefined;
}
Expand All @@ -85,6 +73,34 @@ export function isMappableJob(job: CombinedJob, detectorIndex: number): boolean
return isMappable;
}

/**
* Validates that composite definition only have sources that are only terms and date_histogram
* if composite is defined.
* @param buckets
*/
export function hasValidComposite(buckets: estypes.AggregationContainer) {
if (
isPopulatedObject(buckets, ['composite']) &&
isPopulatedObject(buckets.composite, ['sources']) &&
Array.isArray(buckets.composite.sources)
) {
const sources = buckets.composite.sources;
return !sources.some((source) => {
const sourceName = getFirstKeyInObject(source);
if (sourceName !== undefined && isPopulatedObject(source[sourceName])) {
const sourceTypes = Object.keys(source[sourceName]);
return (
sourceTypes.length === 1 &&
sourceTypes[0] !== 'date_histogram' &&
sourceTypes[0] !== 'terms'
);
}
return false;
});
}
return true;
}

// Returns a flag to indicate whether the source data can be plotted in a time
// series chart for the specified detector.
export function isSourceDataChartableForDetector(job: CombinedJob, detectorIndex: number): boolean {
Expand All @@ -105,42 +121,42 @@ export function isSourceDataChartableForDetector(job: CombinedJob, detectorIndex
dtr.partition_field_name !== MLCATEGORY &&
dtr.over_field_name !== MLCATEGORY;

// If the datafeed uses script fields, we can only plot the time series if
// model plot is enabled. Without model plot it will be very difficult or impossible
// to invert to a reverse search of the underlying metric data.
if (
isSourceDataChartable === true &&
job.datafeed_config?.script_fields !== null &&
typeof job.datafeed_config?.script_fields === 'object'
) {
const hasDatafeed = isPopulatedObject(job.datafeed_config);

if (isSourceDataChartable && hasDatafeed) {
// Perform extra check to see if the detector is using a scripted field.
const scriptFields = Object.keys(job.datafeed_config.script_fields);
isSourceDataChartable =
scriptFields.indexOf(dtr.partition_field_name!) === -1 &&
scriptFields.indexOf(dtr.by_field_name!) === -1 &&
scriptFields.indexOf(dtr.over_field_name!) === -1;
}
if (isPopulatedObject(job.datafeed_config.script_fields)) {
// If the datafeed uses script fields, we can only plot the time series if
// model plot is enabled. Without model plot it will be very difficult or impossible
// to invert to a reverse search of the underlying metric data.

const scriptFields = Object.keys(job.datafeed_config.script_fields);
return (
scriptFields.indexOf(dtr.partition_field_name!) === -1 &&
scriptFields.indexOf(dtr.by_field_name!) === -1 &&
scriptFields.indexOf(dtr.over_field_name!) === -1
);
}

const hasDatafeed = isPopulatedObject(job.datafeed_config);
if (hasDatafeed) {
// We cannot plot the source data for some specific aggregation configurations
const aggs = getDatafeedAggregations(job.datafeed_config);
if (aggs !== undefined) {
const aggBucketsName = getAggregationBucketsName(aggs);
if (isPopulatedObject(aggs)) {
const aggBucketsName = getFirstKeyInObject(aggs);
if (aggBucketsName !== undefined) {
// if fieldName is a aggregated field under nested terms using bucket_script
const aggregations = getAggregations<{ [key: string]: any }>(aggs[aggBucketsName]) ?? {};
// if fieldName is an aggregated field under nested terms using bucket_script
const aggregations =
getAggregations<estypes.AggregationContainer>(aggs[aggBucketsName]) ?? {};
const foundField = findAggField(aggregations, dtr.field_name, false);
if (foundField?.bucket_script !== undefined) {
return false;
}

// composite sources should be terms and date_histogram only for now
return hasValidComposite(aggregations);
}
}

// We also cannot plot the source data if they datafeed uses any field defined by runtime_mappings
if (hasRuntimeMappings(job)) {
return false;
}
return true;
}
}

Expand Down Expand Up @@ -180,11 +196,22 @@ export function isModelPlotChartableForDetector(job: Job, detectorIndex: number)
// Returns a reason to indicate why the job configuration is not supported
// if the result is undefined, that means the single metric job should be viewable
export function getSingleMetricViewerJobErrorMessage(job: CombinedJob): string | undefined {
// if job has runtime mappings with no model plot
if (hasRuntimeMappings(job) && !job.model_plot_config?.enabled) {
return i18n.translate('xpack.ml.timeSeriesJob.jobWithRunTimeMessage', {
defaultMessage: 'the datafeed contains runtime fields and model plot is disabled',
});
// if job has at least one composite source that is not terms or date_histogram
const aggs = getDatafeedAggregations(job.datafeed_config);
if (isPopulatedObject(aggs)) {
const aggBucketsName = getFirstKeyInObject(aggs);
if (aggBucketsName !== undefined && aggs[aggBucketsName] !== undefined) {
// if fieldName is an aggregated field under nested terms using bucket_script

if (!hasValidComposite(aggs[aggBucketsName])) {
return i18n.translate(
'xpack.ml.timeSeriesJob.jobWithUnsupportedCompositeAggregationMessage',
{
defaultMessage: 'Disabled because the datafeed contains unsupported composite sources.',
}
);
}
}
}
// only allow jobs with at least one detector whose function corresponds to
// an ES aggregation which can be viewed in the single metric view and which
Expand All @@ -196,7 +223,7 @@ export function getSingleMetricViewerJobErrorMessage(job: CombinedJob): string |

if (isChartableTimeSeriesViewJob === false) {
return i18n.translate('xpack.ml.timeSeriesJob.notViewableTimeSeriesJobMessage', {
defaultMessage: 'not a viewable time series job',
defaultMessage: 'Disabled because not a viewable time series job.',
});
}
}
Expand Down
16 changes: 15 additions & 1 deletion x-pack/plugins/ml/common/util/object_utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { isPopulatedObject } from './object_utils';
import { getFirstKeyInObject, isPopulatedObject } from './object_utils';

describe('object_utils', () => {
describe('isPopulatedObject()', () => {
Expand Down Expand Up @@ -47,4 +47,18 @@ describe('object_utils', () => {
).toBe(false);
});
});

describe('getFirstKeyInObject()', () => {
it('gets the first key in object', () => {
expect(getFirstKeyInObject({ attribute1: 'value', attribute2: 'value2' })).toBe('attribute1');
});

it('returns undefined with invalid argument', () => {
expect(getFirstKeyInObject(undefined)).toBe(undefined);
expect(getFirstKeyInObject(null)).toBe(undefined);
expect(getFirstKeyInObject({})).toBe(undefined);
expect(getFirstKeyInObject('value')).toBe(undefined);
expect(getFirstKeyInObject(5)).toBe(undefined);
});
});
});
11 changes: 11 additions & 0 deletions x-pack/plugins/ml/common/util/object_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,14 @@ export const isPopulatedObject = <U extends string = string>(
requiredAttributes.every((d) => ({}.hasOwnProperty.call(arg, d))))
);
};

/**
* Get the first key in the object
* getFirstKeyInObject({ firstKey: {}, secondKey: {}}) -> firstKey
*/
export const getFirstKeyInObject = (arg: unknown): string | undefined => {
if (isPopulatedObject(arg)) {
const keys = Object.keys(arg);
return keys.length > 0 ? keys[0] : undefined;
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ export function ResultLinks({ jobs }) {
const singleMetricDisabledMessage =
jobs.length === 1 && jobs[0].isNotSingleMetricViewerJobMessage;

const singleMetricDisabledMessageText =
singleMetricDisabledMessage !== undefined
? i18n.translate('xpack.ml.jobsList.resultActions.singleMetricDisabledMessageText', {
defaultMessage: 'Disabled because {reason}.',
values: {
reason: singleMetricDisabledMessage,
},
})
: undefined;

const jobActionsDisabled = jobs.length === 1 && jobs[0].deleting === true;
const { createLinkWithUserDefaults } = useCreateADLinks();
const timeSeriesExplorerLink = useMemo(
Expand All @@ -62,7 +52,7 @@ export function ResultLinks({ jobs }) {
{singleMetricVisible && (
<EuiToolTip
position="bottom"
content={singleMetricDisabledMessageText ?? openJobsInSingleMetricViewerText}
content={singleMetricDisabledMessage ?? openJobsInSingleMetricViewerText}
>
<EuiButtonIcon
href={timeSeriesExplorerLink}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@ import { parseInterval } from '../../../../../../common/util/parse_interval';
import { Calendar } from '../../../../../../common/types/calendars';
import { mlCalendarService } from '../../../../services/calendar_service';
import { IndexPattern } from '../../../../../../../../../src/plugins/data/public';
import {
getAggregationBucketsName,
getDatafeedAggregations,
} from '../../../../../../common/util/datafeed_utils';
import { getDatafeedAggregations } from '../../../../../../common/util/datafeed_utils';
import { getFirstKeyInObject } from '../../../../../../common/util/object_utils';

export class JobCreator {
protected _type: JOB_TYPE = JOB_TYPE.SINGLE_METRIC;
Expand Down Expand Up @@ -786,7 +784,7 @@ export class JobCreator {
this._aggregationFields = [];
const aggs = getDatafeedAggregations(this._datafeed_config);
if (aggs !== undefined) {
const aggBucketsName = getAggregationBucketsName(aggs);
const aggBucketsName = getFirstKeyInObject(aggs);
if (aggBucketsName !== undefined && aggs[aggBucketsName] !== undefined) {
const buckets = aggs[aggBucketsName];
collectAggs(buckets, this._aggregationFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,9 +749,11 @@ export class AnomalyExplorerChartsService {
// plus anomalyScore for points with anomaly markers.
let chartData: ChartPoint[] = [];
if (metricData !== undefined) {
if (eventDistribution.length > 0 && records.length > 0) {
if (records.length > 0) {
const filterField = records[0].by_field_value || records[0].over_field_value;
chartData = eventDistribution.filter((d: { entity: any }) => d.entity !== filterField);
if (eventDistribution.length > 0) {
chartData = eventDistribution.filter((d: { entity: any }) => d.entity !== filterField);
}
map(metricData, (value, time) => {
// The filtering for rare/event_distribution charts needs to be handled
// differently because of how the source data is structured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { ES_AGGREGATION } from '../../../../common/constants/aggregation_types';
import { isPopulatedObject } from '../../../../common/util/object_utils';
import { InfluencersFilterQuery } from '../../../../common/types/es_client';
import { RecordForInfluencer } from './results_service';
import { isRuntimeMappings } from '../../../../common';

interface ResultResponse {
success: boolean;
Expand Down Expand Up @@ -140,9 +141,7 @@ export function resultsServiceRxProvider(mlApiServices: MlApiServices) {
},
},
size: 0,
_source: {
excludes: [],
},
_source: false,
aggs: {
byTime: {
date_histogram: {
Expand All @@ -152,6 +151,9 @@ export function resultsServiceRxProvider(mlApiServices: MlApiServices) {
},
},
},
...(isRuntimeMappings(datafeedConfig?.runtime_mappings)
? { runtime_mappings: datafeedConfig?.runtime_mappings }
: {}),
};

if (shouldCriteria.length > 0) {
Expand Down
4 changes: 1 addition & 3 deletions x-pack/plugins/translations/translations/ja-JP.json
Original file line number Diff line number Diff line change
Expand Up @@ -14207,7 +14207,6 @@
"xpack.ml.jobsList.refreshButtonLabel": "更新",
"xpack.ml.jobsList.resultActions.openJobsInAnomalyExplorerText": "{jobsCount, plural, one {{jobId}} other {# 件のジョブ}} を異常エクスプローラーで開く",
"xpack.ml.jobsList.resultActions.openJobsInSingleMetricViewerText": "シングルメトリックビューアーで {jobsCount, plural, one {{jobId}} other {# 件のジョブ}} を開く",
"xpack.ml.jobsList.resultActions.singleMetricDisabledMessageText": "{reason}のため無効です。",
"xpack.ml.jobsList.selectRowForJobMessage": "ジョブID {jobId} の行を選択",
"xpack.ml.jobsList.showDetailsColumn.screenReaderDescription": "このカラムには各ジョブの詳細を示すクリック可能なコントロールが含まれます",
"xpack.ml.jobsList.spacesLabel": "スペース",
Expand Down Expand Up @@ -15074,7 +15073,6 @@
"xpack.ml.timeSeriesExplorer.timeSeriesChart.zoomLabel": "ズーム:",
"xpack.ml.timeSeriesExplorer.tryWideningTheTimeSelectionDescription": "時間範囲を広げるか、さらに過去に遡ってみてください。",
"xpack.ml.timeSeriesExplorer.youCanViewOneJobAtTimeWarningMessage": "このダッシュボードでは 1 度に 1 つのジョブしか表示できません",
"xpack.ml.timeSeriesJob.jobWithRunTimeMessage": "データフィードにはランタイムフィールドが含まれ、モデルプロットが無効です",
"xpack.ml.timeSeriesJob.notViewableTimeSeriesJobMessage": "表示可能な時系列ジョブではありません",
"xpack.ml.timeSeriesJob.sourceDataModelPlotNotChartableMessage": "この検出器ではソースデータとモデルプロットの両方をグラフ化できません",
"xpack.ml.timeSeriesJob.sourceDataNotChartableWithDisabledModelPlotMessage": "この検出器ではソースデータを表示できません。モデルプロットが無効です",
Expand Down Expand Up @@ -23570,4 +23568,4 @@
"xpack.watcher.watchEdit.thresholdWatchExpression.aggType.fieldIsRequiredValidationMessage": "フィールドを選択してください。",
"xpack.watcher.watcherDescription": "アラートの作成、管理、監視によりデータへの変更を検知します。"
}
}
}
4 changes: 1 addition & 3 deletions x-pack/plugins/translations/translations/zh-CN.json
Original file line number Diff line number Diff line change
Expand Up @@ -14404,7 +14404,6 @@
"xpack.ml.jobsList.refreshButtonLabel": "刷新",
"xpack.ml.jobsList.resultActions.openJobsInAnomalyExplorerText": "在 Anomaly Explorer 中打开 {jobsCount, plural, one {{jobId}} other {# 个作业}}",
"xpack.ml.jobsList.resultActions.openJobsInSingleMetricViewerText": "在 Single Metric Viewer 中打开 {jobsCount, plural, one {{jobId}} other {# 个作业}}",
"xpack.ml.jobsList.resultActions.singleMetricDisabledMessageText": "由于{reason},已禁用。",
"xpack.ml.jobsList.selectRowForJobMessage": "选择作业 ID {jobId} 的行",
"xpack.ml.jobsList.showDetailsColumn.screenReaderDescription": "此列包含可单击控件,用于显示每个作业的更多详情",
"xpack.ml.jobsList.spacesLabel": "工作区",
Expand Down Expand Up @@ -15292,7 +15291,6 @@
"xpack.ml.timeSeriesExplorer.timeSeriesChart.zoomLabel": "缩放:",
"xpack.ml.timeSeriesExplorer.tryWideningTheTimeSelectionDescription": "请尝试扩大时间选择范围或进一步向后追溯。",
"xpack.ml.timeSeriesExplorer.youCanViewOneJobAtTimeWarningMessage": "在此仪表板中,一次仅可以查看一个作业",
"xpack.ml.timeSeriesJob.jobWithRunTimeMessage": "数据馈送包含运行时字段,模型绘图已禁用",
"xpack.ml.timeSeriesJob.notViewableTimeSeriesJobMessage": "不是可查看的时间序列作业",
"xpack.ml.timeSeriesJob.sourceDataModelPlotNotChartableMessage": "此检测器的源数据和模型绘图均无法绘制",
"xpack.ml.timeSeriesJob.sourceDataNotChartableWithDisabledModelPlotMessage": "此检测器的源数据无法查看,且模型绘图处于禁用状态",
Expand Down Expand Up @@ -23939,4 +23937,4 @@
"xpack.watcher.watchEdit.thresholdWatchExpression.aggType.fieldIsRequiredValidationMessage": "此字段必填。",
"xpack.watcher.watcherDescription": "通过创建、管理和监测警报来检测数据中的更改。"
}
}
}

0 comments on commit e7f5d07

Please sign in to comment.