Skip to content

Commit

Permalink
feat: concurrent mult-host connection
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Nov 8, 2022
1 parent 48b27c9 commit 72da5e2
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 138 deletions.
4 changes: 3 additions & 1 deletion src/network/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,9 @@ function verifyClientCertificateChain(certChain: Array<Certificate>): void {
* It will also filter out any duplicates or IPV6 addresses.
* @param addresses
*/
async function resolveHostnames(addresses: Array<NodeAddress>) {
async function resolveHostnames(
addresses: Array<NodeAddress>,
): Promise<Array<{ host: Host; port: Port }>> {
const existingAddresses: Set<string> = new Set();
const final: Array<{ host: Host; port: Port }> = [];
for (const address of addresses) {
Expand Down
2 changes: 2 additions & 0 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class NodeConnection<T extends GRPCClient> {
proxy,
clientFactory,
destroyCallback = async () => {},
destroyTimeout,
logger = new Logger(this.name),
}: {
targetNodeId: NodeId;
Expand All @@ -53,6 +54,7 @@ class NodeConnection<T extends GRPCClient> {
proxy: Proxy;
clientFactory: (...args) => Promise<T>;
destroyCallback?: () => Promise<void>;
destroyTimeout?: number;
logger?: Logger;
},
ctx?: Partial<ContextTimed>,
Expand Down
196 changes: 101 additions & 95 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type {
import type NodeManager from './NodeManager';
import type { ContextTimed } from 'contexts/types';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { PromiseDeconstructed } from '../types';
import { withF } from '@matrixai/resources';
import Logger from '@matrixai/logger';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
Expand All @@ -24,7 +25,6 @@ import { Timer } from '@matrixai/timer';
import NodeConnection from './NodeConnection';
import * as nodesUtils from './utils';
import * as nodesErrors from './errors';
import * as contextsErrors from '../contexts/errors';
import { context, timedCancellable } from '../contexts';
import GRPCClientAgent from '../agent/GRPCClientAgent';
import * as validationUtils from '../validation/utils';
Expand Down Expand Up @@ -346,110 +346,116 @@ class NodeConnectionManager {
);
});
}
const destroyCallbackProms: Array<PromiseDeconstructed<void>> = [];
const errors: Array<any> = [];
let connectionsCount = targetAddresses.length;
for (const address of targetAddresses) {
// Creating new connection
const firstConnectionIndexProm = promise<number | null>();
const nodeConnectionProms = targetAddresses.map((address, index) => {
const destroyCallbackProm = promise<void>();
const abortController = new AbortController();
const timer = new Timer({
handler: () => {
abortController.abort(contextsErrors.ErrorContextsTimedTimeOut);
destroyCallbackProms[index] = destroyCallbackProm;
const nodeConnectionProm = NodeConnection.createNodeConnection(
{
targetNodeId: targetNodeId,
targetHost: address.host,
targetHostname: targetHostname,
targetPort: address.port,
proxy: this.proxy,
destroyTimeout: this.ncDestroyTimeout,
destroyCallback: async () => {
destroyCallbackProm.resolveP();
},
logger: this.logger.getChild(
`${NodeConnection.name} [${nodesUtils.encodeNodeId(
targetNodeId,
)}@${address.host}:${address.port}]`,
),
clientFactory: async (args) =>
GRPCClientAgent.createGRPCClientAgent(args),
},
delay: ctx.timer.getTimeout() / connectionsCount,
});
connectionsCount -= 1;
const eventListener = () => {
abortController.abort(ctx.signal.reason);
};
if (ctx.signal.aborted) {
abortController.abort(ctx.signal.reason);
ctx,
);
void nodeConnectionProm.then(
() => firstConnectionIndexProm.resolveP(index),
(e) => {
this.logger.info(
`Creating NodeConnection failed for ${targetNodeIdEncoded}`,
);
// Disable destroyCallback clean up
void destroyCallbackProm.p.then(() => {});
errors.push(e);
if (errors.length === targetAddresses.length) {
firstConnectionIndexProm.resolveP(null);
}
},
);
return nodeConnectionProm;
});
let newConnection: NodeConnection<GRPCClientAgent>;
try {
newConnection = await Promise.any(nodeConnectionProms);
} catch (e) {
// All connections failed to establish
this.logger.info(
`Failed NodeConnection for ${targetNodeIdEncoded} with ${errors}`,
);
if (errors.length === 1) {
throw errors[0];
} else {
ctx.signal.addEventListener('abort', eventListener);
}
try {
const newConnection = await NodeConnection.createNodeConnection(
throw new nodesErrors.ErrorNodeConnectionMultiConnectionFailed(
'failed to establish connection with multiple hosts',
{
targetNodeId: targetNodeId,
targetHost: address.host,
targetHostname: targetHostname,
targetPort: address.port,
proxy: this.proxy,
destroyCallback: async () => {
destroyCallbackProm.resolveP();
},
logger: this.logger.getChild(
`${NodeConnection.name} [${nodesUtils.encodeNodeId(
targetNodeId,
)}@${address.host}:${address.port}]`,
),
clientFactory: async (args) =>
GRPCClientAgent.createGRPCClientAgent(args),
cause: new AggregateError(errors),
},
{ signal: abortController.signal, timer },
);
void destroyCallbackProm.p.then(async () => {
this.logger.info('DestroyedCallback was called');
// To avoid deadlock only in the case where this is called
// we want to check for destroying connection and read lock
const connAndTimer = this.connections.get(targetNodeIdString);
// If the connection is calling destroyCallback then it SHOULD
// exist in the connection map
if (connAndTimer == null) return;
// Already locked so already destroying
if (this.connectionLocks.isLocked(targetNodeIdString)) return;
// Connection is already destroying
if (connAndTimer?.connection?.[status] === 'destroying') return;
await this.destroyConnection(targetNodeId);
});
// We can assume connection was established and destination was valid,
// we can add the target to the nodeGraph
await this.nodeManager?.setNode(targetNodeId, targetAddress);
// Creating TTL timeout.
// We don't create a TTL for seed nodes.
const timeToLiveTimer = !this.isSeedNode(targetNodeId)
? new Timer({
handler: async () =>
await this.destroyConnection(targetNodeId),
delay: this.connTimeoutTime,
})
: null;
const newConnAndTimer: ConnectionAndTimer = {
connection: newConnection,
timer: timeToLiveTimer,
usageCount: 0,
};
this.connections.set(targetNodeIdString, newConnAndTimer);
// Enable destroyCallback clean up
this.logger.info(
`Created NodeConnection for ${targetNodeIdEncoded}`,
);
return newConnAndTimer;
} catch (e) {
this.logger.info(
`Creating NodeConnection failed for ${targetNodeIdEncoded}`,
);
// Disable destroyCallback clean up
void destroyCallbackProm.p.then(() => {});
errors.push(e);
} finally {
abortController.signal.removeEventListener('abort', eventListener);
timer.cancel();
}
}
this.logger.info(
`Failed NodeConnection for ${targetNodeIdEncoded} with ${errors}`,
// Cleaning up other connections
const successfulIndex = await firstConnectionIndexProm.p;
if (successfulIndex === null) never();
const cleanUpReason = Symbol('cleanUpReason');
await Promise.allSettled(
nodeConnectionProms.map(async (nodeConnectionProm, index) => {
if (index === successfulIndex) return;
nodeConnectionProm.cancel(cleanUpReason);
return nodeConnectionProm.then(async (nodeConnection) => {
await nodeConnection.destroy({ timeout: this.ncDestroyTimeout });
});
}),
);
if (errors.length === 1) {
throw errors[0];
} else {
throw new nodesErrors.ErrorNodeConnectionMultiConnectionFailed(
'failed to establish connection with multiple hosts',
{
cause: new AggregateError(errors),
},
);
}
// Final set up
void destroyCallbackProms[successfulIndex].p.then(async () => {
this.logger.info('DestroyedCallback was called');
// To avoid deadlock only in the case where this is called
// we want to check for destroying connection and read lock
const connAndTimer = this.connections.get(targetNodeIdString);
// If the connection is calling destroyCallback then it SHOULD
// exist in the connection map
if (connAndTimer == null) return;
// Already locked so already destroying
if (this.connectionLocks.isLocked(targetNodeIdString)) return;
// Connection is already destroying
if (connAndTimer?.connection?.[status] === 'destroying') return;
await this.destroyConnection(targetNodeId);
});
// We can assume connection was established and destination was valid,
// we can add the target to the nodeGraph
await this.nodeManager?.setNode(targetNodeId, targetAddress);
// Creating TTL timeout.
// We don't create a TTL for seed nodes.
const timeToLiveTimer = !this.isSeedNode(targetNodeId)
? new Timer({
handler: async () => await this.destroyConnection(targetNodeId),
delay: this.connTimeoutTime,
})
: null;
const newConnAndTimer: ConnectionAndTimer = {
connection: newConnection!,
timer: timeToLiveTimer,
usageCount: 0,
};
this.connections.set(targetNodeIdString, newConnAndTimer);
// Enable destroyCallback clean up
this.logger.info(`Created NodeConnection for ${targetNodeIdEncoded}`);
return newConnAndTimer;
},
);
}
Expand Down
46 changes: 5 additions & 41 deletions tests/nodes/NodeConnectionManager.lifecycle.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { NodeId, NodeIdString, SeedNodes } from '@/nodes/types';
import type { Host, Hostname, Port } from '@/network/types';
import type { Host, Port } from '@/network/types';
import type NodeManager from 'nodes/NodeManager';
import fs from 'fs';
import path from 'path';
Expand All @@ -25,7 +25,7 @@ import { globalRootKeyPems } from '../fixtures/globalRootKeyPems';
describe(`${NodeConnectionManager.name} lifecycle test`, () => {
const logger = new Logger(
`${NodeConnectionManager.name} test`,
LogLevel.WARN,
LogLevel.INFO,
[new StreamHandler()],
);
grpcUtils.setLogger(logger.getChild('grpc'));
Expand Down Expand Up @@ -86,6 +86,8 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
let remoteNodeIdString1: NodeIdString;
let remoteNodeId2: NodeId;

const resolveHostnameMock = jest.spyOn(networkUtils, 'resolveHostnames');

const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager;

beforeAll(async () => {
Expand Down Expand Up @@ -129,6 +131,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
});

beforeEach(async () => {
resolveHostnameMock.mockRestore();
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
Expand Down Expand Up @@ -225,45 +228,6 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => {
await nodeConnectionManager?.stop();
}
});
test('should create connection when resolving hostName with multiple results', async () => {
// Setting new information in the node graph
await nodeGraph.setNode(remoteNodeId1, {
host: 'test.com' as Hostname,
port: remoteNode1.proxy.getProxyPort(),
});
// Resolving hostname results in our target and two other targets
const resolveHostnameMock = jest.spyOn(networkUtils, 'resolveHostname');
resolveHostnameMock.mockResolvedValue([
remoteNode1.proxy.getProxyHost(),
'192.168.0.123' as Host,
]);
// NodeConnectionManager under test
let nodeConnectionManager: NodeConnectionManager | undefined;
try {
nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
taskManager,
logger: nodeConnectionManagerLogger,
});
await nodeConnectionManager.start({ nodeManager: dummyNodeManager });
await taskManager.startProcessing();
// @ts-ignore: kidnap connections
const connections = nodeConnectionManager.connections;
// @ts-ignore: kidnap connectionLocks
const connectionLocks = nodeConnectionManager.connectionLocks;
const initialConnection = connections.get(remoteNodeIdString1);
expect(initialConnection).toBeUndefined();
await nodeConnectionManager.withConnF(remoteNodeId1, nop);
const finalConnection = connections.get(remoteNodeIdString1);
// Check entry is in map and lock is released
expect(finalConnection).toBeDefined();
expect(connectionLocks.isLocked(remoteNodeIdString1)).toBeFalsy();
} finally {
await nodeConnectionManager?.stop();
}
});
test('acquireConnection should create connection', async () => {
let nodeConnectionManager: NodeConnectionManager | undefined;
try {
Expand Down
4 changes: 3 additions & 1 deletion tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ describe(`${NodeManager.name} test`, () => {
const serverPort = 0 as Port;
const externalPort = 0 as Port;
const mockedPingNode = jest.fn(); // Jest.spyOn(NodeManager.prototype, 'pingNode');
const mockedIsSeedNode = jest.fn();
const dummyNodeConnectionManager = {
connConnectTime: 5000,
pingNode: mockedPingNode,
isSeedNode: mockedIsSeedNode,
} as unknown as NodeConnectionManager;

beforeEach(async () => {
mockedPingNode.mockClear();
mockedPingNode.mockImplementation(async (_) => true);
mockedIsSeedNode.mockReturnValue(false);

dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
Expand Down

0 comments on commit 72da5e2

Please sign in to comment.