Skip to content

Commit

Permalink
package: initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
esatterwhite committed Jun 4, 2017
1 parent 6ee8341 commit 3fc93d6
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 0 deletions.
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

module.exports = require('./lib/zmq')
97 changes: 97 additions & 0 deletions lib/zmq.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'
const Zmq = require('zmq')
const debug = require('debug')('skyring:transports:zmq')
const connections = new Map()
const METHODS = new Set(['push', 'pub'])
const ZMQ_DEBUG = !!process.env.ZMQ_DEBUG
const ZMQ_BIND = !!process.env.ZMQ_BIND
const MONITOR_EVENTS = new Set([
'connect'
, 'connect_delay'
, 'connect_retry'
, 'listen'
, 'bind_error'
, 'accept'
, 'accept_error'
, 'close'
, 'close_error'
, 'disconnect'
])

module.exports = function zmq(method, url, payload, id, storage) {
const conn = getConnection(url, method)
if(!conn) {
const err = new Error(`unable to create connection for ${method} - ${url}`)
err.code = 'ENOZMQCONN'
throw err
}

debug('execut zmq timer', 'timeout', payload)
conn
.send('timeout', Zmq.ZMQ_SNDMORE)
.send(payload)
storage.remove(id)
}

module.exports.shutdown = shutdown

function getConnection(addr, type) {
if (connections.has(addr)) return connections.get(addr)
if (!METHODS.has(type)) {
console.error('zmq error: unsupported transport method %s', type)
return null
}

debug(`creating ${type} socket to ${addr}`)
const socket = Zmq.socket(type)
if (ZMQ_BIND) {
debug('binding to %s', addr)
const err = tryBind(socket, addr)
if (err) throw err
} else {
debug('connecting to %s', addr)
socket.connect(addr)
}
if (ZMQ_DEBUG) startMonitor(socket)
socket.on('error', (err) => {
console.error('zmq error: destroying socket %s', addr, err.message)
socket.removeAllListeners()
socket.disconnect(addr)
socket.close()
connections.delete(addr)
})
connections.set(addr, socket)
return socket
}

function startMonitor(socket) {
for (const evt of MONITOR_EVENTS) {
debug(`adding monitor event ${evt} for socket`)
socket.on(evt, (fd, addr) => {
debug(`socket ${evt}: ${addr}`)
})
}
}

function shutdown(cb = () =>{}) {
for (const [addr, socket] of connections.entries()) {
debug('shutdown - %s', addr)
socket.removeAllListeners()
socket.disconnect(addr)
socket.close()
connections.delete(addr)
}
cb()
}

function tryBind(socket, addr){
try{
socket.bindSync(addr)
} catch(e) {
e.meta = {
address: addr
, type: type
}
return e
}
}
20 changes: 20 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "skyring-zmq-transport",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "ZMQ_DEBUG=1 tap --cov -Rtap test"
},
"author": "",
"license": "ISC",
"dependencies": {
"debug": "^2.6.8",
"zmq": "^2.15.3"
},
"devDependencies": {
"skyring": "^4.2.0",
"supertest": "^3.0.0",
"tap": "^10.3.3"
}
}
153 changes: 153 additions & 0 deletions test/transport.zmq.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
'use strict'

const zmq = require('zmq')
, os = require('os')
, path = require('path')
, tap = require('tap')
, supertest = require('supertest')
, Skyring = require('skyring')
, test = tap.test
, ZMQ_BIND = !!process.env.ZMQ_BIND
;

let hostname = null;

if(!process.env.TEST_HOST) {
hostname = os.hostname()
console.log(`env variable TEST_HOST not set. using ${hostname} as hostname`)
} else {
hostname = process.env.TEST_HOST;
}

test('zmq:push', (t) => {
let request, server, handler
t.on('end', () => {
handler.removeAllListeners()
handler.disconnect('tcp://0.0.0.0:5555')
handler.close()
server.close()
})

t.test('set up skyring server', (tt) => {
server = new Skyring({
transports: [path.resolve(__dirname, '../')]
, seeds: [`${hostname}:3455`]
});
request = supertest('http://localhost:3333')
server.load().listen(3333, null, null, tt.end)
})

t.test('success - should deliver payload', (tt) => {
tt.plan(3)
handler = zmq.socket('pull')
if (ZMQ_BIND) {
handler.connect('tcp://0.0.0.0:5555')
} else {
handler.bindSync('tcp://0.0.0.0:5555')
}
handler.on('message', (evt, data) => {
const payload = JSON.parse(data)
tt.match(payload, {
text: 'hello world'
, status: 200
})
tt.pass('timeout executed')
})

request
.post('/timer')
.send({
timeout: 500
, data: JSON.stringify({
text: 'hello world'
, status: 200
})
, callback: {
uri: `tcp://0.0.0.0:5555`
, method: 'push'
, transport: 'zmq'
}
})
.expect(201)
.end((err, res) => {
tt.error(err)
})
})

t.end()
})

test('error case', (t) => {
t.throws(() => {
const transport = require('../lib/zmq')
transport('fake', 'zmq://0.0.0.0:3333', '{}', '1', null)
}, /unable to create connection for fake/)
t.end()
})

test('zmq:pub', (t) => {
let request, server, handler
const req = supertest('http://localhost:3333')
function doRequest(t) {
req
.post('/timer')
.send({
timeout: 500
, data: 'fake'
, callback: {
transport: 'zmq'
, uri: `tcp://0.0.0.0:5555`
, method: 'pub'
}
})
.expect(201)
.end((err, res) => {
t.error(err)
})
}
t.on('end', (done) => {
handler.removeAllListeners()
handler.disconnect('tcp://0.0.0.0:5555')
handler.close()
server.close()
})
t.test('set up skyring server', (tt) => {
server = new Skyring({
transports: [require(path.resolve(__dirname, '../'))]
, seeds: [`${hostname}:3455`]
});
request = supertest('http://localhost:3333')
server.load().listen(3333, null, null, tt.end)
})

t.test('start saturate pool - no connection', (tt) => {
tt.plan(151)
for (var x = 0; x < 150; x++ ) {
doRequest(tt)
}
setTimeout(() => {
tt.pass('saturated')
}, 2000)
})
t.test('success - should deliver payload', (tt) => {
tt.plan(150)
handler = zmq.socket('sub')
handler.subscribe('timeout')
if (ZMQ_BIND) {
handler.connect('tcp://0.0.0.0:5555')
} else {
handler.bindSync('tcp://0.0.0.0:5555')
}
handler.on('message', (evt, data) => {
tt.match(data, /fake/)
})
for (var x = 0; x < 150; x++ ) {
doRequest({error: () => {}})
}
})
t.end()
})




0 comments on commit 3fc93d6

Please sign in to comment.