diff --git a/package.json b/package.json index 58e2808391..3956f2db4b 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,6 @@ }, "devDependencies": { "aegir": "^3.2.0", - "bl": "^1.1.2", "buffer-loader": "0.0.1", "chai": "^3.5.0", "gulp": "^3.9.1", @@ -54,15 +53,17 @@ }, "dependencies": { "babel-runtime": "^6.6.1", + "bl": "^1.1.2", "browserify-zlib": "github:ipfs/browserify-zlib", "duplexify": "^3.4.3", "ip-address": "^5.8.0", + "length-prefixed-stream": "^1.5.0", "lodash.contains": "^2.4.3", "multiaddr": "^2.0.0", "multistream-select": "^0.9.0", "peer-id": "^0.7.0", "peer-info": "^0.7.0", - "protocol-buffers-stream": "^1.3.1", + "protocol-buffers": "^3.1.6", "run-parallel": "^1.1.6" }, "contributors": [ @@ -73,4 +74,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/identify.js b/src/identify.js index 76adbef26c..f500038535 100644 --- a/src/identify.js +++ b/src/identify.js @@ -10,18 +10,20 @@ const multistream = require('multistream-select') const fs = require('fs') const path = require('path') -const Info = require('peer-info') -const Id = require('peer-id') +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') const multiaddr = require('multiaddr') +const bl = require('bl') -const identity = fs.readFileSync(path.join(__dirname, 'identify.proto')) - -const pbStream = require('protocol-buffers-stream')(identity) +const lpstream = require('length-prefixed-stream') +const protobuf = require('protocol-buffers') +const schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) +const idPb = protobuf(schema) exports = module.exports exports.multicodec = '/ipfs/id/1.0.0' -exports.exec = (rawConn, muxer, peerInfo, callback) => { +exports.exec = (rawConn, muxer, pInfo, callback) => { // 1. open a stream // 2. multistream into identify // 3. send what I see from this other peer (extract fro conn) @@ -36,78 +38,96 @@ exports.exec = (rawConn, muxer, peerInfo, callback) => { return callback(err) } - ms.select(exports.multicodec, (err, ds) => { + ms.select(exports.multicodec, (err, conn) => { if (err) { return callback(err) } - var pbs = pbStream() - - pbs.on('identify', (msg) => { - if (msg.observedAddr.length > 0) { - peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) - } - - const peerId = Id.createFromPubKey(msg.publicKey) - const otherPeerInfo = new Info(peerId) - msg.listenAddrs.forEach((ma) => { - otherPeerInfo.multiaddr.add(multiaddr(ma)) - }) - - callback(null, otherPeerInfo) - }) + const encode = lpstream.encode() + const decode = lpstream.decode() + + encode + .pipe(conn) + .pipe(decode) + .pipe(bl((err, data) => { + if (err) { + return callback(err) + } + const msg = idPb.Identify.decode(data) + if (msg.observedAddr.length > 0) { + pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) + } + + const pId = PeerId.createFromPubKey(msg.publicKey) + const otherPInfo = new PeerInfo(pId) + msg.listenAddrs.forEach((ma) => { + otherPInfo.multiaddr.add(multiaddr(ma)) + }) + callback(null, otherPInfo) + })) const obsMultiaddr = rawConn.getObservedAddrs()[0] let publicKey = new Buffer(0) - if (peerInfo.id.pubKey) { - publicKey = peerInfo.id.pubKey.bytes + if (pInfo.id.pubKey) { + publicKey = pInfo.id.pubKey.bytes } - pbs.identify({ + const msg = idPb.Identify.encode({ protocolVersion: 'na', agentVersion: 'na', publicKey: publicKey, - listenAddrs: peerInfo.multiaddrs.map((mh) => mh.buffer), + listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer), observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('') }) - pbs.pipe(ds).pipe(pbs) - pbs.finalize() + encode.write(msg) + encode.end() }) }) } -exports.handler = (peerInfo, swarm) => { +exports.handler = (pInfo, swarm) => { return (conn) => { // 1. receive incoming observed info about me // 2. update my own information (on peerInfo) // 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs() - var pbs = pbStream() - pbs.on('identify', (msg) => { - if (msg.observedAddr.length > 0) { - peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) - } - const peerId = Id.createFromPubKey(msg.publicKey) - const conn = swarm.muxedConns[peerId.toB58String()].conn - const obsMultiaddr = conn.getObservedAddrs()[0] + const encode = lpstream.encode() + const decode = lpstream.decode() - let publicKey = new Buffer(0) - if (peerInfo.id.pubKey) { - publicKey = peerInfo.id.pubKey.bytes - } + encode + .pipe(conn) + .pipe(decode) + .pipe(bl((err, data) => { + if (err) { + console.log(new Error('Failed to decode lpm from identify')) + return + } + const msg = idPb.Identify.decode(data) + if (msg.observedAddr.length > 0) { + pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr)) + } - pbs.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: publicKey, - listenAddrs: peerInfo.multiaddrs.map((ma) => ma.buffer), - observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('') - }) - pbs.finalize() - }) + const pId = PeerId.createFromPubKey(msg.publicKey) + const conn = swarm.muxedConns[pId.toB58String()].conn + const obsMultiaddr = conn.getObservedAddrs()[0] + + let publicKey = new Buffer(0) + if (pInfo.id.pubKey) { + publicKey = pInfo.id.pubKey.bytes + } + + const msgSend = idPb.Identify.encode({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: publicKey, + listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer), + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('') + }) - pbs.pipe(conn).pipe(pbs) + encode.write(msgSend) + encode.end() + })) } }