-
Notifications
You must be signed in to change notification settings - Fork 470
/
Copy pathpeer-routing.ts
185 lines (158 loc) · 5.13 KB
/
peer-routing.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import { logger } from '@libp2p/logger'
import errCode from 'err-code'
import { codes, messages } from './errors.js'
import {
storeAddresses,
uniquePeers,
requirePeers
} from './content-routing/utils.js'
import { TimeoutController } from 'timeout-abort-controller'
import merge from 'it-merge'
import { pipe } from 'it-pipe'
import first from 'it-first'
import drain from 'it-drain'
import filter from 'it-filter'
import {
setDelayedInterval,
clearDelayedInterval
// @ts-expect-error module with no types
} from 'set-delayed-interval'
import { setMaxListeners } from 'events'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerRouting } from '@libp2p/interface-peer-routing'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Startable } from '@libp2p/interfaces/startable'
import type { PeerInfo } from '@libp2p/interface-peer-info'
import type { Components } from '@libp2p/components'
const log = logger('libp2p:peer-routing')
export interface RefreshManagerInit {
/**
* Whether to enable the Refresh manager
*/
enabled?: boolean
/**
* Boot delay to start the Refresh Manager (in ms)
*/
bootDelay?: number
/**
* Interval between each Refresh Manager run (in ms)
*/
interval?: number
/**
* How long to let each refresh run (in ms)
*/
timeout?: number
}
export interface PeerRoutingInit {
routers: PeerRouting[]
refreshManager?: RefreshManagerInit
}
export class DefaultPeerRouting implements PeerRouting, Startable {
private readonly components: Components
private readonly routers: PeerRouting[]
private readonly refreshManagerInit: RefreshManagerInit
private timeoutId?: ReturnType<typeof setTimeout>
private started: boolean
private abortController?: TimeoutController
constructor (components: Components, init: PeerRoutingInit) {
this.components = components
this.routers = init.routers
this.refreshManagerInit = init.refreshManager ?? {}
this.started = false
this._findClosestPeersTask = this._findClosestPeersTask.bind(this)
}
isStarted () {
return this.started
}
/**
* Start peer routing service.
*/
async start () {
if (this.started || this.routers.length === 0 || this.timeoutId != null || this.refreshManagerInit.enabled === false) {
return
}
this.timeoutId = setDelayedInterval(
this._findClosestPeersTask, this.refreshManagerInit.interval, this.refreshManagerInit.bootDelay
)
this.started = true
}
/**
* Recurrent task to find closest peers and add their addresses to the Address Book.
*/
async _findClosestPeersTask () {
if (this.abortController != null) {
// we are already running the query
return
}
try {
this.abortController = new TimeoutController(this.refreshManagerInit.timeout ?? 10e3)
// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.abortController.signal)
} catch {}
// nb getClosestPeers adds the addresses to the address book
await drain(this.getClosestPeers(this.components.getPeerId().toBytes(), { signal: this.abortController.signal }))
} catch (err: any) {
log.error(err)
} finally {
this.abortController?.clear()
this.abortController = undefined
}
}
/**
* Stop peer routing service.
*/
async stop () {
clearDelayedInterval(this.timeoutId)
// abort query if it is in-flight
this.abortController?.abort()
this.started = false
}
/**
* Iterates over all peer routers in parallel to find the given peer
*/
async findPeer (id: PeerId, options?: AbortOptions): Promise<PeerInfo> {
if (this.routers.length === 0) {
throw errCode(new Error('No peer routers available'), codes.ERR_NO_ROUTERS_AVAILABLE)
}
if (id.toString() === this.components.getPeerId().toString()) {
throw errCode(new Error('Should not try to find self'), codes.ERR_FIND_SELF)
}
const output = await pipe(
merge(
...this.routers.map(router => (async function * () {
try {
yield await router.findPeer(id, options)
} catch (err) {
log.error(err)
}
})())
),
(source) => filter(source, Boolean),
(source) => storeAddresses(source, this.components.getPeerStore()),
async (source) => await first(source)
)
if (output != null) {
return output
}
throw errCode(new Error(messages.NOT_FOUND), codes.ERR_NOT_FOUND)
}
/**
* Attempt to find the closest peers on the network to the given key
*/
async * getClosestPeers (key: Uint8Array, options?: AbortOptions): AsyncIterable<PeerInfo> {
if (this.routers.length === 0) {
throw errCode(new Error('No peer routers available'), codes.ERR_NO_ROUTERS_AVAILABLE)
}
yield * pipe(
merge(
...this.routers.map(router => router.getClosestPeers(key, options))
),
(source) => storeAddresses(source, this.components.getPeerStore()),
(source) => uniquePeers(source),
(source) => requirePeers(source)
)
}
}