Skip to content

Commit

Permalink
skyring, server: sets default behavior to not auto rebalance
Browse files Browse the repository at this point in the history
The previous behavior would send every server in the cluster into a
rebalance every time a server was added or removed from the cluster.
This can lead to some undesireable bahavior.

Adds a new option `autorebalance` that can be enabled if that behavior
is desired. By default a server will rebalance when it leaves the
cluster.

Additionally, the linux signal SIGUSR2 can be sent to the parent process
to trigger a specific node to rebalance

Semver: major
  • Loading branch information
Eric Satterwhite committed Apr 26, 2020
1 parent 3271ad7 commit 51f8490
Show file tree
Hide file tree
Showing 18 changed files with 1,002 additions and 338 deletions.
3 changes: 2 additions & 1 deletion packages/skyring/conf/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ module.exports = conf.defaults({
backend: "memdown"
, path: null
}
, autobalance: false
, channel: {
host: "127.0.0.1"
, port: 3455
}
, PORT: 3000
, port: 3000
, nats: {
hosts: "127.0.0.1:4222"
}
Expand Down
4 changes: 2 additions & 2 deletions packages/skyring/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ if( require.main === module ){

const server = new Server();

server.listen(conf.get('PORT'), (err) => {
server.listen(conf.get('port'), (err) => {
if(err) {
process.exitCode = 1;
console.error(err);
throw err;
}
debug('server listening', conf.get('PORT'));
debug('server listening', conf.get('port'));
});

function onSignal() {
Expand Down
2 changes: 1 addition & 1 deletion packages/skyring/lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Object.defineProperty(exports, 'client', {
function createClient(options) {
const hosts = (options && options.hosts) || nats_hosts;
const servers = Array.isArray(hosts) ? hosts : parse(hosts);
const opts = Object.assign({}, options, {servers});
const opts = Object.assign({json: true}, options, {servers});
debug('creating nats client', opts);
const client = nats.connect(opts);
client.on('error', (err) => {
Expand Down
36 changes: 19 additions & 17 deletions packages/skyring/lib/server/api/middleware/proxy.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
/*jshint laxcomma: true, smarttabs: true, esnext: true, node: true*/

/**
* Middleware that determines if the current server should handle the request, and will proxy
* it to the appropriate node if it isn't
Expand All @@ -13,10 +13,9 @@
*/

const uuid = require('uuid')
, body = require('body')
, debug = require('debug')('skyring:proxy')
, json = require('../../../json')
;
const body = require('body')
const debug = require('debug')('skyring:proxy')
const json = require('../../../json')

/**
* @function
Expand All @@ -27,21 +26,24 @@ const uuid = require('uuid')
* @param {Function} next
**/
module.exports = function proxy(req, res, node, cb) {
const timer_id = req.$.headers['x-timer-id'] || uuid.v4();
req.headers['x-timer-id'] = timer_id;
res.setHeader('location', `/timer/${timer_id}`);
const do_handle = node.handleOrProxy(timer_id, req, res);
if (!do_handle) return debug('proxing', timer_id);
const timer_id = req.$.headers['x-timer-id'] || uuid.v4()
req.headers['x-timer-id'] = timer_id
res.setHeader('location', `/timer/${timer_id}`)
const do_handle = node.handleOrProxy(timer_id, req, res)
if (!do_handle) return debug('proxing', timer_id)

debug('handle request')
body(req, res, (err, data) => {
if (err) return cb(err);
if (err) return cb(err)

const {error, value} = json.parse(data);
const {error, value} = json.parse(data)

if (error) return cb(error);
if (error) {
error.statusCode = 400
return cb(error)
}

req.$.body = value;
cb();
});
};
req.$.body = value
cb()
})
}
2 changes: 1 addition & 1 deletion packages/skyring/lib/server/api/middleware/validate.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
const validate = require('../validators/timer');

module.exports = function validatePayload( req, res, node, next ) {
validate(req.$.body || undefined, next);
validate(req.$.body, next);
};
64 changes: 32 additions & 32 deletions packages/skyring/lib/server/api/validators/timer.js
Original file line number Diff line number Diff line change
@@ -1,61 +1,61 @@
'use strict';
'use strict'

const type_exp = /^\[object (.*)\]$/;
const transports = require('../../../transports');
const type_exp = /^\[object (.*)\]$/
const transports = require('../../../transports')

function typeOf(obj) {
if (obj === null) return 'Null';
if (obj === undefined) return 'Undefined';
return type_exp.exec( Object.prototype.toString.call(obj) )[1];
if (obj === null) return 'Null'
if (obj === undefined) return 'Undefined'
return type_exp.exec(Object.prototype.toString.call(obj))[1]
}

// 2^32 - 1
const MAX_TIMEOUT_VALUE = 2147483647;
const MAX_TIMEOUT_VALUE = 2147483647

module.exports = function(data = {}, cb) {
if (isNaN(data.timeout) || data.timeout < 1) {
const err = new TypeError('timeout is required and must be a positive number');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError('timeout is required and must be a positive number')
err.statusCode = 400
return setImmediate(cb, err)
}

if (data.timeout > MAX_TIMEOUT_VALUE) {
const err = new TypeError(`timeout must be less than or equal to 2147483647 milliseconds`);
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError(`timeout must be less than or equal to 2147483647 milliseconds`)
err.statusCode = 400
return setImmediate(cb, err)
}

if (data.data) {
const type = typeOf(data.data);
const type = typeOf(data.data)
if (data.data != null) {
if (type !== 'String' && type !== 'Object') {
const err = new TypeError('data is required and must be a string or object');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError(`data is required and must be a string or object. Got ${type}`)
err.statusCode = 400
return setImmediate(cb, err)
}
}

if (typeOf(data.callback) !== 'Object') {
const err = new TypeError('callback is required and must be an object');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError('callback is required and must be an object')
err.statusCode = 400
return setImmediate(cb, err)
}

if (typeOf(data.callback.transport) !== 'String') {
const err = new TypeError('callback.transport is required and must be a string');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError(`callback.transport is required and must be a string. Got ${type}`)
err.statusCode = 400
return setImmediate(cb, err)
}

if (typeOf(data.callback.uri) !== 'String') {
const err = new TypeError('callback.uri is required and must be a string');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError(`callback.uri is required and must be a string. Got ${type}`)
err.statusCode = 400
return setImmediate(cb, err)
}

if (typeOf(data.callback.method) !== 'String') {
const err = new TypeError('callback.method is required and must be a string');
err.statusCode = 400;
return setImmediate(cb, err);
const err = new TypeError(`callback.method is required and must be a string. Got ${type}`)
err.statusCode = 400
return setImmediate(cb, err)
}
setImmediate(cb, null, data);
};
setImmediate(cb, null, data)
}
Loading

0 comments on commit 51f8490

Please sign in to comment.