Skip to content

Commit

Permalink
Merge pull request #6844 from Bargs/ingest/bulkAPI
Browse files Browse the repository at this point in the history
[API] Add CSV bulk indexing support to Kibana API
  • Loading branch information
Matt Bargar committed May 11, 2016
2 parents 1372cae + 475913d commit 5918737
Show file tree
Hide file tree
Showing 13 changed files with 2,309 additions and 1 deletion.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"@spalger/numeral": "^2.0.0",
"@spalger/test-subj-selector": "0.2.1",
"@spalger/ui-ace": "0.2.3",
"JSONStream": "1.1.1",
"angular": "1.4.7",
"angular-bootstrap-colorpicker": "3.0.19",
"angular-elastic": "2.5.0",
Expand All @@ -95,6 +96,7 @@
"clipboard": "1.5.5",
"commander": "2.8.1",
"css-loader": "0.17.0",
"csv-parse": "1.1.0",
"d3": "3.5.6",
"elasticsearch": "10.1.2",
"elasticsearch-browser": "10.1.2",
Expand All @@ -109,6 +111,7 @@
"good-squeeze": "2.1.0",
"gridster": "0.5.6",
"hapi": "8.8.1",
"highland": "2.7.2",
"httpolyglot": "0.1.1",
"imports-loader": "0.6.4",
"jade": "1.11.0",
Expand Down
3 changes: 3 additions & 0 deletions src/cli/cluster/base_path_proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ export default class BasePathProxy {
config.set('server.basePath', this.basePath);
}

const ONE_GIGABYTE = 1024 * 1024 * 1024;
config.set('server.maxPayloadBytes', ONE_GIGABYTE);

setupLogging(null, this.server, config);
setupConnection(null, this.server, config);
this.setupRoutes();
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { registerPost } from './register_post';
import { registerDelete } from './register_delete';
import { registerProcessors } from './register_processors';
import { registerSimulate } from './register_simulate';
import { registerData } from './register_data';

export default function (server) {
registerPost(server);
registerDelete(server);
registerProcessors(server);
registerSimulate(server);
registerData(server);
}
99 changes: 99 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/register_data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Promise } from 'bluebird';
import parse from 'csv-parse';
import _ from 'lodash';
import hi from 'highland';
import { patternToIngest } from '../../../../common/lib/convert_pattern_and_ingest_name';
import { PassThrough } from 'stream';
import JSONStream from 'JSONStream';

const ONE_GIGABYTE = 1024 * 1024 * 1024;

export function registerData(server) {
server.route({
path: '/api/kibana/{id}/_data',
method: 'POST',
config: {
payload: {
output: 'stream',
maxBytes: ONE_GIGABYTE
}
},
handler: function (req, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req);
const indexPattern = req.params.id;
const usePipeline = req.query.pipeline === 'true';
const delimiter = _.get(req.query, 'csv_delimiter', ',');
const responseStream = new PassThrough();
const parser = parse({
columns: true,
auto_parse: true,
delimiter: delimiter,
skip_empty_lines: true
});

const csv = req.payload.csv ? req.payload.csv : req.payload;
const fileName = req.payload.csv ? csv.hapi.filename : '';

let currentLine = 2; // Starts at 2 since we parse the header separately

csv.pipe(parser);

hi(parser)
.consume((err, doc, push, next) => {
if (err) {
push(err, null);
next();
}
else if (doc === hi.nil) {
// pass nil (end event) along the stream
push(null, doc);
}
else {
push(null, {index: _.isEmpty(fileName) ? {} : {_id: `${fileName}:${currentLine}`}});
push(null, doc);
currentLine++;
next();
}
})
.batch(200)
.map((bulkBody) => {
const bulkParams = {
index: indexPattern,
type: 'default',
body: bulkBody
};

if (usePipeline) {
bulkParams.pipeline = patternToIngest(indexPattern);
}

return hi(boundCallWithRequest('bulk', bulkParams));
})
.parallel(2)
.map((response) => {
return _.reduce(response.items, (memo, docResponse) => {
const indexResult = docResponse.index;
if (indexResult.error) {
const hasIndexingErrors = _.isUndefined(_.get(memo, 'errors.index'));
if (hasIndexingErrors) {
_.set(memo, 'errors.index', []);
}
memo.errors.index.push(_.pick(indexResult, ['_id', 'error']));
}
else {
memo.created++;
}

return memo;
}, {created: 0});
})
.stopOnError((err, push) => {
push(null, {created: 0, errors: {other: [err.message]}});
})
.pipe(JSONStream.stringify())
.pipe(responseStream);

reply(responseStream).type('application/json');
}
});
}
3 changes: 2 additions & 1 deletion tasks/config/simplemocha.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ module.exports = {
timeout: 10000,
slow: 5000,
ignoreLeaks: false,
reporter: 'dot'
reporter: 'dot',
globals: ['nil']
},
all: {
src: [
Expand Down
193 changes: 193 additions & 0 deletions test/unit/api/ingest/_data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
define(function (require) {
var Promise = require('bluebird');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');
var fakeNamesIndexTemplate = require('intern/dojo/node!../../fixtures/fake_names_index_template.json');
var fs = require('intern/dojo/node!fs');

return function (bdd, scenarioManager, request) {
const es = scenarioManager.client;
bdd.describe('_data', function () {

bdd.beforeEach(function () {
return es.indices.putTemplate({
name: 'names',
body: fakeNamesIndexTemplate
});
});

bdd.afterEach(function () {
return es.indices.delete({
index: 'names',
ignore: 404
})
.then(() => {
return es.indices.deleteTemplate({name: 'names'});
});
});

bdd.it('should accept a multipart/form-data request with a csv file attached', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200);
});

bdd.it('should also accept the raw csv data in the payload body', function () {
var csvData = fs.readFileSync('test/unit/fixtures/fake_names_big.csv', {encoding: 'utf8'});

return request.post('/kibana/names/_data')
.send(csvData)
.expect(200);
});

bdd.it('should return JSON results', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect('Content-Type', /json/)
.expect(200);
});

bdd.it('should index one document per row in the csv', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200)
.then(() => {
return es.indices.refresh()
.then(() => {
return es.count({ index: 'names' })
.then((res) => {
expect(res.count).to.be(100);
});
});
});
});

bdd.it('should stream a chunked response', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect('Transfer-Encoding', 'chunked')
.expect(200);
});

bdd.it('should respond with an array of one or more "result objects"', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_big.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body.length).to.be(14);
});
});

