From fdbc7f35d11fdbe4007a9659b25a30ae5a6749e5 Mon Sep 17 00:00:00 2001
From: jfoley <johannes.foley@sybrin.com>
Date: Fri, 22 Jan 2021 15:48:38 +0200
Subject: [PATCH] Cleanup to not log every result; added create date to
 typology result.

---
 typology-11/src/producer.ts         |  5 ++---
 typology-11/src/scoring-process.ts  |  9 +++++----
 typology-214/src/producer.ts        |  4 +---
 typology-214/src/scoring-process.ts |  9 +++++----
 typology-27/src/producer.ts         |  4 +---
 typology-27/src/scoring-process.ts  | 11 ++++++-----
 typology-28/src/producer.ts         |  4 +---
 typology-28/src/scoring-process.ts  |  9 +++++----
 8 files changed, 26 insertions(+), 29 deletions(-)

diff --git a/typology-11/src/producer.ts b/typology-11/src/producer.ts
index 8f274cc0c..e04219544 100644
--- a/typology-11/src/producer.ts
+++ b/typology-11/src/producer.ts
@@ -7,7 +7,7 @@ let producer: kafka.Producer;
 const initializeProducer = () => {
   producer = new kafka.Producer(new kafka.KafkaClient({
     kafkaHost: config.kafkaEndpoint,
-  }), {});
+  }), {partitionerType: 2});
   return new Promise((resolve) => {
     producer.on('ready', () => resolve(undefined));
   });
