-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathinsert.js
148 lines (142 loc) · 4 KB
/
insert.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/* eslint-disable no-restricted-syntax */
const co = require('co');
const cosql = require('co-mssql');
const logger = require('@elastic.io/component-logger')();
let pstmt;
const VARS_REGEXP = /@([\w_$][\d\w_$]*(:(string|boolean|date|number|bigint|float|real|money))?)/g;
/**
* This function will be called during component intialization
*
* @param cfg
* @returns {Promise}
*/
function init(cfg) {
const conString = `mssql://${
encodeURIComponent(cfg.username)
}:${
encodeURIComponent(cfg.password)
}@${
cfg.server
}${(cfg.port) ? `:${cfg.port}` : ''
}${(cfg.instance) ? `/${cfg.instance}` : ''
}/${
cfg.database
}${(cfg.domain) ? `?domain=${cfg.domain}&encrypt=${cfg.encrypt}`
: `?encrypt=${cfg.encrypt}`}`;
logger.debug('Connection string is created');
return co(function* gen() {
logger.info('Connecting to the database');
const connection = new cosql.Connection(conString);
// Always attach an error listener
connection.on('error', err => this.emit('error', err));
let sql = cfg.query;
yield connection.connect();
logger.info('Connection established');
logger.debug('Preparing query...');
const vars = sql.match(VARS_REGEXP);
logger.debug('Found prepared variable:type pairs');
pstmt = new cosql.PreparedStatement(connection);
for (const tuple of vars) {
const [placeholder, type] = tuple.split(':');
const name = placeholder.substr(1);
switch (type) {
case undefined:
case 'string':
pstmt.input(name, cosql.NVarChar);
break;
case 'number':
pstmt.input(name, cosql.Int);
break;
case 'float':
pstmt.input(name, cosql.Float);
break;
case 'real':
pstmt.input(name, cosql.Real);
break;
case 'boolean':
pstmt.input(name, cosql.Bit);
break;
case 'money':
pstmt.input(name, cosql.Money);
break;
case 'date':
pstmt.input(name, cosql.DateTime2);
break;
case 'bigint':
pstmt.input(name, cosql.BigInt);
break;
default:
logger.warn('WARNING: Can figure out the type key=%s type=%s', name, type);
}
// Now let's remove all :string :boolean :date etc to the name only
sql = sql.replace(tuple, placeholder);
}
logger.trace('Resulting SQL is ready');
yield pstmt.prepare(sql);
logger.info('Preparing statement created');
}.bind(this));
}
/**
* This function will be called to fetch metamodel for SQL query
*
* @param cfg
*/
function getMetaModel(cfg, cb) {
const sql = cfg.query;
const result = {
in: {
type: 'object',
properties: {},
},
out: {},
};
if (sql && sql.length > 0) {
const vars = sql.match(VARS_REGEXP);
const fields = result.in.properties;
for (const tuple of vars) {
const [key, type] = tuple.split(':');
let jsType = 'string';
switch (type) {
case 'date':
jsType = 'string';
break;
case 'bigint':
jsType = 'number';
break;
case 'real':
jsType = 'number';
break;
case 'float':
jsType = 'number';
break;
case 'money':
jsType = 'number';
break;
default:
jsType = 'string';
}
fields[key.substr(1)] = {
type: jsType,
};
}
}
cb(null, result);
}
/**
* This method will be called from elastic.io platform providing following data
*
* @param msg incoming message object that contains ``body`` with payload
* @param cfg configuration that is account information and configuration field values
*/
function processAction(msg) {
const that = this;
return co(function* gen() {
that.logger.info('Executing statement');
yield pstmt.execute(msg.body);
that.logger.info('Execution completed');
return msg;
});
}
module.exports.process = processAction;
module.exports.init = init;
module.exports.getMetaModel = getMetaModel;