bdd.describe('result objects', function () {

bdd.it('should include a count of created documents', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(100);
});
});

bdd.it('should report any indexing errors per document under an "errors.index" key', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_with_mapping_errors.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(98);
expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('index');
expect(dataResponse.body[0].errors.index.length).to.be(2);
});
});

bdd.it('should report any csv parsing errors under an "errors.other" key', function () {
return request.post('/kibana/names/_data')
.attach('csv', 'test/unit/fixtures/fake_names_with_parse_errors.csv')
.expect(200)
.then((dataResponse) => {
// parse errors immediately abort indexing
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(0);

expect(dataResponse.body[0]).to.have.property('errors');
expect(dataResponse.body[0].errors).to.have.property('other');
expect(dataResponse.body[0].errors.other.length).to.be(1);
});
});

});

bdd.describe('optional parameters', function () {
bdd.it('should accept a custom csv_delimiter query string param for parsing the CSV', function () {
return request.post('/kibana/names/_data?csv_delimiter=|')
.attach('csv', 'test/unit/fixtures/fake_names_pipe_delimited.csv')
.expect(200)
.then((dataResponse) => {
expect(dataResponse.body[0]).to.have.property('created');
expect(dataResponse.body[0].created).to.be(2);
expect(dataResponse.body[0]).to.not.have.property('errors');

return es.indices.refresh();
})
.then(() => {
return es.search({
index: 'names'
});
})
.then((searchResponse) => {
const doc = _.get(searchResponse, 'hits.hits[0]._source');
expect(doc).to.only.have.keys('Number', 'Gender', 'NameSet');
});
});

bdd.it('should accept a boolean pipeline query string parameter enabling use of the index pattern\'s associated pipeline',
function () {
return es.transport.request({
path: '_ingest/pipeline/kibana-names',
method: 'put',
body: {
processors: [
{
set: {
field: 'foo',
value: 'bar'
}
}
]
}
})
.then((res) => {
return request.post('/kibana/names/_data?pipeline=true')
.attach('csv', 'test/unit/fixtures/fake_names.csv')
.expect(200);
})
.then(() => {
return es.indices.refresh();
})
.then(() => {
return es.search({
index: 'names'
});
})
.then((searchResponse) => {
_.forEach(searchResponse.hits.hits, (doc) => {
expect(doc._source).to.have.property('foo');
expect(doc._source.foo).to.be('bar');
});
return searchResponse;
})
.finally(() => {
return es.transport.request({
path: '_ingest/pipeline/kibana-names',
method: 'delete'
});
});
});
});

});
};
});
2 changes: 2 additions & 0 deletions test/unit/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ define(function (require) {
var expect = require('intern/dojo/node!expect.js');
var post = require('./_post');
var del = require('./_del');
var data = require('./_data');
var simulate = require('./_simulate');
var processors = require('./_processors');
var processorTypes = require('./processors/index');
Expand All @@ -26,6 +27,7 @@ define(function (require) {

post(bdd, scenarioManager, request);
del(bdd, scenarioManager, request);
data(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
processorTypes(bdd, scenarioManager, request);
Expand Down
Loading

0 comments on commit 5918737

Please sign in to comment.