Skip to content

Commit

Permalink
Merge pull request #468 from MatrixAI/feature-proxy_cancellability
Browse files Browse the repository at this point in the history
Feature: network cancellability and deadlines
  • Loading branch information
tegefaulkes authored Oct 5, 2022
2 parents bc995aa + 6d0b677 commit 06df14a
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 255 deletions.
2 changes: 1 addition & 1 deletion src/network/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class Connection {
tlsConfig,
keepAliveTimeoutTime = 20000,
endTime = 1000,
punchIntervalTime = 1000,
punchIntervalTime = 50,
keepAliveIntervalTime = 1000,
logger,
}: {
Expand Down
54 changes: 35 additions & 19 deletions src/network/ConnectionForward.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import type { Socket, AddressInfo } from 'net';
import type { TLSSocket } from 'tls';
import type UTPConnection from 'utp-native/lib/connection';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { Certificate } from '../keys/types';
import type { Address, Host, NetworkMessage, Port } from './types';
import type { NodeId } from '../ids/types';
import type { AbstractConstructorParameters, Timer } from '../types';
import type { AbstractConstructorParameters } from '../types';
import type { ContextTimed } from '../contexts/types';
import tls from 'tls';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import Connection from './Connection';
import * as networkUtils from './utils';
import * as networkErrors from './errors';
import * as keysUtils from '../keys/utils';
import { promise, timerStart, timerStop } from '../utils';
import { promise } from '../utils';
import * as contextsErrors from '../contexts/errors';
import { timedCancellable, context } from '../contexts/index';

type ConnectionsForward = {
proxy: Map<Address, ConnectionForward>;
Expand Down Expand Up @@ -106,11 +110,7 @@ class ConnectionForward extends Connection {
this.connections = connections;
}

public async start({
timer,
}: {
timer?: Timer;
} = {}): Promise<void> {
public async start({ ctx }: { ctx: ContextTimed }): Promise<void> {
this.logger.info('Starting Connection Forward');
// Promise for ready
const { p: readyP, resolveP: resolveReadyP } = promise<void>();
Expand All @@ -119,6 +119,9 @@ class ConnectionForward extends Connection {
// Promise for secure connection
const { p: secureConnectP, resolveP: resolveSecureConnectP } =
promise<void>();
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
const handleStartError = (e) => {
Expand Down Expand Up @@ -154,7 +157,7 @@ class ConnectionForward extends Connection {
await Promise.race([
Promise.all([readyP, secureConnectP]).then(() => {}),
errorP,
...(timer != null ? [timer.timerP] : []),
abortedP,
]);
} catch (e) {
// Clean up partial start
Expand All @@ -177,15 +180,20 @@ class ConnectionForward extends Connection {
}
this.tlsSocket.on('error', this.handleError);
this.tlsSocket.off('error', handleStartError);
if (timer?.timedOut) {
if (ctx.signal.aborted) {
// Clean up partial start
// TLSSocket isn't established yet, so it is destroyed
if (!this.tlsSocket.destroyed) {
this.tlsSocket.end();
this.tlsSocket.destroy();
}
this.utpSocket.off('message', this.handleMessage);
throw new networkErrors.ErrorConnectionStartTimeout();
if (
ctx.signal.reason instanceof contextsErrors.ErrorContextsTimedTimeOut
) {
throw new networkErrors.ErrorConnectionStartTimeout();
}
throw ctx.signal.reason;
}
const serverCertChain = networkUtils.getCertificateChain(this.tlsSocket);
try {
Expand All @@ -197,7 +205,7 @@ class ConnectionForward extends Connection {
this.logger.debug('Sends tlsSocket ending');
// Graceful exit has its own end handler
this.tlsSocket.removeAllListeners('end');
await this.endGracefully(this.tlsSocket, this.endTime);
await this.endGracefully(this.tlsSocket);
throw e;
}
await this.startKeepAliveInterval();
Expand All @@ -219,14 +227,14 @@ class ConnectionForward extends Connection {
this.tlsSocket.unpipe();
// Graceful exit has its own end handler
this.tlsSocket.removeAllListeners('end');
endPs.push(this.endGracefully(this.tlsSocket, this.endTime));
endPs.push(this.endGracefully(this.tlsSocket));
}
if (this.clientSocket != null && !this.clientSocket.destroyed) {
this.logger.debug('Sends clientSocket ending');
this.clientSocket.unpipe();
// Graceful exit has its own end handler
this.clientSocket.removeAllListeners('end');
endPs.push(this.endGracefully(this.clientSocket, this.endTime));
endPs.push(this.endGracefully(this.clientSocket));
}
await Promise.all(endPs);
this.connections.proxy.delete(this.address);
Expand Down Expand Up @@ -320,17 +328,25 @@ class ConnectionForward extends Connection {
clearTimeout(this.timeout);
}

protected async endGracefully(socket: Socket, timeout: number) {
protected endGracefully(
socket: Socket,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
@timedCancellable(
true,
(connectionForward: ConnectionForward) => connectionForward.endTime,
)
protected async endGracefully(socket: Socket, @context ctx: ContextTimed) {
const { p: endP, resolveP: resolveEndP } = promise<void>();
socket.once('end', resolveEndP);
socket.end();
const timer = timerStart(timeout);
await Promise.race([endP, timer.timerP]);
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
await Promise.race([endP, abortedP]);
socket.removeListener('end', resolveEndP);
if (timer.timedOut) {
if (ctx.signal.aborted) {
socket.emit('error', new networkErrors.ErrorConnectionEndTimeout());
} else {
timerStop(timer);
}
// Must be destroyed if timed out
// If not timed out, force destroy the socket due to buggy tlsSocket and utpConn
Expand Down
81 changes: 47 additions & 34 deletions src/network/ConnectionReverse.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import type { Socket, AddressInfo } from 'net';
import type { TLSSocket } from 'tls';
import type UTPConnection from 'utp-native/lib/connection';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { Host, Port, Address, NetworkMessage } from './types';
import type { NodeId } from '../ids/types';
import type { Certificate } from '../keys/types';
import type { AbstractConstructorParameters, Timer } from '../types';
import type { AbstractConstructorParameters } from '../types';
import type { ContextTimed } from '../contexts/types';
import net from 'net';
import tls from 'tls';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import Connection from './Connection';
import * as networkUtils from './utils';
import * as networkErrors from './errors';
import * as keysUtils from '../keys/utils';
import { promise, timerStart, timerStop } from '../utils';
import { promise } from '../utils';
import * as contextsErrors from '../contexts/errors';
import { timedCancellable, context } from '../contexts';

type ConnectionsReverse = {
proxy: Map<Address, ConnectionReverse>;
Expand Down Expand Up @@ -104,18 +108,17 @@ class ConnectionReverse extends Connection {
this.connections = connections;
}

public async start({
timer,
}: {
timer?: Timer;
} = {}): Promise<void> {
public async start({ ctx }: { ctx: ContextTimed }): Promise<void> {
this.logger.info('Starting Connection Reverse');
// Promise for ready
const { p: readyP, resolveP: resolveReadyP } = promise<void>();
// Promise for server connection
const { p: socketP, resolveP: resolveSocketP } = promise<void>();
// Promise for start errors
const { p: errorP, rejectP: rejectErrorP } = promise<void>();
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
this.serverSocket = net.connect(this.serverPort, this.serverHost, () => {
Expand All @@ -136,21 +139,13 @@ class ConnectionReverse extends Connection {
this.serverSocket.on('close', this.handleClose);
let punchInterval;
try {
await Promise.race([
socketP,
errorP,
...(timer != null ? [timer.timerP] : []),
]);
await Promise.race([socketP, errorP, abortedP]);
// Send punch & ready signal
await this.send(networkUtils.pingBuffer);
punchInterval = setInterval(async () => {
await this.send(networkUtils.pingBuffer);
}, this.punchIntervalTime);
await Promise.race([
readyP,
errorP,
...(timer != null ? [timer.timerP] : []),
]);
await Promise.race([readyP, errorP, abortedP]);
} catch (e) {
// Clean up partial start
// Socket isn't established yet, so it is destroyed
Expand All @@ -169,7 +164,7 @@ class ConnectionReverse extends Connection {
}
this.serverSocket.on('error', this.handleError);
this.serverSocket.off('error', handleStartError);
if (timer?.timedOut) {
if (ctx.signal.aborted) {
// Clean up partial start
// Socket isn't established yet, so it is destroyed
this.serverSocket.destroy();
Expand All @@ -196,14 +191,14 @@ class ConnectionReverse extends Connection {
this.serverSocket.unpipe();
// Graceful exit has its own end handler
this.serverSocket.removeAllListeners('end');
endPs.push(this.endGracefully(this.serverSocket, this.endTime));
endPs.push(this.endGracefully(this.serverSocket));
}
if (this.tlsSocket != null && !this.tlsSocket.destroyed) {
this.logger.debug('Sends tlsSocket ending');
this.tlsSocket.unpipe();
// Graceful exit has its own end handler
this.tlsSocket.removeAllListeners('end');
endPs.push(this.endGracefully(this.tlsSocket, this.endTime));
endPs.push(this.endGracefully(this.tlsSocket));
}
await Promise.all(endPs);
this.connections.proxy.delete(this.address);
Expand All @@ -212,7 +207,10 @@ class ConnectionReverse extends Connection {
}

@ready(new networkErrors.ErrorConnectionNotRunning(), true)
public async compose(utpConn: UTPConnection, timer?: Timer): Promise<void> {
public async compose(
utpConn: UTPConnection,
ctx: ContextTimed,
): Promise<void> {
try {
if (this._composed) {
throw new networkErrors.ErrorConnectionComposed();
Expand All @@ -225,6 +223,9 @@ class ConnectionReverse extends Connection {
const handleComposeError = (e) => {
rejectErrorP(e);
};
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
const tlsSocket = new tls.TLSSocket(utpConn, {
key: Buffer.from(this.tlsConfig.keyPrivatePem, 'ascii'),
cert: Buffer.from(this.tlsConfig.certChainPem, 'ascii'),
Expand All @@ -237,11 +238,7 @@ class ConnectionReverse extends Connection {
});
tlsSocket.once('error', handleComposeError);
try {
await Promise.race([
secureP,
errorP,
...(timer != null ? [timer.timerP] : []),
]);
await Promise.race([secureP, errorP, abortedP]);
} catch (e) {
// Clean up partial compose
if (!tlsSocket.destroyed) {
Expand All @@ -262,13 +259,18 @@ class ConnectionReverse extends Connection {
await this.stop();
});
tlsSocket.off('error', handleComposeError);
if (timer?.timedOut) {
if (ctx.signal.aborted) {
// Clean up partial compose
if (!tlsSocket.destroyed) {
tlsSocket.end();
tlsSocket.destroy();
}
throw new networkErrors.ErrorConnectionComposeTimeout();
if (
ctx.signal.reason instanceof contextsErrors.ErrorContextsTimedTimeOut
) {
throw new networkErrors.ErrorConnectionComposeTimeout();
}
throw ctx.signal.reason;
}
const clientCertChain = networkUtils.getCertificateChain(tlsSocket);
try {
Expand Down Expand Up @@ -360,17 +362,28 @@ class ConnectionReverse extends Connection {
clearTimeout(this.timeout);
}

protected async endGracefully(socket: Socket, timeout: number) {
protected endGracefully(
socket: Socket,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
@timedCancellable(
true,
(connectionReverse: ConnectionReverse) => connectionReverse.endTime,
)
protected async endGracefully(
socket: Socket,
@context ctx: ContextTimed,
): Promise<void> {
const { p: endP, resolveP: resolveEndP } = promise<void>();
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
ctx.signal.addEventListener('abort', () => resolveAbortedP());
socket.once('end', resolveEndP);
socket.end();
const timer = timerStart(timeout);
await Promise.race([endP, timer.timerP]);
await Promise.race([endP, abortedP]);
socket.removeListener('end', resolveEndP);
if (timer.timedOut) {
if (ctx.signal.aborted) {
socket.emit('error', new networkErrors.ErrorConnectionEndTimeout());
} else {
timerStop(timer);
}
// Must be destroyed if timed out
// If not timed out, force destroy the socket due to buggy tlsSocket and utpConn
Expand Down
Loading

0 comments on commit 06df14a

Please sign in to comment.