Skip to content

Commit

Permalink
Make transports a class.
Browse files Browse the repository at this point in the history
Need to be able to load and append customer transports via server
config.
  • Loading branch information
esatterwhite committed May 24, 2017
1 parent e43917e commit 9ce47fb
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 44 deletions.
5 changes: 0 additions & 5 deletions lib/server/api/validators/timer.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ module.exports = function(data = {}, cb) {
return setImmediate(cb, err);
}

if (typeOf(transports[data.callback.transport]) !== 'Function') {
const err = new Error(`unknown transport ${data.callback.transport}`);
err.statusCode = 400;
return setImmediate(cb, err);
}
if (typeOf(data.callback.uri) !== 'String') {
const err = new TypeError('callback.uri is required and must be a string');
err.statusCode = 400;
Expand Down
9 changes: 7 additions & 2 deletions lib/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var server = new Server({ node })
server.load().listen(5000)
*/
class Server extends http.Server {
constructor( opts={} ){
constructor(opts = {}){
super((req, res) => {
this._router.handle(req, res);
});
Expand All @@ -66,6 +66,7 @@ class Server extends http.Server {
seeds: null
, nats: null
, storage: null
, transports: []
}, opts);
this.loaded = false;
if( opts.node ){
Expand All @@ -81,7 +82,11 @@ class Server extends http.Server {
this._node = new Node();
}
this._group = this._node.name;
this._timers = new Timer({ nats: this.options.nats, storage: this.options.storage });
this._timers = new Timer({
nats: this.options.nats
, storage: this.options.storage
, transports: this.options.transports
});
this._router = new Router(this._node, this._timers);
this._node.on('ringchange', (evt) => {
this._timers.rebalance(evt, this._node, (data) => {
Expand Down
9 changes: 6 additions & 3 deletions lib/timer.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const os = require('os')
, crypto = require('crypto')
, path = require('path')
, levelup = require('levelup')
, transports = require('./transports')
, Transports = require('./transports')
, nats = require('./nats')
, json = require('./json')
, conf = require('../conf')
Expand Down Expand Up @@ -52,10 +52,13 @@ class Timer extends Map {
this.options = Object.assign({}, {
nats: null
, storage: null
, transports: []
}, options);
this._sid = null;
this._bail = false;
this.nats = nats.createClient( this.options.nats );
this.transports = new Transports(this.options.transports)
console.log(this.transports)
const store_opts = conf.get('storage');
const opts = Object.assign(store_opts, this.options.storage);
store(opts);
Expand Down Expand Up @@ -120,9 +123,9 @@ timers.create(id, options, (err) => {
**/
create(id, body, cb) {
const payload = body;
const transport = transports[payload.callback.transport];
const transport = this.transports[payload.callback.transport];
if ( this.has( id ) ) {
const err = new Error( 'Key exists' );
const err = new Error(`Timer with id ${id} already exists` );
err.code = 'EKEYEXISTS';
return setImmediate(cb, err);
}
Expand Down
83 changes: 49 additions & 34 deletions lib/transports/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,68 @@
* @author Eric Satterwhite
* @since 1.0.0
* @requires skyring/lib/transports/http
* @requires skyring/conf
*/

const debug = require('debug')('skyring:transports:tcp')
const conf = require('../../conf');
const callback = require('./callback');
const ENV = conf.get('NODE_ENV');
const transports = new Set(toArray(conf.get('with-transport')))

/**
* Primary http transport
* @memberof module:skyring/lib/transports
* @property {Object} http The default HTTP transport
**/
exports.http = require('./http');

debug('to load', transports)
for (const path of transports) {
const transport = require(path)
if (typeof transport !== 'function') {
throw new TypeError('A Transport must export a function')
}
const debug = require('debug')('skyring:transports')
, conf = require('../../conf')
, callback = require('./callback')
, http = require('./http')
, kLoad = Symbol('kLoad')
, ENV = conf.get('NODE_ENV')
, defaults = toArray(conf.get('transport'))
;

if(transport.length !== 5) {
throw new Error('Transports must accept five parameters')
module.exports = class Transports {
constructor(transports) {
/**
* Primary http transport
* @memberof module:skyring/lib/transports
* @property {Object} http The default HTTP transport
**/
this.http = http
if(ENV === 'development' || ENV === 'test') {
this.callback = callback;
}
this[kLoad](toArray(transports))
}

if(typeof transport.name !== 'string' && transport.name.length <= 0) {
throw new TypeError('transports.name is required and must be a string')
}
[kLoad](paths) {
const transports = new Set(defaults.concat(toArray(paths)))

for (const path of transports) {
const transport = typeof path === 'string' ? require(path) : path
if (typeof transport !== 'function') {
throw new TypeError('A Transport must export a function')
}

if (transport.length !== 5) {
throw new Error('Transports must accept five parameters')
}

debug('loading %s transport', transport.name)
Object.defineProperty(exports, transport.name, {
configrable: false
, enumerable: true
, get: () => {
return transport
if (typeof transport.name !== 'string' && transport.name.length <= 0) {
throw new TypeError('transports.name is required and must be a string')
}

if (hasOwn(exports, transport.name)) {
const error = new Error(`A transport with name ${transport.name} is already defined`)
error.name = 'EEXIST'
throw error
}

debug('loading %s transport', transport.name)
this[transport.name] = transport
}
})
}
}

if(ENV === 'development' || ENV === 'test') {
exports.callback = callback;
}

function toArray(item) {
if (!item) return []
if (Array.isArray(item)) return item
return typeof item === 'string' ? item.split(',') : [item]
}

function hasOwn(obj, prop) {
return Object.prototype.hasOwnProperty.call(obj, prop)
}

0 comments on commit 9ce47fb

Please sign in to comment.