Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: port control client and service from ml connector #308

Merged
merged 8 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ TEST_LISTEN_PORT=4002
# environment, i.e. when you're running it locally against your own implementation.
INBOUND_MUTUAL_TLS_ENABLED=false
OUTBOUND_MUTUAL_TLS_ENABLED=false
TEST_MUTUAL_TLS_ENABLED=false

# Enable verification or incoming JWS signatures
# Note that signatures will be required on incoming messages
Expand Down Expand Up @@ -42,6 +43,10 @@ OUT_CA_CERT_PATH=./secrets/cacert.pem
OUT_CLIENT_CERT_PATH=./secrets/servercert.pem
OUT_CLIENT_KEY_PATH=./secrets/serverkey.pem

TEST_CA_CERT_PATH=./secrets/cacert.pem
TEST_CLIENT_CERT_PATH=./secrets/servercert.pem
TEST_CLIENT_KEY_PATH=./secrets/serverkey.pem

# The number of space characters by which to indent pretty-printed logs. If set to zero, log events
# will each be printed on a single line.
LOG_INDENT=0
Expand Down Expand Up @@ -138,3 +143,11 @@ RESERVE_NOTIFICATION=true

# resources API versions should be string in format: "resouceOneName=1.0,resourceTwoName=1.1"
RESOURCE_VERSIONS="transfers=1.1,participants=1.1"

# Management API websocket connection settings.
# The Management API uses this for exchanging connector management messages.
MGMT_API_WS_URL=127.0.0.1
MGMT_API_WS_PORT=4005
# Set to true to enable the use of PM4ML-related services e.g MCM, Management API service
# when running the scheme-adapter as a mojaloop connector component within Payment Manager for Mojaloop.
PM4ML_ENABLED=false
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
.idea
node_modules/
.swp
src/junit.xml
junit.xml
.vscode
secrets/*.pem
coverage
src/.env
.env

# https://devspace.sh/
devspace*
Expand Down
20 changes: 20 additions & 0 deletions audit-resolve.json
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,26 @@
"decision": "ignore",
"madeAt": 1649898257344,
"expiresAt": 1652490250295
},
"1070030|@mojaloop/central-services-shared>widdershins>markdown-it": {
"decision": "ignore",
"madeAt": 1650460940438,
"expiresAt": 1653052932045
},
"1070030|@mojaloop/central-services-shared>shins>markdown-it": {
"decision": "ignore",
"madeAt": 1650459472663,
"expiresAt": 1653051469252
},
"1068154|@mojaloop/central-services-shared>shins>sanitize-html": {
"decision": "ignore",
"madeAt": 1650459474362,
"expiresAt": 1653051469252
},
"1068155|@mojaloop/central-services-shared>shins>sanitize-html": {
"decision": "ignore",
"madeAt": 1650459475376,
"expiresAt": 1653051469252
}
},
"rules": {},
Expand Down
515 changes: 0 additions & 515 deletions junit.xml

This file was deleted.

11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
"engines": {
"node": "=16.x"
},
"_moduleAliases": {
"~": "src"
},
"scripts": {
"audit:resolve": "SHELL=sh resolve-audit --production",
"audit:check": "SHELL=sh check-audit --production",
Expand Down Expand Up @@ -65,6 +68,7 @@
"dotenv": "^10.0.0",
"env-var": "^7.0.1",
"express": "^4.17.2",
"fast-json-patch": "^3.1.1",
"javascript-state-machine": "^3.1.0",
"js-yaml": "^4.1.0",
"json-schema-ref-parser": "^9.0.9",
Expand Down
221 changes: 221 additions & 0 deletions src/ControlAgent/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/**************************************************************************
* (C) Copyright ModusBox Inc. 2020 - All rights reserved. *
* *
* This file is made available under the terms of the license agreement *
* specified in the corresponding source code repository. *
* *
* ORIGINAL AUTHOR: *
* Matt Kingston - [email protected] *
**************************************************************************/

// This server has deliberately been written separate from any other server in the SDK. There is
// some reasonable argument that it could be part of the outbound or test server. It has not been
// incorporated in either as, at the time of writing, it is intended to be maintained in a
// proprietary fork. Therefore, keeping it independent of other servers will avoid the maintenance
// burden that would otherwise be associated with incorporating it with those.
//
// It inherits from the Server class from the 'ws' websocket library for Node, which in turn
// inherits from EventEmitter. We exploit this to emit an event when a reconfigure message is sent
// to this server. Then, when this server's reconfigure method is called, it reconfigures itself
// and sends a message to all clients notifying them of the new application configuration.
//
// It expects new configuration to be supplied as an array of JSON patches. It therefore exposes
// the current configuration to

const assert = require('assert').strict;
const ws = require('ws');
const jsonPatch = require('fast-json-patch');
const randomPhrase = require('~/lib/randomphrase');


/**************************************************************************
* The message protocol messages, verbs, and errors
*************************************************************************/
const MESSAGE = {
CONFIGURATION: 'CONFIGURATION',
ERROR: 'ERROR',
};

const VERB = {
READ: 'READ',
NOTIFY: 'NOTIFY',
PATCH: 'PATCH'
};

