Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-Host DNS and Multi NodeID resolution - for network entry and general usage #491

Merged
merged 17 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ class PolykeyAgent {
PolykeyAgent.eventSymbols.Proxy,
async (data: ConnectionData) => {
if (data.type === 'reverse') {
if (this.keyManager.getNodeId().equals(data.remoteNodeId)) return;
const address = networkUtils.buildAddress(
data.remoteHost,
data.remotePort,
Expand Down
8 changes: 6 additions & 2 deletions src/agent/GRPCClientAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
return grpcClientAgent;
}

public async destroy() {
await super.destroy();
public async destroy({
timeout,
}: {
timeout?: number;
} = {}) {
await super.destroy({ timeout });
}

@ready(new agentErrors.ErrorAgentClientDestroyed())
Expand Down
7 changes: 7 additions & 0 deletions src/agent/service/nodesHolePunchMessageSend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ function nodesHolePunchMessageSend({
connectionInfo!.remoteHost,
connectionInfo!.remotePort,
);
// Checking if the source and destination are the same
if (sourceId?.equals(targetId)) {
// Logging and silently dropping operation
logger.warn('Signalling relay message requested signal to itself');
callback(null, response);
return;
}
call.request.setProxyAddress(proxyAddress);
logger.debug(
`Relaying signalling message from ${srcNodeId}@${
Expand Down
22 changes: 20 additions & 2 deletions src/bin/CommandPolykey.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { FileSystem } from '../types';
import commander from 'commander';
import Logger, { StreamHandler, formatting } from '@matrixai/logger';
import Logger, {
StreamHandler,
formatting,
levelToString,
evalLogDataValue,
} from '@matrixai/logger';
import * as binUtils from './utils';
import * as binOptions from './utils/options';
import * as binErrors from './errors';
Expand Down Expand Up @@ -68,8 +73,21 @@ class CommandPolykey extends commander.Command {
// Set the logger formatter according to the format
if (opts.format === 'json') {
this.logger.handlers.forEach((handler) =>
handler.setFormatter(formatting.jsonFormatter),
handler.setFormatter((record) => {
return JSON.stringify(
{
level: levelToString(record.level),
keys: record.keys,
msg: record.msg,
...record.data,
},
evalLogDataValue,
);
}),
);
} else {
const format = formatting.format`${formatting.level}:${formatting.keys}:${formatting.msg}`;
this.logger.handlers.forEach((handler) => handler.setFormatter(format));
}
// Set the global upstream GRPC logger
grpcSetLogger(this.logger.getChild('grpc'));
Expand Down
11 changes: 10 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,16 @@ const config = {
// This is not used by the `PolykeyAgent` which defaults to `{}`
network: {
mainnet: {},
testnet: {},
testnet: {
vg9a9e957878s2qgtbdmu2atvli8ms7muukb1dk4dpbm4llkki3h0: {
host: 'testnet.polykey.io' as Host,
port: 1314 as Port,
},
vh9oqtvct10eaiv3cl4ebm0ko33sl0qqpvb59vud8cngfvqs4p4ng: {
host: 'testnet.polykey.io' as Host,
port: 1314 as Port,
},
},
},
},
};
Expand Down
2 changes: 2 additions & 0 deletions src/contexts/functions/timedCancellable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ function setupTimedCancellable<C extends ContextTimed, P extends Array<any>, R>(
);
ctx.signal = abortController.signal;
teardownContext = () => {
// The timer is not cancelled here because
// it was not created in this scope
finished = true;
};
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/grpc/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ abstract class GRPCClient<T extends Client = Client> {
const socket = session.socket as TLSSocket;
serverCertChain = networkUtils.getCertificateChain(socket);
try {
networkUtils.verifyServerCertificateChain(nodeId, serverCertChain);
networkUtils.verifyServerCertificateChain([nodeId], serverCertChain);
} catch (e) {
const e_ = e;
if (e instanceof networkErrors.ErrorCertChain) {
Expand Down
32 changes: 22 additions & 10 deletions src/network/ConnectionForward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type ConnectionsForward = {
interface ConnectionForward extends StartStop {}
@StartStop()
class ConnectionForward extends Connection {
public readonly nodeId: NodeId;

protected nodeId_: NodeId;
protected nodeIds: Array<NodeId>;
protected connections: ConnectionsForward;
protected pingInterval: ReturnType<typeof setInterval>;
protected utpConn: UTPConnection;
Expand Down Expand Up @@ -98,15 +98,15 @@ class ConnectionForward extends Connection {
};

public constructor({
nodeId,
nodeIds,
connections,
...rest
}: {
nodeId: NodeId;
nodeIds: Array<NodeId>;
connections: ConnectionsForward;
} & AbstractConstructorParameters<typeof Connection>[0]) {
super(rest);
this.nodeId = nodeId;
this.nodeIds = nodeIds;
this.connections = connections;
}

Expand All @@ -125,7 +125,7 @@ class ConnectionForward extends Connection {
// Promise for abortion and timeout
const { p: abortedP, resolveP: resolveAbortedP } = promise<void>();
if (ctx.signal.aborted) {
this.logger.debug(`Failed to start Connection Forward: aborted`);
this.logger.info(`Failed to start Connection Forward: aborted`);
// This is for arbitrary abortion reason provided by the caller
// Re-throw the default timeout error as a network timeout error
if (
Expand All @@ -137,6 +137,10 @@ class ConnectionForward extends Connection {
} else {
ctx.signal.addEventListener('abort', () => resolveAbortedP());
}
void ctx.timer.then(
() => resolveAbortedP(),
() => {},
);
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
const handleStartError = (e) => {
Expand Down Expand Up @@ -182,7 +186,7 @@ class ConnectionForward extends Connection {
this.tlsSocket.destroy();
}
this.utpSocket.off('message', this.handleMessage);
this.logger.debug(`Failed to start Connection Forward: ${e.message}`);
this.logger.info(`Failed to start Connection Forward: ${e.message}`);
throw new networkErrors.ErrorConnectionStart(undefined, {
cause: e,
});
Expand All @@ -192,7 +196,7 @@ class ConnectionForward extends Connection {
this.tlsSocket.on('error', this.handleError);
this.tlsSocket.off('error', handleStartError);
if (ctx.signal.aborted) {
this.logger.debug(`Failed to start Connection Forward: aborted`);
this.logger.info(`Failed to start Connection Forward: aborted`);
// Clean up partial start
// TLSSocket isn't established yet, so it is destroyed
if (!this.tlsSocket.destroyed) {
Expand All @@ -211,9 +215,12 @@ class ConnectionForward extends Connection {
}
const serverCertChain = networkUtils.getCertificateChain(this.tlsSocket);
try {
networkUtils.verifyServerCertificateChain(this.nodeId, serverCertChain);
this.nodeId_ = networkUtils.verifyServerCertificateChain(
this.nodeIds,
serverCertChain,
);
} catch (e) {
this.logger.debug(
this.logger.info(
`Failed to start Connection Forward: verification failed`,
);
// Clean up partial start
Expand Down Expand Up @@ -259,6 +266,11 @@ class ConnectionForward extends Connection {
this.logger.info('Stopped Connection Forward');
}

@ready(new networkErrors.ErrorConnectionNotRunning())
get nodeId() {
return this.nodeId_;
}

@ready(new networkErrors.ErrorConnectionNotRunning())
public compose(clientSocket: Socket): void {
try {
Expand Down
59 changes: 42 additions & 17 deletions src/network/Proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import * as nodesUtils from '../nodes/utils';
import { promisify } from '../utils';
import { timedCancellable, context } from '../contexts';

const clientConnectionClosedReason = Symbol('clientConnectionClosedReason');

interface Proxy extends StartStop {}
@StartStop()
class Proxy {
Expand Down Expand Up @@ -316,23 +318,32 @@ class Proxy {
* Set timer to `null` explicitly to wait forever
*/
public openConnectionForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
): PromiseCancellable<NodeId>;
@ready(new networkErrors.ErrorProxyNotRunning(), true)
@timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime)
public async openConnectionForward(
nodeId: NodeId,
nodeId: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
@context ctx: ContextTimed,
): Promise<void> {
): Promise<NodeId> {
const proxyAddress = networkUtils.buildAddress(proxyHost, proxyPort);
await this.connectionLocksForward.withF([proxyAddress, Lock], async () => {
await this.establishConnectionForward(nodeId, proxyHost, proxyPort, ctx);
});
return await this.connectionLocksForward.withF(
[proxyAddress, Lock],
async () => {
const connectionForward = await this.establishConnectionForward(
nodeId,
proxyHost,
proxyPort,
ctx,
);
return connectionForward.nodeId;
},
);
}

@ready(new networkErrors.ErrorProxyNotRunning(), true)
Expand Down Expand Up @@ -409,10 +420,22 @@ class Proxy {
}
await this.connectionLocksForward.withF([proxyAddress, Lock], async () => {
const timer = new Timer({ delay: this.connConnectTime });
let cleanUpConnectionListener = () => {};
try {
await this.connectForward(nodeId, proxyHost, proxyPort, clientSocket, {
timer,
});
const connectForwardProm = this.connectForward(
[nodeId],
proxyHost,
proxyPort,
clientSocket,
{
timer,
},
);
cleanUpConnectionListener = () => {
connectForwardProm.cancel(clientConnectionClosedReason);
};
clientSocket.addListener('close', cleanUpConnectionListener);
await connectForwardProm;
} catch (e) {
if (e instanceof networkErrors.ErrorProxyConnectInvalidUrl) {
if (!clientSocket.destroyed) {
Expand Down Expand Up @@ -471,6 +494,7 @@ class Proxy {
return;
} finally {
timer.cancel();
clientSocket.removeListener('close', cleanUpConnectionListener);
}
// After composing, switch off this error handler
clientSocket.off('error', handleConnectError);
Expand All @@ -482,22 +506,22 @@ class Proxy {
};

protected connectForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
clientSocket: Socket,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
): PromiseCancellable<NodeId>;
@timedCancellable(true, (proxy: Proxy) => proxy.connConnectTime)
protected async connectForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
clientSocket: Socket,
@context ctx: ContextTimed,
): Promise<void> {
): Promise<NodeId> {
const conn = await this.establishConnectionForward(
nodeId,
nodeIds,
proxyHost,
proxyPort,
ctx,
Expand All @@ -511,10 +535,11 @@ class Proxy {
remotePort: conn.port,
type: 'forward',
});
return conn.nodeId;
}

protected async establishConnectionForward(
nodeId: NodeId,
nodeIds: Array<NodeId>,
proxyHost: Host,
proxyPort: Port,
ctx: ContextTimed,
Expand All @@ -530,7 +555,7 @@ class Proxy {
return conn;
}
conn = new ConnectionForward({
nodeId,
nodeIds,
connections: this.connectionsForward,
utpSocket: this.utpSocket,
host: proxyHost,
Expand Down
14 changes: 10 additions & 4 deletions src/network/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class ErrorConnectionEndTimeout<T> extends ErrorConnection<T> {
exitCode = sysexits.UNAVAILABLE;
}

class ErrorConnectionNodesEmpty<T> extends ErrorConnection<T> {
static description = 'Nodes list to verify against was empty';
exitCode = sysexits.USAGE;
}

/**
* Used by ConnectionForward and ConnectionReverse
*/
Expand Down Expand Up @@ -129,9 +134,9 @@ class ErrorCertChainSignatureInvalid<T> extends ErrorCertChain<T> {
exitCode = sysexits.PROTOCOL;
}

class ErrorHostnameResolutionFailed<T> extends ErrorNetwork<T> {
static description = 'Unable to resolve hostname';
exitCode = sysexits.USAGE;
class ErrorDNSResolver<T> extends ErrorNetwork<T> {
static description = 'DNS resolution failed';
exitCode = sysexits.SOFTWARE;
}

export {
Expand All @@ -148,6 +153,7 @@ export {
ErrorConnectionMessageParse,
ErrorConnectionTimeout,
ErrorConnectionEndTimeout,
ErrorConnectionNodesEmpty,
ErrorConnectionStart,
ErrorConnectionStartTimeout,
ErrorConnectionStartTimeoutMax,
Expand All @@ -161,5 +167,5 @@ export {
ErrorCertChainNameInvalid,
ErrorCertChainKeyInvalid,
ErrorCertChainSignatureInvalid,
ErrorHostnameResolutionFailed,
ErrorDNSResolver,
};
Loading