Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Add connection type param to hangUp() #298

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions src/connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,23 @@ const BaseConnection = require('./base')
const observeConnection = require('../observe-connection')
const Errors = require('../errors')

/**
* ConnectionType indicates whether the connection is incoming or outgoing
* @readonly
* @enum {string}
*/
const ConnectionType = {
Incoming: 'Incoming',
Outgoing: 'Outgoing'
}

/**
* @typedef {Object} ConnectionOptions
* @property {Switch} _switch Our switch instance
* @property {PeerInfo} peerInfo The PeerInfo of the peer to dial
* @property {Muxer} muxer Optional - A muxed connection
* @property {Connection} conn Optional - The base connection
* @property {string} type Optional - identify the connection as incoming or outgoing. Defaults to out.
* @property {ConnectionType} type Optional - identify the connection as Incoming or Outgoing. Defaults to Outgoing.
*/

/**
Expand All @@ -30,7 +40,7 @@ class ConnectionFSM extends BaseConnection {
* @param {ConnectionOptions} param0
* @constructor
*/
constructor ({ _switch, peerInfo, muxer, conn, type = 'out' }) {
constructor ({ _switch, peerInfo, muxer, conn, type = ConnectionType.Outgoing }) {
super({
_switch,
name: `${type}:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
Expand All @@ -42,6 +52,8 @@ class ConnectionFSM extends BaseConnection {
this.conn = conn // The base connection
this.muxer = muxer // The upgraded/muxed connection

this._type = type

let startState = 'DISCONNECTED'
if (this.muxer) {
startState = 'MUXED'
Expand Down Expand Up @@ -93,8 +105,7 @@ class ConnectionFSM extends BaseConnection {
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED',
disconnect: 'DISCONNECTING'
done: 'DISCONNECTED'
},
ABORTED: { }, // A severe event occurred
ERRORED: { // An error occurred, but future dials may be allowed
Expand Down Expand Up @@ -276,12 +287,12 @@ class ConnectionFSM extends BaseConnection {
if (this.conn) {
this.conn.source(true, () => {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
this.switch.emit('peer-mux-closed', this.theirPeerInfo, { type: this._type })
delete this.conn
})
} else {
this._state('done')
this.switch.emit('peer-mux-closed', this.theirPeerInfo)
this.switch.emit('peer-mux-closed', this.theirPeerInfo, { type: this._type })
}
}

Expand Down Expand Up @@ -375,7 +386,7 @@ class ConnectionFSM extends BaseConnection {
this.switch.protocolMuxer(null)(conn)
})

this.switch.emit('peer-mux-established', this.theirPeerInfo)
this.switch.emit('peer-mux-established', this.theirPeerInfo, { type: this._type })

this._didUpgrade(null)
})
Expand Down Expand Up @@ -447,8 +458,14 @@ class ConnectionFSM extends BaseConnection {
this.emit('error', Errors.INVALID_STATE_TRANSITION(err))
this.log(err)
}

get type () {
return this._type
}
}

ConnectionFSM.ConnectionType = ConnectionType

module.exports = withIs(ConnectionFSM, {
className: 'ConnectionFSM',
symbolName: 'libp2p-switch/ConnectionFSM'
Expand Down
12 changes: 6 additions & 6 deletions src/connection/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ class ConnectionManager {
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @param {ConnectionType} type Optional Incoming / Outgoing
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
getOne (peerId, type = null) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
return this.connections[peerId].find(conn => !type || conn.type === type)
}
return null
}
Expand Down Expand Up @@ -171,18 +172,17 @@ class ConnectionManager {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()

const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn,
conn: conn,
type: 'inc'
type: ConnectionFSM.ConnectionType.Incoming
})
this.switch.connection.add(connection)

if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// with incoming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
Expand All @@ -198,7 +198,7 @@ class ConnectionManager {
connection.close()
})

this.switch.emit('peer-mux-established', peerInfo)
this.switch.emit('peer-mux-established', peerInfo, { type: ConnectionFSM.ConnectionType.Incoming })
})
})
}
Expand Down
3 changes: 1 addition & 2 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ function dial (_switch, returnFSM) {
const b58Id = peerInfo.id.toB58String()

log(`dialing to ${b58Id.slice(0, 8)} with protocol ${protocol || 'unknown'}`)

let connection = _switch.connection.getOne(b58Id)
let connection = _switch.connection.getOne(b58Id, ConnectionFSM.ConnectionType.Outgoing)

if (!ConnectionFSM.isConnectionFSM(connection)) {
connection = new ConnectionFSM({
Expand Down
18 changes: 15 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,29 @@ class Switch extends EventEmitter {
}

/**
* If a muxed Connection exists for the given peer, it will be closed
* and its reference on the Switch will be removed.
* If a muxed Connection exists for the given peer (optionally of the given
* connection type: Incoming / Outgoing), it will be closed and its reference
* on the Switch will be removed.
*
* @param {PeerInfo|Multiaddr|PeerId} peer
* @param {ConnectionType} connectionType Optional
* @param {function()} callback
* @returns {void}
*/
hangUp (peer, callback) {
hangUp (peer, connectionType, callback) {
if (typeof connectionType === 'function') {
callback = connectionType
connectionType = null
}

const peerInfo = getPeerInfo(peer, this._peerBook)
const key = peerInfo.id.toB58String()
const conns = [...this.connection.getAllById(key)]
each(conns, (conn, cb) => {
if (connectionType && conn.type !== connectionType) {
return setImmediate(cb)
}

conn.once('close', cb)
conn.close()
}, callback)
Expand Down Expand Up @@ -262,3 +273,4 @@ class Switch extends EventEmitter {

module.exports = Switch
module.exports.errors = Errors
module.exports.ConnectionType = require('./connection').ConnectionType
8 changes: 4 additions & 4 deletions src/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ module.exports = (swtch) => {
outgoing: observe('out')
})

swtch.on('peer-mux-established', (peerInfo) => {
observer.emit('peer:connected', peerInfo.id.toB58String())
swtch.on('peer-mux-established', (peerInfo, meta) => {
observer.emit('peer:connected', peerInfo.id.toB58String(), meta)
})

swtch.on('peer-mux-closed', (peerInfo) => {
observer.emit('peer:closed', peerInfo.id.toB58String())
swtch.on('peer-mux-closed', (peerInfo, meta) => {
observer.emit('peer:closed', peerInfo.id.toB58String(), meta)
})

return observer
Expand Down
114 changes: 113 additions & 1 deletion test/dial-fsm.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,112 @@ describe('dialFSM', () => {
})
})

it('parallel dials to one another can disconnect only outgoing on hangup', function (done) {
this.timeout(10e3)

switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })

// 2 close checks (outgoing) and 1 hangup check
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')

expect(switchA.connection.getAll()).to.have.length(1)
expect(switchB.connection.getAll()).to.have.length(1)

// Hangup remaining connections
switchA.hangUp(switchB._peerInfo, done)
})

switchA.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Outgoing)
})
switchB.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Incoming)
})

// Dial from A -> B and wait for connections to be established in both
// directions
utils.awaitEvents([
[switchA, 'peer-mux-established', 2],
[switchB, 'peer-mux-established', 2]
], () => {
// Both should have 1 incoming / 1 outgoing connection
expect(switchA.connection.getAll()).to.have.length(2)
expect(switchB.connection.getAll()).to.have.length(2)

// Hangup Outbound only and verify the connections are closed
switchA.hangUp(switchB._peerInfo, Switch.ConnectionType.Outgoing, (err) => {
expect(err).to.not.exist().mark()
})
})
const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0')

// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
conn._state.on('DIALING:enter', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => {
cb()
})
})
})

it('parallel dials to one another can disconnect only incoming on hangup', function (done) {
this.timeout(10e3)

switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })

// 2 close checks (incoming) and 1 hangup check
expect(3).checks(() => {
switchA.removeAllListeners('peer-mux-closed')
switchB.removeAllListeners('peer-mux-closed')

expect(switchA.connection.getAll()).to.have.length(1)
expect(switchB.connection.getAll()).to.have.length(1)

// Hangup remaining connections
switchA.hangUp(switchB._peerInfo, done)
})

switchA.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchB._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Incoming)
})
switchB.on('peer-mux-closed', (peerInfo, meta) => {
expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark()
expect(meta.type).to.eql(Switch.ConnectionType.Outgoing)
})

// Dial from A -> B and wait for connections to be established in both
// directions
utils.awaitEvents([
[switchA, 'peer-mux-established', 2],
[switchB, 'peer-mux-established', 2]
], () => {
// Both should have 1 incoming / 1 outgoing connection
expect(switchA.connection.getAll()).to.have.length(2)
expect(switchB.connection.getAll()).to.have.length(2)

// Hangup Inbound only and verify the connections are closed
switchA.hangUp(switchB._peerInfo, Switch.ConnectionType.Incoming, (err) => {
expect(err).to.not.exist().mark()
})
})
const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0')

// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
conn._state.on('DIALING:enter', (cb) => {
switchB.dialFSM(switchA._peerInfo, '/parallel/1.0.0', () => {
cb()
})
})
})

it('parallel dials to one another should disconnect on stop', (done) => {
switchA.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
switchB.handle('/parallel/1.0.0', (_, conn) => { pull(conn, conn) })
Expand All @@ -187,12 +293,18 @@ describe('dialFSM', () => {
expect(peerInfo.id.toB58String()).to.eql(switchA._peerInfo.id.toB58String()).mark()
})

const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0', () => {
// Dial from A -> B and wait for connections to be established in both
// directions
utils.awaitEvents([
[switchA, 'peer-mux-established', 2],
[switchB, 'peer-mux-established', 2]
], () => {
// Hangup and verify the connections are closed
switchA.stop((err) => {
expect(err).to.not.exist().mark()
})
})
const conn = switchA.dialFSM(switchB._peerInfo, '/parallel/1.0.0')

// Hold the dial from A, until switch B is done dialing to ensure
// we have both incoming and outgoing connections
Expand Down
Loading