Skip to content

Commit

Permalink
chore: fix up floating promises
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Dec 13, 2021
1 parent f950067 commit 5e38d87
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 37 deletions.
15 changes: 7 additions & 8 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const log = Object.assign(debug('libp2p:auto-relay'), {
const { fromString: uint8ArrayFromString } = require('uint8arrays/from-string')
const { toString: uint8ArrayToString } = require('uint8arrays/to-string')
const { Multiaddr } = require('multiaddr')
const PeerId = require('peer-id')

const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')
Expand All @@ -23,6 +22,7 @@ const {
/**
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('../peer-store/types').Address} Address
* @typedef {import('peer-id')} PeerId
*/

/**
Expand Down Expand Up @@ -91,7 +91,7 @@ class AutoRelay {

// If no protocol, check if we were keeping the peer before as a listenRelay
if (!hasProtocol && this._listenRelays.has(id)) {
this._removeListenRelay(id)
await this._removeListenRelay(id)
return
} else if (!hasProtocol || this._listenRelays.has(id)) {
return
Expand Down Expand Up @@ -125,7 +125,6 @@ class AutoRelay {
* Peer disconnects.
*
* @param {Connection} connection - connection to the peer
* @returns {void}
*/
_onPeerDisconnected (connection) {
const peerId = connection.remotePeer
Expand All @@ -136,7 +135,9 @@ class AutoRelay {
return
}

this._removeListenRelay(id)
this._removeListenRelay(id).catch(err => {
log.error(err)
})
}

/**
Expand Down Expand Up @@ -180,12 +181,11 @@ class AutoRelay {
*
* @private
* @param {string} id - peer identifier string.
* @returns {void}
*/
_removeListenRelay (id) {
async _removeListenRelay (id) {
if (this._listenRelays.delete(id)) {
// TODO: this should be responsibility of the connMgr
this._listenOnAvailableHopRelays([id])
await this._listenOnAvailableHopRelays([id])
}
}

Expand All @@ -197,7 +197,6 @@ class AutoRelay {
* 3. Search the network.
*
* @param {string[]} [peersToIgnore]
* @returns {Promise<void>}
*/
async _listenOnAvailableHopRelays (peersToIgnore = []) {
// TODO: The peer redial issue on disconnect should be handled by connection gating
Expand Down
4 changes: 2 additions & 2 deletions src/connection-manager/auto-dialler.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ class AutoDialler {
/**
* Starts the auto dialer
*/
start () {
async start () {
if (!this._options.enabled) {
log('not enabled')
return
}

this._running = true
this._autoDial()
await this._autoDial()
log('started')
}

Expand Down
27 changes: 15 additions & 12 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,19 +183,22 @@ class ConnectionManager extends EventEmitter {
*
* @private
*/
_checkMetrics () {
async _checkMetrics () {
if (this._libp2p.metrics) {
const movingAverages = this._libp2p.metrics.global.movingAverages
// @ts-ignore moving averages object types
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkMaxLimit('maxReceivedData', received)
// @ts-ignore moving averages object types
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
this._checkMaxLimit('maxSentData', sent)
const total = received + sent
this._checkMaxLimit('maxData', total)
log('metrics update', total)
this._timer = retimer(this._checkMetrics, this._options.pollInterval)
try {
const movingAverages = this._libp2p.metrics.global.movingAverages
// @ts-ignore moving averages object types
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
await this._checkMaxLimit('maxReceivedData', received)
// @ts-ignore moving averages object types
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
await this._checkMaxLimit('maxSentData', sent)
const total = received + sent
await this._checkMaxLimit('maxData', total)
log('metrics update', total)
} finally {
this._timer = retimer(this._checkMetrics, this._options.pollInterval)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class IdentifyService {
}
}

this.push(connections)
await this.push(connections)
}

/**
Expand Down
8 changes: 5 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ class Libp2p extends EventEmitter {

this.relay && this.relay.stop()
this.peerRouting.stop()
this._autodialler.stop()
await this._autodialler.stop()
await (this._dht && this._dht.stop())

for (const service of this._discovery.values()) {
Expand Down Expand Up @@ -643,7 +643,9 @@ class Libp2p extends EventEmitter {

this.peerStore.on('peer', peerId => {
this.emit('peer:discovery', peerId)
this._maybeConnect(peerId)
this._maybeConnect(peerId).catch(err => {
log.error(err)
})
})

// Once we start, emit any peers we may have already discovered
Expand All @@ -653,7 +655,7 @@ class Libp2p extends EventEmitter {
}

this.connectionManager.start()
this._autodialler.start()
await this._autodialler.start()

// Peer discovery
await this._setupPeerDiscovery()
Expand Down
2 changes: 1 addition & 1 deletion src/peer-store/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const Store = require('./store')
* @typedef {import('peer-id')} PeerId
*/

const log = Object.assign(debug('libp2p:peer-store'), {
const log = Object.assign(debug('libp2p:peer-store'), {
error: debug('libp2p:peer-store:err')
})

Expand Down
1 change: 0 additions & 1 deletion src/peer-store/key-book.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class PeerStoreKeyBook {

log(`stored provided public key for ${peerId.toB58String()}`)
}

} finally {
log('set release write lock')
release()
Expand Down
4 changes: 2 additions & 2 deletions src/peer-store/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class PersistentStore {
/**
* @param {Peer} peer
*/
async save (peer) {
async save (peer) {
const buf = PeerPB.encode({
...peer,
pubKey: peer.pubKey ? marshalPublicKey(peer.pubKey) : undefined,
Expand All @@ -124,7 +124,7 @@ class PersistentStore {
* @param {PeerId} peerId
* @param {Partial<Peer>} data
*/
async merge (peerId, data) {
async merge (peerId, data) {
const peer = await this.load(peerId)
const merged = {
...peer,
Expand Down
6 changes: 6 additions & 0 deletions src/ping/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async function ping (node, peer) {
*/
function mount (node) {
node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => pipe(stream, stream))
.catch(err => {
log.error(err)
})
}

/**
Expand All @@ -72,6 +75,9 @@ function mount (node) {
*/
function unmount (node) {
node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`)
.catch(err => {
log.error(err)
})
}

exports = module.exports = ping
Expand Down
4 changes: 3 additions & 1 deletion src/upgrader.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ class Upgrader {
} finally {
this.onConnectionEnd(connection)
}
})()
})().catch(err => {
log.error(err)
})
}

return Reflect.set(...args)
Expand Down
8 changes: 2 additions & 6 deletions test/content-routing/dht/operation.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('DHT subsystem operates correctly', () => {
])

await libp2p.peerStore.addressBook.set(remotePeerId, [remoteListenAddr]);
[ remAddr ] = await libp2p.peerStore.addressBook.getMultiaddrsForPeer(remotePeerId)
[remAddr] = await libp2p.peerStore.addressBook.getMultiaddrsForPeer(remotePeerId)
})

afterEach(() => Promise.all([
Expand All @@ -65,22 +65,18 @@ describe('DHT subsystem operates correctly', () => {
])
})

it.only('should put on a peer and get from the other', async () => {
it('should put on a peer and get from the other', async () => {
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')

console.info('dial protocol')
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
console.info('wait for routing table')
await Promise.all([
pWaitFor(() => libp2p._dht._lan._routingTable.size === 1),
pWaitFor(() => remoteLibp2p._dht._lan._routingTable.size === 1)
])

console.info('put a thing')
await libp2p.contentRouting.put(key, value)

console.info('get a thing')
const fetchedValue = await remoteLibp2p.contentRouting.get(key)
expect(fetchedValue).to.have.property('val').that.equalBytes(value)
})
Expand Down

0 comments on commit 5e38d87

Please sign in to comment.