-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
115 lines (112 loc) · 3.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
const fp = require("fastify-plugin");
const globby = require("globby");
const protoLoader = require("@grpc/proto-loader");
const grpc = require("grpc");
const { resolve } = require("path");
const _ = require("lodash");
const Reach = require("dlv");
const fs = require("fs-extra");
const opt = {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
};
module.exports = fp(async (fastify, config = {}, next) => {
const { pbPathsRegExp, services, pemPath, options } = config;
if (pbPathsRegExp && services && pemPath && options) {
const pbPaths = await globby.sync(pbPathsRegExp);
let protos = {};
pbPaths.forEach(path => {
const pbRoot = resolve(path, "../../");
const packageDefinition = protoLoader.loadSync(
path,
Object.assign({}, opt, { includeDirs: [pbRoot] })
);
const packageObject = grpc.loadPackageDefinition(packageDefinition);
protos = _.defaultsDeep(protos, packageObject);
});
fastify.decorate("protos", protos);
let _grpc = {};
for (let key in services) {
const [url, serviceName, version] = services[key].split(":");
_grpc[key] = proxyHandler(
fastify,
`${serviceName}:${version}`,
Reach(protos, url),
config
);
}
fastify.decorate("grpc", _grpc);
} else {
console.log(
"缺少grpc的必要配置参数(pbPathsRegExp, services, pemPath, options),跳过grpc插件"
);
}
next();
});
function proxyHandler(fastify, eurekaServiceName, PB, grpcConfig) {
let cache = {};
return new Proxy(
{},
{
get: function(target, key, receiver) {
const { eureka } = fastify;
return function(params, metaData = {}) {
// todo 清除过期的client
const host = _.sample(eureka[eurekaServiceName]);
let client;
if (cache[host]) {
console.info("client from cache !");
client = cache[host];
} else {
const pemPath = grpcConfig.pemPath;
const sslCreds = grpc.credentials.createSsl(
fs.readFileSync(pemPath)
);
const mcreds = grpc.credentials.createFromMetadataGenerator(
function(_, callback) {
const metadata = new grpc.Metadata();
metadata.set("plugin_key", "plugin_value");
callback(null, metadata);
}
);
const combinedCreds = grpc.credentials.combineChannelCredentials(
sslCreds,
mcreds
);
client = cache[host] = new PB(
host,
combinedCreds,
grpcConfig.options
);
}
let retry = 0;
const maxRetry = grpcConfig.retry || 5; // 最大重试次数, 默认5次
let customMetadata = new grpc.Metadata();
Object.keys(metaData).forEach(k =>
customMetadata.set(k, metaData[k])
);
return new Promise(function invoke(resolve, reject) {
client[key](params, customMetadata, function(err, response) {
if (err) {
console.warn(`methods:${key}出现错误,错误原因:${err}`);
if (++retry > maxRetry) {
console.error(`重试超过${maxRetry}次,放弃!`);
// 重试超过规定次数
reject(err);
} else {
console.warn(`准备重试-->第${retry}次`);
invoke(resolve, reject);
}
} else {
resolve(response);
}
});
});
};
}
}
);
}