Skip to content

Commit

Permalink
switch to lpm stream to match go
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Jun 4, 2016
1 parent ce86b7b commit cb82275
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 54 deletions.
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
},
"devDependencies": {
"aegir": "^3.2.0",
"bl": "^1.1.2",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"gulp": "^3.9.1",
Expand All @@ -54,15 +53,17 @@
},
"dependencies": {
"babel-runtime": "^6.6.1",
"bl": "^1.1.2",
"browserify-zlib": "github:ipfs/browserify-zlib",
"duplexify": "^3.4.3",
"ip-address": "^5.8.0",
"length-prefixed-stream": "^1.5.0",
"lodash.contains": "^2.4.3",
"multiaddr": "^2.0.0",
"multistream-select": "^0.9.0",
"peer-id": "^0.7.0",
"peer-info": "^0.7.0",
"protocol-buffers-stream": "^1.3.1",
"protocol-buffers": "^3.1.6",
"run-parallel": "^1.1.6"
},
"contributors": [
Expand All @@ -73,4 +74,4 @@
"Richard Littauer <[email protected]>",
"dignifiedquire <[email protected]>"
]
}
}
122 changes: 71 additions & 51 deletions src/identify.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@
const multistream = require('multistream-select')
const fs = require('fs')
const path = require('path')
const Info = require('peer-info')
const Id = require('peer-id')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const bl = require('bl')

const identity = fs.readFileSync(path.join(__dirname, 'identify.proto'))

const pbStream = require('protocol-buffers-stream')(identity)
const lpstream = require('length-prefixed-stream')
const protobuf = require('protocol-buffers')
const schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
const idPb = protobuf(schema)

exports = module.exports
exports.multicodec = '/ipfs/id/1.0.0'

exports.exec = (rawConn, muxer, peerInfo, callback) => {
exports.exec = (rawConn, muxer, pInfo, callback) => {
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer (extract fro conn)
Expand All @@ -36,78 +38,96 @@ exports.exec = (rawConn, muxer, peerInfo, callback) => {
return callback(err)
}

ms.select(exports.multicodec, (err, ds) => {
ms.select(exports.multicodec, (err, conn) => {
if (err) {
return callback(err)
}

var pbs = pbStream()

pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

const peerId = Id.createFromPubKey(msg.publicKey)
const otherPeerInfo = new Info(peerId)
msg.listenAddrs.forEach((ma) => {
otherPeerInfo.multiaddr.add(multiaddr(ma))
})

callback(null, otherPeerInfo)
})
const encode = lpstream.encode()
const decode = lpstream.decode()

encode
.pipe(conn)
.pipe(decode)
.pipe(bl((err, data) => {
if (err) {
return callback(err)
}
const msg = idPb.Identify.decode(data)
if (msg.observedAddr.length > 0) {
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

const pId = PeerId.createFromPubKey(msg.publicKey)
const otherPInfo = new PeerInfo(pId)
msg.listenAddrs.forEach((ma) => {
otherPInfo.multiaddr.add(multiaddr(ma))
})
callback(null, otherPInfo)
}))

const obsMultiaddr = rawConn.getObservedAddrs()[0]

let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}

pbs.identify({
const msg = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((mh) => mh.buffer),
listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})

pbs.pipe(ds).pipe(pbs)
pbs.finalize()
encode.write(msg)
encode.end()
})
})
}

exports.handler = (peerInfo, swarm) => {
exports.handler = (pInfo, swarm) => {
return (conn) => {
// 1. receive incoming observed info about me
// 2. update my own information (on peerInfo)
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

const peerId = Id.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[peerId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
const encode = lpstream.encode()
const decode = lpstream.decode()

let publicKey = new Buffer(0)
if (peerInfo.id.pubKey) {
publicKey = peerInfo.id.pubKey.bytes
}
encode
.pipe(conn)
.pipe(decode)
.pipe(bl((err, data) => {
if (err) {
console.log(new Error('Failed to decode lpm from identify'))
return
}
const msg = idPb.Identify.decode(data)
if (msg.observedAddr.length > 0) {
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
}

pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: peerInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.finalize()
})
const pId = PeerId.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[pId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]

let publicKey = new Buffer(0)
if (pInfo.id.pubKey) {
publicKey = pInfo.id.pubKey.bytes
}

const msgSend = idPb.Identify.encode({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: publicKey,
listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})

pbs.pipe(conn).pipe(pbs)
encode.write(msgSend)
encode.end()
}))
}
}

0 comments on commit cb82275

Please sign in to comment.