Skip to content

Commit

Permalink
[ML] Module setup with dynamic model memory estimation (elastic#60656)
Browse files Browse the repository at this point in the history
* [ML] add estimateModelMemory to the setup endpoint

* [ML] wip caching cardinality checks

* [ML] refactor

* [ML] fix a fallback time range

* [ML] fix typing issue

* [ML] fields_aggs_cache.ts as part of fields_service

* [ML] fix types, add comments

* [ML] check for MML overrides

* [ML] disable estimateModelMemory

* [ML] fix typing

* [ML] check for empty max mml

* [ML] refactor, update types, fix jobsForModelMemoryEstimation

* [ML] fix override lookup

* [ML] resolve nit comments

* [ML] init jobsForModelMemoryEstimation
  • Loading branch information
darnautov committed Mar 24, 2020
1 parent 908fb78 commit b45ff40
Show file tree
Hide file tree
Showing 10 changed files with 443 additions and 194 deletions.
10 changes: 8 additions & 2 deletions x-pack/plugins/ml/common/types/modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ export interface DataRecognizerConfigResponse {
};
}

export type GeneralOverride = any;

export type JobOverride = Partial<Job>;
export type GeneralJobsOverride = Omit<JobOverride, 'job_id'>;
export type JobSpecificOverride = JobOverride & { job_id: Job['job_id'] };

export function isGeneralJobOverride(override: JobOverride): override is GeneralJobsOverride {
return override.job_id === undefined;
}

export type GeneralDatafeedsOverride = Partial<Omit<Datafeed, 'job_id' | 'datafeed_id'>>;

export type DatafeedOverride = Partial<Datafeed>;
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export const Page: FC<PageProps> = ({ moduleId, existingGroupIds }) => {
startDatafeed: startDatafeedAfterSave,
...(jobOverridesPayload !== null ? { jobOverrides: jobOverridesPayload } : {}),
...resultTimeRange,
estimateModelMemory: false,
});
const { datafeeds: datafeedsResponse, jobs: jobsResponse, kibana: kibanaResponse } = response;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ export const ml = {
start,
end,
jobOverrides,
estimateModelMemory,
}: {
moduleId: string;
prefix?: string;
Expand All @@ -378,6 +379,7 @@ export const ml = {
start?: number;
end?: number;
jobOverrides?: Array<Partial<Job>>;
estimateModelMemory?: boolean;
}) {
const body = JSON.stringify({
prefix,
Expand All @@ -389,6 +391,7 @@ export const ml = {
start,
end,
jobOverrides,
estimateModelMemory,
});

return http<DataRecognizerConfigResponse>({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numeral from '@elastic/numeral';
import { APICaller } from 'kibana/server';
import { MLCATEGORY } from '../../../common/constants/field_types';
import { AnalysisConfig } from '../../../common/types/anomaly_detection_jobs';
import { fieldsServiceProvider } from '../fields_service';

Expand Down Expand Up @@ -34,92 +35,96 @@ export interface ModelMemoryEstimate {
/**
* Retrieves overall and max bucket cardinalities.
*/
async function getCardinalities(
callAsCurrentUser: APICaller,
analysisConfig: AnalysisConfig,
indexPattern: string,
query: any,
timeFieldName: string,
earliestMs: number,
latestMs: number
): Promise<{
overallCardinality: { [key: string]: number };
maxBucketCardinality: { [key: string]: number };
}> {
/**
* Fields not involved in cardinality check
*/
const excludedKeywords = new Set<string>(
/**
* The keyword which is used to mean the output of categorization,
* so it will have cardinality zero in the actual input data.
*/
'mlcategory'
);

const cardinalityCheckProvider = (callAsCurrentUser: APICaller) => {
const fieldsService = fieldsServiceProvider(callAsCurrentUser);

const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig;

let overallCardinality = {};
let maxBucketCardinality = {};
const overallCardinalityFields: Set<string> = detectors.reduce(
(
acc,
{
by_field_name: byFieldName,
partition_field_name: partitionFieldName,
over_field_name: overFieldName,
}
) => {
[byFieldName, partitionFieldName, overFieldName]
.filter(field => field !== undefined && field !== '' && !excludedKeywords.has(field))
.forEach(key => {
acc.add(key as string);
});
return acc;
},
new Set<string>()
);

const maxBucketFieldCardinalities: string[] = influencers.filter(
influencerField =>
typeof influencerField === 'string' &&
!excludedKeywords.has(influencerField) &&
!!influencerField &&
!overallCardinalityFields.has(influencerField)
) as string[];

if (overallCardinalityFields.size > 0) {
overallCardinality = await fieldsService.getCardinalityOfFields(
indexPattern,
[...overallCardinalityFields],
query,
timeFieldName,
earliestMs,
latestMs
return async (
analysisConfig: AnalysisConfig,
indexPattern: string,
query: any,
timeFieldName: string,
earliestMs: number,
latestMs: number
): Promise<{
overallCardinality: { [key: string]: number };
maxBucketCardinality: { [key: string]: number };
}> => {
/**
* Fields not involved in cardinality check
*/
const excludedKeywords = new Set<string>(
/**
* The keyword which is used to mean the output of categorization,
* so it will have cardinality zero in the actual input data.
*/
MLCATEGORY
);
}

if (maxBucketFieldCardinalities.length > 0) {
maxBucketCardinality = await fieldsService.getMaxBucketCardinalities(
indexPattern,
maxBucketFieldCardinalities,
query,
timeFieldName,
earliestMs,
latestMs,
bucketSpan
const { detectors, influencers, bucket_span: bucketSpan } = analysisConfig;

let overallCardinality = {};
let maxBucketCardinality = {};

// Get fields required for the model memory estimation
const overallCardinalityFields: Set<string> = detectors.reduce(
(
acc,
{
by_field_name: byFieldName,
partition_field_name: partitionFieldName,
over_field_name: overFieldName,
}
) => {
[byFieldName, partitionFieldName, overFieldName]
.filter(field => field !== undefined && field !== '' && !excludedKeywords.has(field))
.forEach(key => {
acc.add(key as string);
});
return acc;
},
new Set<string>()
);
}

return {
overallCardinality,
maxBucketCardinality,
const maxBucketFieldCardinalities: string[] = influencers.filter(
influencerField =>
!!influencerField &&
!excludedKeywords.has(influencerField) &&
!overallCardinalityFields.has(influencerField)
) as string[];

if (overallCardinalityFields.size > 0) {
overallCardinality = await fieldsService.getCardinalityOfFields(
indexPattern,
[...overallCardinalityFields],
query,
timeFieldName,
earliestMs,
latestMs
);
}

if (maxBucketFieldCardinalities.length > 0) {
maxBucketCardinality = await fieldsService.getMaxBucketCardinalities(
indexPattern,
maxBucketFieldCardinalities,
query,
timeFieldName,
earliestMs,
latestMs,
bucketSpan
);
}

return {
overallCardinality,
maxBucketCardinality,
};
};
}
};

export function calculateModelMemoryLimitProvider(callAsCurrentUser: APICaller) {
const getCardinalities = cardinalityCheckProvider(callAsCurrentUser);

/**
* Retrieves an estimated size of the model memory limit used in the job config
* based on the cardinality of the fields being used to split the data
Expand All @@ -145,7 +150,6 @@ export function calculateModelMemoryLimitProvider(callAsCurrentUser: APICaller)
}

const { overallCardinality, maxBucketCardinality } = await getCardinalities(
callAsCurrentUser,
analysisConfig,
indexPattern,
query,
Expand Down
Loading

0 comments on commit b45ff40

Please sign in to comment.