@@ -15,12 +15,11 @@ const initializeProducer = () => {
 
 const publish = (topic: string, message: string) => {
   const result = `{"topic":"${topic}",${message}`;
-  log(result, topic);
   return new Promise((resolve) => {
     producer.send(
       [{
         topic: config.resultTopic,
-        messages: [result]
+        messages: [result],
       }],
       (err) => {
         if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
diff --git a/typology-11/src/scoring-process.ts b/typology-11/src/scoring-process.ts
index c143dc812..5e5eaa1d7 100644
--- a/typology-11/src/scoring-process.ts
+++ b/typology-11/src/scoring-process.ts
@@ -16,14 +16,15 @@ class typology11Type {
 }
 
 // https://lextego.atlassian.net/browse/ACTIO-198
-const handleScores = (scores: any, topic: string, TransactionID: string) => {
+const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
   const score =
     (scores.rule17 ? 0.25 : 0)
     + (scores.rule27 ? 0.25 : 0)
     + (scores.rule86 ? 0.25 : 0)
     + (scores.rule87 ? 0.25 : 0);
 
-  publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 11 score is ${score}, Reason: ${
+  publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}",
+    "textResult":"Typology 11 score is ${score}, Reason: ${
     + (scores.rule17 ? 'Transaction Divergence, ' : '')
     + (scores.rule27 ? 'Transaction Mirroring, ' : '')
     + (scores.rule86 ? 'Transaction Between Parties, ' : '')
@@ -40,7 +41,7 @@ const handleQuoteMessage = async (
 ) => {
   try {
     const transfer = JSON.parse(message.value.toString());
-    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
+    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
     const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
     const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
     const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
@@ -68,7 +69,7 @@ const handleQuoteMessage = async (
       log(`Error while handling Co-located Parties ${TransactionID}, with message: \r\n${error}`, topic)
     }
 
-    handleScores(scores, topic, TransactionID);
+    handleScores(scores, topic, TransactionID, HTTPTransactionDate);
   } catch (e) {
     console.error(e);
   }
diff --git a/typology-214/src/producer.ts b/typology-214/src/producer.ts
index e0d65b3ed..e04219544 100644
--- a/typology-214/src/producer.ts
+++ b/typology-214/src/producer.ts
@@ -7,7 +7,7 @@ let producer: kafka.Producer;
 const initializeProducer = () => {
   producer = new kafka.Producer(new kafka.KafkaClient({
     kafkaHost: config.kafkaEndpoint,
-  }), {});
+  }), {partitionerType: 2});
   return new Promise((resolve) => {
     producer.on('ready', () => resolve(undefined));
   });
@@ -15,13 +15,11 @@ const initializeProducer = () => {
 
 const publish = (topic: string, message: string) => {
   const result = `{"topic":"${topic}",${message}`;
-  log(result, topic);
   return new Promise((resolve) => {
     producer.send(
       [{
         topic: config.resultTopic,
         messages: [result],
-        partition: config.partition,
       }],
       (err) => {
         if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
diff --git a/typology-214/src/scoring-process.ts b/typology-214/src/scoring-process.ts
index f2df166b9..baf25a084 100644
--- a/typology-214/src/scoring-process.ts
+++ b/typology-214/src/scoring-process.ts
@@ -14,7 +14,7 @@ class typology214Type {
 }
 
 // Composed probability for typology 214 = (012.p+027.p+030.p+048.p+078.p)
-const handleScores = (scores: any, topic: string, TransactionID: string) => {
+const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
   const score =
     (scores.rule12 ? 0.2 : 0)
     + (scores.rule27 ? 0.2 : 0)
@@ -22,7 +22,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => {
     + (scores.rule48 ? 0.2 : 0)
     + (scores.rule78 ? 0.2 : 0);
 
-  publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '')
+  publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}",
+  "textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '')
     + (scores.rule12 ? 'Party Type Individual, ' : '')
     + (scores.rule27 ? 'Transaction Mirroring, ' : '')
     + (scores.rule30 ? 'New Payee, ' : '')
@@ -40,7 +41,7 @@ const handleQuoteMessage = async (
 ) => {
   try {
     const transfer = JSON.parse(message.value.toString());
-    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
+    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
     const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
     const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
     const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
@@ -72,7 +73,7 @@ const handleQuoteMessage = async (
       log(`Error while handling Cash Withdraw ${TransactionID}, with message: \r\n${error}`, topic)
     }
 
-    handleScores(scores, topic, TransactionID);
+    handleScores(scores, topic, TransactionID, HTTPTransactionDate);
   } catch (e) {
     console.error(e);
   }
diff --git a/typology-27/src/producer.ts b/typology-27/src/producer.ts
index e0d65b3ed..e04219544 100644
--- a/typology-27/src/producer.ts
+++ b/typology-27/src/producer.ts
@@ -7,7 +7,7 @@ let producer: kafka.Producer;
 const initializeProducer = () => {
   producer = new kafka.Producer(new kafka.KafkaClient({
     kafkaHost: config.kafkaEndpoint,
-  }), {});
+  }), {partitionerType: 2});
   return new Promise((resolve) => {
     producer.on('ready', () => resolve(undefined));
   });
@@ -15,13 +15,11 @@ const initializeProducer = () => {
 
 const publish = (topic: string, message: string) => {
   const result = `{"topic":"${topic}",${message}`;
-  log(result, topic);
   return new Promise((resolve) => {
     producer.send(
       [{
         topic: config.resultTopic,
         messages: [result],
-        partition: config.partition,
       }],
       (err) => {
         if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
diff --git a/typology-27/src/scoring-process.ts b/typology-27/src/scoring-process.ts
index be4bb93e7..15d3d0790 100644
--- a/typology-27/src/scoring-process.ts
+++ b/typology-27/src/scoring-process.ts
@@ -17,7 +17,7 @@ class typology27Type {
 }
 
 // Composed probability for typology 27 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p)
-const handleScores = (scores: any, topic: string, TransactionID: string) => {
+const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
   const score =
     (scores.rule9 ? 1 : 0)
     * (scores.rule12 ? 1 : 0)
@@ -29,7 +29,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => {
       + (scores.rule78 ? 0.2 : 0)
     );
 
-  publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '')
+  publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"createDate":"${transactionDate}",
+  "textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '')
     + (scores.rule12 ? 'Party Type Individual, ' : '')
     + (scores.rule14 ? 'Recent Password Reset, ' : '')
     + (scores.rule18 ? 'Exceptionally Large Transfer, ' : '')
@@ -47,10 +48,10 @@ const handleQuoteMessage = async (
 ) => {
   try {
     const transfer = JSON.parse(message.value.toString());
-    const { TransactionID, ILPSourceAccountAddress } = transfer;
+    const { TransactionID, ILPSourceAccountAddress, HTTPTransactionDate } = transfer;
 
     const ILPList = await get(client, ILPSourceAccountAddress);
-    const historicalData = JSON.parse(ILPList);
+    const historicalData = ILPList == undefined ? undefined : JSON.parse(ILPList);
     const scores: typology27Type = new typology27Type();
 
     try { scores.rule9 = rules.handleRecentSimSwap({ transfer, historicalData }); }
@@ -75,7 +76,7 @@ const handleQuoteMessage = async (
     try { scores.rule78 = rules.handleCashWithdraw(transfer); } catch (error) {
       log(`Error while handling Cash Withdraw for transaction ${TransactionID}, with message: \r\n${error}`, topic)
     }
-    handleScores(scores, topic, TransactionID);
+    handleScores(scores, topic, TransactionID, HTTPTransactionDate);
   } catch (e) {
     console.error(e);
   }
diff --git a/typology-28/src/producer.ts b/typology-28/src/producer.ts
index e0d65b3ed..e04219544 100644
--- a/typology-28/src/producer.ts
+++ b/typology-28/src/producer.ts
@@ -7,7 +7,7 @@ let producer: kafka.Producer;
 const initializeProducer = () => {
   producer = new kafka.Producer(new kafka.KafkaClient({
     kafkaHost: config.kafkaEndpoint,
-  }), {});
+  }), {partitionerType: 2});
   return new Promise((resolve) => {
     producer.on('ready', () => resolve(undefined));
   });
@@ -15,13 +15,11 @@ const initializeProducer = () => {
 
 const publish = (topic: string, message: string) => {
   const result = `{"topic":"${topic}",${message}`;
-  log(result, topic);
   return new Promise((resolve) => {
     producer.send(
       [{
         topic: config.resultTopic,
         messages: [result],
-        partition: config.partition,
       }],
       (err) => {
         if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); }
diff --git a/typology-28/src/scoring-process.ts b/typology-28/src/scoring-process.ts
index 4bc3d89f5..71cab3d70 100644
--- a/typology-28/src/scoring-process.ts
+++ b/typology-28/src/scoring-process.ts
@@ -19,7 +19,7 @@ class typology28Type {
 }
 
 // Composed probability for typology 28 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p)
-const handleScores = (scores: any, topic: string, TransactionID: string) => {
+const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => {
   const score =
     (scores.rule2 ? 0.14 : 0)
     + (scores.rule12 ? 0.14 : 0)
@@ -30,7 +30,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => {
     + (scores.rule64 ? 0.15 : 0)
     ;
 
-  publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '')
+  publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","createDate":"${transactionDate}",
+  "score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '')
     + (scores.rule12 ? 'Party Type Individual, ' : '')
     + (scores.rule16 ? 'Transaction Convergence, ' : '')
     + (scores.rule27 ? 'Transaction Mirroring, ' : '')
@@ -49,7 +50,7 @@ const handleQuoteMessage = async (
 ) => {
   try {
     const transfer = JSON.parse(message.value.toString());
-    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer;
+    const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer;
     const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress);
     const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress);
     const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress);
@@ -78,7 +79,7 @@ const handleQuoteMessage = async (
       log(`Error while handling Benford's Law ${TransactionID}, with message: \r\n${error}`, topic)
     }
 
-    handleScores(scores, topic, TransactionID);
+    handleScores(scores, topic, TransactionID, HTTPTransactionDate);
   } catch (e) {
     console.error(e);
   }