From 72da5e20b55c4ccfbd9ab24e954c32ddfc4e7d6f Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 7 Nov 2022 19:35:08 +1100 Subject: [PATCH] feat: concurrent mult-host connection [ci skip] --- src/network/utils.ts | 4 +- src/nodes/NodeConnection.ts | 2 + src/nodes/NodeConnectionManager.ts | 196 +++++++++--------- .../NodeConnectionManager.lifecycle.test.ts | 46 +--- tests/nodes/NodeManager.test.ts | 4 +- 5 files changed, 114 insertions(+), 138 deletions(-) diff --git a/src/network/utils.ts b/src/network/utils.ts index 55f9a0bfce..603f3e952b 100644 --- a/src/network/utils.ts +++ b/src/network/utils.ts @@ -490,7 +490,9 @@ function verifyClientCertificateChain(certChain: Array): void { * It will also filter out any duplicates or IPV6 addresses. * @param addresses */ -async function resolveHostnames(addresses: Array) { +async function resolveHostnames( + addresses: Array, +): Promise> { const existingAddresses: Set = new Set(); const final: Array<{ host: Host; port: Port }> = []; for (const address of addresses) { diff --git a/src/nodes/NodeConnection.ts b/src/nodes/NodeConnection.ts index d28e55eef0..c9311c81d1 100644 --- a/src/nodes/NodeConnection.ts +++ b/src/nodes/NodeConnection.ts @@ -44,6 +44,7 @@ class NodeConnection { proxy, clientFactory, destroyCallback = async () => {}, + destroyTimeout, logger = new Logger(this.name), }: { targetNodeId: NodeId; @@ -53,6 +54,7 @@ class NodeConnection { proxy: Proxy; clientFactory: (...args) => Promise; destroyCallback?: () => Promise; + destroyTimeout?: number; logger?: Logger; }, ctx?: Partial, diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 728e8b9919..c5bcc7894c 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -14,6 +14,7 @@ import type { import type NodeManager from './NodeManager'; import type { ContextTimed } from 'contexts/types'; import type { PromiseCancellable } from '@matrixai/async-cancellable'; +import type { PromiseDeconstructed } from '../types'; import { withF } from '@matrixai/resources'; import Logger from '@matrixai/logger'; import { ready, StartStop } from '@matrixai/async-init/dist/StartStop'; @@ -24,7 +25,6 @@ import { Timer } from '@matrixai/timer'; import NodeConnection from './NodeConnection'; import * as nodesUtils from './utils'; import * as nodesErrors from './errors'; -import * as contextsErrors from '../contexts/errors'; import { context, timedCancellable } from '../contexts'; import GRPCClientAgent from '../agent/GRPCClientAgent'; import * as validationUtils from '../validation/utils'; @@ -346,110 +346,116 @@ class NodeConnectionManager { ); }); } + const destroyCallbackProms: Array> = []; const errors: Array = []; - let connectionsCount = targetAddresses.length; - for (const address of targetAddresses) { - // Creating new connection + const firstConnectionIndexProm = promise(); + const nodeConnectionProms = targetAddresses.map((address, index) => { const destroyCallbackProm = promise(); - const abortController = new AbortController(); - const timer = new Timer({ - handler: () => { - abortController.abort(contextsErrors.ErrorContextsTimedTimeOut); + destroyCallbackProms[index] = destroyCallbackProm; + const nodeConnectionProm = NodeConnection.createNodeConnection( + { + targetNodeId: targetNodeId, + targetHost: address.host, + targetHostname: targetHostname, + targetPort: address.port, + proxy: this.proxy, + destroyTimeout: this.ncDestroyTimeout, + destroyCallback: async () => { + destroyCallbackProm.resolveP(); + }, + logger: this.logger.getChild( + `${NodeConnection.name} [${nodesUtils.encodeNodeId( + targetNodeId, + )}@${address.host}:${address.port}]`, + ), + clientFactory: async (args) => + GRPCClientAgent.createGRPCClientAgent(args), }, - delay: ctx.timer.getTimeout() / connectionsCount, - }); - connectionsCount -= 1; - const eventListener = () => { - abortController.abort(ctx.signal.reason); - }; - if (ctx.signal.aborted) { - abortController.abort(ctx.signal.reason); + ctx, + ); + void nodeConnectionProm.then( + () => firstConnectionIndexProm.resolveP(index), + (e) => { + this.logger.info( + `Creating NodeConnection failed for ${targetNodeIdEncoded}`, + ); + // Disable destroyCallback clean up + void destroyCallbackProm.p.then(() => {}); + errors.push(e); + if (errors.length === targetAddresses.length) { + firstConnectionIndexProm.resolveP(null); + } + }, + ); + return nodeConnectionProm; + }); + let newConnection: NodeConnection; + try { + newConnection = await Promise.any(nodeConnectionProms); + } catch (e) { + // All connections failed to establish + this.logger.info( + `Failed NodeConnection for ${targetNodeIdEncoded} with ${errors}`, + ); + if (errors.length === 1) { + throw errors[0]; } else { - ctx.signal.addEventListener('abort', eventListener); - } - try { - const newConnection = await NodeConnection.createNodeConnection( + throw new nodesErrors.ErrorNodeConnectionMultiConnectionFailed( + 'failed to establish connection with multiple hosts', { - targetNodeId: targetNodeId, - targetHost: address.host, - targetHostname: targetHostname, - targetPort: address.port, - proxy: this.proxy, - destroyCallback: async () => { - destroyCallbackProm.resolveP(); - }, - logger: this.logger.getChild( - `${NodeConnection.name} [${nodesUtils.encodeNodeId( - targetNodeId, - )}@${address.host}:${address.port}]`, - ), - clientFactory: async (args) => - GRPCClientAgent.createGRPCClientAgent(args), + cause: new AggregateError(errors), }, - { signal: abortController.signal, timer }, - ); - void destroyCallbackProm.p.then(async () => { - this.logger.info('DestroyedCallback was called'); - // To avoid deadlock only in the case where this is called - // we want to check for destroying connection and read lock - const connAndTimer = this.connections.get(targetNodeIdString); - // If the connection is calling destroyCallback then it SHOULD - // exist in the connection map - if (connAndTimer == null) return; - // Already locked so already destroying - if (this.connectionLocks.isLocked(targetNodeIdString)) return; - // Connection is already destroying - if (connAndTimer?.connection?.[status] === 'destroying') return; - await this.destroyConnection(targetNodeId); - }); - // We can assume connection was established and destination was valid, - // we can add the target to the nodeGraph - await this.nodeManager?.setNode(targetNodeId, targetAddress); - // Creating TTL timeout. - // We don't create a TTL for seed nodes. - const timeToLiveTimer = !this.isSeedNode(targetNodeId) - ? new Timer({ - handler: async () => - await this.destroyConnection(targetNodeId), - delay: this.connTimeoutTime, - }) - : null; - const newConnAndTimer: ConnectionAndTimer = { - connection: newConnection, - timer: timeToLiveTimer, - usageCount: 0, - }; - this.connections.set(targetNodeIdString, newConnAndTimer); - // Enable destroyCallback clean up - this.logger.info( - `Created NodeConnection for ${targetNodeIdEncoded}`, ); - return newConnAndTimer; - } catch (e) { - this.logger.info( - `Creating NodeConnection failed for ${targetNodeIdEncoded}`, - ); - // Disable destroyCallback clean up - void destroyCallbackProm.p.then(() => {}); - errors.push(e); - } finally { - abortController.signal.removeEventListener('abort', eventListener); - timer.cancel(); } } - this.logger.info( - `Failed NodeConnection for ${targetNodeIdEncoded} with ${errors}`, + // Cleaning up other connections + const successfulIndex = await firstConnectionIndexProm.p; + if (successfulIndex === null) never(); + const cleanUpReason = Symbol('cleanUpReason'); + await Promise.allSettled( + nodeConnectionProms.map(async (nodeConnectionProm, index) => { + if (index === successfulIndex) return; + nodeConnectionProm.cancel(cleanUpReason); + return nodeConnectionProm.then(async (nodeConnection) => { + await nodeConnection.destroy({ timeout: this.ncDestroyTimeout }); + }); + }), ); - if (errors.length === 1) { - throw errors[0]; - } else { - throw new nodesErrors.ErrorNodeConnectionMultiConnectionFailed( - 'failed to establish connection with multiple hosts', - { - cause: new AggregateError(errors), - }, - ); - } + // Final set up + void destroyCallbackProms[successfulIndex].p.then(async () => { + this.logger.info('DestroyedCallback was called'); + // To avoid deadlock only in the case where this is called + // we want to check for destroying connection and read lock + const connAndTimer = this.connections.get(targetNodeIdString); + // If the connection is calling destroyCallback then it SHOULD + // exist in the connection map + if (connAndTimer == null) return; + // Already locked so already destroying + if (this.connectionLocks.isLocked(targetNodeIdString)) return; + // Connection is already destroying + if (connAndTimer?.connection?.[status] === 'destroying') return; + await this.destroyConnection(targetNodeId); + }); + // We can assume connection was established and destination was valid, + // we can add the target to the nodeGraph + await this.nodeManager?.setNode(targetNodeId, targetAddress); + // Creating TTL timeout. + // We don't create a TTL for seed nodes. + const timeToLiveTimer = !this.isSeedNode(targetNodeId) + ? new Timer({ + handler: async () => await this.destroyConnection(targetNodeId), + delay: this.connTimeoutTime, + }) + : null; + const newConnAndTimer: ConnectionAndTimer = { + connection: newConnection!, + timer: timeToLiveTimer, + usageCount: 0, + }; + this.connections.set(targetNodeIdString, newConnAndTimer); + // Enable destroyCallback clean up + this.logger.info(`Created NodeConnection for ${targetNodeIdEncoded}`); + return newConnAndTimer; }, ); } diff --git a/tests/nodes/NodeConnectionManager.lifecycle.test.ts b/tests/nodes/NodeConnectionManager.lifecycle.test.ts index d7f1440a72..92680df4fa 100644 --- a/tests/nodes/NodeConnectionManager.lifecycle.test.ts +++ b/tests/nodes/NodeConnectionManager.lifecycle.test.ts @@ -1,5 +1,5 @@ import type { NodeId, NodeIdString, SeedNodes } from '@/nodes/types'; -import type { Host, Hostname, Port } from '@/network/types'; +import type { Host, Port } from '@/network/types'; import type NodeManager from 'nodes/NodeManager'; import fs from 'fs'; import path from 'path'; @@ -25,7 +25,7 @@ import { globalRootKeyPems } from '../fixtures/globalRootKeyPems'; describe(`${NodeConnectionManager.name} lifecycle test`, () => { const logger = new Logger( `${NodeConnectionManager.name} test`, - LogLevel.WARN, + LogLevel.INFO, [new StreamHandler()], ); grpcUtils.setLogger(logger.getChild('grpc')); @@ -86,6 +86,8 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { let remoteNodeIdString1: NodeIdString; let remoteNodeId2: NodeId; + const resolveHostnameMock = jest.spyOn(networkUtils, 'resolveHostnames'); + const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager; beforeAll(async () => { @@ -129,6 +131,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { }); beforeEach(async () => { + resolveHostnameMock.mockRestore(); dataDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'polykey-test-'), ); @@ -225,45 +228,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { await nodeConnectionManager?.stop(); } }); - test('should create connection when resolving hostName with multiple results', async () => { - // Setting new information in the node graph - await nodeGraph.setNode(remoteNodeId1, { - host: 'test.com' as Hostname, - port: remoteNode1.proxy.getProxyPort(), - }); - // Resolving hostname results in our target and two other targets - const resolveHostnameMock = jest.spyOn(networkUtils, 'resolveHostname'); - resolveHostnameMock.mockResolvedValue([ - remoteNode1.proxy.getProxyHost(), - '192.168.0.123' as Host, - ]); - // NodeConnectionManager under test - let nodeConnectionManager: NodeConnectionManager | undefined; - try { - nodeConnectionManager = new NodeConnectionManager({ - keyManager, - nodeGraph, - proxy, - taskManager, - logger: nodeConnectionManagerLogger, - }); - await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); - await taskManager.startProcessing(); - // @ts-ignore: kidnap connections - const connections = nodeConnectionManager.connections; - // @ts-ignore: kidnap connectionLocks - const connectionLocks = nodeConnectionManager.connectionLocks; - const initialConnection = connections.get(remoteNodeIdString1); - expect(initialConnection).toBeUndefined(); - await nodeConnectionManager.withConnF(remoteNodeId1, nop); - const finalConnection = connections.get(remoteNodeIdString1); - // Check entry is in map and lock is released - expect(finalConnection).toBeDefined(); - expect(connectionLocks.isLocked(remoteNodeIdString1)).toBeFalsy(); - } finally { - await nodeConnectionManager?.stop(); - } - }); test('acquireConnection should create connection', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; try { diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 4b786d7c5a..0d4a828333 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -51,14 +51,16 @@ describe(`${NodeManager.name} test`, () => { const serverPort = 0 as Port; const externalPort = 0 as Port; const mockedPingNode = jest.fn(); // Jest.spyOn(NodeManager.prototype, 'pingNode'); + const mockedIsSeedNode = jest.fn(); const dummyNodeConnectionManager = { connConnectTime: 5000, pingNode: mockedPingNode, + isSeedNode: mockedIsSeedNode, } as unknown as NodeConnectionManager; beforeEach(async () => { - mockedPingNode.mockClear(); mockedPingNode.mockImplementation(async (_) => true); + mockedIsSeedNode.mockReturnValue(false); dataDir = await fs.promises.mkdtemp( path.join(os.tmpdir(), 'polykey-test-'),