Skip to content

Commit

Permalink
Initial implimentation
Browse files Browse the repository at this point in the history
  • Loading branch information
esatterwhite committed May 21, 2017
1 parent 7fe9fd1 commit 512e7a0
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 0 deletions.
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

module.exports = require('./lib/tcp')
70 changes: 70 additions & 0 deletions lib/tcp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict'
const Url = require('url')
, net = require('net')
, Pool = require('generic-pool')
, debug = require('debug')('skyring:transports:tcp')
, connections = new Map()
;

module.exports = function tcp( method, url, payload, id, cache ){
const pool = getPool(url)
pool.acquire().then((conn) => {
const out = typeof payload === 'object' ? JSON.stringify(payload) : payload
cache.remove(id)
conn.write(out + '\n', 'utf8',() => {
pool.release(conn)
})
})
.catch((e) => {
debug('error', e)
const err = new Error(`Unable to exeute tcp transport for timer ${id}`)
err.name = 'ETCPERR'
console.error(err)
})
}

function getPool(addr, opts) {
if (connections.has(addr)) return connections.get(addr)
const pool = Pool.createPool({
create: () => {
debug("creating")
const url = Url.parse(addr)
let conn = net.connect(url.port, url.hostname)
conn.setNoDelay(true)
conn.setKeepAlive(true)
conn.on('error', (err) => {
debug("destroying connection", err.message)
pool.destroy(conn).catch(()=>{})
connections.delete(addr)
})
return conn
}
, destroy: (client, cb) => {
client.on('end', () => {
debug('connection closed')
})
client.destroy()
}
, validate: (conn) => {
return Promise.resolve(conn.destroyed)
}
}, {
min: 1
, max: 100
, testOnBorrow: true
})
connections.set(addr, pool)
return pool
}

function onSignal() {
debug('open connections', connections.size)
for (var [key, value] of connections.entries()) {
debug('disconnecting %s', key)
value.drain().then(() => {
value.clear()
})
}
}
process.once('SIGINT', onSignal)
process.once('SIGTERM', onSignal)
35 changes: 35 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "skyring-tcp-transport",
"version": "1.0.0",
"description": "A TCP transport for skyring service",
"main": "index.js",
"scripts": {
"test": "tap --cov -Rtap test"
},
"keywords": [
"skyring",
"timers",
"distributed",
"tcp",
"webhook"
],
"author": "Eric Satterwhite <[email protected]>",
"license": "MIT",
"repository": {
"type":"git",
"url": "https://github.com/esatterwhite/skyring-tcp-transport"
},
"bugs": {
"email": "[email protected]",
"url":"https://github.com/esatterwhite/skyring-tcp-transport/issues"
},
"dependencies": {
"debug": "^2.6.8",
"generic-pool": "^3.1.7"
},
"devDependencies": {
"skyring": "^4.0.4",
"supertest": "^3.0.0",
"tap": "^10.3.2"
}
}
33 changes: 33 additions & 0 deletions test/transport.tcp.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const net = require('net')
, path = require('path')
, tap = require('tap')
, supertest = require('supertest')
, Skyring = require('skyring')
, test = tap.test
;

let hostname = null;

if(!process.env.TEST_HOST) {
hostname = os.hostname()
console.log(`env variable TEST_HOST not set. using ${hostname} as hostname`)
} else {
hostname = process.env.TEST_HOST;
}

test('transports:tcp', (t) => {
let request, server
t.test('set up skyring server', (tt) => {
server = new Skyring({
transports: [path.resolve('../')]
});
request = supertest('http://localhost:3333')
server.load().listen(3333, null, null, tt.end)
})

tt.

})

0 comments on commit 512e7a0

Please sign in to comment.