From 496a2ebbdaf44ad2279e4486eb1aec95e2cbe3df Mon Sep 17 00:00:00 2001 From: jia wei Date: Wed, 7 Jul 2021 19:40:47 +0800 Subject: [PATCH] feat(bulk): add bulk --- examples/bulk.ts | 78 +++++++++++++++++++++++ src/client.ts | 59 +++++++++-------- src/indices.ts | 45 +++++++++++++ src/types.ts | 19 ++++++ src/utils/ajax.ts | 6 ++ src/utils/errors.ts | 138 ++++++++++++++++++++++++++++++++++++++++ src/utils/serializer.ts | 75 ++++++++++++++++++++++ 7 files changed, 394 insertions(+), 26 deletions(-) create mode 100644 examples/bulk.ts create mode 100644 src/indices.ts create mode 100644 src/utils/errors.ts create mode 100644 src/utils/serializer.ts diff --git a/examples/bulk.ts b/examples/bulk.ts new file mode 100644 index 0000000..9e2ac4c --- /dev/null +++ b/examples/bulk.ts @@ -0,0 +1,78 @@ +import { Client } from "../mod.ts"; + +const client = new Client(); +await client.connect("http://localhost:9200/"); + +async function run() { + // await client.indices.create({ + // index: "tweets", + // body: { + // mappings: { + // properties: { + // id: { type: "integer" }, + // text: { type: "text" }, + // user: { type: "keyword" }, + // time: { type: "date" }, + // }, + // }, + // }, + // }, { ignore: [400] }); + + const dataset = [{ + id: 1, + text: "If I fall, don't bring me back.", + user: "jon", + date: new Date(), + }, { + id: 2, + text: "Winter is coming", + user: "ned", + date: new Date(), + }, { + id: 3, + text: "A Lannister always pays his debts.", + user: "tyrion", + date: new Date(), + }, { + id: 4, + text: "I am the blood of the dragon.", + user: "daenerys", + date: new Date(), + }, { + id: 5, // change this value to a string to see the bulk response with errors + text: "A girl is Arya Stark of Winterfell. And I'm going home.", + user: "arya", + date: new Date(), + }]; + + const body = dataset.flatMap((doc) => [{ index: { _index: "tweets" } }, doc]); + + const bulkResponse = await client.bulk({ refresh: true, body }); + + if (bulkResponse.errors) { + const erroredDocuments: any[] = []; + // The items array has the same order of the dataset we just indexed. + // The presence of the `error` key indicates that the operation + // that we did for the document has failed. + bulkResponse.items.forEach((action: any, i: number) => { + const operation = Object.keys(action)[0]; + if (action[operation].error) { + erroredDocuments.push({ + // If the status is 429 it means that you can retry the document, + // otherwise it's very likely a mapping error, and you should + // fix the document before to try it again. + status: action[operation].status, + error: action[operation].error, + operation: body[i * 2], + document: body[i * 2 + 1], + }); + } + }); + console.log(erroredDocuments); + } + + const { count } = await client.count({ index: "tweets" }); + console.log(count); +} + +run().catch(console.log); diff --git a/src/client.ts b/src/client.ts index 61e5ed2..f357d47 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,5 +1,7 @@ import { assert, urlParse } from "../deps.ts"; +import { Indices } from "./indices.ts"; import { + BulkInfo, CountInfo, CreatedInfo, DeleteByQueryInfo, @@ -7,10 +9,10 @@ import { DeleteIndexInfo, ReIndexInfo, SearchInfo, - StatInfo, UpdatedInfo, } from "./types.ts"; import { Ajax, ajax, Method } from "./utils/ajax.ts"; +import { serializer } from "./utils/serializer.ts"; import { generateId } from "./utils/tools.ts"; const DENO_DRIVER_VERSION = "0.0.3"; @@ -29,6 +31,8 @@ export class Client { connectedCount = 0; + indices = new Indices(); + private connectDB(db: string) { this.db = db; Ajax.defaults.baseURL = db; @@ -207,32 +211,8 @@ export class Client { }); } - indicesStats(params: { - index?: string; - method?: Method; - metric?: string; - }): Promise { - const { index, method = "get", metric } = params; - let path = ""; - - if (index != null && metric != null) { - path = "/" + encodeURIComponent(index) + "/" + "_stats" + "/" + - encodeURIComponent(metric); - } else if (metric != null) { - path = "/" + "_stats" + "/" + encodeURIComponent(metric); - } else if (index != null) { - path = "/" + encodeURIComponent(index) + "/" + "_stats"; - } else { - path = "/" + "_stats"; - } // build request object - return ajax({ - url: path, - method, - }); - } - async getAllIndices(): Promise { - const result = await this.indicesStats({}); + const result = await this.indices.stats({}); return Object.keys(result.indices); } @@ -256,4 +236,31 @@ export class Client { data: body, }); } + + /** + * https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/7.x/bulk_examples.html + * @param params + * @returns + */ + bulk(params: { + index?: string; + method?: Method; + body?: any; + refresh?: boolean; + }): Promise { + const { index, method = "post", body, refresh } = params; // TODO use refesh + let path = ""; + if (index != null) { + path = "/" + encodeURIComponent(index) + "/" + encodeURIComponent(type) + + "/" + "_bulk"; + } else { + path = "/" + "_bulk"; + } + return ajax({ + url: path, + method, + data: serializer.ndserialize(body), + query: refresh !== undefined ? { refresh } : undefined, + }); + } } diff --git a/src/indices.ts b/src/indices.ts new file mode 100644 index 0000000..2a08dc7 --- /dev/null +++ b/src/indices.ts @@ -0,0 +1,45 @@ +import { StatInfo } from "./types.ts"; +import { ajax, Method } from "./utils/ajax.ts"; + +export class Indices { + // TODO use options + create(params: { + index: string; + body: any; + method?: Method; + }, options?: { + ignore: number[]; + }) { + const { index, method = "put", body } = params; + const path = "/" + encodeURIComponent(index); + return ajax({ + url: path, + method, + data: body, + }); + } + + stats(params: { + index?: string; + method?: Method; + metric?: string; + }): Promise { + const { index, method = "get", metric } = params; + let path = ""; + + if (index != null && metric != null) { + path = "/" + encodeURIComponent(index) + "/" + "_stats" + "/" + + encodeURIComponent(metric); + } else if (metric != null) { + path = "/" + "_stats" + "/" + encodeURIComponent(metric); + } else if (index != null) { + path = "/" + encodeURIComponent(index) + "/" + "_stats"; + } else { + path = "/" + "_stats"; + } // build request object + return ajax({ + url: path, + method, + }); + } +} diff --git a/src/types.ts b/src/types.ts index 22dbdf5..5ba944e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -88,3 +88,22 @@ export interface DeletedInfo extends CreatedInfo { export interface DeleteByQueryInfo extends ReIndexInfo { } + +export interface BulkInfo { + took: number; + errors: boolean; + items: { + index: { + result: string; + _shards: any; + _seq_no: number; + _index: string; + forced_refresh: boolean; + _type: string; + _id: string; + _version: number; + _primary_term: number; + status: number; + }; + }[]; +} diff --git a/src/utils/ajax.ts b/src/utils/ajax.ts index 4a794b0..4ce1410 100644 --- a/src/utils/ajax.ts +++ b/src/utils/ajax.ts @@ -95,6 +95,8 @@ export interface AjaxConfig extends AjaxExConfig { url: string; method: Method; data?: FormData | any; + + query?: any; } type RequestCallback = (config: AjaxConfig) => AjaxConfig; @@ -261,6 +263,7 @@ export class BaseAjax { url, baseURL, //接着的前缀url data, + query, headers = {}, method, credentials, @@ -276,6 +279,9 @@ export class BaseAjax { body = null; //get请求不能有body tempUrl = this.handleGetUrl(tempUrl, data, isEncodeUrl); } else { + if (query) { + tempUrl = this.handleGetUrl(tempUrl, query, isEncodeUrl); + } body = this.handlePostData(data, isFile); if (isFile) { if (!headers["Content-Type"]) { diff --git a/src/utils/errors.ts b/src/utils/errors.ts new file mode 100644 index 0000000..d841e23 --- /dev/null +++ b/src/utils/errors.ts @@ -0,0 +1,138 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export class ElasticsearchClientError extends Error { + meta: any; + data: any; + + constructor(message: string) { + super(message); + this.name = "ElasticsearchClientError"; + } +} + +export class TimeoutError extends ElasticsearchClientError { + constructor(message: string, meta: any) { + super(message); + Error.captureStackTrace(this, TimeoutError); + this.name = "TimeoutError"; + this.message = message || "Timeout Error"; + this.meta = meta; + } +} + +export class ConnectionError extends ElasticsearchClientError { + constructor(message: string, meta: any) { + super(message); + Error.captureStackTrace(this, ConnectionError); + this.name = "ConnectionError"; + this.message = message || "Connection Error"; + this.meta = meta; + } +} + +export class NoLivingConnectionsError extends ElasticsearchClientError { + constructor(message: string, meta: any) { + super(message); + Error.captureStackTrace(this, NoLivingConnectionsError); + this.name = "NoLivingConnectionsError"; + this.message = message || + "Given the configuration, the ConnectionPool was not able to find a usable Connection for this request."; + this.meta = meta; + } +} + +export class SerializationError extends ElasticsearchClientError { + constructor(message: string, data?: any) { + super(message); + Error.captureStackTrace(this, SerializationError); + this.name = "SerializationError"; + this.message = message || "Serialization Error"; + this.data = data; + } +} + +export class DeserializationError extends ElasticsearchClientError { + constructor(message: string, data: any) { + super(message); + Error.captureStackTrace(this, DeserializationError); + this.name = "DeserializationError"; + this.message = message || "Deserialization Error"; + this.data = data; + } +} + +export class ConfigurationError extends ElasticsearchClientError { + constructor(message: string) { + super(message); + Error.captureStackTrace(this, ConfigurationError); + this.name = "ConfigurationError"; + this.message = message || "Configuration Error"; + } +} + +export class ResponseError extends ElasticsearchClientError { + constructor(meta: any) { + super("Response Error"); + Error.captureStackTrace(this, ResponseError); + this.name = "ResponseError"; + if (meta.body && meta.body.error && meta.body.error.type) { + if (Array.isArray(meta.body.error.root_cause)) { + this.message = meta.body.error.type + ": "; + this.message += meta.body.error.root_cause.map((entry: any) => + `[${entry.type}] Reason: ${entry.reason}` + ).join("; "); + } else { + this.message = meta.body.error.type; + } + } else { + this.message = "Response Error"; + } + this.meta = meta; + } + + get body() { + return this.meta.body; + } + + get statusCode() { + if (this.meta.body && typeof this.meta.body.status === "number") { + return this.meta.body.status; + } + return this.meta.statusCode; + } + + get headers() { + return this.meta.headers; + } + + toString() { + return JSON.stringify(this.meta.body); + } +} + +export class RequestAbortedError extends ElasticsearchClientError { + constructor(message: string, meta: any) { + super(message); + Error.captureStackTrace(this, RequestAbortedError); + this.name = "RequestAbortedError"; + this.message = message || "Request aborted"; + this.meta = meta; + } +} diff --git a/src/utils/serializer.ts b/src/utils/serializer.ts new file mode 100644 index 0000000..8702e2f --- /dev/null +++ b/src/utils/serializer.ts @@ -0,0 +1,75 @@ +import { SerializationError } from "./errors.ts"; + +interface SerializerOptions { + disablePrototypePoisoningProtection: boolean | "proto" | "constructor"; +} + +const kJsonOptions = Symbol("secure json parse options"); + +export class Serializer { + constructor(opts: SerializerOptions) { + // const disable = opts.disablePrototypePoisoningProtection; + // this[kJsonOptions] = { + // protoAction: disable === true || disable === "proto" ? "ignore" : "error", + // constructorAction: disable === true || disable === "constructor" + // ? "ignore" + // : "error", + // }; + } + + serialize(object: any) { + let json; + try { + json = JSON.stringify(object); + } catch (err) { + throw new SerializationError(err.message, object); + } + return json; + } + + // deserialize(json) { + // let object; + // try { + // object = sjson.parse(json, this[kJsonOptions]); + // } catch (err) { + // throw new DeserializationError(err.message, json); + // } + // return object; + // } + + ndserialize(array: any) { + if (Array.isArray(array) === false) { + throw new SerializationError("The argument provided is not an array"); + } + let ndjson = ""; + for (let i = 0, len = array.length; i < len; i++) { + if (typeof array[i] === "string") { + ndjson += array[i] + "\n"; + } else { + ndjson += this.serialize(array[i]) + "\n"; + } + } + return ndjson; + } + + qserialize(object: any) { + if (object == null) return ""; + if (typeof object === "string") return object; + // arrays should be serialized as comma separated list + const keys = Object.keys(object); + for (let i = 0, len = keys.length; i < len; i++) { + const key = keys[i]; + // elasticsearch will complain for keys without a value + if (object[key] === undefined) { + delete object[key]; + } else if (Array.isArray(object[key]) === true) { + object[key] = object[key].join(","); + } + } + return JSON.stringify(object); + } +} + +export const serializer = new Serializer({ + disablePrototypePoisoningProtection: true, +});