Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloud Security] Metering integration tests #187219

Merged
merged 25 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
58a8a1a
debug
CohenIdo Jul 1, 2024
15c8fe0
working version
CohenIdo Jul 1, 2024
d92716b
working and clean version
CohenIdo Jul 1, 2024
e8f9e85
working and clean version
CohenIdo Jul 1, 2024
1afecf5
working and clean version
CohenIdo Jul 1, 2024
e6fcd85
cleaning
CohenIdo Jul 2, 2024
92ab18d
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 2, 2024
faeced0
code review comments
CohenIdo Jul 2, 2024
a1150c1
working and clean version
CohenIdo Jul 3, 2024
023204d
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 3, 2024
cdc7eed
code review comments
CohenIdo Jul 4, 2024
942e92c
code review comments
CohenIdo Jul 4, 2024
871337b
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 4, 2024
1d49d3e
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 4, 2024
e31300b
fix flakiness
CohenIdo Jul 4, 2024
48836e3
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 4, 2024
ebf18b8
tech debt
CohenIdo Jul 5, 2024
97e0862
lint
CohenIdo Jul 5, 2024
b3b5948
ci failire fix try
CohenIdo Jul 8, 2024
422c37a
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 8, 2024
d4aa42a
Merge branch 'main' into billing-ftr
CohenIdo Jul 8, 2024
11f04ed
ci failire fix try
CohenIdo Jul 9, 2024
3105dd5
Merge remote-tracking branch 'upstream/main' into billing-ftr
CohenIdo Jul 9, 2024
dabd9f4
[CI] Auto-commit changed files from 'node scripts/eslint --no-cache -…
kibanamachine Jul 9, 2024
38ebd50
Merge branch 'main' into billing-ftr
CohenIdo Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@
"@mapbox/mapbox-gl-rtl-text": "0.2.3",
"@mapbox/mapbox-gl-supported": "2.0.1",
"@mapbox/vector-tile": "1.3.1",
"@mswjs/http-middleware": "^0.10.1",
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/api-metrics": "^0.31.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.34.0",
Expand Down Expand Up @@ -1758,4 +1759,4 @@
"zod-to-json-schema": "^3.22.3"
},
"packageManager": "[email protected]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,119 @@ import {
CSPM,
KSPM,
METERING_CONFIGS,
THRESHOLD_MINUTES,
BILLABLE_ASSETS_CONFIG,
} from './constants';
import type { Tier, UsageRecord } from '../types';
import type {
CloudSecurityMeteringCallbackInput,
CloudSecuritySolutions,
AssetCountAggregation,
ResourceSubtypeAggregationBucket,
ResourceSubtypeCounter,
} from './types';

