Skip to content

Commit

Permalink
Merge pull request #137 from mojaloop/cleanup-add-create-date
Browse files Browse the repository at this point in the history
Cleanup to not log every result; added create date to typology result.
  • Loading branch information
aaronreynoza authored Jan 22, 2021
2 parents ce38715 + fdbc7f3 commit d941b72
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 29 deletions.
5 changes: 2 additions & 3 deletions typology-11/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ let producer: kafka.Producer;
const initializeProducer = () => {
producer = new kafka.Producer(new kafka.KafkaClient({
kafkaHost: config.kafkaEndpoint,
}), {});
}), {partitionerType: 2});
return new Promise((resolve) => {
producer.on('ready', () => resolve(undefined));
});
};

const publish = (topic: string, message: string) => {
const result = `{"topic":"${topic}",${message}`;
log(result, topic);
return new Promise((resolve) => {
producer.send(
[{
topic: config.resultTopic,
messages: [result]
messages: [result],
}],
(err) => {
if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
Expand Down
9 changes: 5 additions & 4 deletions typology-11/src/scoring-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ class typology11Type {
}

// https://lextego.atlassian.net/browse/ACTIO-198
const handleScores = (scores: any, topic: string, TransactionID: string) => {
const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
const score =
(scores.rule17 ? 0.25 : 0)
+ (scores.rule27 ? 0.25 : 0)
+ (scores.rule86 ? 0.25 : 0)
+ (scores.rule87 ? 0.25 : 0);

publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 11 score is ${score}, Reason: ${
publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}",
"textResult":"Typology 11 score is ${score}, Reason: ${
+ (scores.rule17 ? 'Transaction Divergence, ' : '')
+ (scores.rule27 ? 'Transaction Mirroring, ' : '')
+ (scores.rule86 ? 'Transaction Between Parties, ' : '')
Expand All @@ -40,7 +41,7 @@ const handleQuoteMessage = async (
) => {
try {
const transfer = JSON.parse(message.value.toString());
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
Expand Down Expand Up @@ -68,7 +69,7 @@ const handleQuoteMessage = async (
log(`Error while handling Co-located Parties ${TransactionID}, with message: \r\n${error}`, topic)
}

handleScores(scores, topic, TransactionID);
handleScores(scores, topic, TransactionID, HTTPTransactionDate);
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 1 addition & 3 deletions typology-214/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ let producer: kafka.Producer;
const initializeProducer = () => {
producer = new kafka.Producer(new kafka.KafkaClient({
kafkaHost: config.kafkaEndpoint,
}), {});
}), {partitionerType: 2});
return new Promise((resolve) => {
producer.on('ready', () => resolve(undefined));
});
};

const publish = (topic: string, message: string) => {
const result = `{"topic":"${topic}",${message}`;
log(result, topic);
return new Promise((resolve) => {
producer.send(
[{
topic: config.resultTopic,
messages: [result],
partition: config.partition,
}],
(err) => {
if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
Expand Down
9 changes: 5 additions & 4 deletions typology-214/src/scoring-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ class typology214Type {
}

// Composed probability for typology 214 = (012.p+027.p+030.p+048.p+078.p)
const handleScores = (scores: any, topic: string, TransactionID: string) => {
const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
const score =
(scores.rule12 ? 0.2 : 0)
+ (scores.rule27 ? 0.2 : 0)
+ (scores.rule30 ? 0.2 : 0)
+ (scores.rule48 ? 0.2 : 0)
+ (scores.rule78 ? 0.2 : 0);

publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '')
publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}",
"textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '')
+ (scores.rule12 ? 'Party Type Individual, ' : '')
+ (scores.rule27 ? 'Transaction Mirroring, ' : '')
+ (scores.rule30 ? 'New Payee, ' : '')
Expand All @@ -40,7 +41,7 @@ const handleQuoteMessage = async (
) => {
try {
const transfer = JSON.parse(message.value.toString());
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
Expand Down Expand Up @@ -72,7 +73,7 @@ const handleQuoteMessage = async (
log(`Error while handling Cash Withdraw ${TransactionID}, with message: \r\n${error}`, topic)
}

handleScores(scores, topic, TransactionID);
handleScores(scores, topic, TransactionID, HTTPTransactionDate);
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 1 addition & 3 deletions typology-27/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ let producer: kafka.Producer;
const initializeProducer = () => {
producer = new kafka.Producer(new kafka.KafkaClient({
kafkaHost: config.kafkaEndpoint,
}), {});
}), {partitionerType: 2});
return new Promise((resolve) => {
producer.on('ready', () => resolve(undefined));
});
};

const publish = (topic: string, message: string) => {
const result = `{"topic":"${topic}",${message}`;
log(result, topic);
return new Promise((resolve) => {
producer.send(
[{
topic: config.resultTopic,
messages: [result],
partition: config.partition,
}],
(err) => {
if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
Expand Down
11 changes: 6 additions & 5 deletions typology-27/src/scoring-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class typology27Type {
}

// Composed probability for typology 27 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p)
const handleScores = (scores: any, topic: string, TransactionID: string) => {
const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
const score =
(scores.rule9 ? 1 : 0)
* (scores.rule12 ? 1 : 0)
Expand All @@ -29,7 +29,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => {
+ (scores.rule78 ? 0.2 : 0)
);

publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '')
publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"createDate":"${transactionDate}",
"textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '')
+ (scores.rule12 ? 'Party Type Individual, ' : '')
+ (scores.rule14 ? 'Recent Password Reset, ' : '')
+ (scores.rule18 ? 'Exceptionally Large Transfer, ' : '')
Expand All @@ -47,10 +48,10 @@ const handleQuoteMessage = async (
) => {
try {
const transfer = JSON.parse(message.value.toString());
const { TransactionID, ILPSourceAccountAddress } = transfer;
const { TransactionID, ILPSourceAccountAddress, HTTPTransactionDate } = transfer;

const ILPList = await get(client, ILPSourceAccountAddress);
const historicalData = JSON.parse(ILPList);
const historicalData = ILPList == undefined ? undefined : JSON.parse(ILPList);
const scores: typology27Type = new typology27Type();

try { scores.rule9 = rules.handleRecentSimSwap({ transfer, historicalData }); }
Expand All @@ -75,7 +76,7 @@ const handleQuoteMessage = async (
try { scores.rule78 = rules.handleCashWithdraw(transfer); } catch (error) {
log(`Error while handling Cash Withdraw for transaction ${TransactionID}, with message: \r\n${error}`, topic)
}
handleScores(scores, topic, TransactionID);
handleScores(scores, topic, TransactionID, HTTPTransactionDate);
} catch (e) {
console.error(e);
}
Expand Down
4 changes: 1 addition & 3 deletions typology-28/src/producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@ let producer: kafka.Producer;
const initializeProducer = () => {
producer = new kafka.Producer(new kafka.KafkaClient({
kafkaHost: config.kafkaEndpoint,
}), {});
}), {partitionerType: 2});
return new Promise((resolve) => {
producer.on('ready', () => resolve(undefined));
});
};

const publish = (topic: string, message: string) => {
const result = `{"topic":"${topic}",${message}`;
log(result, topic);
return new Promise((resolve) => {
producer.send(
[{
topic: config.resultTopic,
messages: [result],
partition: config.partition,
}],
(err) => {
if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
Expand Down
9 changes: 5 additions & 4 deletions typology-28/src/scoring-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class typology28Type {
}

// Composed probability for typology 28 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p)
const handleScores = (scores: any, topic: string, TransactionID: string) => {
const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
const score =
(scores.rule2 ? 0.14 : 0)
+ (scores.rule12 ? 0.14 : 0)
Expand All @@ -30,7 +30,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => {
+ (scores.rule64 ? 0.15 : 0)
;

publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '')
publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","createDate":"${transactionDate}",
"score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '')
+ (scores.rule12 ? 'Party Type Individual, ' : '')
+ (scores.rule16 ? 'Transaction Convergence, ' : '')
+ (scores.rule27 ? 'Transaction Mirroring, ' : '')
Expand All @@ -49,7 +50,7 @@ const handleQuoteMessage = async (
) => {
try {
const transfer = JSON.parse(message.value.toString());
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
Expand Down Expand Up @@ -78,7 +79,7 @@ const handleQuoteMessage = async (
log(`Error while handling Benford's Law ${TransactionID}, with message: \r\n${error}`, topic)
}

handleScores(scores, topic, TransactionID);
handleScores(scores, topic, TransactionID, HTTPTransactionDate);
} catch (e) {
console.error(e);
}
Expand Down

0 comments on commit d941b72

Please sign in to comment.