Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

callbacks -> async / await #44

Merged
merged 7 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ The primary goal of this module is to enable developers to pick and swap their t

Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite.

The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to uniform several transports through a shimmed interface.
The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to provide a uniform API for several transports through a shimmed interface.

The API is presented with both Node.js and Go primitives, however, there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.
The API is presented with both Node.js and Go primitives, however there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.

## Lead Maintainer

Expand Down Expand Up @@ -48,16 +48,32 @@ const YourTransport = require('../src')

describe('compliance', () => {
tests({
setup (cb) {
let t = new YourTransport()
setup () {
let transport = new YourTransport()

const addrs = [
multiaddr('valid-multiaddr-for-your-transport'),
multiaddr('valid-multiaddr2-for-your-transport')
]
cb(null, t, addrs)

const network = require('my-network-lib')
const connect = network.connect
const connector = {
delay (delayMs) {
// Add a delay in the connection mechanism for the transport
// (this is used by the dial tests)
network.connect = (...args) => setTimeout(() => connect(...args), 100)
},
restore () {
// Restore the connection mechanism to normal
network.connect = connect
}
}

return { transport, addrs, connector }
},
teardown (cb) {
cb()
teardown () {
// Clean up any resources created by setup()
}
})
})
Expand All @@ -69,88 +85,107 @@ describe('compliance', () => {

# API

A valid (read: that follows the interface defined) transport, must implement the following API.
A valid transport (one that follows the interface defined) must implement the following API:

**Table of contents:**

- type: `Transport`
- `new Transport([options])`
- `transport.dial(multiaddr, [options, callback])`
- `<Promise> transport.dial(multiaddr, [options])`
- `transport.createListener([options], handlerFunction)`
- type: `transport.Listener`
- event: 'listening'
- event: 'close'
- event: 'connection'
- event: 'error'
- `listener.listen(multiaddr, [callback])`
- `listener.getAddrs(callback)`
- `listener.close([options])`
- `<Promise> listener.listen(multiaddr)`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this should be a follow up PR instead, but since we're doing a breaking change it might be worth making listen support an array of multiaddrs now https://github.com/libp2p/interface-transport/issues/41.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense... should listen require an array, or should it also support single multiaddr?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense for it to just support an array. In the majority of cases the switch is going to be handling the .listen call, so most users shouldn't notice the change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can leverage this breaking change and change the multiaddr param to an array. Should we do this in a separate PR, but only release a new version of interface-transport once that is done?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dirkmc I'm holding on the release until we've added the listening array support. Are you working on that? If not I can start it, just avoiding the potential duplicate effort :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacobheun please go ahead - I have some other things going on at the moment so only able to snatch a bit of time here and there to work

- `listener.getAddrs()`
- `<Promise> listener.close([options])`

### Creating a transport instance

- `JavaScript` - `var transport = new Transport([options])`

Creates a new Transport instance. `options` is a optional JavaScript object, might include the necessary parameters for the transport instance.
Creates a new Transport instance. `options` is an optional JavaScript object that should include the necessary parameters for the transport instance.

**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. One example is a libp2p-webrtc-star (or pretty much any other WebRTC flavour transport), where that, in order to dial, a peer needs to be part of some signalling network that is shared also with the listener.
**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener.

### Dial to another peer

- `JavaScript` - `var conn = transport.dial(multiaddr, [options, callback])`
- `JavaScript` - `const conn = await transport.dial(multiaddr, [options])`

This method dials a transport to the Peer listening on `multiaddr`.
This method uses a transport to dial a Peer listening on `multiaddr`.

`multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr).

`stream` must implements the [interface-connection](https://github.com/libp2p/interface-connection) interface.
`[options]` the options that may be passed to the dial. Must support the `signal` option (see below)

`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface.

The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`.

`[options]` is an optional argument, which can be used by some implementations
### Canceling a dial

`callback` should follow the `function (err)` signature.
Dials may be cancelled using an `AbortController`:

`err` is an `Error` instance to signal that the dial was unsuccessful, this error can be a 'timeout' or simply 'error'.
```Javascript
const AbortController = require('abort-controller')
const { AbortError } = require('interface-transport')
const controller = new AbortController()
try {
const conn = await mytransport.dial(ma, { signal: controller.signal })
// Do stuff with conn here ...
} catch (err) {
if(err.code === AbortError.code) {
// Dial was aborted, just bail out
return
}
throw err
}

// ----
// In some other part of the code:
controller.abort()
// ----
```

### Create a listener

- `JavaScript` - `var listener = transport.createListener([options], handlerFunction)`
- `JavaScript` - `const listener = transport.createListener([options], handlerFunction)`

This method creates a listener on the transport.

`options` is an optional object that contains the properties the listener must have, in order to properly listen on a given transport/socket.

`handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection).

The listener object created, can emit the following events:
The listener object created may emit the following events:

- `listening` -
- `close` -
- `connection` -
- `error` -
- `listening` - when the listener is ready for incoming connections
- `close` - when the listener is closed
- `connection` - (`conn`) each time an incoming connection is received
- `error` - (`err`) each time there is an error on the connection

### Start a listener

- `JavaScript` - `listener.listen(multiaddr, [callback])`
- `JavaScript` - `await listener.listen(multiaddr)`

This method puts the listener in `listening` mode, waiting for incoming connections.

`multiaddr` is the address where the listener should bind to.

`callback` is a function called once the listener is ready.
`multiaddr` is the address that the listener should bind to.

### Get listener addrs

- `JavaScript` - `listener.getAddrs(callback)`
- `JavaScript` - `listener.getAddrs()`

This method retrieves the addresses in which this listener is listening. Useful for when listening on port 0 or any interface (0.0.0.0).
This method returns the addresses on which this listener is listening. Useful when listening on port 0 or any interface (0.0.0.0).

### Stop a listener

- `JavaScript` - `listener.close([options, callback])`

This method closes the listener so that no more connections can be open on this transport instance.
- `JavaScript` - `await listener.close([options])`

`options` is an optional object that might contain the following properties:
This method closes the listener so that no more connections can be opened on this transport instance.

- `timeout` - A timeout value (in ms) that fires and destroys all the connections on this transport if the transport is not able to close graciously. (e.g { timeout: 1000 })
`options` is an optional object that may contain the following properties:

`callback` is function that gets called when the listener is closed. It is optional.
- `timeout` - A timeout value (in ms) after which all connections on this transport will be destroyed if the transport is not able to close gracefully. (e.g { timeout: 1000 })
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
"dirty-chai": "^2.0.1"
},
"dependencies": {
"abort-controller": "^3.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"chai": "^4.2.0",
"interface-connection": "~0.3.3",
"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^5.0.2",
"pull-goodbye": "~0.0.2",
"pull-serializer": "~0.3.2",
"pull-stream": "^3.6.9"
"pull-stream": "^3.6.9",
"streaming-iterables": "^4.0.2"
},
"contributors": [
"David Dias <[email protected]>",
Expand Down
80 changes: 80 additions & 0 deletions src/adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict'

const { Connection } = require('interface-connection')
const toPull = require('async-iterator-to-pull-stream')
const error = require('pull-stream/sources/error')
const drain = require('pull-stream/sinks/drain')
const noop = () => {}

function callbackify (fn) {
return async function (...args) {
let cb = args.pop()
if (typeof cb !== 'function') {
args.push(cb)
cb = noop
}
let res
try {
res = await fn(...args)
} catch (err) {
return cb(err)
}
cb(null, res)
}
}

// Legacy adapter to old transport & connection interface
class Adapter {
constructor (transport) {
this.transport = transport
}

dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}

callback = callback || noop

const conn = new Connection()

this.transport.dial(ma, options)
.then(socket => {
conn.setInnerConn(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
conn.close = callbackify(socket.close.bind(socket))
callback(null, conn)
})
.catch(err => {
conn.setInnerConn({ sink: drain(), source: error(err) })
callback(err)
})

return conn
}

createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}

const server = this.transport.createListener(options, socket => {
const conn = new Connection(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
handler(conn)
})

const proxy = {
listen: callbackify(server.listen.bind(server)),
close: callbackify(server.close.bind(server)),
getAddrs: callbackify(server.getAddrs.bind(server)),
getObservedAddrs: callbackify(() => server.getObservedAddrs())
}

return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] })
}
}

module.exports = Adapter
Loading