export const getUsageRecords = (
assetCountAggregations: AssetCountAggregation[],
assetCountAggregation: AssetCountAggregation,
cloudSecuritySolution: CloudSecuritySolutions,
taskId: string,
tier: Tier,
projectId: string,
periodSeconds: number,
logger: Logger
): UsageRecord[] => {
const usageRecords = assetCountAggregations.map((assetCountAggregation) => {
const assetCount = assetCountAggregation.unique_assets.value;

if (assetCount > AGGREGATION_PRECISION_THRESHOLD) {
logger.warn(
`The number of unique resources for {${cloudSecuritySolution}} is ${assetCount}, which is higher than the AGGREGATION_PRECISION_THRESHOLD of ${AGGREGATION_PRECISION_THRESHOLD}.`
);
}

const minTimestamp = new Date(
assetCountAggregation.min_timestamp.value_as_string
).toISOString();

const creationTimestamp = new Date();
const minutes = creationTimestamp.getMinutes();
if (minutes >= 30) {
creationTimestamp.setMinutes(30, 0, 0);
} else {
creationTimestamp.setMinutes(0, 0, 0);
}
const roundedCreationTimestamp = creationTimestamp.toISOString();

const usageRecord: UsageRecord = {
id: `${CLOUD_SECURITY_TASK_TYPE}_${cloudSecuritySolution}_${projectId}_${roundedCreationTimestamp}`,
usage_timestamp: minTimestamp,
creation_timestamp: creationTimestamp.toISOString(),
usage: {
type: CLOUD_SECURITY_TASK_TYPE,
sub_type: cloudSecuritySolution,
quantity: assetCount,
period_seconds: periodSeconds,
},
source: {
id: taskId,
instance_group_id: projectId,
metadata: { tier },
): UsageRecord => {
let assetCount;
let resourceSubtypeCounter;

if (cloudSecuritySolution === CSPM || cloudSecuritySolution === KSPM) {
const resourceSubtypeBuckets: ResourceSubtypeAggregationBucket[] =
assetCountAggregation.resource_sub_type.buckets;

const billableAssets = BILLABLE_ASSETS_CONFIG[cloudSecuritySolution].values;
assetCount = resourceSubtypeBuckets
.filter((bucket) => billableAssets.includes(bucket.key))
.reduce((acc, bucket) => acc + bucket.unique_assets.value, 0);

resourceSubtypeCounter = assetCountAggregation.resource_sub_type.buckets.reduce(
(resourceMap, item) => {
resourceMap[item.key] = {
doc_count: item.doc_count as number,
unique_assets: item.unique_assets.value as number,
};
return resourceMap;
},
};
{} as ResourceSubtypeCounter
);
} else {
assetCount = assetCountAggregation.unique_assets.value;
}

if (assetCount > AGGREGATION_PRECISION_THRESHOLD) {
logger.warn(
`The number of unique resources for {${cloudSecuritySolution}} is ${assetCount}, which is higher than the AGGREGATION_PRECISION_THRESHOLD of ${AGGREGATION_PRECISION_THRESHOLD}.`
);
}

const minTimestamp = new Date(assetCountAggregation.min_timestamp.value_as_string).toISOString();

return usageRecord;
});
return usageRecords;
const creationTimestamp = new Date();
const minutes = creationTimestamp.getMinutes();
if (minutes >= 30) {
creationTimestamp.setMinutes(30, 0, 0);
} else {
creationTimestamp.setMinutes(0, 0, 0);
}
const roundedCreationTimestamp = creationTimestamp.toISOString();

const metadata = resourceSubtypeCounter
? { tier, resource_sub_type_count: resourceSubtypeCounter }
: { tier };

const usageRecord: UsageRecord = {
id: `${CLOUD_SECURITY_TASK_TYPE}_${cloudSecuritySolution}_${projectId}_${roundedCreationTimestamp}`,
usage_timestamp: minTimestamp,
creation_timestamp: creationTimestamp.toISOString(),
usage: {
type: CLOUD_SECURITY_TASK_TYPE,
sub_type: cloudSecuritySolution,
quantity: assetCount,
period_seconds: periodSeconds,
},
source: {
id: taskId,
instance_group_id: projectId,
// metadata: { tier },
metadata,
},
};

return usageRecord;
};

export const getAggregationByCloudSecuritySolution = (
cloudSecuritySolution: CloudSecuritySolutions
) => {
if (cloudSecuritySolution === CSPM || cloudSecuritySolution === KSPM)
return {
resource_sub_type: {
terms: {
field: BILLABLE_ASSETS_CONFIG[cloudSecuritySolution].filter_attribute,
},
aggs: {
unique_assets: {
cardinality: {
field: METERING_CONFIGS[cloudSecuritySolution].assets_identifier,
precision_threshold: AGGREGATION_PRECISION_THRESHOLD,
},
},
},
},
min_timestamp: {
min: {
field: '@timestamp',
},
},
};

return {
unique_assets: {
cardinality: {
Expand All @@ -97,8 +144,7 @@ export const getAggregationByCloudSecuritySolution = (
};

export const getSearchQueryByCloudSecuritySolution = (
cloudSecuritySolution: CloudSecuritySolutions,
searchFrom: Date
cloudSecuritySolution: CloudSecuritySolutions
) => {
const mustFilters = [];

Expand All @@ -117,20 +163,11 @@ export const getSearchQueryByCloudSecuritySolution = (
}

if (cloudSecuritySolution === CSPM || cloudSecuritySolution === KSPM) {
const billableAssetsConfig = BILLABLE_ASSETS_CONFIG[cloudSecuritySolution];

mustFilters.push({
term: {
'rule.benchmark.posture_type': cloudSecuritySolution,
},
});

// filter in only billable assets
mustFilters.push({
terms: {
[billableAssetsConfig.filter_attribute]: billableAssetsConfig.values,
},
});
}

return {
Expand All @@ -141,10 +178,9 @@ export const getSearchQueryByCloudSecuritySolution = (
};

export const getAssetAggQueryByCloudSecuritySolution = (
cloudSecuritySolution: CloudSecuritySolutions,
searchFrom: Date
cloudSecuritySolution: CloudSecuritySolutions
) => {
const query = getSearchQueryByCloudSecuritySolution(cloudSecuritySolution, searchFrom);
const query = getSearchQueryByCloudSecuritySolution(cloudSecuritySolution);
const aggs = getAggregationByCloudSecuritySolution(cloudSecuritySolution);

return {
Expand All @@ -157,51 +193,34 @@ export const getAssetAggQueryByCloudSecuritySolution = (

export const getAssetAggByCloudSecuritySolution = async (
esClient: ElasticsearchClient,
cloudSecuritySolution: CloudSecuritySolutions,
searchFrom: Date
): Promise<AssetCountAggregation[]> => {
const assetsAggQuery = getAssetAggQueryByCloudSecuritySolution(cloudSecuritySolution, searchFrom);
cloudSecuritySolution: CloudSecuritySolutions
): Promise<AssetCountAggregation | undefined> => {
const assetsAggQuery = getAssetAggQueryByCloudSecuritySolution(cloudSecuritySolution);

const response = await esClient.search<unknown, AssetCountAggregation>(assetsAggQuery);
if (!response.aggregations) return [];

return [response.aggregations];
if (!response.aggregations) return;

return response.aggregations;
};

const indexHasDataInDateRange = async (
esClient: ElasticsearchClient,
cloudSecuritySolution: CloudSecuritySolutions,
searchFrom: Date
cloudSecuritySolution: CloudSecuritySolutions
) => {
const response = await esClient.search(
{
index: METERING_CONFIGS[cloudSecuritySolution].index,
size: 1,
_source: false,
query: getSearchQueryByCloudSecuritySolution(cloudSecuritySolution, searchFrom),
query: getSearchQueryByCloudSecuritySolution(cloudSecuritySolution),
},
{ ignore: [404] }
);

return response.hits?.hits.length > 0;
};

const getSearchStartDate = (lastSuccessfulReport: Date): Date => {
const initialDate = new Date();
const thresholdDate = new Date(initialDate.getTime() - THRESHOLD_MINUTES * 60 * 1000);

if (lastSuccessfulReport) {
const lastSuccessfulReportDate = new Date(lastSuccessfulReport);

const searchFrom =
lastSuccessfulReport && lastSuccessfulReportDate > thresholdDate
? lastSuccessfulReportDate
: thresholdDate;
return searchFrom;
}
return thresholdDate;
};

export const getCloudSecurityUsageRecord = async ({
esClient,
projectId,
Expand All @@ -212,21 +231,20 @@ export const getCloudSecurityUsageRecord = async ({
logger,
}: CloudSecurityMeteringCallbackInput): Promise<UsageRecord[] | undefined> => {
try {
const searchFrom = getSearchStartDate(lastSuccessfulReport);

if (!(await indexHasDataInDateRange(esClient, cloudSecuritySolution, searchFrom))) return;
if (!(await indexHasDataInDateRange(esClient, cloudSecuritySolution))) return;

// const periodSeconds = Math.floor((new Date().getTime() - searchFrom.getTime()) / 1000);
const periodSeconds = 1800; // Workaround to prevent overbilling by charging for a constant time window. The issue should be resolved in https://github.com/elastic/security-team/issues/9424.

const assetCountAggregations = await getAssetAggByCloudSecuritySolution(
const assetCountAggregation = await getAssetAggByCloudSecuritySolution(
esClient,
cloudSecuritySolution,
searchFrom
cloudSecuritySolution
);

if (!assetCountAggregation) return [];

const usageRecords = await getUsageRecords(
assetCountAggregations,
assetCountAggregation,
cloudSecuritySolution,
taskId,
tier,
Expand All @@ -235,7 +253,7 @@ export const getCloudSecurityUsageRecord = async ({
logger
);

return usageRecords;
return [usageRecords];
} catch (err) {
logger.error(`Failed to fetch ${cloudSecuritySolution} metering data ${err}`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ export const getUsageRecords = async (
{
range: {
'event.ingested': {
gt: searchFrom.toISOString(),
// gt: searchFrom.toISOString()
gte: `now-30m`,
},
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ export class UsageReportingService {
records: UsageRecord[],
url = USAGE_SERVICE_USAGE_URL
): Promise<Response> {
const isHttps = url.includes('https');

return fetch(url, {
method: 'post',
body: JSON.stringify(records),
headers: { 'Content-Type': 'application/json' },
agent,
agent: isHttps ? agent : undefined, // Conditionally add agent if URL is HTTPS for supporting integration tests.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @joeypoon

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious why we used fetch instead of the core HTTP service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the reason is that the agent attribute is not supported in the core fetch. @joeypoon, please correct me if I'm wrong.

});
}
}
Expand Down
8 changes: 7 additions & 1 deletion x-pack/plugins/security_solution_serverless/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: false }),
productTypes,
/**
* Usage Reporting: the interval between runs of the task
* Usage Reporting: the interval between runs of the endpoint task
*/

usageReportingTaskInterval: schema.string({ defaultValue: '5m' }),

/**
* Usage Reporting: the interval between runs of the cloud security task
*/

cloudSecurityUsageReportingTaskInterval: schema.string({ defaultValue: '30m' }),

/**
* Usage Reporting: timeout value for how long the task should run.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ export class SecuritySolutionServerlessPlugin
this.cloudSecurityUsageReportingTask
?.start({
taskManager: pluginsSetup.taskManager,
interval: cloudSecurityMetringTaskProperties.interval,
interval: this.config.cloudSecurityUsageReportingTaskInterval,
})
.catch(() => {});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./benchmark/v2'));
loadTestFile(require.resolve('./find_csp_benchmark_rule'));
loadTestFile(require.resolve('./telemetry'));
loadTestFile(require.resolve('./serverless_metering/cloud_security_metering'));

// TODO: migrate status_unprivileged tests from stateful, if it feasible in serverless with the new security model
// loadTestFile(require.resolve('./status/status_unprivileged'));
Expand Down
Loading