diff --git a/package.json b/package.json index d85da8ccda..4afda0afa3 100644 --- a/package.json +++ b/package.json @@ -79,11 +79,12 @@ "scripts": { "clean": "aegir clean", "lint": "aegir lint", - "dep-check": "aegir dep-check", + "dep-check": "aegir dep-check -i protons", "prepublishOnly": "node scripts/update-version.js", "build": "aegir build", "docs": "aegir docs", "generate": "run-s generate:proto:*", + "generate:proto:autonat": "protons ./src/autonat/pb/index.proto", "generate:proto:circuit": "protons ./src/circuit/pb/index.proto", "generate:proto:fetch": "protons ./src/fetch/pb/proto.proto", "generate:proto:identify": "protons ./src/identify/pb/message.proto", @@ -143,6 +144,7 @@ "it-drain": "^2.0.0", "it-filter": "^2.0.0", "it-first": "^2.0.0", + "it-parallel": "^3.0.0", "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.2", "it-map": "^2.0.0", @@ -201,6 +203,7 @@ "p-event": "^5.0.1", "p-times": "^4.0.0", "p-wait-for": "^5.0.0", + "protons": "^7.0.2", "sinon": "^15.0.1", "sinon-ts": "^1.0.0" }, diff --git a/src/address-manager/index.ts b/src/address-manager/index.ts index 39ba371e23..35834b3b0c 100644 --- a/src/address-manager/index.ts +++ b/src/address-manager/index.ts @@ -44,12 +44,35 @@ export interface AddressFilter { const defaultAddressFilter = (addrs: Multiaddr[]): Multiaddr[] => addrs +interface ObservedAddressMetadata { + confident: boolean +} + +/** + * If the passed multiaddr contains the passed peer id, remove it + */ +function stripPeerId (ma: Multiaddr, peerId: PeerId): Multiaddr { + const observedPeerIdStr = ma.getPeerId() + + // strip our peer id if it has been passed + if (observedPeerIdStr != null) { + const observedPeerId = peerIdFromString(observedPeerIdStr) + + // use same encoding for comparison + if (observedPeerId.equals(peerId)) { + ma = ma.decapsulate(multiaddr(`/p2p/${peerId.toString()}`)) + } + } + + return ma +} + export class DefaultAddressManager extends EventEmitter { private readonly components: DefaultAddressManagerComponents // this is an array to allow for duplicates, e.g. multiples of `/ip4/0.0.0.0/tcp/0` private readonly listen: string[] private readonly announce: Set - private readonly observed: Set + private readonly observed: Map private readonly announceFilter: AddressFilter /** @@ -66,7 +89,7 @@ export class DefaultAddressManager extends EventEmitter { this.components = components this.listen = listen.map(ma => ma.toString()) this.announce = new Set(announce.map(ma => ma.toString())) - this.observed = new Set() + this.observed = new Map() this.announceFilter = init.announceFilter ?? defaultAddressFilter } @@ -88,52 +111,51 @@ export class DefaultAddressManager extends EventEmitter { * Get observed multiaddrs */ getObservedAddrs (): Multiaddr[] { - return Array.from(this.observed).map((a) => multiaddr(a)) + return Array.from(this.observed).map(([a]) => multiaddr(a)) } /** * Add peer observed addresses - * Signal that we have confidence an observed multiaddr is publicly dialable - - * this will make it appear in the output of getAddresses() */ - confirmObservedAddr (addr: Multiaddr): void { - - } + addObservedAddr (addr: Multiaddr): void { + addr = stripPeerId(addr, this.components.peerId) + const addrString = addr.toString() - /** - * Signal that we do not have confidence an observed multiaddr is publicly dialable - - * this will remove it from the output of getObservedAddrs() - */ - removeObservedAddr (addr: Multiaddr): void { + // do not trigger the change:addresses event if we already know about this address + if (this.observed.has(addrString)) { + return + } + this.observed.set(addrString, { + confident: false + }) } - /** - * Add peer observed addresses - */ - addObservedAddr (addr: string | Multiaddr): void { - let ma = multiaddr(addr) - const remotePeer = ma.getPeerId() - - // strip our peer id if it has been passed - if (remotePeer != null) { - const remotePeerId = peerIdFromString(remotePeer) - - // use same encoding for comparison - if (remotePeerId.equals(this.components.peerId)) { - ma = ma.decapsulate(multiaddr(`/p2p/${this.components.peerId.toString()}`)) - } + confirmObservedAddr (addr: Multiaddr): void { + addr = stripPeerId(addr, this.components.peerId) + const addrString = addr.toString() + + const metadata = this.observed.get(addrString) ?? { + confident: false } - const addrString = ma.toString() + const startingConfidence = metadata.confident - // do not trigger the change:addresses event if we already know about this address - if (this.observed.has(addrString)) { - return + this.observed.set(addrString, { + confident: true + }) + + // only trigger the change:addresses event if our confidence in an address has changed + if (!startingConfidence) { + this.dispatchEvent(new CustomEvent('change:addresses')) } + } + + removeObservedAddr (addr: Multiaddr): void { + addr = stripPeerId(addr, this.components.peerId) + const addrString = addr.toString() - this.observed.add(addrString) - this.dispatchEvent(new CustomEvent('change:addresses')) + this.observed.delete(addrString) } getAddresses (): Multiaddr[] { @@ -144,7 +166,12 @@ export class DefaultAddressManager extends EventEmitter { addrs = this.components.transportManager.getAddrs().map(ma => ma.toString()) } - addrs = addrs.concat(this.getObservedAddrs().map(ma => ma.toString())) + // add observed addresses we are confident in + addrs = addrs.concat( + Array.from(this.observed) + .filter(([ma, metadata]) => metadata.confident) + .map(([ma]) => ma) + ) // dedupe multiaddrs const addrSet = new Set(addrs) diff --git a/src/autonat/constants.ts b/src/autonat/constants.ts new file mode 100644 index 0000000000..d76113de8f --- /dev/null +++ b/src/autonat/constants.ts @@ -0,0 +1,4 @@ + +export const PROTOCOL = '/libp2p/autonat/1.0.0' +export const PROTOCOL_VERSION = '1.0.0' +export const PROTOCOL_NAME = 'autonat' diff --git a/src/autonat/index.ts b/src/autonat/index.ts new file mode 100644 index 0000000000..62003c97e3 --- /dev/null +++ b/src/autonat/index.ts @@ -0,0 +1,567 @@ +import type { AddressManager } from '@libp2p/interface-address-manager' +import type { Connection } from '@libp2p/interface-connection' +import type { ConnectionManager, Dialer } from '@libp2p/interface-connection-manager' +import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerInfo } from '@libp2p/interface-peer-info' +import type { PeerRouting } from '@libp2p/interface-peer-routing' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' +import type { TransportManager } from '@libp2p/interface-transport' +import type { Startable } from '@libp2p/interfaces/startable' +import { logger } from '@libp2p/logger' +import { peerIdFromBytes } from '@libp2p/peer-id' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { multiaddr, protocols } from '@multiformats/multiaddr' +import { abortableDuplex } from 'abortable-iterator' +import { setMaxListeners } from 'events' +import first from 'it-first' +import * as lp from 'it-length-prefixed' +import map from 'it-map' +import parallel from 'it-parallel' +import { pipe } from 'it-pipe' +import isPrivateIp from 'private-ip' +import { TimeoutController } from 'timeout-abort-controller' +import { + PROTOCOL +} from './constants.js' +import { Message } from './pb/index.js' + +const log = logger('libp2p:autonat') + +// if more than 3 peers manage to dial us on what we believe to be our external +// IP then we are convinced that it is, in fact, our external IP +// https://github.com/libp2p/specs/blob/master/autonat/README.md#autonat-protocol +const REQUIRED_SUCCESSFUL_DIALS = 4 + +// Wait this long before we start to query autonat nodes +const AUTONAT_STARTUP_DELAY = 5000 + +// Only try to verify our external address this often +const AUTONAT_REFRESH_INTERVAL = 60000 + +export interface AutonatServiceInit { + /** + * Allows overriding the protocol prefix used + */ + protocolPrefix: string + + /** + * How long we should wait for a remote peer to verify our external address + */ + timeout: number + + /** + * How long to wait after startup before trying to verify our external address + */ + startupDelay: number + + /** + * Verify our external addresses this often + */ + refreshInterval: number + + /** + * How many parallel inbound autonat streams we allow per-connection + */ + maxInboundStreams: number + + /** + * How many parallel outbound autonat streams we allow per-connection + */ + maxOutboundStreams: number +} + +export interface DefaultAutonatComponents { + registrar: Registrar + addressManager: AddressManager + transportManager: TransportManager + dialer: Dialer + peerId: PeerId + connectionManager: ConnectionManager + peerRouting: PeerRouting +} + +export class AutonatService implements Startable { + private readonly components: DefaultAutonatComponents + private readonly _init: AutonatServiceInit + private readonly startupDelay: number + private readonly refreshInterval: number + private verifyAddressTimeout?: ReturnType + private started: boolean + + constructor (components: DefaultAutonatComponents, init: AutonatServiceInit) { + this.components = components + this.started = false + this._init = init + this.startupDelay = init.startupDelay ?? AUTONAT_STARTUP_DELAY + this.refreshInterval = init.refreshInterval ?? AUTONAT_REFRESH_INTERVAL + + this._verifyExternalAddresses = this._verifyExternalAddresses.bind(this) + } + + isStarted (): boolean { + return this.started + } + + async start (): Promise { + if (this.started) { + return + } + + await this.components.registrar.handle(PROTOCOL, (data) => { + void this.handleIncomingAutonatStream(data) + .catch(err => { + log.error(err) + }) + }, { + maxInboundStreams: this._init.maxInboundStreams, + maxOutboundStreams: this._init.maxOutboundStreams + }) + + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.startupDelay) + + this.started = true + } + + async stop (): Promise { + await this.components.registrar.unhandle(PROTOCOL) + clearTimeout(this.verifyAddressTimeout) + + this.started = false + } + + /** + * Handle an incoming autonat request + */ + async handleIncomingAutonatStream (data: IncomingStreamData): Promise { + const controller = new TimeoutController(this._init.timeout) + + // this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning + // appearing in the console + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} + + const ourHosts = this.components.addressManager.getAddresses() + .map(ma => ma.toOptions().host) + + try { + const source = abortableDuplex(data.stream, controller.signal) + const self = this + + await pipe( + source, + lp.decode(), + async function * (stream) { + const buf = await first(stream) + + if (buf == null) { + log('No message received') + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'No message was sent' + } + }) + + return + } + + let request: Message + + try { + request = Message.decode(buf) + } catch (err) { + log.error('Could not decode message', err) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'Could not decode message' + } + }) + + return + } + + const dialRequest = request.dial + + if (dialRequest == null) { + log.error('Dial was missing from message') + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'No Dial message found in message' + } + }) + + return + } + + let peerId: PeerId + const peer = dialRequest.peer + + if (peer == null || peer.id == null) { + log.error('PeerId missing from message') + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'missing peer info' + } + }) + + return + } + + try { + peerId = peerIdFromBytes(peer.id) + } catch (err) { + log.error('Invalid PeerId', err) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'bad peer id' + } + }) + + return + } + + log('Incoming request from %p', peerId) + + // reject any dial requests that arrive via relays + if (!data.connection.remotePeer.equals(peerId)) { + log('Target peer %p did not equal sending peer %p', peerId, data.connection.remotePeer) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_BAD_REQUEST, + statusText: 'peer id mismatch' + } + }) + + return + } + + // get a list of multiaddrs to dial + const multiaddrs = peer.addrs + .map(buf => multiaddr(buf)) + .filter(ma => { + const isFromSameHost = ma.toOptions().host === data.connection.remoteAddr.toOptions().host + + log.trace('Request to dial %s was sent from %s is same host %s', ma, data.connection.remoteAddr, isFromSameHost) + // skip any Multiaddrs where the target node's IP does not match the sending node's IP + return isFromSameHost + }) + .filter(ma => { + const host = ma.toOptions().host + const isPublicIp = !(isPrivateIp(host) ?? false) + + log.trace('Host %s was public %s', host, isPublicIp) + // don't try to dial private addresses + return isPublicIp + }) + .filter(ma => { + const host = ma.toOptions().host + const isNotOurHost = !ourHosts.includes(host) + + log.trace('Host %s was not our host %s', host, isNotOurHost) + // don't try to dial nodes on the same host as us + return isNotOurHost + }) + .filter(ma => { + const isSupportedTransport = Boolean(self.components.transportManager.transportForMultiaddr(ma)) + + log.trace('Transport for %s is supported %s', ma, isSupportedTransport) + // skip any Multiaddrs that have transports we do not support + return isSupportedTransport + }) + .map(ma => { + if (ma.getPeerId() == null) { + // make sure we have the PeerId as part of the Multiaddr + ma = ma.encapsulate(`/p2p/${peerId.toString()}`) + } + + return ma + }) + + // make sure we have something to dial + if (multiaddrs.length === 0) { + log('No valid multiaddrs for %p in message', peerId) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_REFUSED, + statusText: 'no dialable addresses' + } + }) + + return + } + + log('Dial multiaddrs %s for peer %p', multiaddrs.map(ma => ma.toString()).join(', '), peerId) + + let errorMessage = '' + let lastMultiaddr = multiaddrs[0] + + for await (const multiaddr of multiaddrs) { + let connection: Connection | undefined + lastMultiaddr = multiaddr + + try { + // use the dialer so we can dial a specific multiaddr instead of every known + // multiaddr for the peer + connection = await self.components.dialer.dial(multiaddr, { + signal: controller.signal + }) + + if (!connection.remoteAddr.equals(multiaddr)) { + log.error('Tried to dial %s but dialed %s', multiaddr, connection.remoteAddr) + throw new Error('Unexpected remote address') + } + + log('Success %p', peerId) + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.OK, + addr: connection.remoteAddr.decapsulateCode(protocols('p2p').code).bytes + } + }) + + return + } catch (err: any) { + log('Could not dial %p', peerId, err) + errorMessage = err.message + } finally { + if (connection != null) { + await connection.close() + } + } + } + + yield Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse: { + status: Message.ResponseStatus.E_DIAL_ERROR, + statusText: errorMessage, + addr: lastMultiaddr.bytes + } + }) + }, + lp.encode(), + // pipe to the stream, not the abortable source other wise we + // can't tell the remote when a dial timed out.. + data.stream + ) + } finally { + controller.clear() + } + } + + _verifyExternalAddresses (): void { + void this.verifyExternalAddresses() + .catch(err => { + log.error(err) + }) + } + + /** + * Our multicodec topology noticed a new peer that supports autonat + */ + async verifyExternalAddresses (): Promise { + clearTimeout(this.verifyAddressTimeout) + + // Do not try to push if we are not running + if (!this.isStarted()) { + return + } + + const addressManager = this.components.addressManager + + const multiaddrs = addressManager.getObservedAddrs() + .filter(ma => { + const options = ma.toOptions() + + return !(isPrivateIp(options.host) ?? false) + }) + + if (multiaddrs.length === 0) { + log('No public addresses found, not requesting verification') + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.refreshInterval) + + return + } + + const controller = new TimeoutController(this._init.timeout) + + // this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning + // appearing in the console + try { + // fails on node < 15.4 + setMaxListeners?.(Infinity, controller.signal) + } catch {} + + const self = this + + try { + log('verify multiaddrs %s', multiaddrs.map(ma => ma.toString()).join(', ')) + + const request = Message.encode({ + type: Message.MessageType.DIAL, + dial: { + peer: { + id: this.components.peerId.toBytes(), + addrs: multiaddrs.map(map => map.bytes) + } + } + }) + // find some random peers + const randomPeer = await createEd25519PeerId() + const randomCid = randomPeer.toBytes() + + const results: Record = {} + const networkSegments: string[] = [] + + async function verifyAddress (peer: PeerInfo): Promise { + try { + log('Asking %p to verify multiaddr', peer.id) + + const connection = await self.components.connectionManager.openConnection(peer.id, { + signal: controller.signal + }) + + const stream = await connection.newStream(PROTOCOL, { + signal: controller.signal + }) + const source = abortableDuplex(stream, controller.signal) + + const buf = await pipe( + [request], + lp.encode(), + source, + lp.decode(), + async (stream) => await first(stream) + ) + if (buf == null) { + log('No response received from %s', connection.remotePeer) + return undefined + } + const response = Message.decode(buf) + + if (response.type !== Message.MessageType.DIAL_RESPONSE || response.dialResponse == null) { + log('Invalid autonat response from %s', connection.remotePeer) + return undefined + } + + if (response.dialResponse.status === Message.ResponseStatus.OK) { + // make sure we use different network segments + const options = connection.remoteAddr.toOptions() + let segment: string + + if (options.family === 4) { + const octets = options.host.split('.') + segment = octets[0] + } else if (options.family === 6) { + const octets = options.host.split(':') + segment = octets[0] + } else { + log('Remote address "%s" was not IP4 or IP6?', options.host) + return undefined + } + + if (networkSegments.includes(segment)) { + log('Already have response from network segment %d - %s', segment, options.host) + return undefined + } + + networkSegments.push(segment) + } + + return response.dialResponse + } catch (err) { + log.error(err) + } + } + + for await (const dialResponse of parallel(map(this.components.peerRouting.getClosestPeers(randomCid, { + signal: controller.signal + }), (peer) => async () => await verifyAddress(peer)), { + concurrency: REQUIRED_SUCCESSFUL_DIALS + })) { + try { + if (dialResponse == null) { + continue + } + + // they either told us which address worked/didn't work, or we only sent them one address + const addr = dialResponse.addr == null ? multiaddrs[0] : multiaddr(dialResponse.addr) + + log('Autonat response for %s is %s', addr, dialResponse.status) + + if (dialResponse.status === Message.ResponseStatus.E_BAD_REQUEST) { + // the remote could not parse our request + continue + } + + if (dialResponse.status === Message.ResponseStatus.E_DIAL_REFUSED) { + // the remote could not honour our request + continue + } + + if (dialResponse.addr == null && multiaddrs.length > 1) { + // we sent the remote multiple addrs but they didn't tell us which ones worked/didn't work + continue + } + + if (!multiaddrs.some(ma => ma.equals(addr))) { + log('Peer reported %s as %s but it was not in our observed address list', addr, dialResponse.status) + continue + } + + const addrStr = addr.toString() + + if (results[addrStr] == null) { + results[addrStr] = { success: 0, failure: 0 } + } + + if (dialResponse.status === Message.ResponseStatus.OK) { + results[addrStr].success++ + } else if (dialResponse.status === Message.ResponseStatus.E_DIAL_ERROR) { + results[addrStr].failure++ + } + + if (results[addrStr].success === REQUIRED_SUCCESSFUL_DIALS) { + // we are now convinced + log('%s is externally dialable', addr) + addressManager.confirmObservedAddr(addr) + return + } + + if (results[addrStr].failure === REQUIRED_SUCCESSFUL_DIALS) { + // we are now unconvinced + log('%s is not externally dialable', addr) + addressManager.removeObservedAddr(addr) + return + } + } catch (err) { + log.error('Could not verify external address', err) + } + } + } finally { + controller.clear() + this.verifyAddressTimeout = setTimeout(this._verifyExternalAddresses, this.refreshInterval) + } + } +} diff --git a/src/autonat/pb/index.proto b/src/autonat/pb/index.proto new file mode 100644 index 0000000000..3e71e23197 --- /dev/null +++ b/src/autonat/pb/index.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} diff --git a/src/autonat/pb/index.ts b/src/autonat/pb/index.ts new file mode 100644 index 0000000000..0d9e840f03 --- /dev/null +++ b/src/autonat/pb/index.ts @@ -0,0 +1,320 @@ +/* eslint-disable import/export */ +/* eslint-disable complexity */ +/* eslint-disable @typescript-eslint/no-namespace */ +/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ +/* eslint-disable @typescript-eslint/no-empty-interface */ + +import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' +import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' + +export interface Message { + type?: Message.MessageType + dial?: Message.Dial + dialResponse?: Message.DialResponse +} + +export namespace Message { + export enum MessageType { + DIAL = 'DIAL', + DIAL_RESPONSE = 'DIAL_RESPONSE' + } + + enum __MessageTypeValues { + DIAL = 0, + DIAL_RESPONSE = 1 + } + + export namespace MessageType { + export const codec = (): Codec => { + return enumeration(__MessageTypeValues) + } + } + + export enum ResponseStatus { + OK = 'OK', + E_DIAL_ERROR = 'E_DIAL_ERROR', + E_DIAL_REFUSED = 'E_DIAL_REFUSED', + E_BAD_REQUEST = 'E_BAD_REQUEST', + E_INTERNAL_ERROR = 'E_INTERNAL_ERROR' + } + + enum __ResponseStatusValues { + OK = 0, + E_DIAL_ERROR = 100, + E_DIAL_REFUSED = 101, + E_BAD_REQUEST = 200, + E_INTERNAL_ERROR = 300 + } + + export namespace ResponseStatus { + export const codec = (): Codec => { + return enumeration(__ResponseStatusValues) + } + } + + export interface PeerInfo { + id?: Uint8Array + addrs: Uint8Array[] + } + + export namespace PeerInfo { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.id != null) { + w.uint32(10) + w.bytes(obj.id) + } + + if (obj.addrs != null) { + for (const value of obj.addrs) { + w.uint32(18) + w.bytes(value) + } + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = { + addrs: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.id = reader.bytes() + break + case 2: + obj.addrs.push(reader.bytes()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, PeerInfo.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): PeerInfo => { + return decodeMessage(buf, PeerInfo.codec()) + } + } + + export interface Dial { + peer?: Message.PeerInfo + } + + export namespace Dial { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.peer != null) { + w.uint32(10) + Message.PeerInfo.codec().encode(obj.peer, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.peer = Message.PeerInfo.codec().decode(reader, reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, Dial.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): Dial => { + return decodeMessage(buf, Dial.codec()) + } + } + + export interface DialResponse { + status?: Message.ResponseStatus + statusText?: string + addr?: Uint8Array + } + + export namespace DialResponse { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.status != null) { + w.uint32(8) + Message.ResponseStatus.codec().encode(obj.status, w) + } + + if (obj.statusText != null) { + w.uint32(18) + w.string(obj.statusText) + } + + if (obj.addr != null) { + w.uint32(26) + w.bytes(obj.addr) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.status = Message.ResponseStatus.codec().decode(reader) + break + case 2: + obj.statusText = reader.string() + break + case 3: + obj.addr = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, DialResponse.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): DialResponse => { + return decodeMessage(buf, DialResponse.codec()) + } + } + + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.type != null) { + w.uint32(8) + Message.MessageType.codec().encode(obj.type, w) + } + + if (obj.dial != null) { + w.uint32(18) + Message.Dial.codec().encode(obj.dial, w) + } + + if (obj.dialResponse != null) { + w.uint32(26) + Message.DialResponse.codec().encode(obj.dialResponse, w) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.type = Message.MessageType.codec().decode(reader) + break + case 2: + obj.dial = Message.Dial.codec().decode(reader, reader.uint32()) + break + case 3: + obj.dialResponse = Message.DialResponse.codec().decode(reader, reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, Message.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList): Message => { + return decodeMessage(buf, Message.codec()) + } +} diff --git a/src/config.ts b/src/config.ts index 16c1775f5b..767f04bea3 100644 --- a/src/config.ts +++ b/src/config.ts @@ -58,7 +58,8 @@ const DefaultConfig: Partial = { maxInboundStreams: 1, maxOutboundStreams: 1, maxPushIncomingStreams: 1, - maxPushOutgoingStreams: 1 + maxPushOutgoingStreams: 1, + maxObservedAddresses: 10 }, ping: { protocolPrefix: 'ipfs', @@ -77,6 +78,14 @@ const DefaultConfig: Partial = { maxInboundStreams: 1, maxOutboundStreams: 1, timeout: 10000 + }, + autonat: { + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1, + timeout: 30000, + startupDelay: 5000, + refreshInterval: 60000 } } diff --git a/src/identify/index.ts b/src/identify/index.ts index 410b69ad5f..591bf5795d 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -65,6 +65,7 @@ export interface IdentifyServiceInit { maxPushIncomingStreams: number maxPushOutgoingStreams: number + maxObservedAddresses?: number } export interface IdentifyServiceComponents { @@ -378,10 +379,13 @@ export class IdentifyService implements Startable { } log('identify completed for peer %p and protocols %o', id, protocols) + log('our observed address is %s', cleanObservedAddr) - // TODO: Add and score our observed addr - log('received observed address of %s', cleanObservedAddr?.toString()) - // this.components.addressManager.addObservedAddr(observedAddr) + if (cleanObservedAddr != null && + this.components.addressManager.getObservedAddrs().length < (this.init.maxObservedAddresses ?? Infinity)) { + log('storing our observed address %s', cleanObservedAddr?.toString()) + this.components.addressManager.addObservedAddr(cleanObservedAddr) + } } /** diff --git a/src/index.ts b/src/index.ts index afb45b5b03..ebe9b60b2a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,6 +35,7 @@ import type { Metrics } from '@libp2p/interface-metrics' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { PingServiceInit } from './ping/index.js' import type { FetchServiceInit } from './fetch/index.js' +import type { AutonatServiceInit } from './autonat/index.js' import type { Components } from './components.js' import type { Libp2p } from '@libp2p/interface-libp2p' import type { KeyChainInit } from '@libp2p/keychain' @@ -107,20 +108,25 @@ export interface Libp2pInit { relay: (components: Components) => CircuitRelayService /** - * libp2p identify protocol options + * identify protocol options */ identify: IdentifyServiceInit /** - * libp2p ping protocol options + * ping protocol options */ ping: PingServiceInit /** - * libp2p fetch protocol options + * fetch protocol options */ fetch: FetchServiceInit + /** + * autonat protocol options + */ + autonat: AutonatServiceInit + /** * An array that must include at least 1 compliant transport */ diff --git a/src/libp2p.ts b/src/libp2p.ts index f586dc84f5..4ddcafec2a 100644 --- a/src/libp2p.ts +++ b/src/libp2p.ts @@ -22,6 +22,7 @@ import { PeerRecordUpdater } from './peer-record-updater.js' import { DHTPeerRouting } from './dht/dht-peer-routing.js' import { PersistentPeerStore } from '@libp2p/peer-store' import { DHTContentRouting } from './dht/dht-content-routing.js' +import { AutonatService } from './autonat/index.js' import { DefaultComponents } from './components.js' import type { Components } from './components.js' import type { PeerId } from '@libp2p/interface-peer-id' @@ -62,6 +63,7 @@ export class Libp2pNode extends EventEmitter implements Libp2p { public circuitService?: CircuitRelayService public fetchService: FetchService public pingService: PingService + public autonatService: AutonatService public components: Components public peerStore: PeerStore public contentRouting: ContentRouting @@ -230,6 +232,16 @@ export class Libp2pNode extends EventEmitter implements Libp2p { ...init.ping })) + this.autonatService = this.configureComponent(new AutonatService(this.components, { + ...init.autonat + })) + + this.configureComponent(new AutoDialler(this.components, { + enabled: init.connectionManager.autoDial, + minConnections: init.connectionManager.minConnections, + autoDialInterval: init.connectionManager.autoDialInterval + })) + if (init.relay != null) { this.circuitService = this.configureComponent(init.relay(this.components)) } diff --git a/test/addresses/address-manager.spec.ts b/test/addresses/address-manager.spec.ts index 2ae83038f3..41274f2da9 100644 --- a/test/addresses/address-manager.spec.ts +++ b/test/addresses/address-manager.spec.ts @@ -79,7 +79,7 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() - am.addObservedAddr('/ip4/123.123.123.123/tcp/39201') + am.addObservedAddr(multiaddr('/ip4/123.123.123.123/tcp/39201')) expect(am.getObservedAddrs()).to.have.lengthOf(1) }) @@ -104,7 +104,7 @@ describe('Address Manager', () => { }) it('should dedupe added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() @@ -119,7 +119,7 @@ describe('Address Manager', () => { am.addObservedAddr(ma) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) it('should only emit one change:addresses event', () => { @@ -136,16 +136,16 @@ describe('Address Manager', () => { eventCount++ }) - am.addObservedAddr(ma) - am.addObservedAddr(ma) - am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.confirmObservedAddr(multiaddr(ma)) + am.confirmObservedAddr(multiaddr(ma)) + am.confirmObservedAddr(multiaddr(ma)) + am.confirmObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(eventCount).to.equal(1) }) it('should strip our peer address from added observed addresses', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() @@ -156,14 +156,14 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.addObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) it('should strip our peer address from added observed addresses in difference formats', () => { - const ma = '/ip4/123.123.123.123/tcp/39201' + const ma = multiaddr('/ip4/123.123.123.123/tcp/39201') const am = new DefaultAddressManager({ peerId, transportManager: stubInterface() @@ -174,10 +174,10 @@ describe('Address Manager', () => { expect(am.getObservedAddrs()).to.be.empty() am.addObservedAddr(ma) - am.addObservedAddr(`${ma}/p2p/${peerId.toString()}`) + am.addObservedAddr(multiaddr(`${ma.toString()}/p2p/${peerId.toString()}`)) expect(am.getObservedAddrs()).to.have.lengthOf(1) - expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma) + expect(am.getObservedAddrs().map(ma => ma.toString())).to.include(ma.toString()) }) it('should not add our peer id to path multiaddrs', () => { diff --git a/test/addresses/addresses.node.ts b/test/addresses/addresses.node.ts index c5fc5005a7..4d1a493991 100644 --- a/test/addresses/addresses.node.ts +++ b/test/addresses/addresses.node.ts @@ -165,7 +165,7 @@ describe('libp2p.multiaddrs', () => { expect(libp2p.components.addressManager.getAddresses()).to.have.lengthOf(listenAddresses.length) - libp2p.components.addressManager.addObservedAddr(multiaddr(ma)) + libp2p.components.addressManager.confirmObservedAddr(multiaddr(ma)) expect(libp2p.components.addressManager.getAddresses()).to.have.lengthOf(listenAddresses.length + 1) expect(libp2p.components.addressManager.getAddresses().map(ma => ma.decapsulateCode(protocols('p2p').code).toString())).to.include(ma) diff --git a/test/autonat/index.spec.ts b/test/autonat/index.spec.ts new file mode 100644 index 0000000000..4f2f133dd4 --- /dev/null +++ b/test/autonat/index.spec.ts @@ -0,0 +1,670 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ + +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { start, stop } from '@libp2p/interfaces/startable' +import { AutonatService, AutonatServiceInit } from '../../src/autonat/index.js' +import { StubbedInstance, stubInterface } from 'sinon-ts' +import type { PeerRouting } from '@libp2p/interface-peer-routing' +import { Multiaddr, multiaddr } from '@multiformats/multiaddr' +import type { Registrar } from '@libp2p/interface-registrar' +import type { AddressManager } from '@libp2p/interface-address-manager' +import type { Connection, Stream } from '@libp2p/interface-connection' +import { PROTOCOL } from '../../src/autonat/constants.js' +import { Message } from '../../src/autonat/pb/index.js' +import type { PeerId } from '@libp2p/interface-peer-id' +import { pushable } from 'it-pushable' +import type { Transport, TransportManager } from '@libp2p/interface-transport' +import type { AddressBook, PeerStore } from '@libp2p/interface-peer-store' +import type { DefaultConnectionManager } from '../../src/connection-manager/index.js' +import * as lp from 'it-length-prefixed' +import all from 'it-all' +import { pipe } from 'it-pipe' +import { Components, DefaultComponents } from '../../src/components.js' +import type { Dialer } from '@libp2p/interface-connection-manager' +import { Uint8ArrayList } from 'uint8arraylist' +import type { PeerInfo } from '@libp2p/interface-peer-info' + +const defaultInit: AutonatServiceInit = { + protocolPrefix: 'libp2p', + maxInboundStreams: 1, + maxOutboundStreams: 1, + timeout: 100, + startupDelay: 120000, + refreshInterval: 120000 +} + +describe('autonat', () => { + let service: AutonatService + let components: Components + let peerRouting: StubbedInstance + let registrar: StubbedInstance + let addressManager: StubbedInstance + let connectionManager: StubbedInstance + let dialer: StubbedInstance + let transportManager: StubbedInstance + let peerStore: StubbedInstance + + beforeEach(async () => { + peerRouting = stubInterface() + registrar = stubInterface() + addressManager = stubInterface() + addressManager.getAddresses.returns([]) + + dialer = stubInterface() + connectionManager = stubInterface() + // @ts-expect-error read-only property + connectionManager.dialer = dialer + transportManager = stubInterface() + peerStore = stubInterface() + peerStore.addressBook = stubInterface() + + components = new DefaultComponents({ + peerId: await createEd25519PeerId(), + dialer, + peerRouting, + registrar, + addressManager, + connectionManager, + transportManager, + peerStore + }) + + service = new AutonatService(components, defaultInit) + + await start(components) + await start(service) + }) + + afterEach(async () => { + sinon.restore() + + await stop(service) + await stop(components) + }) + + describe('verify our observed addresses', () => { + async function stubPeerResponse (host: string, dialResponse: Message.DialResponse, peerId?: PeerId): Promise { + // stub random peer lookup + const peer = { + id: peerId ?? await createEd25519PeerId(), + multiaddrs: [], + protocols: [] + } + + // stub connection to remote peer + const connection = stubInterface() + connection.remoteAddr = multiaddr(`/ip4/${host}/tcp/28319/p2p/${peer.id.toString()}`) + connectionManager.openConnection.withArgs(peer.id).resolves(connection) + + // stub autonat protocol stream + const stream = stubInterface() + connection.newStream.withArgs(PROTOCOL).resolves(stream) + + // stub autonat response + const response = Message.encode({ + type: Message.MessageType.DIAL_RESPONSE, + dialResponse + }) + stream.source = (async function * () { + yield lp.encode.single(response) + }()) + stream.sink.returns(Promise.resolve()) + + return peer + } + + it('should request peers verify our observed address', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.confirmObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not confirm observed multiaddr') + }) + + it('should mark observed address as low confidence when dialing fails', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + }) + + it('should ignore non error or success statuses', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // Mix of responses, mostly OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_REFUSED + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_INTERNAL_ERROR + }), + await stubPeerResponse('139.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.confirmObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not confirm external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should require confirmation from diverse networks', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.125', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.126', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('124.124.124.127', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should require confirmation from diverse peers', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + const peerId = await createEd25519PeerId() + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('125.124.124.125', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('126.124.124.126', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('127.124.124.127', { + status: Message.ResponseStatus.OK + }, peerId), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('131.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should only accept observed addresses', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + const reportedAddress = multiaddr('/ip4/100.100.100.100/tcp/28319') + + // our observed addresses + addressManager.getObservedAddrs.returns([observedAddress]) + + // an attacker says OK, the rest of the network says ERROR + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('125.124.124.125', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('126.124.124.126', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('127.124.124.127', { + status: Message.ResponseStatus.OK, + addr: reportedAddress.bytes + }), + await stubPeerResponse('128.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('129.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('130.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }), + await stubPeerResponse('131.124.124.124', { + status: Message.ResponseStatus.E_DIAL_ERROR + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + await service.verifyExternalAddresses() + + expect(addressManager.removeObservedAddr.calledWith(observedAddress)) + .to.be.true('Did not verify external multiaddr') + + expect(connectionManager.openConnection.callCount) + .to.equal(peers.length, 'Did not open connections to all peers') + }) + + it('should time out when verifying an observed address', async () => { + const observedAddress = multiaddr('/ip4/123.123.123.123/tcp/28319') + addressManager.getObservedAddrs.returns([observedAddress]) + + // The network says OK + const peers = [ + await stubPeerResponse('124.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('125.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('126.124.124.124', { + status: Message.ResponseStatus.OK + }), + await stubPeerResponse('127.124.124.124', { + status: Message.ResponseStatus.OK + }) + ] + + peerRouting.getClosestPeers.returns(async function * () { + yield * peers + }()) + + connectionManager.openConnection.reset() + connectionManager.openConnection.callsFake(async (peer, options = {}) => { + return await Promise.race([ + new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + reject(new Error('Dial aborted!')) + }) + }), + new Promise((resolve, reject) => { + // longer than the timeout + setTimeout(() => { + reject(new Error('Dial Timeout!')) + }, 1000) + }) + ]) + }) + + await service.verifyExternalAddresses() + + expect(addressManager.addObservedAddr.called) + .to.be.false('Verify external multiaddr when we should have timed out') + }) + }) + + describe('verify others observed addresses', () => { + async function stubIncomingStream (opts: { + requestingPeer?: PeerId + remotePeer?: PeerId + observedAddress?: Multiaddr + remoteAddr?: Multiaddr + message?: Message | Uint8Array | boolean + transportSupported?: boolean + canDial?: boolean + } = {}): Promise { + const requestingPeer = opts.requestingPeer ?? await createEd25519PeerId() + const remotePeer = opts.remotePeer ?? requestingPeer + const observedAddress = opts.observedAddress ?? multiaddr('/ip4/124.124.124.124/tcp/28319') + const remoteAddr = opts.remoteAddr ?? observedAddress.encapsulate(`/p2p/${remotePeer.toString()}`) + const source = pushable() + const sink = pushable() + const stream: Stream = { + ...stubInterface(), + source, + sink: async (stream) => { + for await (const buf of stream) { + sink.push(new Uint8ArrayList(buf)) + } + + sink.end() + } + } + const connection = { + ...stubInterface(), + remotePeer, + remoteAddr + } + + // we might support this transport + transportManager.transportForMultiaddr.withArgs(observedAddress) + .returns(opts.transportSupported === false ? undefined : stubInterface()) + + // we might open a new connection + const newConnection = stubInterface() + newConnection.remotePeer = remotePeer + newConnection.remoteAddr = remoteAddr + + if (opts.canDial === false) { + dialer.dial.rejects(new Error('Could not dial')) + } else if (opts.canDial === true) { + dialer.dial.resolves(newConnection) + } + + let buf: Uint8Array | undefined + + if (opts.message instanceof Uint8Array) { + buf = opts.message + } else if (opts.message == null) { + buf = Message.encode({ + type: Message.MessageType.DIAL, + dial: { + peer: { + id: requestingPeer.toBytes(), + addrs: [ + observedAddress.bytes + ] + } + } + }) + } else if (opts.message !== false && opts.message !== true) { + buf = Message.encode(opts.message) + } + + if (buf != null) { + source.push(lp.encode.single(buf)) + } + + source.end() + + await service.handleIncomingAutonatStream({ + stream, + connection + }) + + const slice = await pipe( + sink, + lp.decode(), + async source => await all(source) + ) + + if (slice.length !== 1) { + throw new Error('Response was not length encoded') + } + + const message = Message.decode(slice[0]) + + if (message.dialResponse?.status === Message.ResponseStatus.OK) { + expect(newConnection.close.called).to.be.true('Did not close connection after dial') + } + + return message + } + + it('should dial a requested address', async () => { + const message = await stubIncomingStream({ + canDial: true + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.OK) + }) + + it('should expect a message', async () => { + const message = await stubIncomingStream({ + message: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'No message was sent') + }) + + it('should expect a valid message', async () => { + const message = await stubIncomingStream({ + message: Uint8Array.from([3, 2, 1, 0]) + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'Could not decode message') + }) + + it('should expect a dial message', async () => { + const message = await stubIncomingStream({ + message: {} + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'No Dial message found in message') + }) + + it('should expect a message with a peer id', async () => { + const observedAddress = multiaddr('/ip4/124.124.124.124/tcp/28319') + const message = await stubIncomingStream({ + observedAddress, + message: { + type: Message.MessageType.DIAL, + dial: { + peer: { + addrs: [ + observedAddress.bytes + ] + } + } + } + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'missing peer info') + }) + + it('should expect a message with a valid peer id', async () => { + const observedAddress = multiaddr('/ip4/124.124.124.124/tcp/28319') + const message = await stubIncomingStream({ + observedAddress, + message: { + type: Message.MessageType.DIAL, + dial: { + peer: { + id: Uint8Array.from([0, 1, 2, 3]), + addrs: [ + observedAddress.bytes + ] + } + } + } + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'bad peer id') + }) + + it('should fail to dial a requested address when it arrives via a relay', async () => { + const remotePeer = await createEd25519PeerId() + const requestingPeer = await createEd25519PeerId() + + const message = await stubIncomingStream({ + remotePeer, + remoteAddr: multiaddr(`/ip4/223.223.223.223/tcp/27132/p2p/${remotePeer.toString()}/p2p-circuit/p2p/${requestingPeer.toString()}`), + requestingPeer + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_BAD_REQUEST) + expect(message).to.have.nested.property('dialResponse.statusText', 'peer id mismatch') + }) + + it('should refuse to dial a requested address when it is from a different host', async () => { + const requestingPeer = await createEd25519PeerId() + const observedAddress = multiaddr('/ip4/10.10.10.10/tcp/27132') + const remoteAddr = multiaddr(`/ip4/129.129.129.129/tcp/27132/p2p/${requestingPeer.toString()}`) + + const message = await stubIncomingStream({ + requestingPeer, + remoteAddr, + observedAddress + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_REFUSED) + expect(message).to.have.nested.property('dialResponse.statusText', 'no dialable addresses') + }) + + it('should refuse to dial a requested address when it is on an unsupported transport', async () => { + const message = await stubIncomingStream({ + transportSupported: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_REFUSED) + expect(message).to.have.nested.property('dialResponse.statusText', 'no dialable addresses') + }) + + it('should error when to dialing a requested address', async () => { + const message = await stubIncomingStream({ + canDial: false + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_ERROR) + expect(message).to.have.nested.property('dialResponse.statusText', 'Could not dial') + }) + + it('should time out when dialing a requested address', async () => { + dialer.dial.callsFake(async function (ma, options = {}) { + return await Promise.race([ + new Promise((resolve, reject) => { + options.signal?.addEventListener('abort', () => { + reject(new Error('Dial aborted!')) + }) + }), + new Promise((resolve, reject) => { + // longer than the timeout + setTimeout(() => { + reject(new Error('Dial Timeout!')) + }, 1000) + }) + ]) + }) + + const message = await stubIncomingStream({ + canDial: undefined + }) + + expect(message).to.have.property('type', Message.MessageType.DIAL_RESPONSE) + expect(message).to.have.nested.property('dialResponse.status', Message.ResponseStatus.E_DIAL_ERROR) + expect(message).to.have.nested.property('dialResponse.statusText', 'Dial aborted!') + }) + }) +}) diff --git a/test/core/consume-peer-record.spec.ts b/test/core/consume-peer-record.spec.ts index e84f597035..7f5e304a4e 100644 --- a/test/core/consume-peer-record.spec.ts +++ b/test/core/consume-peer-record.spec.ts @@ -28,7 +28,7 @@ describe('Consume peer record', () => { await libp2p.stop() }) - it('should consume peer record when observed addrs are added', async () => { + it('should consume peer record when observed addrs are confirmed', async () => { let done: () => void libp2p.components.peerStore.addressBook.consumePeerRecord = async () => { @@ -42,7 +42,7 @@ describe('Consume peer record', () => { await libp2p.start() - libp2p.components.addressManager.addObservedAddr(multiaddr('/ip4/123.123.123.123/tcp/3983')) + libp2p.components.addressManager.confirmObservedAddr(multiaddr('/ip4/123.123.123.123/tcp/3983')) await p diff --git a/test/nat-manager/nat-manager.node.ts b/test/nat-manager/nat-manager.node.ts index d2322d6990..6da477b8ed 100644 --- a/test/nat-manager/nat-manager.node.ts +++ b/test/nat-manager/nat-manager.node.ts @@ -14,6 +14,7 @@ import { createFromJSON } from '@libp2p/peer-id-factory' import type { NatAPI } from '@achingbrain/nat-port-mapper' import { StubbedInstance, stubInterface } from 'sinon-ts' import { start, stop } from '@libp2p/interfaces/startable' +import { multiaddr } from '@multiformats/multiaddr' import { DefaultComponents } from '../../src/components.js' const DEFAULT_ADDRESSES = [ @@ -100,6 +101,9 @@ describe('Nat Manager (TCP)', () => { }) }) + // simulate autonat having run + components.addressManager.confirmObservedAddr(multiaddr('/ip4/82.3.1.5/tcp/4002')) + expect(addressChangedEventFired).to.be.true() })