This repository has been archived by the owner on Aug 23, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: split transport dialer and circuit logic
- Loading branch information
Showing
10 changed files
with
499 additions
and
406 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
'use strict' | ||
|
||
module.exports = { | ||
PRIOIRY: 100, | ||
DIALER: { | ||
ONION: 'onion', | ||
TELESCOPE: 'telescope' | ||
}, | ||
RESPONSE: { | ||
SUCCESS: 100, | ||
HOP: { | ||
SRC_ADDR_TOO_LONG: 220, | ||
DST_ADDR_TOO_LONG: 221, | ||
SRC_MULTIADDR_INVALID: 250, | ||
DST_MULTIADDR_INVALID: 251, | ||
NO_CONN_TO_DST: 260, | ||
CANT_DIAL_DST: 261, | ||
CANT_OPEN_DST_STREAM: 262, | ||
CANT_SPEAK_RELAY: 270, | ||
CANT_CONNECT_TO_SELF: 280 | ||
}, | ||
STOP: { | ||
SRC_ADDR_TOO_LONG: 320, | ||
DST_ADDR_TOO_LONG: 321, | ||
SRC_MULTIADDR_INVALID: 350, | ||
DST_MULTIADDR_INVALID: 351 | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,238 @@ | ||
'use strict' | ||
|
||
const pull = require('pull-stream') | ||
const handshake = require('pull-handshake') | ||
const Connection = require('interface-connection').Connection | ||
const mafmt = require('mafmt') | ||
const isFunction = require('lodash.isfunction') | ||
const multiaddr = require('multiaddr') | ||
const lp = require('pull-length-prefixed') | ||
const constants = require('./constants') | ||
const once = require('once') | ||
const utils = require('./utils') | ||
|
||
const debug = require('debug') | ||
const log = debug('libp2p:circuit:dialer') | ||
log.err = debug('libp2p:circuit:error:dialer') | ||
|
||
const multicodec = require('../multicodec') | ||
|
||
class Dialer { | ||
/** | ||
* Creates an instance of Dialer. | ||
* @param {Swarm} swarm - the swarm | ||
* @param {any} options - config options | ||
* | ||
* @memberOf Dialer | ||
*/ | ||
constructor (swarm, options) { | ||
this.swarm = swarm | ||
this.relayPeers = new Map() | ||
this.options = options | ||
// this.handler = handler // this handler is passed to the listener | ||
|
||
// get all the relay addresses for this swarm | ||
const relays = this.filter(swarm._peerInfo.multiaddrs.toArray()) | ||
|
||
// if no explicit relays, add a default relay addr | ||
if (relays.length === 0) { | ||
this.swarm | ||
._peerInfo | ||
.multiaddrs | ||
.add(`/p2p-circuit/ipfs/${this.swarm._peerInfo.id.toB58String()}`) | ||
} | ||
|
||
this.dialSwarmRelays(relays) | ||
|
||
this.swarm.on('peer-mux-established', this.dialRelay.bind(this)) | ||
this.swarm.on('peer-mux-closed', (peerInfo) => { | ||
this.relayPeers.delete(peerInfo.id.toB58String()) | ||
}) | ||
} | ||
|
||
/** | ||
* Dial the relays in the Addresses.Swarm config | ||
* | ||
* @param {Array} relays | ||
* @return {void} | ||
*/ | ||
dialSwarmRelays (relays) { | ||
// if we have relay addresses in swarm config, then dial those relays | ||
this.swarm.on('listening', () => { | ||
relays.forEach((relay) => { | ||
let relaySegments = relay | ||
.toString() | ||
.split('/p2p-circuit') | ||
.filter(segment => segment.length) | ||
|
||
relaySegments.forEach((relaySegment) => { | ||
this.dialRelay(utils.peerInfoFromMa(relaySegment)) | ||
}) | ||
}) | ||
}) | ||
} | ||
|
||
/** | ||
* Dial a peer over a relay | ||
* | ||
* @param {multiaddr} ma - the multiaddr of the peer to dial | ||
* @param {Object} options - dial options | ||
* @param {Function} cb - a callback called once dialed | ||
* @returns {Connection} - the connection | ||
* | ||
* @memberOf Dialer | ||
*/ | ||
dial (ma, options, cb) { | ||
throw new Error('abstract class, method not implemented') | ||
} | ||
|
||
/** | ||
* Dial the destination peer over a relay | ||
* | ||
* @param {multiaddr} dstMa | ||
* @param {Connection} relay | ||
* @param {Function} cb | ||
* @return {Function|void} | ||
* @private | ||
*/ | ||
dialPeer (dstMa, relay, cb) { | ||
if (isFunction(relay)) { | ||
cb = relay | ||
relay = null | ||
} | ||
|
||
if (!cb) { | ||
cb = () => {} | ||
} | ||
|
||
dstMa = multiaddr(dstMa) | ||
// if no relay provided, dial on all available relays until one succeeds | ||
if (!relay) { | ||
const relays = Array.from(this.relayPeers.values()) | ||
let next = (nextRelay) => { | ||
if (!nextRelay) { | ||
let err = `no relay peers were found` | ||
log.err(err) | ||
return cb(err) | ||
} | ||
|
||
this.dialPeer(dstMa, nextRelay, (err, conn) => { | ||
if (err) { | ||
log.err(err) | ||
return next(relays.shift()) | ||
} | ||
cb(null, new Connection(conn)) | ||
}) | ||
} | ||
next(relays.shift()) | ||
} else { | ||
this.negotiateRelay(relay, dstMa, (err, conn) => { | ||
if (err) { | ||
log.err(`An error has occurred negotiating the relay connection`, err) | ||
return cb(err) | ||
} | ||
|
||
return cb(null, conn) | ||
}) | ||
} | ||
} | ||
|
||
/** | ||
* Negotiate the relay connection | ||
* | ||
* @param {Connection} conn - a connection to the relay | ||
* @param {multiaddr} dstMa - the multiaddr of the peer to relay the connection for | ||
* @param {Function} cb - a callback with that return the negotiated relay connection | ||
* @returns {void} | ||
* | ||
* @memberOf Dialer | ||
*/ | ||
negotiateRelay (conn, dstMa, cb) { | ||
dstMa = multiaddr(dstMa) | ||
|
||
let stream = handshake({timeout: 1000 * 60}, cb) | ||
let shake = stream.handshake | ||
|
||
log(`negotiating relay for peer ${dstMa.getPeerId()}`) | ||
const values = [new Buffer(dstMa.toString())] | ||
|
||
pull( | ||
pull.values(values), | ||
lp.encode(), | ||
pull.collect((err, encoded) => { | ||
if (err) { | ||
return cb(err) | ||
} | ||
|
||
shake.write(encoded[0]) | ||
shake.read(3, (err, data) => { | ||
if (err) { | ||
log.err(err) | ||
return cb(err) | ||
} | ||
|
||
if (Number(data.toString()) !== constants.RESPONSE.SUCCESS) { | ||
cb(new Error(`Got ${data.toString()} error code trying to dial over relay`)) | ||
} | ||
|
||
cb(null, shake.rest()) | ||
}) | ||
}) | ||
) | ||
|
||
pull(stream, conn, stream) | ||
} | ||
|
||
/** | ||
* Dial a relay peer by its PeerInfo | ||
* | ||
* @param {PeerInfo} peer - the PeerInfo of the relay peer | ||
* @param {Function} cb - a callback with the connection to the relay peer | ||
* @returns {Function|void} | ||
* | ||
* @memberOf Dialer | ||
*/ | ||
dialRelay (peer, cb) { | ||
cb = once(cb || (() => {})) | ||
|
||
const b58Id = utils.getB58String(peer) | ||
const relay = this.relayPeers.get(b58Id) | ||
if (relay) { | ||
cb(null, relay) | ||
} | ||
|
||
const relayConn = new Connection() | ||
relayConn.setPeerInfo(peer) | ||
// attempt to dia the relay so that we have a connection | ||
this.swarm.dial(peer, multicodec.hop, once((err, conn) => { | ||
if (err) { | ||
log.err(err) | ||
return cb(err) | ||
} | ||
|
||
relayConn.setInnerConn(conn) | ||
this.relayPeers.set(b58Id, relayConn) | ||
cb(null, relayConn) | ||
})) | ||
} | ||
|
||
/** | ||
* Filter check for all multiaddresses | ||
* that this transport can dial on | ||
* | ||
* @param {any} multiaddrs | ||
* @returns {Array<multiaddr>} | ||
* | ||
* @memberOf Dialer | ||
*/ | ||
filter (multiaddrs) { | ||
if (!Array.isArray(multiaddrs)) { | ||
multiaddrs = [multiaddrs] | ||
} | ||
return multiaddrs.filter((ma) => { | ||
return mafmt.Circuit.matches(ma) | ||
}) | ||
} | ||
} | ||
|
||
module.exports = Dialer |
Oops, something went wrong.