diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index 88bb1e3a2..4377051fc 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -573,6 +573,7 @@ class PolykeyAgent { PolykeyAgent.eventSymbols.Proxy, async (data: ConnectionData) => { if (data.type === 'reverse') { + if (this.keyManager.getNodeId().equals(data.remoteNodeId)) return; const address = networkUtils.buildAddress( data.remoteHost, data.remotePort, diff --git a/src/agent/GRPCClientAgent.ts b/src/agent/GRPCClientAgent.ts index 4b3996a07..c6083699a 100644 --- a/src/agent/GRPCClientAgent.ts +++ b/src/agent/GRPCClientAgent.ts @@ -72,8 +72,12 @@ class GRPCClientAgent extends GRPCClient { return grpcClientAgent; } - public async destroy() { - await super.destroy(); + public async destroy({ + timeout, + }: { + timeout?: number; + } = {}) { + await super.destroy({ timeout }); } @ready(new agentErrors.ErrorAgentClientDestroyed()) diff --git a/src/agent/service/nodesHolePunchMessageSend.ts b/src/agent/service/nodesHolePunchMessageSend.ts index 33ffe8603..bf5d4c33f 100644 --- a/src/agent/service/nodesHolePunchMessageSend.ts +++ b/src/agent/service/nodesHolePunchMessageSend.ts @@ -71,7 +71,7 @@ function nodesHolePunchMessageSend({ call.request.getProxyAddress(), ); logger.debug( - `Received signalling message to target ${call.request.getSrcId()}@${host}:${port}`, + `Received signaling message to target ${call.request.getSrcId()}@${host}:${port}`, ); // Ignore failure try { @@ -81,7 +81,7 @@ function nodesHolePunchMessageSend({ } } else { logger.error( - 'Received signalling message, target information was missing, skipping reverse hole punch', + 'Received signaling message, target information was missing, skipping reverse hole punch', ); } } else if (await nodeManager.knowsNode(sourceId, tran)) { @@ -92,15 +92,22 @@ function nodesHolePunchMessageSend({ connectionInfo!.remoteHost, connectionInfo!.remotePort, ); + // Checking if the source and destination are the same + if (sourceId?.equals(targetId)) { + // Logging and silently dropping operation + logger.warn('Signaling relay message requested signal to itself'); + callback(null, response); + return; + } call.request.setProxyAddress(proxyAddress); logger.debug( - `Relaying signalling message from ${srcNodeId}@${ + `Relaying signaling message from ${srcNodeId}@${ connectionInfo!.remoteHost }:${ connectionInfo!.remotePort } to ${targetNodeId} with information ${proxyAddress}`, ); - await nodeConnectionManager.relaySignallingMessage(call.request, { + await nodeConnectionManager.relaySignalingMessage(call.request, { host: connectionInfo!.remoteHost, port: connectionInfo!.remotePort, }); diff --git a/src/bin/CommandPolykey.ts b/src/bin/CommandPolykey.ts index a80c2bd28..5b6ff353d 100644 --- a/src/bin/CommandPolykey.ts +++ b/src/bin/CommandPolykey.ts @@ -1,6 +1,11 @@ import type { FileSystem } from '../types'; import commander from 'commander'; -import Logger, { StreamHandler, formatting } from '@matrixai/logger'; +import Logger, { + StreamHandler, + formatting, + levelToString, + evalLogDataValue, +} from '@matrixai/logger'; import * as binUtils from './utils'; import * as binOptions from './utils/options'; import * as binErrors from './errors'; @@ -68,8 +73,21 @@ class CommandPolykey extends commander.Command { // Set the logger formatter according to the format if (opts.format === 'json') { this.logger.handlers.forEach((handler) => - handler.setFormatter(formatting.jsonFormatter), + handler.setFormatter((record) => { + return JSON.stringify( + { + level: levelToString(record.level), + keys: record.keys, + msg: record.msg, + ...record.data, + }, + evalLogDataValue, + ); + }), ); + } else { + const format = formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`; + this.logger.handlers.forEach((handler) => handler.setFormatter(format)); } // Set the global upstream GRPC logger grpcSetLogger(this.logger.getChild('grpc')); diff --git a/src/config.ts b/src/config.ts index d72b4e2a1..126945d56 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,8 +1,30 @@ import type { Host, Port } from './network/types'; +import type { NodeAddress } from 'nodes/types'; import { getDefaultNodePath } from './utils'; // @ts-ignore package.json is outside rootDir import { version } from '../package.json'; +/** + * Configuration for testnet node addresses. + * Extracted here to enforce types properly. + */ +const testnet: Record = { + vg9a9e957878s2qgtbdmu2atvli8ms7muukb1dk4dpbm4llkki3h0: { + host: 'testnet.polykey.io' as Host, + port: 1314 as Port, + }, + vh9oqtvct10eaiv3cl4ebm0ko33sl0qqpvb59vud8cngfvqs4p4ng: { + host: 'testnet.polykey.io' as Host, + port: 1314 as Port, + }, +}; + +/** + * Configuration for main net node addresses. + * Extracted here to enforce types properly. + */ +const mainnet: Record = {}; + /** * Polykey static configuration * This is intended only for static properties @@ -96,8 +118,8 @@ const config = { }, // This is not used by the `PolykeyAgent` which defaults to `{}` network: { - mainnet: {}, - testnet: {}, + mainnet: mainnet, + testnet: testnet, }, }, }; diff --git a/src/contexts/functions/timedCancellable.ts b/src/contexts/functions/timedCancellable.ts index 4b0fa7f5d..4b5bc4a17 100644 --- a/src/contexts/functions/timedCancellable.ts +++ b/src/contexts/functions/timedCancellable.ts @@ -104,6 +104,8 @@ function setupTimedCancellable, R>( ); ctx.signal = abortController.signal; teardownContext = () => { + // The timer is not cancelled here because + // it was not created in this scope finished = true; }; } else { diff --git a/src/grpc/GRPCClient.ts b/src/grpc/GRPCClient.ts index d601cf17c..8db5f4b20 100644 --- a/src/grpc/GRPCClient.ts +++ b/src/grpc/GRPCClient.ts @@ -152,7 +152,7 @@ abstract class GRPCClient { const socket = session.socket as TLSSocket; serverCertChain = networkUtils.getCertificateChain(socket); try { - networkUtils.verifyServerCertificateChain(nodeId, serverCertChain); + networkUtils.verifyServerCertificateChain([nodeId], serverCertChain); } catch (e) { const e_ = e; if (e instanceof networkErrors.ErrorCertChain) { diff --git a/src/network/ConnectionForward.ts b/src/network/ConnectionForward.ts index 1aed3d013..43094a957 100644 --- a/src/network/ConnectionForward.ts +++ b/src/network/ConnectionForward.ts @@ -25,8 +25,8 @@ type ConnectionsForward = { interface ConnectionForward extends StartStop {} @StartStop() class ConnectionForward extends Connection { - public readonly nodeId: NodeId; - + protected nodeId_: NodeId; + protected nodeIds: Array; protected connections: ConnectionsForward; protected pingInterval: ReturnType; protected utpConn: UTPConnection; @@ -98,15 +98,15 @@ class ConnectionForward extends Connection { }; public constructor({ - nodeId, + nodeIds, connections, ...rest }: { - nodeId: NodeId; + nodeIds: Array; connections: ConnectionsForward; } & AbstractConstructorParameters[0]) { super(rest); - this.nodeId = nodeId; + this.nodeIds = nodeIds; this.connections = connections; } @@ -137,6 +137,10 @@ class ConnectionForward extends Connection { } else { ctx.signal.addEventListener('abort', () => resolveAbortedP()); } + void ctx.timer.then( + () => resolveAbortedP(), + () => {}, + ); this.resolveReadyP = resolveReadyP; this.utpSocket.on('message', this.handleMessage); const handleStartError = (e) => { @@ -211,7 +215,10 @@ class ConnectionForward extends Connection { } const serverCertChain = networkUtils.getCertificateChain(this.tlsSocket); try { - networkUtils.verifyServerCertificateChain(this.nodeId, serverCertChain); + this.nodeId_ = networkUtils.verifyServerCertificateChain( + this.nodeIds, + serverCertChain, + ); } catch (e) { this.logger.debug( `Failed to start Connection Forward: verification failed`, @@ -259,6 +266,11 @@ class ConnectionForward extends Connection { this.logger.info('Stopped Connection Forward'); } + @ready(new networkErrors.ErrorConnectionNotRunning()) + get nodeId() { + return this.nodeId_; + } + @ready(new networkErrors.ErrorConnectionNotRunning()) public compose(clientSocket: Socket): void { try { diff --git a/src/network/Proxy.ts b/src/network/Proxy.ts index 11dbbdbc6..625c6f209 100644 --- a/src/network/Proxy.ts +++ b/src/network/Proxy.ts @@ -26,6 +26,8 @@ import * as nodesUtils from '../nodes/utils'; import { promisify } from '../utils'; import { timedCancellable, context } from '../contexts'; +const clientConnectionClosedReason = Symbol('clientConnectionClosedReason'); + interface Proxy extends StartStop {} @StartStop() class Proxy { @@ -316,23 +318,32 @@ class Proxy { * Set timer to `null` explicitly to wait forever */ public openConnectionForward( - nodeId: NodeId, + nodeIds: Array, proxyHost: Host, proxyPort: Port, ctx?: Partial, - ): PromiseCancellable; + ): PromiseCancellable; @ready(new networkErrors.ErrorProxyNotRunning(), true) @timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime) public async openConnectionForward( - nodeId: NodeId, + nodeId: Array, proxyHost: Host, proxyPort: Port, @context ctx: ContextTimed, - ): Promise { + ): Promise { const proxyAddress = networkUtils.buildAddress(proxyHost, proxyPort); - await this.connectionLocksForward.withF([proxyAddress, Lock], async () => { - await this.establishConnectionForward(nodeId, proxyHost, proxyPort, ctx); - }); + return await this.connectionLocksForward.withF( + [proxyAddress, Lock], + async () => { + const connectionForward = await this.establishConnectionForward( + nodeId, + proxyHost, + proxyPort, + ctx, + ); + return connectionForward.nodeId; + }, + ); } @ready(new networkErrors.ErrorProxyNotRunning(), true) @@ -409,10 +420,22 @@ class Proxy { } await this.connectionLocksForward.withF([proxyAddress, Lock], async () => { const timer = new Timer({ delay: this.connConnectTime }); + let cleanUpConnectionListener = () => {}; try { - await this.connectForward(nodeId, proxyHost, proxyPort, clientSocket, { - timer, - }); + const connectForwardProm = this.connectForward( + [nodeId], + proxyHost, + proxyPort, + clientSocket, + { + timer, + }, + ); + cleanUpConnectionListener = () => { + connectForwardProm.cancel(clientConnectionClosedReason); + }; + clientSocket.addListener('close', cleanUpConnectionListener); + await connectForwardProm; } catch (e) { if (e instanceof networkErrors.ErrorProxyConnectInvalidUrl) { if (!clientSocket.destroyed) { @@ -471,6 +494,7 @@ class Proxy { return; } finally { timer.cancel(); + clientSocket.removeListener('close', cleanUpConnectionListener); } // After composing, switch off this error handler clientSocket.off('error', handleConnectError); @@ -482,22 +506,22 @@ class Proxy { }; protected connectForward( - nodeId: NodeId, + nodeIds: Array, proxyHost: Host, proxyPort: Port, clientSocket: Socket, ctx?: Partial, - ): PromiseCancellable; + ): PromiseCancellable; @timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime) protected async connectForward( - nodeId: NodeId, + nodeIds: Array, proxyHost: Host, proxyPort: Port, clientSocket: Socket, @context ctx: ContextTimed, - ): Promise { + ): Promise { const conn = await this.establishConnectionForward( - nodeId, + nodeIds, proxyHost, proxyPort, ctx, @@ -511,10 +535,11 @@ class Proxy { remotePort: conn.port, type: 'forward', }); + return conn.nodeId; } protected async establishConnectionForward( - nodeId: NodeId, + nodeIds: Array, proxyHost: Host, proxyPort: Port, ctx: ContextTimed, @@ -530,7 +555,7 @@ class Proxy { return conn; } conn = new ConnectionForward({ - nodeId, + nodeIds, connections: this.connectionsForward, utpSocket: this.utpSocket, host: proxyHost, diff --git a/src/network/errors.ts b/src/network/errors.ts index 851bd65b8..76c7ef485 100644 --- a/src/network/errors.ts +++ b/src/network/errors.ts @@ -57,6 +57,11 @@ class ErrorConnectionEndTimeout extends ErrorConnection { exitCode = sysexits.UNAVAILABLE; } +class ErrorConnectionNodesEmpty extends ErrorConnection { + static description = 'Nodes list to verify against was empty'; + exitCode = sysexits.USAGE; +} + /** * Used by ConnectionForward and ConnectionReverse */ @@ -129,9 +134,9 @@ class ErrorCertChainSignatureInvalid extends ErrorCertChain { exitCode = sysexits.PROTOCOL; } -class ErrorHostnameResolutionFailed extends ErrorNetwork { - static description = 'Unable to resolve hostname'; - exitCode = sysexits.USAGE; +class ErrorDNSResolver extends ErrorNetwork { + static description = 'DNS resolution failed'; + exitCode = sysexits.SOFTWARE; } export { @@ -148,6 +153,7 @@ export { ErrorConnectionMessageParse, ErrorConnectionTimeout, ErrorConnectionEndTimeout, + ErrorConnectionNodesEmpty, ErrorConnectionStart, ErrorConnectionStartTimeout, ErrorConnectionStartTimeoutMax, @@ -161,5 +167,5 @@ export { ErrorCertChainNameInvalid, ErrorCertChainKeyInvalid, ErrorCertChainSignatureInvalid, - ErrorHostnameResolutionFailed, + ErrorDNSResolver, }; diff --git a/src/network/utils.ts b/src/network/utils.ts index dc2e31ca2..603f3e952 100644 --- a/src/network/utils.ts +++ b/src/network/utils.ts @@ -1,15 +1,19 @@ import type { Socket } from 'net'; import type { TLSSocket } from 'tls'; +import type { PromiseCancellable } from '@matrixai/async-cancellable'; import type { Host, Hostname, Port, Address, NetworkMessage } from './types'; import type { Certificate, PublicKey } from '../keys/types'; import type { NodeId } from '../ids/types'; +import type { ContextTimed } from '../contexts/types'; +import type { NodeAddress } from 'nodes/types'; import { Buffer } from 'buffer'; import dns from 'dns'; import { IPv4, IPv6, Validator } from 'ip-num'; import * as networkErrors from './errors'; +import timedCancellable from '../contexts/functions/timedCancellable'; import * as keysUtils from '../keys/utils'; import * as nodesUtils from '../nodes/utils'; -import { isEmptyObject, promisify } from '../utils'; +import * as utils from '../utils'; const pingBuffer = serializeNetworkMessage({ type: 'ping', @@ -90,25 +94,118 @@ function parseAddress(address: string): [Host, Port] { } /** - * Resolves a provided hostname to its respective IP address (type Host) + * Checks if error is software error. + * These error codes would mean there's something broken with DNS. */ -async function resolveHost(host: Host | Hostname): Promise { - // If already IPv4/IPv6 address, return it - if (isHost(host)) { - return host as Host; - } - const lookup = promisify(dns.lookup).bind(dns); - let resolvedHost; - try { - // Resolve the hostname and get the IPv4 address - resolvedHost = await lookup(host, 4); - } catch (e) { - throw new networkErrors.ErrorHostnameResolutionFailed(e.message, { - cause: e, +function isDNSError(e: { code: string }): boolean { + return ( + e.code === dns.EOF || + e.code === dns.FILE || + e.code === dns.NOMEM || + e.code === dns.DESTRUCTION || + e.code === dns.BADFLAGS || + e.code === dns.BADHINTS || + e.code === dns.NOTINITIALIZED || + e.code === dns.LOADIPHLPAPI || + e.code === dns.ADDRGETNETWORKPARAMS + ); +} + +/** + * Resolve a hostname to all IPv4 and IPv6 hosts. + * It does an iterative BFS over any CNAME records. + * This performs proper DNS lookup, it does not use the operating system's + * resolver. However the default set of DNS servers is inherited from the + * operating system configuration. + * The default time limit is practically infinity. + * This means if the DNS server doesn't respond, this function could take + * a very long time. + */ +function resolveHostname( + hostname: Hostname, + servers?: Array, + ctx?: Partial, +): PromiseCancellable> { + const f = async (ctx: ContextTimed) => { + const hosts: Array = []; + if (ctx.signal.aborted) { + return hosts; + } + // These settings here practically ensure an infinite resolver + // The `timeout` is the timeout per DNS packet + // The default of `-1` is an exponential backoff starting at 5s + // It doubles from there + // The maximum timeout is `Math.pow(2, 31) - 1` + // The maximum number of tries is `Math.pow(2, 31) - 1` + const resolver = new dns.promises.Resolver({ + timeout: -1, + tries: Math.pow(2, 31) - 1, }); - } - // Returns an array of [ resolved address, family (4 or 6) ] - return resolvedHost[0] as Host; + // Even if you set a custom set of servers + // it is possible for it retrieve cached results + // Note that you should use `dns.getServers()` to get + // the default set to be used + // Servers will be tried in array-order + if (servers != null) { + resolver.setServers(servers); + } + // The default DNS servers are inherited from the OS + ctx.signal.addEventListener('abort', () => { + // This will trigger `dns.CANCELLED` error + // This will result in just returning whatever is in the hosts + resolver.cancel(); + }); + // Breadth first search through the CNAME records + const queue = [hostname]; + while (queue.length > 0) { + const target = queue.shift()!; + let cnames: Array; + try { + cnames = (await resolver.resolveCname(target)) as Array; + } catch (e) { + if (e.code === dns.CANCELLED || e.code === dns.TIMEOUT) { + return hosts; + } else if (isDNSError(e)) { + throw new networkErrors.ErrorDNSResolver(undefined, { cause: e }); + } else { + cnames = []; + } + } + if (cnames.length > 0) { + // Usually only 1 CNAME is used + // but here we can support multiple CNAMEs + queue.push(...cnames); + } else { + let ipv4Hosts: Array; + try { + ipv4Hosts = (await resolver.resolve4(hostname)) as Array; + } catch (e) { + if (e.code === dns.CANCELLED || e.code === dns.TIMEOUT) { + return hosts; + } else if (isDNSError(e)) { + throw new networkErrors.ErrorDNSResolver(undefined, { cause: e }); + } else { + ipv4Hosts = []; + } + } + let ipv6Hosts: Array; + try { + ipv6Hosts = (await resolver.resolve6(hostname)) as Array; + } catch (e) { + if (e.code === dns.CANCELLED || e.code === dns.TIMEOUT) { + return hosts; + } else if (isDNSError(e)) { + throw new networkErrors.ErrorDNSResolver(undefined, { cause: e }); + } else { + ipv6Hosts = []; + } + } + hosts.push(...ipv4Hosts, ...ipv6Hosts); + } + } + return hosts; + }; + return timedCancellable(f, true)(ctx); } /** @@ -153,7 +250,7 @@ function getCertificateChain(socket: TLSSocket): Array { // The order of certificates is always leaf to root const certs: Array = []; let cert_ = socket.getPeerCertificate(true); - if (isEmptyObject(cert_)) { + if (utils.isEmptyObject(cert_)) { return certs; } while (true) { @@ -188,17 +285,23 @@ function isTLSSocket(socket: Socket | TLSSocket): socket is TLSSocket { * verify that the new NodeId is the true descendant of the target NodeId. */ function verifyServerCertificateChain( - nodeId: NodeId, + nodeIds: Array, certChain: Array, -): void { +): NodeId { if (!certChain.length) { throw new networkErrors.ErrorCertChainEmpty( 'No certificates available to verify', ); } + if (!nodeIds.length) { + throw new networkErrors.ErrorConnectionNodesEmpty( + 'No nodes were provided to verify against', + ); + } const now = new Date(); - let certClaim: Certificate | undefined; - let certClaimIndex: number | undefined; + let certClaim: Certificate | null = null; + let certClaimIndex: number | null = null; + let verifiedNodeId: NodeId | null = null; for (let certIndex = 0; certIndex < certChain.length; certIndex++) { const cert = certChain[certIndex]; if (now < cert.validity.notBefore || now > cert.validity.notAfter) { @@ -252,18 +355,22 @@ function verifyServerCertificateChain( }, ); } - if (commonName.value === nodesUtils.encodeNodeId(nodeId)) { - // Found the certificate claiming the nodeId - certClaim = cert; - certClaimIndex = certIndex; - break; + for (const nodeId of nodeIds) { + if (commonName.value === nodesUtils.encodeNodeId(nodeId)) { + // Found the certificate claiming the nodeId + certClaim = cert; + certClaimIndex = certIndex; + verifiedNodeId = nodeId; + } } + // If cert is found then break out of loop + if (verifiedNodeId != null) break; } - if (certClaimIndex == null || certClaim == null) { + if (certClaimIndex == null || certClaim == null || verifiedNodeId == null) { throw new networkErrors.ErrorCertChainUnclaimed( - 'Node ID is not claimed by any certificate', + 'Node IDs is not claimed by any certificate', { - data: { nodeId }, + data: { nodeIds }, }, ); } @@ -290,6 +397,7 @@ function verifyServerCertificateChain( } } } + return verifiedNodeId; } /** @@ -377,6 +485,35 @@ function verifyClientCertificateChain(certChain: Array): void { } } +/** + * Takes an array of host or hostnames and resolves them to the host addresses. + * It will also filter out any duplicates or IPV6 addresses. + * @param addresses + */ +async function resolveHostnames( + addresses: Array, +): Promise> { + const existingAddresses: Set = new Set(); + const final: Array<{ host: Host; port: Port }> = []; + for (const address of addresses) { + if (isHost(address.host)) { + if (existingAddresses.has(`${address.host}|${address.port}`)) continue; + final.push({ host: address.host, port: address.port }); + existingAddresses.add(`${address.host}|${address.port}`); + continue; + } + const resolvedAddresses = await resolveHostname(address.host); + for (const resolvedHost of resolvedAddresses) { + const newAddress = { host: resolvedHost, port: address.port }; + if (!Validator.isValidIPv4String(resolvedHost)[0]) continue; + if (existingAddresses.has(`${resolvedHost}|${address.port}`)) continue; + final.push(newAddress); + existingAddresses.add(`${resolvedHost}|${address.port}`); + } + } + return final; +} + export { pingBuffer, pongBuffer, @@ -387,7 +524,8 @@ export { toAuthToken, buildAddress, parseAddress, - resolveHost, + isDNSError, + resolveHostname, resolvesZeroIP, serializeNetworkMessage, unserializeNetworkMessage, @@ -395,4 +533,5 @@ export { getCertificateChain, verifyServerCertificateChain, verifyClientCertificateChain, + resolveHostnames, }; diff --git a/src/nodes/NodeConnection.ts b/src/nodes/NodeConnection.ts index adf0495ce..142531914 100644 --- a/src/nodes/NodeConnection.ts +++ b/src/nodes/NodeConnection.ts @@ -1,10 +1,8 @@ import type { NodeId } from './types'; import type { Host, Hostname, Port } from '../network/types'; -import type KeyManager from '../keys/KeyManager'; import type { Certificate, PublicKey, PublicKeyPem } from '../keys/types'; import type Proxy from '../network/Proxy'; import type GRPCClient from '../grpc/GRPCClient'; -import type NodeConnectionManager from './NodeConnectionManager'; import type { ContextTimed } from '../contexts/types'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; import Logger from '@matrixai/logger'; @@ -44,10 +42,9 @@ class NodeConnection { targetPort, targetHostname, proxy, - keyManager, clientFactory, - nodeConnectionManager, destroyCallback = async () => {}, + destroyTimeout, logger = new Logger(this.name), }: { targetNodeId: NodeId; @@ -55,10 +52,9 @@ class NodeConnection { targetPort: Port; targetHostname?: Hostname; proxy: Proxy; - keyManager: KeyManager; clientFactory: (...args) => Promise; - nodeConnectionManager: NodeConnectionManager; destroyCallback?: () => Promise; + destroyTimeout?: number; logger?: Logger; }, ctx?: Partial, @@ -71,10 +67,9 @@ class NodeConnection { targetPort, targetHostname, proxy, - keyManager, clientFactory, - nodeConnectionManager, destroyCallback = async () => {}, + destroyTimeout = 2000, logger = new Logger(this.name), }: { targetNodeId: NodeId; @@ -82,10 +77,9 @@ class NodeConnection { targetPort: Port; targetHostname?: Hostname; proxy: Proxy; - keyManager: KeyManager; clientFactory: (...args) => Promise; - nodeConnectionManager: NodeConnectionManager; destroyCallback?: () => Promise; + destroyTimeout?: number; logger?: Logger; }, @context ctx: ContextTimed, @@ -116,39 +110,23 @@ class NodeConnection { }); let client: T; try { - // Start the hole punching only if we are not connecting to seed nodes - const seedNodes = nodeConnectionManager.getSeedNodes(); - const isSeedNode = !!seedNodes.find((nodeId) => { - return nodeId.equals(targetNodeId); - }); - if (!isSeedNode) { - // FIXME: this needs to be cancellable. - // It needs to timeout as well as abort for cleanup - void Array.from(seedNodes, (seedNodeId) => { - return nodeConnectionManager.sendSignallingMessage( - seedNodeId, - keyManager.getNodeId(), - targetNodeId, - undefined, - ctx, - ); - }); - } // TODO: this needs to be updated to take a context, // still uses old timer style. + const clientLogger = logger.getChild(clientFactory.name); client = await clientFactory({ nodeId: targetNodeId, host: targetHost, port: targetPort, proxyConfig: proxyConfig, // Think about this - logger: logger.getChild(clientFactory.name), + logger: clientLogger, destroyCallback: async () => { + clientLogger.debug(`GRPC client triggered destroyedCallback`); if ( nodeConnection[asyncInit.status] !== 'destroying' && !nodeConnection[asyncInit.destroyed] ) { - await nodeConnection.destroy(); + await nodeConnection.destroy({ timeout: destroyTimeout }); } }, // FIXME: this needs to be replaced with @@ -158,7 +136,7 @@ class NodeConnection { // 5. When finished, you have a connection to other node // The GRPCClient is ready to be used for requests } catch (e) { - await nodeConnection.destroy(); + await nodeConnection.destroy({ timeout: destroyTimeout }); // If the connection times out, re-throw this with a higher level nodes exception if (e instanceof grpcErrors.ErrorGRPCClientTimeout) { throw new nodesErrors.ErrorNodeConnectionTimeout(e.message, { @@ -198,15 +176,20 @@ class NodeConnection { this.destroyCallback = destroyCallback; } - public async destroy() { + public async destroy({ + timeout, + }: { + timeout?: number; + } = {}) { this.logger.info(`Destroying ${this.constructor.name}`); if ( this.client != null && this.client[asyncInit.status] !== 'destroying' && !this.client[asyncInit.destroyed] ) { - await this.client.destroy(); + await this.client.destroy({ timeout }); } + this.logger.debug(`${this.constructor.name} triggered destroyedCallback`); await this.destroyCallback(); this.logger.info(`Destroyed ${this.constructor.name}`); } diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 67e493a8a..aa5ae536e 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -14,12 +14,13 @@ import type { import type NodeManager from './NodeManager'; import type { ContextTimed } from 'contexts/types'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; +import type { PromiseDeconstructed } from '../types'; import { withF } from '@matrixai/resources'; import Logger from '@matrixai/logger'; import { ready, StartStop } from '@matrixai/async-init/dist/StartStop'; import { IdInternal } from '@matrixai/id'; import { status } from '@matrixai/async-init'; -import { LockBox, RWLockWriter } from '@matrixai/async-locks'; +import { LockBox, Lock, Semaphore } from '@matrixai/async-locks'; import { Timer } from '@matrixai/timer'; import NodeConnection from './NodeConnection'; import * as nodesUtils from './utils'; @@ -29,13 +30,15 @@ import GRPCClientAgent from '../agent/GRPCClientAgent'; import * as validationUtils from '../validation/utils'; import * as networkUtils from '../network/utils'; import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb'; -import { never } from '../utils'; +import { never, promise } from '../utils'; +import { resolveHostnames } from '../network/utils'; // TODO: check all locking and add cancellation for it. type ConnectionAndTimer = { connection: NodeConnection; - timer: NodeJS.Timer; + timer: Timer | null; + usageCount: number; }; interface NodeConnectionManager extends StartStop {} @@ -81,7 +84,7 @@ class NodeConnectionManager { * NodeIds can't be used to properly retrieve a value from the map. */ protected connections: Map = new Map(); - protected connectionLocks: LockBox = new LockBox(); + protected connectionLocks: LockBox = new LockBox(); // Tracks the backoff period for offline nodes protected nodesBackoffMap: Map< string, @@ -89,6 +92,7 @@ class NodeConnectionManager { > = new Map(); protected backoffDefault: number = 300; // 5 min protected backoffMultiplier: number = 2; // Doubles every failure + protected ncDestroyTimeout: number; public constructor({ keyManager, @@ -100,6 +104,7 @@ class NodeConnectionManager { connConnectTime = 2000, connTimeoutTime = 60000, pingTimeout = 2000, + ncDestroyTimeout = 2000, logger, }: { nodeGraph: NodeGraph; @@ -111,6 +116,7 @@ class NodeConnectionManager { connConnectTime?: number; connTimeoutTime?: number; pingTimeout?: number; + ncDestroyTimeout?: number; logger?: Logger; }) { this.logger = logger ?? new Logger(NodeConnectionManager.name); @@ -118,11 +124,14 @@ class NodeConnectionManager { this.nodeGraph = nodeGraph; this.proxy = proxy; this.taskManager = taskManager; + const localNodeIdEncoded = nodesUtils.encodeNodeId(keyManager.getNodeId()); + delete seedNodes[localNodeIdEncoded]; this.seedNodes = seedNodes; this.initialClosestNodes = initialClosestNodes; this.connConnectTime = connConnectTime; this.connTimeoutTime = connTimeoutTime; this.pingTimeout = pingTimeout; + this.ncDestroyTimeout = ncDestroyTimeout; } public async start({ nodeManager }: { nodeManager: NodeManager }) { @@ -167,30 +176,36 @@ class NodeConnectionManager { targetNodeId: NodeId, ctx?: Partial, ): Promise>> { + if (this.keyManager.getNodeId().equals(targetNodeId)) { + this.logger.warn('Attempting connection to our own NodeId'); + } return async () => { - const { connection, timer: timeToLiveTimer } = await this.getConnection( - targetNodeId, - ctx, - ); - // Acquire the read lock and the release function - // FIXME: race the abortion - const [release] = await this.connectionLocks.lock([ - targetNodeId.toString(), - RWLockWriter, - 'write', - ])(); - // Resetting TTL timer - timeToLiveTimer?.refresh(); + const connectionAndTimer = await this.getConnection(targetNodeId, ctx); + // Increment usage count, and cancel timer + connectionAndTimer.usageCount += 1; + connectionAndTimer.timer?.cancel(); + connectionAndTimer.timer = null; // Return tuple of [ResourceRelease, Resource] return [ async (e) => { - await release(); if (nodesUtils.isConnectionError(e)) { // Error with connection, shutting connection down await this.destroyConnection(targetNodeId); } + // Decrement usage count and set up TTL if needed. + // We're only setting up TTLs for non-seed nodes. + connectionAndTimer.usageCount -= 1; + if ( + connectionAndTimer.usageCount <= 0 && + !this.isSeedNode(targetNodeId) + ) { + connectionAndTimer.timer = new Timer({ + handler: async () => await this.destroyConnection(targetNodeId), + delay: this.connTimeoutTime, + }); + } }, - connection, + connectionAndTimer.connection, ]; }; } @@ -279,24 +294,136 @@ class NodeConnectionManager { @context ctx: ContextTimed, ): Promise { const targetNodeIdString = targetNodeId.toString() as NodeIdString; + const targetNodeIdEncoded = nodesUtils.encodeNodeId(targetNodeId); + this.logger.debug(`Getting NodeConnection for ${targetNodeIdEncoded}`); return await this.connectionLocks.withF( - [targetNodeIdString, RWLockWriter, 'write'], + [targetNodeIdString, Lock], async () => { const connAndTimer = this.connections.get(targetNodeIdString); - if (connAndTimer != null) return connAndTimer; + if (connAndTimer != null) { + this.logger.debug( + `Found existing NodeConnection for ${targetNodeIdEncoded}`, + ); + return connAndTimer; + } // Creating the connection and set in map + this.logger.debug(`Finding address for ${targetNodeIdEncoded}`); const targetAddress = await this.findNode(targetNodeId); if (targetAddress == null) { throw new nodesErrors.ErrorNodeGraphNodeIdNotFound(); } + this.logger.debug( + `Found address for ${targetNodeIdEncoded} at ${targetAddress.host}:${targetAddress.port}`, + ); // If the stored host is not a valid host (IP address), // then we assume it to be a hostname const targetHostname = !networkUtils.isHost(targetAddress.host) ? (targetAddress.host as string as Hostname) : undefined; - const targetHost = await networkUtils.resolveHost(targetAddress.host); - // Creating the destroyCallback - const destroyCallback = async () => { + const targetAddresses = await networkUtils.resolveHostnames([ + targetAddress, + ]); + this.logger.debug(`Creating NodeConnection for ${targetNodeIdEncoded}`); + // Start the hole punching only if we are not connecting to seed nodes + const seedNodes = this.getSeedNodes(); + if (this.isSeedNode(targetNodeId)) { + // FIXME: this needs to be cancellable. + // It needs to timeout as well as abort for cleanup + void Array.from(seedNodes, (seedNodeId) => { + return ( + this.sendSignalingMessage( + seedNodeId, + this.keyManager.getNodeId(), + targetNodeId, + undefined, + ctx, + ) + // Ignore results + .then( + () => {}, + () => {}, + ) + ); + }); + } + const destroyCallbackProms: Array> = []; + const errors: Array = []; + const firstConnectionIndexProm = promise(); + const nodeConnectionProms = targetAddresses.map((address, index) => { + const destroyCallbackProm = promise(); + destroyCallbackProms[index] = destroyCallbackProm; + const nodeConnectionProm = NodeConnection.createNodeConnection( + { + targetNodeId: targetNodeId, + targetHost: address.host, + targetHostname: targetHostname, + targetPort: address.port, + proxy: this.proxy, + destroyTimeout: this.ncDestroyTimeout, + destroyCallback: async () => { + destroyCallbackProm.resolveP(); + }, + logger: this.logger.getChild( + `${NodeConnection.name} [${nodesUtils.encodeNodeId( + targetNodeId, + )}@${address.host}:${address.port}]`, + ), + clientFactory: async (args) => + GRPCClientAgent.createGRPCClientAgent(args), + }, + ctx, + ); + void nodeConnectionProm.then( + () => firstConnectionIndexProm.resolveP(index), + (e) => { + this.logger.debug( + `Creating NodeConnection failed for ${targetNodeIdEncoded}`, + ); + // Disable destroyCallback clean up + void destroyCallbackProm.p.then(() => {}); + errors.push(e); + if (errors.length === targetAddresses.length) { + firstConnectionIndexProm.resolveP(null); + } + }, + ); + return nodeConnectionProm; + }); + let newConnection: NodeConnection; + try { + newConnection = await Promise.any(nodeConnectionProms); + } catch (e) { + // All connections failed to establish + this.logger.debug( + `Failed NodeConnection for ${targetNodeIdEncoded} with ${errors}`, + ); + if (errors.length === 1) { + throw errors[0]; + } else { + throw new nodesErrors.ErrorNodeConnectionMultiConnectionFailed( + 'failed to establish connection with multiple hosts', + { + cause: new AggregateError(errors), + }, + ); + } + } + // Cleaning up other connections + const successfulIndex = await firstConnectionIndexProm.p; + if (successfulIndex === null) never(); + const cleanUpReason = Symbol('cleanUpReason'); + await Promise.allSettled( + nodeConnectionProms.map(async (nodeConnectionProm, index) => { + if (index === successfulIndex) return; + nodeConnectionProm.cancel(cleanUpReason); + return nodeConnectionProm.then(async (nodeConnection) => { + await nodeConnection.destroy({ timeout: this.ncDestroyTimeout }); + }); + }), + ); + // Final set up + void destroyCallbackProms[successfulIndex].p.then(async () => { + this.logger.debug('DestroyedCallback was called'); // To avoid deadlock only in the case where this is called // we want to check for destroying connection and read lock const connAndTimer = this.connections.get(targetNodeIdString); @@ -308,39 +435,26 @@ class NodeConnectionManager { // Connection is already destroying if (connAndTimer?.connection?.[status] === 'destroying') return; await this.destroyConnection(targetNodeId); - }; - // Creating new connection - const newConnection = await NodeConnection.createNodeConnection( - { - targetNodeId: targetNodeId, - targetHost: targetHost, - targetHostname: targetHostname, - targetPort: targetAddress.port, - proxy: this.proxy, - keyManager: this.keyManager, - nodeConnectionManager: this, - destroyCallback, - logger: this.logger.getChild( - `${NodeConnection.name} ${targetHost}:${targetAddress.port}`, - ), - clientFactory: async (args) => - GRPCClientAgent.createGRPCClientAgent(args), - }, - ctx, - ); + }); // We can assume connection was established and destination was valid, // we can add the target to the nodeGraph await this.nodeManager?.setNode(targetNodeId, targetAddress); - // Creating TTL timeout - const timeToLiveTimer = setTimeout(async () => { - await this.destroyConnection(targetNodeId); - }, this.connTimeoutTime); - + // Creating TTL timeout. + // We don't create a TTL for seed nodes. + const timeToLiveTimer = !this.isSeedNode(targetNodeId) + ? new Timer({ + handler: async () => await this.destroyConnection(targetNodeId), + delay: this.connTimeoutTime, + }) + : null; const newConnAndTimer: ConnectionAndTimer = { - connection: newConnection, + connection: newConnection!, timer: timeToLiveTimer, + usageCount: 0, }; this.connections.set(targetNodeIdString, newConnAndTimer); + // Enable destroyCallback clean up + this.logger.debug(`Created NodeConnection for ${targetNodeIdEncoded}`); return newConnAndTimer; }, ); @@ -353,13 +467,20 @@ class NodeConnectionManager { protected async destroyConnection(targetNodeId: NodeId): Promise { const targetNodeIdString = targetNodeId.toString() as NodeIdString; return await this.connectionLocks.withF( - [targetNodeIdString, RWLockWriter, 'write'], + [targetNodeIdString, Lock], async () => { const connAndTimer = this.connections.get(targetNodeIdString); if (connAndTimer?.connection == null) return; - await connAndTimer.connection.destroy(); + this.logger.debug( + `Destroying NodeConnection for ${nodesUtils.encodeNodeId( + targetNodeId, + )}`, + ); + await connAndTimer.connection.destroy({ + timeout: this.ncDestroyTimeout, + }); // Destroying TTL timer - if (connAndTimer.timer != null) clearTimeout(connAndTimer.timer); + if (connAndTimer.timer != null) connAndTimer.timer.cancel(); // Updating the connection map this.connections.delete(targetNodeIdString); }, @@ -406,7 +527,7 @@ class NodeConnectionManager { proxyPort: Port, ctx?: ContextTimed, ): Promise { - await this.proxy.openConnectionForward(nodeId, proxyHost, proxyPort, ctx); + await this.proxy.openConnectionForward([nodeId], proxyHost, proxyPort, ctx); } /** @@ -642,12 +763,14 @@ class NodeConnectionManager { }, ctx, ); + const localNodeId = this.keyManager.getNodeId(); const nodes: Array<[NodeId, NodeData]> = []; // Loop over each map element (from the returned response) and populate nodes response.getNodeTableMap().forEach((address, nodeIdString: string) => { const nodeId = nodesUtils.decodeNodeId(nodeIdString); // If the nodeId is not valid we don't add it to the list of nodes - if (nodeId != null) { + // Our own nodeId is considered not valid here + if (nodeId != null && !localNodeId.equals(nodeId)) { nodes.push([ nodeId, { @@ -681,7 +804,7 @@ class NodeConnectionManager { * @param proxyAddress string of address in the form `proxyHost:proxyPort` * @param ctx */ - public sendSignallingMessage( + public sendSignalingMessage( relayNodeId: NodeId, sourceNodeId: NodeId, targetNodeId: NodeId, @@ -694,18 +817,26 @@ class NodeConnectionManager { (nodeConnectionManager: NodeConnectionManager) => nodeConnectionManager.connConnectTime, ) - public async sendSignallingMessage( + public async sendSignalingMessage( relayNodeId: NodeId, sourceNodeId: NodeId, targetNodeId: NodeId, proxyAddress: string | undefined, @context ctx: ContextTimed, ): Promise { + if ( + this.keyManager.getNodeId().equals(relayNodeId) || + this.keyManager.getNodeId().equals(targetNodeId) + ) { + // Logging and silently dropping operation + this.logger.warn('Attempted to send signaling message to our own NodeId'); + return; + } const rlyNode = nodesUtils.encodeNodeId(relayNodeId); const srcNode = nodesUtils.encodeNodeId(sourceNodeId); const tgtNode = nodesUtils.encodeNodeId(targetNodeId); this.logger.debug( - `sendSignallingMessage sending Signalling message relay: ${rlyNode}, source: ${srcNode}, target: ${tgtNode}, proxy: ${proxyAddress}`, + `sendSignalingMessage sending Signaling message relay: ${rlyNode}, source: ${srcNode}, target: ${tgtNode}, proxy: ${proxyAddress}`, ); const relayMsg = new nodesPB.Relay(); relayMsg.setSrcId(srcNode); @@ -732,7 +863,7 @@ class NodeConnectionManager { * @param sourceAddress * @param ctx */ - public relaySignallingMessage( + public relaySignalingMessage( message: nodesPB.Relay, sourceAddress: NodeAddress, ctx?: Partial, @@ -743,7 +874,7 @@ class NodeConnectionManager { (nodeConnectionManager: NodeConnectionManager) => nodeConnectionManager.connConnectTime, ) - public async relaySignallingMessage( + public async relaySignalingMessage( message: nodesPB.Relay, sourceAddress: NodeAddress, @context ctx: ContextTimed, @@ -752,7 +883,7 @@ class NodeConnectionManager { // If we're relaying then we trust our own node graph records over // what was provided in the message const sourceNode = validationUtils.parseNodeId(message.getSrcId()); - await this.sendSignallingMessage( + await this.sendSignalingMessage( validationUtils.parseNodeId(message.getTargetId()), sourceNode, validationUtils.parseNodeId(message.getTargetId()), @@ -773,6 +904,17 @@ class NodeConnectionManager { }); } + /** + * Returns true if the given node is a seed node. + */ + @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + public isSeedNode(nodeId: NodeId): boolean { + const seedNodes = this.getSeedNodes(); + return !!seedNodes.find((seedNode) => { + return nodeId.equals(seedNode); + }); + } + /** * Checks if a connection can be made to the target. Returns true if the * connection can be authenticated, it's certificate matches the nodeId and @@ -796,11 +938,10 @@ class NodeConnectionManager { ) public async pingNode( nodeId: NodeId, - host: Host | Hostname, + host: Host, port: Port, @context ctx: ContextTimed, ): Promise { - host = await networkUtils.resolveHost(host); const seedNodes = this.getSeedNodes(); const isSeedNode = !!seedNodes.find((seedNodeId) => { return nodeId.equals(seedNodeId); @@ -808,7 +949,7 @@ class NodeConnectionManager { if (!isSeedNode) { void Array.from(this.getSeedNodes(), async (seedNodeId) => { // FIXME: this needs to handle aborting - void this.sendSignallingMessage( + void this.sendSignalingMessage( seedNodeId, this.keyManager.getNodeId(), nodeId, @@ -824,6 +965,73 @@ class NodeConnectionManager { return true; } + public establishMultiConnection( + nodeIds: Array, + addresses: Array, + connectionTimeout?: number, + limit?: number, + ctx?: Partial, + ): PromiseCancellable>; + @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) + @timedCancellable(true) + public async establishMultiConnection( + nodeIds: Array, + addresses: Array, + connectionTimeout: number = 2000, + limit: number | undefined, + @context ctx: ContextTimed, + ): Promise> { + // Get the full list of addresses by flattening results + const addresses_ = await resolveHostnames(addresses); + // We want to establish forward connections to each address + const pendingConnectionProms: Array> = []; + const semaphore = limit != null ? new Semaphore(limit) : null; + const establishedMap: Map = new Map(); + const cleanUpReason = Symbol('CleanUp'); + const abortController = new AbortController(); + const abort = () => abortController.abort(ctx.signal.reason); + ctx.signal.addEventListener('abort', abort); + const signal = abortController.signal; + for (const address of addresses_) { + if (semaphore != null) await semaphore.waitForUnlock(); + if (signal.aborted) break; + const [semaphoreReleaser] = + semaphore != null ? await semaphore.lock()() : [() => {}]; + const timer = new Timer({ delay: connectionTimeout }); + const connectionProm = this.proxy + .openConnectionForward(nodeIds, address.host as Host, address.port, { + signal, + timer, + }) + .then( + (nodeId) => { + // Connection established, add it to the map + establishedMap.set(nodeId, address); + // Check if all nodes are established and trigger clean up + if (establishedMap.size >= nodeIds.length) { + abortController.abort(cleanUpReason); + } + }, + () => { + // Connection failed, ignore error + }, + ) + .finally(async () => { + // Clean up + await semaphoreReleaser(); + timer.cancel(cleanUpReason); + }); + pendingConnectionProms.push(connectionProm); + } + await Promise.all(pendingConnectionProms); + ctx.signal.removeEventListener('abort', abort); + return establishedMap; + } + + public hasConnection(nodeId: NodeId): boolean { + return this.connections.has(nodeId.toString() as NodeIdString); + } + protected hasBackoff(nodeId: NodeId): boolean { const backoff = this.nodesBackoffMap.get(nodeId.toString()); if (backoff == null) return false; diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index be701e9ef..8e51174ad 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -5,7 +5,13 @@ import type KeyManager from '../keys/KeyManager'; import type { PublicKeyPem } from '../keys/types'; import type Sigchain from '../sigchain/Sigchain'; import type { ChainData, ChainDataEncoded } from '../sigchain/types'; -import type { NodeId, NodeAddress, NodeBucket, NodeBucketIndex } from './types'; +import type { + NodeId, + NodeAddress, + NodeBucket, + NodeBucketIndex, + NodeData, +} from './types'; import type { ClaimEncoded } from '../claims/types'; import type TaskManager from '../tasks/TaskManager'; import type { TaskHandler, TaskHandlerId, Task } from '../tasks/types'; @@ -21,7 +27,6 @@ import * as nodesErrors from './errors'; import * as nodesUtils from './utils'; import * as tasksErrors from '../tasks/errors'; import { timedCancellable, context } from '../contexts'; -import * as networkUtils from '../network/utils'; import * as validationUtils from '../validation/utils'; import * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb'; import * as claimsErrors from '../claims/errors'; @@ -44,6 +49,7 @@ class NodeManager { protected taskManager: TaskManager; protected refreshBucketDelay: number; protected refreshBucketDelayJitter: number; + protected retrySeedConnectionsDelay: number; protected pendingNodes: Map> = new Map(); public readonly basePath = this.constructor.name; @@ -107,22 +113,66 @@ class NodeManager { ); never(); } - const host_ = await networkUtils.resolveHost(host); - if ( - await this.pingNode(nodeId, { host: host_, port }, { signal: ctx.signal }) - ) { - await this.setNode( - nodeId, - { host: host_, port }, - false, - false, - 2000, - ctx, - ); + if (await this.pingNode(nodeId, { host, port }, { signal: ctx.signal })) { + await this.setNode(nodeId, { host, port }, false, false, 2000, ctx); } }; public readonly pingAndSetNodeHandlerId: TaskHandlerId = `${this.basePath}.${this.pingAndSetNodeHandler.name}.pingAndSetNodeHandlerId` as TaskHandlerId; + protected checkSeedConnectionsHandler: TaskHandler = async ( + ctx, + taskInfo, + ) => { + this.logger.debug('Checking seed connections'); + // Check for existing seed node connections + const seedNodes = this.nodeConnectionManager.getSeedNodes(); + const allInactive = !seedNodes + .map((nodeId) => this.nodeConnectionManager.hasConnection(nodeId)) + .reduce((a, b) => a || b); + try { + if (allInactive) { + this.logger.debug( + 'No active seed connections were found, retrying network entry', + ); + // If no seed node connections exist then we redo syncNodeGraph + await this.syncNodeGraph(true, undefined, ctx); + } else { + // Doing this concurrently, we don't care about the results + await Promise.allSettled( + seedNodes.map((nodeId) => { + // Retry any failed seed node connections + if (!this.nodeConnectionManager.hasConnection(nodeId)) { + this.logger.debug( + `Re-establishing seed connection for ${nodesUtils.encodeNodeId( + nodeId, + )}`, + ); + return this.nodeConnectionManager.withConnF( + nodeId, + async () => { + // Do nothing, we just want to establish a connection + }, + ctx, + ); + } + }), + ); + } + } finally { + this.logger.debug('Checked seed connections'); + // Re-schedule this task + await this.taskManager.scheduleTask({ + delay: taskInfo.delay, + deadline: taskInfo.deadline, + handlerId: this.checkSeedConnectionsHandlerId, + lazy: true, + path: [this.basePath, this.checkSeedConnectionsHandlerId], + priority: taskInfo.priority, + }); + } + }; + public readonly checkSeedConnectionsHandlerId: TaskHandlerId = + `${this.basePath}.${this.checkSeedConnectionsHandler.name}.checkSeedConnectionsHandler` as TaskHandlerId; constructor({ db, @@ -133,6 +183,7 @@ class NodeManager { taskManager, refreshBucketDelay = 3600000, // 1 hour in milliseconds refreshBucketDelayJitter = 0.5, // Multiple of refreshBucketDelay to jitter by + retrySeedConnectionsDelay = 120000, // 2 minuets logger, }: { db: DB; @@ -143,6 +194,7 @@ class NodeManager { taskManager: TaskManager; refreshBucketDelay?: number; refreshBucketDelayJitter?: number; + retrySeedConnectionsDelay?: number; longTaskTimeout?: number; logger?: Logger; }) { @@ -159,6 +211,7 @@ class NodeManager { 0, Math.min(refreshBucketDelayJitter, 1), ); + this.retrySeedConnectionsDelay = retrySeedConnectionsDelay; } public async start() { @@ -176,7 +229,17 @@ class NodeManager { this.pingAndSetNodeHandlerId, this.pingAndSetNodeHandler, ); + this.taskManager.registerHandler( + this.checkSeedConnectionsHandlerId, + this.checkSeedConnectionsHandler, + ); await this.setupRefreshBucketTasks(); + await this.taskManager.scheduleTask({ + delay: this.retrySeedConnectionsDelay, + handlerId: this.checkSeedConnectionsHandlerId, + lazy: true, + path: [this.basePath, this.checkSeedConnectionsHandlerId], + }); this.logger.info(`Started ${this.constructor.name}`); } @@ -200,6 +263,7 @@ class NodeManager { this.taskManager.deregisterHandler(this.refreshBucketHandlerId); this.taskManager.deregisterHandler(this.gcBucketHandlerId); this.taskManager.deregisterHandler(this.pingAndSetNodeHandlerId); + this.taskManager.deregisterHandler(this.checkSeedConnectionsHandlerId); this.logger.info(`Stopped ${this.constructor.name}`); } @@ -237,10 +301,9 @@ class NodeManager { if (targetAddress == null) { return false; } - const targetHost = await networkUtils.resolveHost(targetAddress.host); return await this.nodeConnectionManager.pingNode( nodeId, - targetHost, + targetAddress.host, targetAddress.port, ctx, ); @@ -696,6 +759,8 @@ class NodeManager { const unsetLock = new Lock(); const pendingPromises: Array> = []; for (const nodeId of bucket) { + // We want to retain seed nodes regardless of state, so skip them + if (this.nodeConnectionManager.isSeedNode(nodeId)) continue; if (removedNodes >= pendingNodes.size) break; await semaphore.waitForUnlock(); if (ctx.signal?.aborted === true) break; @@ -1044,71 +1109,144 @@ class NodeManager { pingTimeout: number | undefined, @context ctx: ContextTimed, ): Promise { - this.logger.info('Syncing nodeGraph'); - for (const seedNodeId of this.nodeConnectionManager.getSeedNodes()) { - // Check if the connection is viable - if ( - (await this.pingNode(seedNodeId, undefined, { - timer: new Timer({ - delay: pingTimeout ?? this.nodeConnectionManager.pingTimeout, - }), - signal: ctx.signal, - })) === false - ) { - continue; - } + const logger = this.logger.getChild('syncNodeGraph'); + logger.info('Syncing nodeGraph'); + // Getting the seed node connection information + const seedNodes = this.nodeConnectionManager.getSeedNodes(); + const addresses = await Promise.all( + await this.db.withTransactionF(async (tran) => + seedNodes.map( + async (seedNode) => + ( + await this.nodeGraph.getNode(seedNode, tran) + )?.address, + ), + ), + ); + const filteredAddresses = addresses.filter( + (address) => address != null, + ) as Array; + logger.debug( + `establishing multi-connection to the following seed nodes ${seedNodes.map( + (nodeId) => nodesUtils.encodeNodeId(nodeId), + )}`, + ); + logger.debug( + `and addresses ${filteredAddresses.map( + (address) => `${address.host}:${address.port}`, + )}`, + ); + // Establishing connections to the seed nodes + const connections = + await this.nodeConnectionManager.establishMultiConnection( + seedNodes, + filteredAddresses, + pingTimeout, + undefined, + { signal: ctx.signal }, + ); + logger.debug(`Multi-connection established for`); + connections.forEach((address, key) => { + logger.debug( + `${nodesUtils.encodeNodeId(key)}@${address.host}:${address.port}`, + ); + }); + if (connections.size === 0) { + // Not explicitly a failure but we do want to stop here + this.logger.warn( + 'Failed to connect to any seed nodes when syncing node graph', + ); + return; + } + // Using a map to avoid duplicates + const closestNodesAll: Map = new Map(); + const localNodeId = this.keyManager.getNodeId(); + let closestNode: NodeId | null = null; + logger.debug('Getting closest nodes'); + for (const [nodeId] of connections) { const closestNodes = await this.nodeConnectionManager.getRemoteNodeClosestNodes( - seedNodeId, - this.keyManager.getNodeId(), + nodeId, + localNodeId, { signal: ctx.signal }, ); - const localNodeId = this.keyManager.getNodeId(); - for (const [nodeId, nodeData] of closestNodes) { - if (!localNodeId.equals(nodeId)) { - const pingAndSetTask = await this.taskManager.scheduleTask({ - delay: 0, - handlerId: this.pingAndSetNodeHandlerId, - lazy: !block, - parameters: [ - nodesUtils.encodeNodeId(nodeId), - nodeData.address.host, - nodeData.address.port, - ], - path: [this.basePath, this.pingAndSetNodeHandlerId], - // Need to be somewhat active so high priority - priority: 100, - }); - if (block) { - try { - await pingAndSetTask.promise(); - } catch (e) { - if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e; - } - } - } - } - // Refreshing every bucket above the closest node - let closestNodeInfo = closestNodes.pop(); - if ( - closestNodeInfo != null && - this.keyManager.getNodeId().equals(closestNodeInfo[0]) - ) { - // Skip our nodeId if it exists - closestNodeInfo = closestNodes.pop(); - } - let index = this.nodeGraph.nodeIdBits; - if (closestNodeInfo != null) { - const [closestNode] = closestNodeInfo; - const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); - index = bucketIndex; + // Setting node information into the map, filtering out local node + closestNodes.forEach(([nodeId, address]) => { + if (!localNodeId.equals(nodeId)) closestNodesAll.set(nodeId, address); + }); + + // Getting the closest node + let closeNodeInfo = closestNodes.pop(); + if (closeNodeInfo != null && localNodeId.equals(closeNodeInfo[0])) { + closeNodeInfo = closestNodes.pop(); } - const refreshBuckets: Array> = []; - for (let i = index; i < this.nodeGraph.nodeIdBits; i++) { - const task = await this.updateRefreshBucketDelay(i, 0, !block); - refreshBuckets.push(task.promise()); + if (closeNodeInfo == null) continue; + const [closeNode] = closeNodeInfo; + if (closestNode == null) closestNode = closeNode; + const distA = nodesUtils.nodeDistance(localNodeId, closeNode); + const distB = nodesUtils.nodeDistance(localNodeId, closestNode); + if (distA < distB) closestNode = closeNode; + } + logger.debug('Starting pingsAndSet tasks'); + const pingTasks: Array = []; + for (const [nodeId, nodeData] of closestNodesAll) { + if (!localNodeId.equals(nodeId)) { + logger.debug( + `pingAndSetTask for ${nodesUtils.encodeNodeId(nodeId)}@${ + nodeData.address.host + }:${nodeData.address.port}`, + ); + const pingAndSetTask = await this.taskManager.scheduleTask({ + delay: 0, + handlerId: this.pingAndSetNodeHandlerId, + lazy: !block, + parameters: [ + nodesUtils.encodeNodeId(nodeId), + nodeData.address.host, + nodeData.address.port, + ], + path: [this.basePath, this.pingAndSetNodeHandlerId], + // Need to be somewhat active so high priority + priority: 100, + deadline: pingTimeout, + }); + pingTasks.push(pingAndSetTask); } - if (block) await Promise.all(refreshBuckets); + } + if (block) { + // We want to wait for all the tasks + logger.debug('Awaiting all pingAndSetTasks'); + await Promise.all( + pingTasks.map((task) => { + const prom = task.promise(); + // Hook on cancellation + if (ctx.signal.aborted) { + prom.cancel(ctx.signal.reason); + } else { + ctx.signal.addEventListener('abort', () => + prom.cancel(ctx.signal.reason), + ); + } + // Ignore errors + return task.promise().catch(() => {}); + }), + ); + } + // Refreshing every bucket above the closest node + logger.debug(`Triggering refreshBucket tasks`); + let index = this.nodeGraph.nodeIdBits; + if (closestNode != null) { + const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); + index = bucketIndex; + } + const refreshBuckets: Array> = []; + for (let i = index; i < this.nodeGraph.nodeIdBits; i++) { + const task = await this.updateRefreshBucketDelay(i, 0, !block); + refreshBuckets.push(task.promise()); + } + if (block) { + logger.debug(`Awaiting refreshBucket tasks`); + await Promise.all(refreshBuckets); } } } diff --git a/src/nodes/errors.ts b/src/nodes/errors.ts index d2f905804..237f5159f 100644 --- a/src/nodes/errors.ts +++ b/src/nodes/errors.ts @@ -62,6 +62,11 @@ class ErrorNodeConnectionTimeout extends ErrorNodes { exitCode = sysexits.UNAVAILABLE; } +class ErrorNodeConnectionMultiConnectionFailed extends ErrorNodes { + static description: 'Could not establish connection when multiple resolved hosts were involved'; + exitCode = sysexits.UNAVAILABLE; +} + class ErrorNodeConnectionInfoNotExist extends ErrorNodes { static description: 'NodeConnection info was not found'; exitCode = sysexits.UNAVAILABLE; @@ -81,6 +86,13 @@ class ErrorNodeConnectionHostWildcard extends ErrorNodes { static description = 'An IP wildcard was provided for the target host'; exitCode = sysexits.USAGE; } + +class ErrorNodeConnectionSameNodeId extends ErrorNodes { + static description = + 'Provided NodeId is the same as this agent, attempts to connect is improper usage'; + exitCode = sysexits.USAGE; +} + class ErrorNodePingFailed extends ErrorNodes { static description = 'Failed to ping the node when attempting to authenticate'; @@ -101,9 +113,11 @@ export { ErrorNodeGraphBucketIndex, ErrorNodeConnectionDestroyed, ErrorNodeConnectionTimeout, + ErrorNodeConnectionMultiConnectionFailed, ErrorNodeConnectionInfoNotExist, ErrorNodeConnectionPublicKeyNotFound, ErrorNodeConnectionManagerNotRunning, ErrorNodeConnectionHostWildcard, + ErrorNodeConnectionSameNodeId, ErrorNodePingFailed, }; diff --git a/src/nodes/utils.ts b/src/nodes/utils.ts index 579e568b4..57ab67e3c 100644 --- a/src/nodes/utils.ts +++ b/src/nodes/utils.ts @@ -298,7 +298,8 @@ function isConnectionError(e): boolean { e instanceof nodesErrors.ErrorNodeConnectionDestroyed || e instanceof grpcErrors.ErrorGRPC || e instanceof agentErrors.ErrorAgentClientDestroyed || - e instanceof nodesErrors.ErrorNodeConnectionTimeout + e instanceof nodesErrors.ErrorNodeConnectionTimeout || + e instanceof nodesErrors.ErrorNodeConnectionMultiConnectionFailed ); } diff --git a/tests/agent/service/nodesHolePunchMessage.test.ts b/tests/agent/service/nodesHolePunchMessage.test.ts index 9f016ec74..d2b10b231 100644 --- a/tests/agent/service/nodesHolePunchMessage.test.ts +++ b/tests/agent/service/nodesHolePunchMessage.test.ts @@ -47,7 +47,12 @@ describe('nodesHolePunchMessage', () => { nodeConnectionManager: pkAgent.nodeConnectionManager, nodeManager: pkAgent.nodeManager, db: pkAgent.db, - connectionInfoGet: () => ({} as ConnectionInfo), + connectionInfoGet: () => + ({ + remoteHost: '127.0.0.1' as Host, + remotePort: 55555 as Port, + remoteNodeId: pkAgent.keyManager.getNodeId(), + } as ConnectionInfo), logger, }), }; diff --git a/tests/bin/agent/start.test.ts b/tests/bin/agent/start.test.ts index 3a4047b63..39cc198d6 100644 --- a/tests/bin/agent/start.test.ts +++ b/tests/bin/agent/start.test.ts @@ -517,7 +517,7 @@ describe('start', () => { // This line is brittle // It may change if the log format changes // Make sure to keep it updated at the exact point when the DB is created - if (l === 'INFO:DB:Created DB') { + if (l === 'INFO:polykey.PolykeyAgent.DB:Created DB') { agentProcess1.kill('SIGINT'); resolve(); } diff --git a/tests/bin/bootstrap.test.ts b/tests/bin/bootstrap.test.ts index fde83c7d4..17b6c11d4 100644 --- a/tests/bin/bootstrap.test.ts +++ b/tests/bin/bootstrap.test.ts @@ -279,7 +279,7 @@ describe('bootstrap', () => { // This line is brittle // It may change if the log format changes // Make sure to keep it updated at the exact point when the root key pair is generated - if (l === 'INFO:KeyManager:Generating root key pair') { + if (l === 'INFO:polykey.KeyManager:Generating root key pair') { bootstrapProcess1.kill('SIGINT'); resolve(); } diff --git a/tests/bin/polykey.test.ts b/tests/bin/polykey.test.ts index 5a4103de4..1f49d2986 100644 --- a/tests/bin/polykey.test.ts +++ b/tests/bin/polykey.test.ts @@ -64,7 +64,7 @@ describe('polykey', () => { const stderrParsed = JSON.parse(stderrStart); expect(stderrParsed).toMatchObject({ level: expect.stringMatching(/INFO|WARN|ERROR|DEBUG/), - key: expect.any(String), + keys: expect.any(String), msg: expect.any(String), }); agentProcess.kill('SIGTERM'); diff --git a/tests/network/Proxy.test.ts b/tests/network/Proxy.test.ts index f2d77b71f..e7e67c144 100644 --- a/tests/network/Proxy.test.ts +++ b/tests/network/Proxy.test.ts @@ -259,7 +259,7 @@ describe(Proxy.name, () => { }); // Cannot open connection to port 0 await expect(() => - proxy.openConnectionForward(nodeIdABC, localHost, 0 as Port), + proxy.openConnectionForward([nodeIdABC], localHost, 0 as Port), ).rejects.toThrow(networkErrors.ErrorConnectionStart); await expect(() => httpConnect( @@ -305,7 +305,7 @@ describe(Proxy.name, () => { const utpSocketHangPort = utpSocketHang.address().port; await expect(() => proxy.openConnectionForward( - nodeIdABC, + [nodeIdABC], localHost, utpSocketHangPort as Port, ), @@ -315,7 +315,7 @@ describe(Proxy.name, () => { const timer = new Timer({ delay: 2000 }); await expect(() => proxy.openConnectionForward( - nodeIdABC, + [nodeIdABC], localHost, utpSocketHangPort as Port, { timer }, @@ -342,6 +342,7 @@ describe(Proxy.name, () => { const proxy = new Proxy({ authToken, logger: logger.getChild('Proxy connection reset'), + connConnectTime: 10000, }); await proxy.start({ tlsConfig: { @@ -368,7 +369,7 @@ describe(Proxy.name, () => { const utpSocketEndPort = utpSocketEnd.address().port; await expect(() => proxy.openConnectionForward( - nodeIdABC, + [nodeIdABC], localHost, utpSocketEndPort as Port, ), @@ -377,11 +378,11 @@ describe(Proxy.name, () => { // The actual error is UTP_ECONNRESET to be precise await expect(() => proxy.openConnectionForward( - nodeIdABC, + [nodeIdABC], localHost, utpSocketEndPort as Port, ), - ).rejects.toThrow(/UTP_ECONNRESET/); + ).rejects.toThrow(networkErrors.ErrorConnectionStart); expect(receivedCount).toBe(2); // 502 Bad Gateway on HTTP Connect await expect(() => @@ -483,7 +484,7 @@ describe(Proxy.name, () => { // This is a TLS handshake failure await expect(() => proxy.openConnectionForward( - nodeIdRandom, + [nodeIdRandom], utpSocketHost as Host, utpSocketPort as Port, ), @@ -722,7 +723,7 @@ describe(Proxy.name, () => { expect(proxy.getConnectionForwardCount()).toBe(0); await expect(() => proxy.openConnectionForward( - nodeIdRandom, + [nodeIdRandom], utpSocketHost as Host, utpSocketPort as Port, ), @@ -983,7 +984,7 @@ describe(Proxy.name, () => { const utpSocketPort = utpSocket.address().port; expect(proxy.getConnectionForwardCount()).toBe(0); await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -991,7 +992,7 @@ describe(Proxy.name, () => { await expect(remoteSecureP).resolves.toBeUndefined(); // Opening a duplicate connection is noop await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1115,7 +1116,7 @@ describe(Proxy.name, () => { const utpSocketPort = utpSocket.address().port; expect(proxy.getConnectionForwardCount()).toBe(0); await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1123,7 +1124,7 @@ describe(Proxy.name, () => { await expect(remoteSecureP).resolves.toBeUndefined(); // Opening a duplicate connection is noop await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1300,7 +1301,7 @@ describe(Proxy.name, () => { }); // Opening a duplicate connection is noop await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1454,7 +1455,7 @@ describe(Proxy.name, () => { }); // Opening a duplicate connection is noop await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1614,7 +1615,7 @@ describe(Proxy.name, () => { }); // Opening a duplicate connection is noop await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1752,7 +1753,7 @@ describe(Proxy.name, () => { const utpSocketHost = utpSocket.address().address; const utpSocketPort = utpSocket.address().port; await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -1888,7 +1889,7 @@ describe(Proxy.name, () => { const utpSocketPort = utpSocket.address().port; expect(proxy.getConnectionForwardCount()).toBe(0); await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -2134,7 +2135,7 @@ describe(Proxy.name, () => { const utpSocketPort = utpSocket.address().port; expect(proxy.getConnectionForwardCount()).toBe(0); await proxy.openConnectionForward( - serverNodeId, + [serverNodeId], utpSocketHost as Host, utpSocketPort as Port, ); @@ -2285,12 +2286,12 @@ describe(Proxy.name, () => { const utpSocketPort2 = utpSocket2.address().port; expect(proxy.getConnectionForwardCount()).toBe(0); await proxy.openConnectionForward( - serverNodeId1, + [serverNodeId1], utpSocketHost1 as Host, utpSocketPort1 as Port, ); await proxy.openConnectionForward( - serverNodeId2, + [serverNodeId2], utpSocketHost2 as Host, utpSocketPort2 as Port, ); diff --git a/tests/network/utils.test.ts b/tests/network/utils.test.ts index d1a26f6aa..f21867443 100644 --- a/tests/network/utils.test.ts +++ b/tests/network/utils.test.ts @@ -1,6 +1,5 @@ -import type { Host, Port } from '@/network/types'; +import type { Host, Hostname, Port } from '@/network/types'; import * as networkUtils from '@/network/utils'; -import * as networkErrors from '@/network/errors'; describe('utils', () => { test('building addresses', async () => { @@ -25,12 +24,14 @@ describe('utils', () => { }); test('resolving hostnames', async () => { await expect( - networkUtils.resolveHost('www.google.com' as Host), + networkUtils.resolveHostname('www.google.com' as Hostname), ).resolves.toBeDefined(); - const host = await networkUtils.resolveHost('www.google.com' as Host); - expect(networkUtils.isHost(host)).toBeTruthy(); + const hosts = await networkUtils.resolveHostname( + 'www.google.com' as Hostname, + ); + expect(hosts.length).toBeGreaterThan(0); await expect( - networkUtils.resolveHost('invalidHostname' as Host), - ).rejects.toThrow(networkErrors.ErrorHostnameResolutionFailed); + networkUtils.resolveHostname('invalidHostname' as Hostname), + ).resolves.toHaveLength(0); }); }); diff --git a/tests/nodes/NodeConnection.test.ts b/tests/nodes/NodeConnection.test.ts index c275e7e5f..b57e859b6 100644 --- a/tests/nodes/NodeConnection.test.ts +++ b/tests/nodes/NodeConnection.test.ts @@ -42,22 +42,6 @@ import * as testUtils from '../utils'; const destroyCallback = async () => {}; -// Dummy nodeConnectionManager -// We only need the hole punch function, and frankly it's not used in testing here -// This is really dirty so don't do this outside of testing EVER -const dummyNodeConnectionManager = { - openConnection: async (_host, _port) => { - throw Error('This is a dummy function, should not be called'); - }, - withConnF: async () => { - throw Error('Test, please ignore'); - }, - getSeedNodes: () => [], - sendHolePunchMessage: async () => { - throw Error('Test, please ignore'); - }, -} as unknown as NodeConnectionManager; - const mockedGenerateDeterministicKeyPair = jest.spyOn( keysUtils, 'generateDeterministicKeyPair', @@ -98,6 +82,7 @@ describe(`${NodeConnection.name} test`, () => { let clientDataDir: string; let sourceNodeId: NodeId; let clientKeyManager: KeyManager; + let clientNodeConnectionManager: NodeConnectionManager; const authToken = 'AUTH'; let clientProxy: Proxy; @@ -266,7 +251,7 @@ describe(`${NodeConnection.name} test`, () => { serverVaultManager = await VaultManager.createVaultManager({ keyManager: serverKeyManager, vaultsPath: serverVaultsPath, - nodeConnectionManager: dummyNodeConnectionManager, + nodeConnectionManager: serverNodeConnectionManager, notificationsManager: serverNotificationsManager, db: serverDb, acl: serverACL, @@ -288,7 +273,7 @@ describe(`${NodeConnection.name} test`, () => { db: serverDb, keyManager: serverKeyManager, vaultManager: serverVaultManager, - nodeConnectionManager: dummyNodeConnectionManager, + nodeConnectionManager: serverNodeConnectionManager, nodeManager: serverNodeManager, nodeGraph: serverNodeGraph, sigchain: serverSigchain, @@ -338,6 +323,15 @@ describe(`${NodeConnection.name} test`, () => { }); sourcePort = clientProxy.getProxyPort(); + clientNodeConnectionManager = new NodeConnectionManager({ + keyManager: clientKeyManager, + nodeGraph: {} as NodeGraph, + proxy: clientProxy, + taskManager: {} as TaskManager, + logger: logger, + }); + await clientNodeConnectionManager.start({ nodeManager: {} as NodeManager }); + // Other setup const privateKey = keysUtils.privateKeyFromPem(globalRootKeyPems[0]); const publicKey = keysUtils.publicKeyFromPrivateKey(privateKey); @@ -395,8 +389,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: (args) => GRPCClientAgent.createGRPCClientAgent(args), @@ -419,8 +411,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -450,7 +440,7 @@ describe(`${NodeConnection.name} test`, () => { }); test('connects to its target but proxies connect first', async () => { await clientProxy.openConnectionForward( - targetNodeId, + [targetNodeId], localHost, targetPort, ); @@ -459,8 +449,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -508,9 +496,7 @@ describe(`${NodeConnection.name} test`, () => { nodeConnection = await NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: killSelf, targetHost: polykeyAgent.proxy.getProxyHost(), targetNodeId: polykeyAgent.keyManager.getNodeId(), @@ -545,8 +531,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: '128.0.0.1' as Host, targetPort: 12345 as Port, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: (args) => GRPCClientAgent.createGRPCClientAgent(args), @@ -560,17 +544,20 @@ describe(`${NodeConnection.name} test`, () => { const connProm = NodeConnection.createNodeConnection( { targetNodeId: targetNodeId, - targetHost: '128.0.0.1' as Host, - targetPort: 12345 as Port, + targetHost: localHost, + targetPort: 55556 as Port, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: (args) => GRPCClientAgent.createGRPCClientAgent(args), }, { timer: new Timer({ delay: 1000 }) }, - ).catch(() => (timedOut = true)); + ).then( + () => {}, + () => { + timedOut = true; + }, + ); expect(timedOut).toBeFalse(); await sleep(500); expect(timedOut).toBeFalse(); @@ -586,8 +573,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -607,8 +592,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -645,9 +628,7 @@ describe(`${NodeConnection.name} test`, () => { const nodeConnectionP = NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: killSelf, targetHost: proxy.getProxyHost(), targetNodeId: targetNodeId, @@ -690,9 +671,7 @@ describe(`${NodeConnection.name} test`, () => { const nodeConnectionP = NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: killSelf, targetHost: proxy.getProxyHost(), targetNodeId: targetNodeId, @@ -733,9 +712,7 @@ describe(`${NodeConnection.name} test`, () => { nodeConnection = await NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: killSelf, targetHost: polykeyAgent.proxy.getProxyHost(), targetNodeId: polykeyAgent.keyManager.getNodeId(), @@ -804,9 +781,7 @@ describe(`${NodeConnection.name} test`, () => { nodeConnection = await NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: async () => { await killSelfCheck(); killSelfP.resolveP(null); @@ -883,9 +858,7 @@ describe(`${NodeConnection.name} test`, () => { nodeConnection = await NodeConnection.createNodeConnection( { proxy: clientProxy, - keyManager: clientKeyManager, logger: logger, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback: async () => { await killSelfCheck(); killSelfP.resolveP(null); @@ -930,8 +903,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -961,8 +932,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -992,8 +961,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1023,8 +990,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1054,8 +1019,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1085,8 +1048,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1120,8 +1081,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1149,8 +1108,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1178,8 +1135,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1205,8 +1160,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1227,8 +1180,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1253,8 +1204,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => @@ -1282,8 +1231,6 @@ describe(`${NodeConnection.name} test`, () => { targetHost: localHost, targetPort: targetPort, proxy: clientProxy, - keyManager: clientKeyManager, - nodeConnectionManager: dummyNodeConnectionManager, destroyCallback, logger: logger, clientFactory: async (args) => diff --git a/tests/nodes/NodeConnectionManager.general.test.ts b/tests/nodes/NodeConnectionManager.general.test.ts index 1a65cc258..f9fcb5655 100644 --- a/tests/nodes/NodeConnectionManager.general.test.ts +++ b/tests/nodes/NodeConnectionManager.general.test.ts @@ -476,7 +476,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { // 3. check that the relevant call was made. const sourceNodeId = testNodesUtils.generateRandomNodeId(); const targetNodeId = testNodesUtils.generateRandomNodeId(); - await nodeConnectionManager.sendSignallingMessage( + await nodeConnectionManager.sendSignalingMessage( remoteNodeId1, sourceNodeId, targetNodeId, @@ -516,7 +516,10 @@ describe(`${NodeConnectionManager.name} general test`, () => { relayMessage.setSrcId(nodesUtils.encodeNodeId(sourceNodeId)); relayMessage.setTargetId(nodesUtils.encodeNodeId(remoteNodeId1)); relayMessage.setProxyAddress(''); - await nodeConnectionManager.relaySignallingMessage(relayMessage); + await nodeConnectionManager.relaySignalingMessage(relayMessage, { + host: '' as Host, + port: 0 as Port, + }); expect(mockedNodesHolePunchMessageSend).toHaveBeenCalled(); } finally { diff --git a/tests/nodes/NodeConnectionManager.lifecycle.test.ts b/tests/nodes/NodeConnectionManager.lifecycle.test.ts index 4453d41dc..92680df4f 100644 --- a/tests/nodes/NodeConnectionManager.lifecycle.test.ts +++ b/tests/nodes/NodeConnectionManager.lifecycle.test.ts @@ -19,12 +19,13 @@ import * as nodesUtils from '@/nodes/utils'; import * as nodesErrors from '@/nodes/errors'; import * as keysUtils from '@/keys/utils'; import * as grpcUtils from '@/grpc/utils'; +import * as networkUtils from '@/network/utils'; import { globalRootKeyPems } from '../fixtures/globalRootKeyPems'; describe(`${NodeConnectionManager.name} lifecycle test`, () => { const logger = new Logger( `${NodeConnectionManager.name} test`, - LogLevel.WARN, + LogLevel.INFO, [new StreamHandler()], ); grpcUtils.setLogger(logger.getChild('grpc')); @@ -85,6 +86,8 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { let remoteNodeIdString1: NodeIdString; let remoteNodeId2: NodeId; + const resolveHostnameMock = jest.spyOn(networkUtils, 'resolveHostnames'); + const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager; beforeAll(async () => { @@ -128,6 +131,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { }); beforeEach(async () => { + resolveHostnameMock.mockRestore(); dataDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'polykey-test-'), ); @@ -238,8 +242,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { await taskManager.startProcessing(); // @ts-ignore: kidnap connections const connections = nodeConnectionManager.connections; - // @ts-ignore: kidnap connectionLocks - const connectionLocks = nodeConnectionManager.connectionLocks; const initialConnection = connections.get(remoteNodeIdString1); expect(initialConnection).toBeUndefined(); await withF( @@ -248,18 +250,16 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { expect(conn).toBeDefined(); const intermediaryConnection = connections.get(remoteNodeIdString1); expect(intermediaryConnection).toBeDefined(); - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBeTruthy(); }, ); const finalConnection = connections.get(remoteNodeIdString1); expect(finalConnection).toBeDefined(); // Neither write nor read lock should be locked now - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBeFalsy(); } finally { await nodeConnectionManager?.stop(); } }); - test('withConnF should create connection and hold lock', async () => { + test('withConnF should create connection', async () => { // NodeConnectionManager under test let nodeConnectionManager: NodeConnectionManager | undefined; try { @@ -274,22 +274,19 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { await taskManager.startProcessing(); // @ts-ignore: kidnap connections const connections = nodeConnectionManager.connections; - // @ts-ignore: kidnap connectionLocks - const connectionLocks = nodeConnectionManager.connectionLocks; const initialConnection = connections.get(remoteNodeIdString1); expect(initialConnection).toBeUndefined(); await nodeConnectionManager.withConnF(remoteNodeId1, async () => { - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBe(true); + // Do nothing }); const finalConnection = connections.get(remoteNodeIdString1); - // Check entry is in map and lock is released + // Check entry is in map expect(finalConnection).toBeDefined(); - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBeFalsy(); } finally { await nodeConnectionManager?.stop(); } }); - test('withConnG should create connection and hold lock', async () => { + test('withConnG should create connection', async () => { // NodeConnectionManager under test let nodeConnectionManager: NodeConnectionManager | undefined; try { @@ -304,8 +301,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { await taskManager.startProcessing(); // @ts-ignore: kidnap connections const connections = nodeConnectionManager.connections; - // @ts-ignore: kidnap connectionLocks - const connectionLocks = nodeConnectionManager.connectionLocks; const initialConnection = connections.get(remoteNodeIdString1); expect(initialConnection).toBeUndefined(); @@ -323,21 +318,17 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { }, ); - // Connection is not created yet, no locking applied + // Connection is not created yet expect(connections.get(remoteNodeIdString1)).not.toBeDefined(); // Iterating over generator for await (const _ of gen) { - // Should be locked for duration of stream - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBe(true); + // Do nothing } - // Unlocked after stream finished - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBe(false); const finalConnection = connections.get(remoteNodeIdString1); - // Check entry is in map and lock is released + // Check entry is in map expect(finalConnection).toBeDefined(); - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBe(false); } finally { await nodeConnectionManager?.stop(); } diff --git a/tests/nodes/NodeConnectionManager.seednodes.test.ts b/tests/nodes/NodeConnectionManager.seednodes.test.ts index b7b01a7e8..54560d610 100644 --- a/tests/nodes/NodeConnectionManager.seednodes.test.ts +++ b/tests/nodes/NodeConnectionManager.seednodes.test.ts @@ -453,8 +453,8 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { await nodeConnectionManager.start({ nodeManager }); await taskManager.startProcessing(); // This should complete without error - await nodeManager.syncNodeGraph(true, 5000, { - timer: new Timer({ delay: 20000 }), + await nodeManager.syncNodeGraph(true, 2000, { + timer: new Timer({ delay: 15000 }), }); // Information on remotes are found expect(await nodeGraph.getNode(nodeId1)).toBeDefined(); diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 4b786d7c5..0d4a82833 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -51,14 +51,16 @@ describe(`${NodeManager.name} test`, () => { const serverPort = 0 as Port; const externalPort = 0 as Port; const mockedPingNode = jest.fn(); // Jest.spyOn(NodeManager.prototype, 'pingNode'); + const mockedIsSeedNode = jest.fn(); const dummyNodeConnectionManager = { connConnectTime: 5000, pingNode: mockedPingNode, + isSeedNode: mockedIsSeedNode, } as unknown as NodeConnectionManager; beforeEach(async () => { - mockedPingNode.mockClear(); mockedPingNode.mockImplementation(async (_) => true); + mockedIsSeedNode.mockReturnValue(false); dataDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'polykey-test-'),