const ERROR = {
UNSUPPORTED_MESSAGE: 'UNSUPPORTED_MESSAGE',
UNSUPPORTED_VERB: 'UNSUPPORTED_VERB',
JSON_PARSE_ERROR: 'JSON_PARSE_ERROR',
};

/**************************************************************************
* Events emitted by the control client
*************************************************************************/
const EVENT = {
RECONFIGURE: 'RECONFIGURE',
};

/**************************************************************************
* Private convenience functions
*************************************************************************/
const serialise = JSON.stringify;
const deserialise = (msg) => {
//reviver function
return JSON.parse(msg.toString(), (k, v) => {
if (
v !== null &&
typeof v === 'object' &&
'type' in v &&
v.type === 'Buffer' &&
'data' in v &&
Array.isArray(v.data)) {
return new Buffer(v.data);
}
return v;
});
};

const buildMsg = (verb, msg, data, id = randomPhrase()) => serialise({
verb,
msg,
data,
id,
});

const buildPatchConfiguration = (oldConf, newConf, id) => {
const patches = jsonPatch.compare(oldConf, newConf);
return buildMsg(VERB.PATCH, MESSAGE.CONFIGURATION, patches, id);
};

/**************************************************************************
* build
*
* Public object exposing an API to build valid protocol messages.
* It is not the only way to build valid messages within the protocol.
*************************************************************************/
const build = {
CONFIGURATION: {
PATCH: buildPatchConfiguration,
READ: (id) => buildMsg(VERB.READ, MESSAGE.CONFIGURATION, {}, id),
NOTIFY: (config, id) => buildMsg(VERB.NOTIFY, MESSAGE.CONFIGURATION, config, id),
},
ERROR: {
NOTIFY: {
UNSUPPORTED_MESSAGE: (id) => buildMsg(VERB.NOTIFY, MESSAGE.ERROR, ERROR.UNSUPPORTED_MESSAGE, id),
UNSUPPORTED_VERB: (id) => buildMsg(VERB.NOTIFY, MESSAGE.ERROR, ERROR.UNSUPPORTED_VERB, id),
JSON_PARSE_ERROR: (id) => buildMsg(VERB.NOTIFY, MESSAGE.ERROR, ERROR.JSON_PARSE_ERROR, id),
}
},
};

/**************************************************************************
* Client
*
* The Control Client. Client for the websocket control API.
* Used to hot-restart the SDK.
*
* logger - Logger- see SDK logger used elsewhere
* address - address of control server
* port - port of control server
*************************************************************************/
class Client extends ws {
/**
* Consider this a private constructor.
* `Client` instances outside of this class should be created via the `Create(...args)` static method.
*/
constructor({ address = 'localhost', port, logger, appConfig }) {
super(`ws://${address}:${port}`);
this._logger = logger;
this._appConfig = appConfig;
}

// Really only exposed so that a user can import only the client for convenience
get Build() {
return build;
}

static Create(...args) {
return new Promise((resolve, reject) => {
const client = new Client(...args);
client.on('open', () => resolve(client));
client.on('error', (err) => reject(err));
client.on('message', client._handle);
});
}

async send(msg) {
const data = typeof msg === 'string' ? msg : serialise(msg);
this._logger.push({ data }).log('Sending message');
return new Promise((resolve) => super.send.call(this, data, resolve));
}

// Receive a single message
async receive() {
return new Promise((resolve) => this.once('message', (data) => {
const msg = deserialise(data);
this._logger.push({ msg }).log('Received');
resolve(msg);
}));
}

// Close connection
async stop() {
this._logger.log('Control client shutting down...');
this.close();
}

reconfigure({ logger = this._logger, port = 0, appConfig = this._appConfig }) {
assert(port === this._socket.remotePort, 'Cannot reconfigure running port');
return () => {
this._logger = logger;
this._appConfig = appConfig;
this._logger.log('restarted');
};
}

// Handle incoming message from the server.
_handle(data) {
// TODO: json-schema validation of received message- should be pretty straight-forward
// and will allow better documentation of the API
let msg;
try {
msg = deserialise(data);
} catch (err) {
this._logger.push({ data }).log('Couldn\'t parse received message');
this.send(build.ERROR.NOTIFY.JSON_PARSE_ERROR());
}
this._logger.push({ msg }).log('Handling received message');
switch (msg.msg) {
case MESSAGE.CONFIGURATION:
switch (msg.verb) {
case VERB.NOTIFY:
case VERB.PATCH: {
const dup = JSON.parse(JSON.stringify(this._appConfig)); // fast-json-patch explicitly mutates
jsonPatch.applyPatch(dup, msg.data);
this._logger.push({ oldConf: this._appConfig, newConf: dup }).log('Emitting new configuration');
this.emit(EVENT.RECONFIGURE, dup);
break;
}
default:
this.send(build.ERROR.NOTIFY.UNSUPPORTED_VERB(msg.id));
break;
}
break;
default:
this.send(build.ERROR.NOTIFY.UNSUPPORTED_MESSAGE(msg.id));
break;
}

}
}



module.exports = {
Client,
build,
MESSAGE,
VERB,
ERROR,
EVENT,
};
Loading