Skip to content

Commit

Permalink
feat(bulk): add bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
jiawei397 committed Jul 7, 2021
1 parent 973b56d commit 496a2eb
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 26 deletions.
78 changes: 78 additions & 0 deletions examples/bulk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { Client } from "../mod.ts";

const client = new Client();
await client.connect("http://localhost:9200/");

async function run() {
// await client.indices.create({
// index: "tweets",
// body: {
// mappings: {
// properties: {
// id: { type: "integer" },
// text: { type: "text" },
// user: { type: "keyword" },
// time: { type: "date" },
// },
// },
// },
// }, { ignore: [400] });

const dataset = [{
id: 1,
text: "If I fall, don't bring me back.",
user: "jon",
date: new Date(),
}, {
id: 2,
text: "Winter is coming",
user: "ned",
date: new Date(),
}, {
id: 3,
text: "A Lannister always pays his debts.",
user: "tyrion",
date: new Date(),
}, {
id: 4,
text: "I am the blood of the dragon.",
user: "daenerys",
date: new Date(),
}, {
id: 5, // change this value to a string to see the bulk response with errors
text: "A girl is Arya Stark of Winterfell. And I'm going home.",
user: "arya",
date: new Date(),
}];

const body = dataset.flatMap((doc) => [{ index: { _index: "tweets" } }, doc]);

const bulkResponse = await client.bulk({ refresh: true, body });

if (bulkResponse.errors) {
const erroredDocuments: any[] = [];
// The items array has the same order of the dataset we just indexed.
// The presence of the `error` key indicates that the operation
// that we did for the document has failed.
bulkResponse.items.forEach((action: any, i: number) => {
const operation = Object.keys(action)[0];
if (action[operation].error) {
erroredDocuments.push({
// If the status is 429 it means that you can retry the document,
// otherwise it's very likely a mapping error, and you should
// fix the document before to try it again.
status: action[operation].status,
error: action[operation].error,
operation: body[i * 2],
document: body[i * 2 + 1],
});
}
});
console.log(erroredDocuments);
}

const { count } = await client.count({ index: "tweets" });
console.log(count);
}

run().catch(console.log);
59 changes: 33 additions & 26 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { assert, urlParse } from "../deps.ts";
import { Indices } from "./indices.ts";
import {
BulkInfo,
CountInfo,
CreatedInfo,
DeleteByQueryInfo,
DeletedInfo,
DeleteIndexInfo,
ReIndexInfo,
SearchInfo,
StatInfo,
UpdatedInfo,
} from "./types.ts";
import { Ajax, ajax, Method } from "./utils/ajax.ts";
import { serializer } from "./utils/serializer.ts";
import { generateId } from "./utils/tools.ts";

const DENO_DRIVER_VERSION = "0.0.3";
Expand All @@ -29,6 +31,8 @@ export class Client {

connectedCount = 0;

indices = new Indices();

private connectDB(db: string) {
this.db = db;
Ajax.defaults.baseURL = db;
Expand Down Expand Up @@ -207,32 +211,8 @@ export class Client {
});
}

indicesStats(params: {
index?: string;
method?: Method;
metric?: string;
}): Promise<StatInfo> {
const { index, method = "get", metric } = params;
let path = "";

if (index != null && metric != null) {
path = "/" + encodeURIComponent(index) + "/" + "_stats" + "/" +
encodeURIComponent(metric);
} else if (metric != null) {
path = "/" + "_stats" + "/" + encodeURIComponent(metric);
} else if (index != null) {
path = "/" + encodeURIComponent(index) + "/" + "_stats";
} else {
path = "/" + "_stats";
} // build request object
return ajax<StatInfo>({
url: path,
method,
});
}

async getAllIndices(): Promise<string[]> {
const result = await this.indicesStats({});
const result = await this.indices.stats({});
return Object.keys(result.indices);
}

Expand All @@ -256,4 +236,31 @@ export class Client {
data: body,
});
}

/**
* https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/7.x/bulk_examples.html
* @param params
* @returns
*/
bulk(params: {
index?: string;
method?: Method;
body?: any;
refresh?: boolean;
}): Promise<BulkInfo> {
const { index, method = "post", body, refresh } = params; // TODO use refesh
let path = "";
if (index != null) {
path = "/" + encodeURIComponent(index) + "/" + encodeURIComponent(type) +
"/" + "_bulk";
} else {
path = "/" + "_bulk";
}
return ajax<BulkInfo>({
url: path,
method,
data: serializer.ndserialize(body),
query: refresh !== undefined ? { refresh } : undefined,
});
}
}
45 changes: 45 additions & 0 deletions src/indices.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { StatInfo } from "./types.ts";
import { ajax, Method } from "./utils/ajax.ts";

export class Indices {
// TODO use options
create(params: {
index: string;
body: any;
method?: Method;
}, options?: {
ignore: number[];
}) {
const { index, method = "put", body } = params;
const path = "/" + encodeURIComponent(index);
return ajax<StatInfo>({
url: path,
method,
data: body,
});
}

stats(params: {
index?: string;
method?: Method;
metric?: string;
}): Promise<StatInfo> {
const { index, method = "get", metric } = params;
let path = "";

if (index != null && metric != null) {
path = "/" + encodeURIComponent(index) + "/" + "_stats" + "/" +
encodeURIComponent(metric);
} else if (metric != null) {
path = "/" + "_stats" + "/" + encodeURIComponent(metric);
} else if (index != null) {
path = "/" + encodeURIComponent(index) + "/" + "_stats";
} else {
path = "/" + "_stats";
} // build request object
return ajax<StatInfo>({
url: path,
method,
});
}
}
19 changes: 19 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,22 @@ export interface DeletedInfo extends CreatedInfo {

export interface DeleteByQueryInfo extends ReIndexInfo {
}

export interface BulkInfo {
took: number;
errors: boolean;
items: {
index: {
result: string;
_shards: any;
_seq_no: number;
_index: string;
forced_refresh: boolean;
_type: string;
_id: string;
_version: number;
_primary_term: number;
status: number;
};
}[];
}
6 changes: 6 additions & 0 deletions src/utils/ajax.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ export interface AjaxConfig extends AjaxExConfig {
url: string;
method: Method;
data?: FormData | any;

query?: any;
}

type RequestCallback = (config: AjaxConfig) => AjaxConfig;
Expand Down Expand Up @@ -261,6 +263,7 @@ export class BaseAjax {
url,
baseURL, //接着的前缀url
data,
query,
headers = {},
method,
credentials,
Expand All @@ -276,6 +279,9 @@ export class BaseAjax {
body = null; //get请求不能有body
tempUrl = this.handleGetUrl(tempUrl, data, isEncodeUrl);
} else {
if (query) {
tempUrl = this.handleGetUrl(tempUrl, query, isEncodeUrl);
}
body = this.handlePostData(data, isFile);
if (isFile) {
if (!headers["Content-Type"]) {
Expand Down
Loading

0 comments on commit 496a2eb

Please sign in to comment.