Skip to content

Commit

Permalink
[Beats Management] APIs: Remove tag(s) from beat(s) (elastic#19440)
Browse files Browse the repository at this point in the history
* Using crypto.timingSafeEqual() for comparing auth tokens

* Introduce random delay after we try to find token in ES to mitigate timing attack

* Remove random delay

* Starting to implement POST /api/beats/beats_tags API

* Changing API

* Updating tests for changes to API

* Renaming

* Use destructuring

* Using crypto.timingSafeEqual() for comparing auth tokens

* Introduce random delay after we try to find token in ES to mitigate timing attack

* Implementing `POST /api/beats/agents_tags/removals` API

* Updating ES archive

* Use destructuring

* Moving start of script to own line to increase readability

* Nothing to remove if there are no existing tags!

* Updating tests to match changes in bulk update painless script

* Use destructuring
  • Loading branch information
ycombinator authored and justinkambic committed Jul 23, 2018
1 parent 2ccd1cd commit 3f440be
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 5 deletions.
2 changes: 2 additions & 0 deletions x-pack/plugins/beats/server/routes/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { registerVerifyBeatsRoute } from './register_verify_beats_route';
import { registerUpdateBeatRoute } from './register_update_beat_route';
import { registerSetTagRoute } from './register_set_tag_route';
import { registerAssignTagsToBeatsRoute } from './register_assign_tags_to_beats_route';
import { registerRemoveTagsFromBeatsRoute } from './register_remove_tags_from_beats_route';

export function registerApiRoutes(server) {
registerCreateEnrollmentTokensRoute(server);
Expand All @@ -20,4 +21,5 @@ export function registerApiRoutes(server) {
registerUpdateBeatRoute(server);
registerSetTagRoute(server);
registerAssignTagsToBeatsRoute(server);
registerRemoveTagsFromBeatsRoute(server);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@ async function getEnrollmentToken(callWithInternalUser, enrollmentToken) {
};

const response = await callWithInternalUser('get', params);
return get(response, '_source.enrollment_token', {});
const token = get(response, '_source.enrollment_token', {});

// Elasticsearch might return fast if the token is not found. OR it might return fast
// if the token *is* found. Either way, an attacker could using a timing attack to figure
// out whether a token is valid or not. So we introduce a random delay in returning from
// this function to obscure the actual time it took for Elasticsearch to find the token.
const randomDelayInMs = 25 + Math.round(Math.random() * 200); // between 25 and 225 ms
return new Promise(resolve => setTimeout(() => resolve(token), randomDelayInMs));
}

function deleteUsedEnrollmentToken(callWithInternalUser, enrollmentToken) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import Joi from 'joi';
import {
get,
flatten,
uniq
} from 'lodash';
import { INDEX_NAMES } from '../../../common/constants';
import { callWithRequestFactory } from '../../lib/client';
import { wrapEsError } from '../../lib/error_wrappers';

async function getDocs(callWithRequest, ids) {
const params = {
index: INDEX_NAMES.BEATS,
type: '_doc',
body: { ids },
_source: false
};

const response = await callWithRequest('mget', params);
return get(response, 'docs', []);
}

function getBeats(callWithRequest, beatIds) {
const ids = beatIds.map(beatId => `beat:${beatId}`);
return getDocs(callWithRequest, ids);
}

function getTags(callWithRequest, tags) {
const ids = tags.map(tag => `tag:${tag}`);
return getDocs(callWithRequest, ids);
}

async function findNonExistentItems(callWithRequest, items, getFn) {
const itemsFromEs = await getFn.call(null, callWithRequest, items);
return itemsFromEs.reduce((nonExistentItems, itemFromEs, idx) => {
if (!itemFromEs.found) {
nonExistentItems.push(items[idx]);
}
return nonExistentItems;
}, []);
}

function findNonExistentBeatIds(callWithRequest, beatIds) {
return findNonExistentItems(callWithRequest, beatIds, getBeats);
}

function findNonExistentTags(callWithRequest, tags) {
return findNonExistentItems(callWithRequest, tags, getTags);
}

async function persistRemovals(callWithRequest, removals) {
const body = flatten(removals.map(({ beatId, tag }) => {
const script = ''
+ 'def beat = ctx._source.beat; '
+ 'if (beat.tags != null) { '
+ ' beat.tags.removeAll([params.tag]); '
+ '}';

return [
{ update: { _id: `beat:${beatId}` } },
{ script: { source: script, params: { tag } } }
];
}));

const params = {
index: INDEX_NAMES.BEATS,
type: '_doc',
body,
refresh: 'wait_for'
};

const response = await callWithRequest('bulk', params);
return get(response, 'items', [])
.map((item, resultIdx) => ({
status: item.update.status,
result: item.update.result,
idxInRequest: removals[resultIdx].idxInRequest
}));
}

function addNonExistentItemRemovalsToResponse(response, removals, nonExistentBeatIds, nonExistentTags) {
removals.forEach(({ beat_id: beatId, tag }, idx) => {
const isBeatNonExistent = nonExistentBeatIds.includes(beatId);
const isTagNonExistent = nonExistentTags.includes(tag);

if (isBeatNonExistent && isTagNonExistent) {
response.removals[idx].status = 404;
response.removals[idx].result = `Beat ${beatId} and tag ${tag} not found`;
} else if (isBeatNonExistent) {
response.removals[idx].status = 404;
response.removals[idx].result = `Beat ${beatId} not found`;
} else if (isTagNonExistent) {
response.removals[idx].status = 404;
response.removals[idx].result = `Tag ${tag} not found`;
}
});
}

function addRemovalResultsToResponse(response, removalResults) {
removalResults.forEach(removalResult => {
const { idxInRequest, status, result } = removalResult;
response.removals[idxInRequest].status = status;
response.removals[idxInRequest].result = result;
});
}

// TODO: add license check pre-hook
// TODO: write to Kibana audit log file
export function registerRemoveTagsFromBeatsRoute(server) {
server.route({
method: 'POST',
path: '/api/beats/agents_tags/removals',
config: {
validate: {
payload: Joi.object({
removals: Joi.array().items(Joi.object({
beat_id: Joi.string().required(),
tag: Joi.string().required()
}))
}).required()
}
},
handler: async (request, reply) => {
const callWithRequest = callWithRequestFactory(server, request);

const { removals } = request.payload;
const beatIds = uniq(removals.map(removal => removal.beat_id));
const tags = uniq(removals.map(removal => removal.tag));

const response = {
removals: removals.map(() => ({ status: null }))
};

try {
// Handle removals containing non-existing beat IDs or tags
const nonExistentBeatIds = await findNonExistentBeatIds(callWithRequest, beatIds);
const nonExistentTags = await findNonExistentTags(callWithRequest, tags);

addNonExistentItemRemovalsToResponse(response, removals, nonExistentBeatIds, nonExistentTags);

const validRemovals = removals
.map((removal, idxInRequest) => ({
beatId: removal.beat_id,
tag: removal.tag,
idxInRequest // so we can add the result of this removal to the correct place in the response
}))
.filter((removal, idx) => response.removals[idx].status === null);

if (validRemovals.length > 0) {
const removalResults = await persistRemovals(callWithRequest, validRemovals);
addRemovalResultsToResponse(response, removalResults);
}
} catch (err) {
return reply(wrapEsError(err));
}

reply(response);
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default function ({ getService }) {
});

beat = esResponse._source.beat;
expect(beat.tags).to.eql(tags);
expect(beat.tags).to.eql([...tags, 'qa']);

// Adding the existing tag
const { body: apiResponse } = await supertest
Expand All @@ -90,7 +90,7 @@ export default function ({ getService }) {
});

beat = esResponse._source.beat;
expect(beat.tags).to.eql(tags);
expect(beat.tags).to.eql([...tags, 'qa']);
});

it('should add a single tag to a multiple beats', async () => {
Expand Down Expand Up @@ -123,7 +123,7 @@ export default function ({ getService }) {
});

beat = esResponse._source.beat;
expect(beat.tags).to.eql(['production', 'development']); // as beat 'foo' already had 'production' tag attached to it
expect(beat.tags).to.eql(['production', 'qa', 'development']); // as beat 'foo' already had 'production' and 'qa' tags attached to it

// Beat bar
esResponse = await es.get({
Expand Down Expand Up @@ -195,7 +195,7 @@ export default function ({ getService }) {
});

beat = esResponse._source.beat;
expect(beat.tags).to.eql(['production', 'development']); // as beat 'foo' already had 'production' tag attached to it
expect(beat.tags).to.eql(['production', 'qa', 'development']); // as beat 'foo' already had 'production' and 'qa' tags attached to it

// Beat bar
esResponse = await es.get({
Expand Down
1 change: 1 addition & 0 deletions x-pack/test/api_integration/apis/beats/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ export default function ({ getService, loadTestFile }) {
loadTestFile(require.resolve('./update_beat'));
loadTestFile(require.resolve('./set_tag'));
loadTestFile(require.resolve('./assign_tags_to_beats'));
loadTestFile(require.resolve('./remove_tags_from_beats'));
});
}
Loading

0 comments on commit 3f440be

Please sign in to comment.