From eceb500e78646813278262912c6a353469860658 Mon Sep 17 00:00:00 2001 From: Dario Gieselaar Date: Thu, 27 Aug 2020 09:55:22 +0200 Subject: [PATCH] [APM] Use observer.hostname instead of observer.name `observer.name` is not guaranteed to be set. Using `observer.hostname` _should_ allow us to approximate the amount of APM Server instances. Additionally, some of the logic was rewritten to A) reflect updates to APM Server's implementation, and B) make it more robust by querying a static date range. --- .../elasticsearch_fieldnames.test.ts.snap | 12 +- .../apm/common/elasticsearch_fieldnames.ts | 2 +- .../collect_data_telemetry/tasks.test.ts | 135 +++++------ .../collect_data_telemetry/tasks.ts | 226 ++++++++++-------- .../apm/typings/elasticsearch/aggregations.ts | 7 +- 5 files changed, 193 insertions(+), 189 deletions(-) diff --git a/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap b/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap index 7c42fb6f12a54..d9e9fe4a827d7 100644 --- a/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap +++ b/x-pack/plugins/apm/common/__snapshots__/elasticsearch_fieldnames.test.ts.snap @@ -68,9 +68,9 @@ exports[`Error METRIC_SYSTEM_FREE_MEMORY 1`] = `undefined`; exports[`Error METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; -exports[`Error OBSERVER_LISTENING 1`] = `undefined`; +exports[`Error OBSERVER_HOSTNAME 1`] = `undefined`; -exports[`Error OBSERVER_NAME 1`] = `"an observer"`; +exports[`Error OBSERVER_LISTENING 1`] = `undefined`; exports[`Error OBSERVER_VERSION_MAJOR 1`] = `8`; @@ -216,9 +216,9 @@ exports[`Span METRIC_SYSTEM_FREE_MEMORY 1`] = `undefined`; exports[`Span METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; -exports[`Span OBSERVER_LISTENING 1`] = `undefined`; +exports[`Span OBSERVER_HOSTNAME 1`] = `undefined`; -exports[`Span OBSERVER_NAME 1`] = `"an observer"`; +exports[`Span OBSERVER_LISTENING 1`] = `undefined`; exports[`Span OBSERVER_VERSION_MAJOR 1`] = `8`; @@ -364,9 +364,9 @@ exports[`Transaction METRIC_SYSTEM_FREE_MEMORY 1`] = `undefined`; exports[`Transaction METRIC_SYSTEM_TOTAL_MEMORY 1`] = `undefined`; -exports[`Transaction OBSERVER_LISTENING 1`] = `undefined`; +exports[`Transaction OBSERVER_HOSTNAME 1`] = `undefined`; -exports[`Transaction OBSERVER_NAME 1`] = `"an observer"`; +exports[`Transaction OBSERVER_LISTENING 1`] = `undefined`; exports[`Transaction OBSERVER_VERSION_MAJOR 1`] = `8`; diff --git a/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts b/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts index 610a32e8e9b99..084fdabe1e297 100644 --- a/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts +++ b/x-pack/plugins/apm/common/elasticsearch_fieldnames.ts @@ -31,7 +31,7 @@ export const USER_AGENT_NAME = 'user_agent.name'; export const DESTINATION_ADDRESS = 'destination.address'; -export const OBSERVER_NAME = 'observer.name'; +export const OBSERVER_HOSTNAME = 'observer.hostname'; export const OBSERVER_VERSION_MAJOR = 'observer.version_major'; export const OBSERVER_LISTENING = 'observer.listening'; export const PROCESSOR_EVENT = 'processor.event'; diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts index 9d06fc2ad9309..00eac42b8b2f3 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.test.ts @@ -4,7 +4,6 @@ * you may not use this file except in compliance with the Elastic License. */ -import { AGENT_NAME } from '../../../../common/elasticsearch_fieldnames'; import { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices'; import { tasks } from './tasks'; @@ -21,100 +20,76 @@ describe('data telemetry collection tasks', () => { describe('aggregated_transactions', () => { const task = tasks.find((t) => t.name === 'aggregated_transactions'); - it('returns aggregated transaction counts', async () => { - // This mock implementation returns different values based on the parameters, - // which should simulate all the queries that are done. For most of them we'll - // simulate the number of buckets by using the length of the key, but for a - // couple we'll simulate being paginated by returning an after_key. - const search = jest.fn().mockImplementation((params) => { - const isRumResult = - params.body.query.bool.filter && - params.body.query.bool.filter.some( - (filter: any) => - filter.terms && filter.terms[AGENT_NAME]?.includes('rum-js') - ); - const isNonRumResult = - params.body.query.bool.filter && - params.body.query.bool.filter.some( - (filter: any) => - filter.terms && !filter.terms[AGENT_NAME]?.includes('rum-js') - ); - const isPagedResult = - !!params.body.aggs?.current_implementation?.composite.after || - !!params.body.aggs?.no_observer_name?.composite.after; - const isTotalResult = 'track_total_hits' in params.body; - const key = Object.keys(params.body.aggs ?? [])[0]; - - if (isRumResult) { - if (isTotalResult) { - return Promise.resolve({ hits: { total: { value: 3000 } } }); - } - } - - if (isNonRumResult) { - if (isTotalResult) { - return Promise.resolve({ hits: { total: { value: 2000 } } }); - } - } + describe('without transactions', () => { + it('returns an empty result', async () => { + const search = jest.fn().mockReturnValueOnce({ + hits: { + hits: [], + total: { + value: 0, + }, + }, + }); - if (isPagedResult && key) { - return Promise.resolve({ - hits: { total: { value: key.length } }, - aggregations: { [key]: { buckets: [{}] } }, - }); - } + expect(await task?.executor({ indices, search } as any)).toEqual({}); + }); + }); - if (isTotalResult) { - return Promise.resolve({ hits: { total: { value: 1000 } } }); - } + it('returns aggregated transaction counts', async () => { + const search = jest + .fn() + // The first call to `search` asks for a transaction to get + // a fixed date range. + .mockReturnValueOnce({ + hits: { + hits: [{ _source: { '@timestamp': new Date().toISOString() } }], + }, + total: { + value: 1, + }, + }) + // Later calls are all composite aggregations. We return 2 pages of + // results to test if scrolling works. + .mockImplementation((params) => { + let arrayLength = 1000; + let nextAfter: Record = { after_key: {} }; + + if (params.body.aggs.transaction_metric_groups.composite.after) { + arrayLength = 250; + nextAfter = {}; + } - if ( - key === 'current_implementation' || - (key === 'no_observer_name' && !isPagedResult) - ) { return Promise.resolve({ - hits: { total: { value: key.length } }, - aggregations: { - [key]: { after_key: {}, buckets: key.split('').map((_) => ({})) }, + hits: { + total: { + value: 5000, + }, }, - }); - } - - if (key) { - return Promise.resolve({ - hits: { total: { value: key.length } }, aggregations: { - [key]: { buckets: key.split('').map((_) => ({})) }, + transaction_metric_groups: { + buckets: new Array(arrayLength), + ...nextAfter, + }, }, }); - } - }); + }); expect(await task?.executor({ indices, search } as any)).toEqual({ aggregated_transactions: { current_implementation: { - expected_metric_document_count: 23, - transaction_count: 1000, + expected_metric_document_count: 1250, + transaction_count: 5000, + ratio: 0.25, }, no_observer_name: { - expected_metric_document_count: 17, - transaction_count: 1000, - }, - no_rum: { - expected_metric_document_count: 6, - transaction_count: 2000, - }, - no_rum_no_observer_name: { - expected_metric_document_count: 23, - transaction_count: 2000, - }, - only_rum: { - expected_metric_document_count: 8, - transaction_count: 3000, + expected_metric_document_count: 1250, + transaction_count: 5000, + ratio: 0.25, }, - only_rum_no_observer_name: { - expected_metric_document_count: 25, - transaction_count: 3000, + with_country: { + expected_metric_document_count: 1250, + transaction_count: 5000, + ratio: 0.25, }, }, }); diff --git a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts index 840f47b043418..dc20252b19167 100644 --- a/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts +++ b/x-pack/plugins/apm/server/lib/apm_telemetry/collect_data_telemetry/tasks.ts @@ -3,7 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ +import { ValuesType } from 'utility-types'; import { flatten, merge, sortBy, sum } from 'lodash'; +import { AggregationOptionsByType } from '../../../../typings/elasticsearch/aggregations'; +import { ProcessorEvent } from '../../../../common/processor_event'; import { TelemetryTask } from '.'; import { AGENT_NAMES, RUM_AGENTS } from '../../../../common/agent_name'; import { @@ -16,7 +19,7 @@ import { CONTAINER_ID, ERROR_GROUP_ID, HOST_NAME, - OBSERVER_NAME, + OBSERVER_HOSTNAME, PARENT_ID, POD_NAME, PROCESSOR_EVENT, @@ -32,10 +35,8 @@ import { TRANSACTION_NAME, TRANSACTION_RESULT, TRANSACTION_TYPE, - USER_AGENT_NAME, USER_AGENT_ORIGINAL, } from '../../../../common/elasticsearch_fieldnames'; -import { ESFilter } from '../../../../typings/elasticsearch'; import { APMError } from '../../../../typings/es_schemas/ui/apm_error'; import { AgentName } from '../../../../typings/es_schemas/ui/fields/agent'; import { Span } from '../../../../typings/es_schemas/ui/span'; @@ -57,79 +58,114 @@ export const tasks: TelemetryTask[] = [ // the transaction count for that time range. executor: async ({ indices, search }) => { async function getBucketCountFromPaginatedQuery( - key: string, - filter: ESFilter[], - count: number = 0, + sources: Array< + ValuesType[string] + >, + prevResult?: { + transaction_count: number; + expected_metric_document_count: number; + }, after?: any - ) { + ): Promise<{ + transaction_count: number; + expected_metric_document_count: number; + ratio: number; + }> { + // eslint-disable-next-line @typescript-eslint/naming-convention + let { expected_metric_document_count } = prevResult ?? { + transaction_count: 0, + expected_metric_document_count: 0, + }; + const params = { index: [indices['apm_oss.transactionIndices']], body: { size: 0, timeout, - query: { bool: { filter } }, + query: { + bool: { + filter: [ + { term: { [PROCESSOR_EVENT]: 'transaction' } }, + { range: { '@timestamp': { gte: start, lt: end } } }, + ], + }, + }, + track_total_hits: true, aggs: { - [key]: { + transaction_metric_groups: { composite: { ...(after ? { after } : {}), size: 10000, - sources: fieldMap[key].map((field) => ({ - [field]: { terms: { field, missing_bucket: true } }, - })), + sources: sources.map((source, index) => { + return { + [index]: source, + }; + }), }, }, }, }, }; + const result = await search(params); + let nextAfter: any; if (result.aggregations) { - nextAfter = result.aggregations[key].after_key; - count += result.aggregations[key].buckets.length; + nextAfter = result.aggregations.transaction_metric_groups.after_key; + expected_metric_document_count += + result.aggregations.transaction_metric_groups.buckets.length; } if (nextAfter) { - count = await getBucketCountFromPaginatedQuery( - key, - filter, - count, + return await getBucketCountFromPaginatedQuery( + sources, + { + expected_metric_document_count, + transaction_count: result.hits.total.value, + }, nextAfter ); } - return count; + return { + expected_metric_document_count, + transaction_count: result.hits.total.value, + ratio: expected_metric_document_count / result.hits.total.value, + }; } - async function totalSearch(filter: ESFilter[]) { - const result = await search({ - index: [indices['apm_oss.transactionIndices']], + // fixed date range for reliable results + const lastTransaction = ( + await search({ + index: indices['apm_oss.transactionIndices'], body: { - size: 0, - timeout, - query: { bool: { filter } }, - track_total_hits: true, + query: { + bool: { + filter: [ + { term: { [PROCESSOR_EVENT]: ProcessorEvent.transaction } }, + ], + }, + }, + size: 1, + sort: { + '@timestamp': 'desc', + }, }, - }); + }) + ).hits.hits[0] as { _source: { '@timestamp': string } }; - return result.hits.total.value; + if (!lastTransaction) { + return {}; } - const nonRumAgentNames = AGENT_NAMES.filter( - (name) => !RUM_AGENTS.includes(name) - ); + const end = + new Date(lastTransaction._source['@timestamp']).getTime() - + 5 * 60 * 1000; - const filter: ESFilter[] = [ - { term: { [PROCESSOR_EVENT]: 'transaction' } }, - { range: { '@timestamp': { gte: 'now-1m' } } }, - ]; - const noRumFilter = [ - ...filter, - { terms: { [AGENT_NAME]: nonRumAgentNames } }, - ]; - const rumFilter = [...filter, { terms: { [AGENT_NAME]: RUM_AGENTS } }]; + const start = end - 60 * 1000; - const baseFields = [ + const simpleTermFields = [ TRANSACTION_NAME, TRANSACTION_RESULT, TRANSACTION_TYPE, @@ -139,73 +175,61 @@ export const tasks: TelemetryTask[] = [ HOST_NAME, CONTAINER_ID, POD_NAME, - ]; + ].map((field) => ({ terms: { field, missing_bucket: true } })); - const fieldMap: Record = { - current_implementation: [OBSERVER_NAME, ...baseFields, USER_AGENT_NAME], - no_observer_name: [...baseFields, USER_AGENT_NAME], - no_rum: [OBSERVER_NAME, ...baseFields], - no_rum_no_observer_name: baseFields, - only_rum: [OBSERVER_NAME, ...baseFields, USER_AGENT_NAME], - only_rum_no_observer_name: [...baseFields, USER_AGENT_NAME], + const observerHostname = { + terms: { field: OBSERVER_HOSTNAME, missing_bucket: true }, }; - // It would be more performant to do these in parallel, but we have different filters and keys and it's easier to - // understand if we make the code slower and longer - const countMap: Record = { - current_implementation: await getBucketCountFromPaginatedQuery( - 'current_implementation', - filter - ), - no_observer_name: await getBucketCountFromPaginatedQuery( - 'no_observer_name', - filter - ), - no_rum: await getBucketCountFromPaginatedQuery('no_rum', noRumFilter), - no_rum_no_observer_name: await getBucketCountFromPaginatedQuery( - 'no_rum_no_observer_name', - noRumFilter - ), - only_rum: await getBucketCountFromPaginatedQuery('only_rum', rumFilter), - only_rum_no_observer_name: await getBucketCountFromPaginatedQuery( - 'only_rum_no_observer_name', - rumFilter - ), - }; - - const [allCount, noRumCount, rumCount] = await Promise.all([ - totalSearch(filter), - totalSearch(noRumFilter), - totalSearch(rumFilter), - ]); + const baseFields = [ + ...simpleTermFields, + // user_agent.name only for page-load transactions + { + terms: { + script: ` + if (doc['transaction.type'].value == 'page-load' && doc['user_agent.name'].size() > 0) { + return doc['user_agent.name'].value; + } - return { - aggregated_transactions: { - current_implementation: { - transaction_count: allCount, - expected_metric_document_count: countMap.current_implementation, - }, - no_observer_name: { - transaction_count: allCount, - expected_metric_document_count: countMap.no_observer_name, - }, - no_rum: { - transaction_count: noRumCount, - expected_metric_document_count: countMap.no_rum, - }, - no_rum_no_observer_name: { - transaction_count: noRumCount, - expected_metric_document_count: countMap.no_rum_no_observer_name, + return null; + `, + missing_bucket: true, }, - only_rum: { - transaction_count: rumCount, - expected_metric_document_count: countMap.only_rum, - }, - only_rum_no_observer_name: { - transaction_count: rumCount, - expected_metric_document_count: countMap.only_rum_no_observer_name, + }, + // transaction.root + { + terms: { + script: `return doc['parent.id'].size() == 0`, + missing_bucket: true, }, }, + ]; + + const results = { + current_implementation: await getBucketCountFromPaginatedQuery([ + ...baseFields, + observerHostname, + ]), + with_country: await getBucketCountFromPaginatedQuery([ + ...baseFields, + observerHostname, + { + terms: { + script: ` + if (doc['transaction.type'].value == 'page-load' && doc['client.geo.country_iso_code'].size() > 0) { + return doc['client.geo.country_iso_code'].value; + } + return null; + `, + missing_bucket: true, + }, + }, + ]), + no_observer_name: await getBucketCountFromPaginatedQuery(baseFields), + }; + + return { + aggregated_transactions: results, }; }, }, diff --git a/x-pack/plugins/apm/typings/elasticsearch/aggregations.ts b/x-pack/plugins/apm/typings/elasticsearch/aggregations.ts index d25ec8709e3be..8a62e36ef2e9d 100644 --- a/x-pack/plugins/apm/typings/elasticsearch/aggregations.ts +++ b/x-pack/plugins/apm/typings/elasticsearch/aggregations.ts @@ -51,7 +51,12 @@ type GetCompositeKeys< type CompositeOptionsSource = Record< string, - { terms: { field: string; missing_bucket?: boolean } } | undefined + | { + terms: ({ field: string } | { script: Script }) & { + missing_bucket?: boolean; + }; + } + | undefined >; export interface AggregationOptionsByType {