Skip to content

Commit

Permalink
Server instance uses the internal name of it's ring node for nats groups
Browse files Browse the repository at this point in the history
Segregate the nats queue group by related node names
  • Loading branch information
esatterwhite committed Dec 29, 2016
1 parent 9b4dd90 commit 5d65f00
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
21 changes: 12 additions & 9 deletions lib/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ const http = require('http')
, mock = require('./mock')
, Node = require('./node')
, Router = exports.Router = require('./router')
, timer = require('../timer')
, Timer = require('../timer')
, debug = Debug('skyring:server')
;

/**
* Description
* @constructor
* @extends http.Server
* @alias module:skyring/lib/server
Expand All @@ -40,6 +39,7 @@ class Server extends http.Server {
this.closed = false;
this.options = Object.assign({}, {
seeds: null
, nats: null
}, opts)
this.loaded = false;
if( opts.node ){
Expand All @@ -54,10 +54,11 @@ class Server extends http.Server {
} else {
this._node = new Node()
}

this._router = new Router(this._node);
this._group = this._node.name
this._timers = new Timer(this.options.nats);
this._router = new Router(this._node, this._timers);
this._node.on('ringchange', (evt) => {
timer.rebalance(evt, this._node, (data) => {
this._timers.rebalance(evt, this._node, (data) => {
this.proxy(data);
});
});
Expand All @@ -70,7 +71,6 @@ class Server extends http.Server {
 **/
load() {
if( this.loaded ) return this;

const routes = require('./api')
Object.keys(routes)
.forEach((name) => {
Expand Down Expand Up @@ -99,11 +99,14 @@ class Server extends http.Server {
listen(port, host, backlog, callback) {
debug('seed nodes', this.options.seeds);
this._node.join(this.options.seeds, (err) => {
if (err) return callback(err)
if (err) {
console.error(err)
return callback && callback(err)
}
this._node.handle(( req, res ) => {
this._router.handle( req, res );
})
timer.watch('skyring', (err, data) => {
this._timers.watch(`skyring:${this._group}`, (err, data) => {
this.proxy(data)
});
super.listen(port, host, backlog, callback)
Expand Down Expand Up @@ -135,7 +138,7 @@ class Server extends http.Server {
if(this.closed) return cb && setImmediate(cb);
super.close(()=>{
this._node.close(() => {
timer.shutdown(() => {
this._timers.shutdown(() => {
debug('closing server')
this.closed = true
cb && cb()
Expand Down
7 changes: 7 additions & 0 deletions lib/server/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ if (!handle) return;
this._ring.destroy();
})
}


}

Object.defineProperty(Node.prototype, 'name', {
get: function() {
return this._app;
}
})
module.exports = Node;
5 changes: 4 additions & 1 deletion lib/server/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ const Route = require('./route')
* @param {module:skyring/lib/server/node} node The node linked to the application hashring
* @example var x = new Router(node)
*/
function Router( node, opts ) {
function Router( node, timers ) {
this.routes = new Map();
this.route_options = new Map();
this.node = node;
this.timers = timers
};

/**
Expand Down Expand Up @@ -125,6 +126,8 @@ http.createServer((req, res) => {
Router.prototype.handle = function handle( req, res ) {
req.$ = new Request( req );
res.$ = new Response( res );
req.$.timers = this.timers;

const path = req.$.path;
const method = req.method.toUpperCase();
const map = this.routes.get( method );
Expand Down

0 comments on commit 5d65f00

Please sign in to comment.