Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Make switch a state machine #278

Merged
merged 27 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c47ab13
feat: add basic state machine functionality to switch
jacobheun Sep 5, 2018
a97c386
fix: linting
jacobheun Sep 5, 2018
b4d1602
refactor: move connection.js to connection-manager.js
jacobheun Sep 5, 2018
41e4a88
feat: add outgoing connection state machine
jacobheun Sep 11, 2018
5848e6d
feat: functioning incoming connection fsm
jacobheun Sep 25, 2018
45ae65b
fix: linting
jacobheun Sep 25, 2018
78203fd
fix: stats
jacobheun Sep 26, 2018
0b9f5f1
docs: remove notes
jacobheun Sep 26, 2018
ef92ee2
test: bump circuit shutdown timeout
jacobheun Sep 26, 2018
4b62917
fix: node 8 support
jacobheun Sep 26, 2018
7ab2151
feat: add class-is support for connections
jacobheun Sep 26, 2018
674d55c
refactor: clean up some logic and make inc muxed conns FSMs
jacobheun Sep 27, 2018
f4a1806
fix: cleanup todos, logic and event handlers
jacobheun Sep 27, 2018
dd12ad1
refactor: clean up logs
jacobheun Oct 2, 2018
56ea400
feat: add dialFSM to the switch
jacobheun Oct 3, 2018
2e1f7b5
refactor: rename test file
jacobheun Oct 3, 2018
75f1370
feat: add better support for closing connections
jacobheun Oct 3, 2018
0ee8157
test: add tests for some uncovered lines
jacobheun Oct 3, 2018
d921c2d
refactor: do some cleanup
jacobheun Oct 4, 2018
6351365
feat: add additional fsm user support
jacobheun Oct 5, 2018
3d469d0
feat: add warning emitter for muxer upgrade failed
jacobheun Oct 5, 2018
662a58f
refactor: cleanup and add some tests
jacobheun Oct 5, 2018
730c2b6
test: add test for failed muxer upgrade
jacobheun Oct 5, 2018
6dd8f39
test: add more error state tests for connectionfsm
jacobheun Oct 5, 2018
a2e995e
docs: update readme
jacobheun Oct 17, 2018
ef85443
docs: fix readme link
jacobheun Oct 17, 2018
8ff0375
docs: clean up readme and jsdocs
jacobheun Oct 19, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ libp2p-switch is used by [libp2p](https://github.com/libp2p/js-libp2p) but it ca
- [Usage](#usage)
- [Create a libp2p switch](#create-a-libp2p-switch)
- [API](#api)
- [`switch.dial(peer, protocol, callback)`](#swarmdialpi-protocol-callback)
- [`switch.hangUp(peer, callback)`](#swarmhanguppi-callback)
- [`switch.handle(protocol, handler)`](#swarmhandleprotocol-handler)
- [`switch.unhandle(protocol)`](#swarmunhandleprotocol)
- [`switch.start(callback)`](#swarmlistencallback)
- [`switch.stop(callback)`](#swarmclosecallback)
- [`switch.connection`](#connection)
- [`switch.dial(peer, protocol, callback)`](#switchdialpeer-protocol-callback)
- [`switch.dialFSM(peer, protocol, callback)`](#switchdialfsmpeer-protocol-callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better just mimic js-libp2p and do dial and dialProtocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@diasdavid are you suggesting to add dialProtocol in addition to the new dialFSM or as a naming replacement? I think having dial handle calls with and without a protocol is reasonable overloading. Replacing it wouldn't be good as it has a different callback footprint to give users access to the ConnectionFSM for finer control/handling.

I've exposed the same dialFSM method in the libp2p branch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to block this PR for this. What I mean is that given that libp2p (the module) is just an extension to the libp2p-switch (the dialing machine) that adds it other features, it would be cool if they both expose the same dialing/hangup APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Ideally I'd like to deprecate dialProtocol in libp2p and just use dial. Having both feel unnecessary. I'll propose the change in an issue there and if dialProtocol ends up staying we can add that in here.

- [`switch.hangUp(peer, callback)`](#switchhanguppeer-callback)
- [`switch.handle(protocol, handler)`](#switchhandleprotocol-handler)
- [`switch.unhandle(protocol)`](#switchunhandleprotocol)
- [`switch.start(callback)`](#switchstartcallback)
- [`switch.stop(callback)`](#switchstopcallback)
- [`switch.connection`](#switchconnection)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please alphabetize the API? I believe this helps when looking for documentation.

- [`switch.stats`](#stats-api)
- [Internal Transports API](#transports)
- [Design Notes](#designnotes)
- [Internal Transports API](#internal-transports-api)
- [Design Notes](#design-notes)
- [Multitransport](#multitransport)
- [Connection upgrades](#connection-upgrades)
- [Identify](#identify)
Expand Down Expand Up @@ -94,6 +95,25 @@ dial uses the best transport (whatever works first, in the future we can have so
- `protocol`
- `callback`

### `switch.dialFSM(peer, protocol, callback)`

works like dial, but calls back with a [Connection State Machine](#connection-state-machine)

- `peer`: can be an instance of [PeerInfo][], [PeerId][] or [multiaddr][]
- `protocol`: String that defines the protocol (e.g '/ipfs/bitswap/1.1.0') to be used
- `callback`: Function with signature `function (err, connFSM) {}` where `connFSM` is a [Connection State Machine](#connection-state-machine)

#### Connection State Machine
Connection state machines emit a number of events that can be used to determine the current state of the connection
and to received the underlying connection that can be used to transfer data.

##### Events
- `error`: emitted whenever a fatal error occurs with the connection; the error will be emitted.
- `error:upgrade_failed`: emitted whenever the connection fails to upgrade with a muxer, this is not fatal.
- `error:connection_attempt_failed`: emitted whenever a dial attempt fails for a given transport. An array of errors is emitted.
- `connection`: emitted whenever a useable connection has been established; the underlying [Connection](https://github.com/libp2p/interface-connection) will be emitted.
- `close`: emitted when the connection has closed.

### `switch.hangUp(peer, callback)`

Hang up the muxed connection we have with the peer.
Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"devDependencies": {
"aegir": "^15.1.0",
"chai": "^4.1.2",
"chai-checkmark": "^1.0.1",
"dirty-chai": "^2.0.1",
"libp2p-mplex": "~0.8.2",
"libp2p-pnet": "~0.1.0",
Expand All @@ -54,7 +55,10 @@
"dependencies": {
"async": "^2.6.1",
"big.js": "^5.1.2",
"class-is": "^1.1.0",
"debug": "^3.1.0",
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"hashlru": "^2.2.1",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
Expand Down
103 changes: 103 additions & 0 deletions src/connection/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const withIs = require('class-is')

class BaseConnection extends EventEmitter {
constructor ({ _switch, name }) {
super()

this.switch = _switch
this.ourPeerInfo = this.switch._peerInfo
this.log = debug(`libp2p:conn:${name}`)
}

/**
* Gets the current state of the connection
*
* @returns {string} The current state of the connection
*/
getState () {
return this._state._state
}

/**
* Puts the state into encrypting mode
*
* @returns {void}
*/
encrypt () {
this._state('encrypt')
}

/**
* Puts the state into privatizing mode
*
* @returns {void}
*/
protect () {
this._state('privatize')
}

/**
* Puts the state into muxing mode
*
* @returns {void}
*/
upgrade () {
this._state('upgrade')
}

/**
* Event handler for disconnected.
*
* @fires BaseConnection#close
* @returns {void}
*/
_onDisconnected () {
this.log(`disconnected from ${this.theirB58Id}`)
this.emit('close')
this.removeAllListeners()
}

/**
* Event handler for privatized
*
* @fires BaseConnection#private
* @returns {void}
*/
_onPrivatized () {
this.log(`successfully privatized incoming connection`)
this.emit('private', this.conn)
}

/**
* Wraps this.conn with the Switch.protector for private connections
*
* @private
* @fires ConnectionFSM#error
* @returns {void}
*/
_onPrivatizing () {
if (!this.switch.protector) {
return this._state('done')
}

this.conn = this.switch.protector.protect(this.conn, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}

this.log(`successfully privatized conn to ${this.theirB58Id}`)
this.conn.setPeerInfo(this.theirPeerInfo)
this._state('done')
})
}
}

module.exports = withIs(BaseConnection, {
className: 'BaseConnection',
symbolName: 'libp2p-switch/BaseConnection'
})
46 changes: 46 additions & 0 deletions src/connection/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const debug = require('debug')
const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')

function listener (_switch) {
const log = debug(`libp2p:switch:listener`)

/**
* Takes a transport key and returns a connection handler function
*
* @param {string} transportKey The key of the transport to handle connections for
* @param {function} handler A custom handler to use
* @returns {function(Connection)} A connection handler function
*/
return (transportKey, handler) => {
/**
* Takes a base connection and manages listening behavior
*
* @param {Connection} conn The connection to manage
* @returns {void}
*/
return (conn) => {
// Add a transport level observer, if needed
const connection = transportKey ? observeConn(transportKey, null, conn, _switch.observer) : conn

log('received incoming connection')
const connFSM = new IncomingConnection({ connection, _switch, transportKey })

connFSM.once('error', (err) => log(err))
connFSM.once('private', (_conn) => {
// Use the custom handler, if it was provided
if (handler) {
return handler(_conn)
}
connFSM.encrypt()
})
connFSM.once('encrypted', () => connFSM.upgrade())

connFSM.protect()
}
}
}

module.exports = listener
115 changes: 115 additions & 0 deletions src/connection/incoming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
'use strict'

const FSM = require('fsm-event')
const multistream = require('multistream-select')
const withIs = require('class-is')

const BaseConnection = require('./base')

class IncomingConnectionFSM extends BaseConnection {
constructor ({ connection, _switch, transportKey }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
this.msListener = new multistream.Listener()

this._state = FSM('DIALED', {
DISCONNECTED: { },
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
ENCRYPTING: { // Encrypting the base connection
done: 'ENCRYPTED',
disconnect: 'DISCONNECTING'
},
ENCRYPTED: { // Upgrading could not happen, the connection is encrypted and waiting
upgrade: 'UPGRADING',
disconnect: 'DISCONNECTING'
},
UPGRADING: { // Attempting to upgrade the connection with muxers
done: 'MUXED'
},
MUXED: {
disconnect: 'DISCONNECTING'
},
DISCONNECTING: { // Shutting down the connection
done: 'DISCONNECTED'
}
})

this._state.on('PRIVATIZING', () => this._onPrivatizing())
this._state.on('PRIVATIZED', () => this._onPrivatized())
this._state.on('ENCRYPTING', () => this._onEncrypting())
this._state.on('ENCRYPTED', () => {
this.log(`successfully encrypted connection to ${this.theirB58Id || 'unknown peer'}`)
this.emit('encrypted', this.conn)
})
this._state.on('UPGRADING', () => this._onUpgrading())
this._state.on('MUXED', () => {
this.log(`successfully muxed connection to ${this.theirB58Id || 'unknown peer'}`)
this.emit('muxed', this.conn)
})
this._state.on('DISCONNECTING', () => {
if (this.theirPeerInfo) {
this.theirPeerInfo.disconnect()
}
this._state('done')
})
}

/**
* Attempts to encrypt `this.conn` with the Switch's crypto.
*
* @private
* @fires IncomingConnectionFSM#error
* @returns {void}
*/
_onEncrypting () {
this.log(`encrypting connection via ${this.switch.crypto.tag}`)

this.msListener.addHandler(this.switch.crypto.tag, (protocol, _conn) => {
this.conn = this.switch.crypto.encrypt(this.ourPeerInfo.id, _conn, undefined, (err) => {
if (err) {
this.emit('error', err)
return this._state('disconnect')
}
this.conn.getPeerInfo((_, peerInfo) => {
this.theirPeerInfo = peerInfo
this._state('done')
})
})
}, null)

// Start handling the connection
this.msListener.handle(this.conn, (err) => {
if (err) {
this.emit('crypto handshaking failed', err)
}
})
}

_onUpgrading () {
this.log('adding the protocol muxer to the connection')
this.protocolMuxer(this.conn, this.msListener)
this._state('done')
}
}

module.exports = withIs(IncomingConnectionFSM, {
className: 'IncomingConnectionFSM',
symbolName: 'libp2p-switch/IncomingConnectionFSM'
})
Loading