diff --git a/typology-11/src/producer.ts b/typology-11/src/producer.ts index 8f274cc0c..e04219544 100644 --- a/typology-11/src/producer.ts +++ b/typology-11/src/producer.ts @@ -7,7 +7,7 @@ let producer: kafka.Producer; const initializeProducer = () => { producer = new kafka.Producer(new kafka.KafkaClient({ kafkaHost: config.kafkaEndpoint, - }), {}); + }), {partitionerType: 2}); return new Promise((resolve) => { producer.on('ready', () => resolve(undefined)); }); @@ -15,12 +15,11 @@ const initializeProducer = () => { const publish = (topic: string, message: string) => { const result = `{"topic":"${topic}",${message}`; - log(result, topic); return new Promise((resolve) => { producer.send( [{ topic: config.resultTopic, - messages: [result] + messages: [result], }], (err) => { if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); } diff --git a/typology-11/src/scoring-process.ts b/typology-11/src/scoring-process.ts index c143dc812..5e5eaa1d7 100644 --- a/typology-11/src/scoring-process.ts +++ b/typology-11/src/scoring-process.ts @@ -16,14 +16,15 @@ class typology11Type { } // https://lextego.atlassian.net/browse/ACTIO-198 -const handleScores = (scores: any, topic: string, TransactionID: string) => { +const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => { const score = (scores.rule17 ? 0.25 : 0) + (scores.rule27 ? 0.25 : 0) + (scores.rule86 ? 0.25 : 0) + (scores.rule87 ? 0.25 : 0); - publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 11 score is ${score}, Reason: ${ + publish(topic, `"typology":"typology-11","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}", + "textResult":"Typology 11 score is ${score}, Reason: ${ + (scores.rule17 ? 'Transaction Divergence, ' : '') + (scores.rule27 ? 'Transaction Mirroring, ' : '') + (scores.rule86 ? 'Transaction Between Parties, ' : '') @@ -40,7 +41,7 @@ const handleQuoteMessage = async ( ) => { try { const transfer = JSON.parse(message.value.toString()); - const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer; + const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer; const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress); const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress); const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress); @@ -68,7 +69,7 @@ const handleQuoteMessage = async ( log(`Error while handling Co-located Parties ${TransactionID}, with message: \r\n${error}`, topic) } - handleScores(scores, topic, TransactionID); + handleScores(scores, topic, TransactionID, HTTPTransactionDate); } catch (e) { console.error(e); } diff --git a/typology-214/src/producer.ts b/typology-214/src/producer.ts index e0d65b3ed..e04219544 100644 --- a/typology-214/src/producer.ts +++ b/typology-214/src/producer.ts @@ -7,7 +7,7 @@ let producer: kafka.Producer; const initializeProducer = () => { producer = new kafka.Producer(new kafka.KafkaClient({ kafkaHost: config.kafkaEndpoint, - }), {}); + }), {partitionerType: 2}); return new Promise((resolve) => { producer.on('ready', () => resolve(undefined)); }); @@ -15,13 +15,11 @@ const initializeProducer = () => { const publish = (topic: string, message: string) => { const result = `{"topic":"${topic}",${message}`; - log(result, topic); return new Promise((resolve) => { producer.send( [{ topic: config.resultTopic, messages: [result], - partition: config.partition, }], (err) => { if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); } diff --git a/typology-214/src/scoring-process.ts b/typology-214/src/scoring-process.ts index f2df166b9..baf25a084 100644 --- a/typology-214/src/scoring-process.ts +++ b/typology-214/src/scoring-process.ts @@ -14,7 +14,7 @@ class typology214Type { } // Composed probability for typology 214 = (012.p+027.p+030.p+048.p+078.p) -const handleScores = (scores: any, topic: string, TransactionID: string) => { +const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => { const score = (scores.rule12 ? 0.2 : 0) + (scores.rule27 ? 0.2 : 0) @@ -22,7 +22,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => { + (scores.rule48 ? 0.2 : 0) + (scores.rule78 ? 0.2 : 0); - publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '') + publish(topic, `"typology":"typology-214","transactionID":"${TransactionID}","score":"${score}","createDate":"${transactionDate}", + "textResult":"Typology 214 score is ${score}, Reason: ${(scores.rule3 ? 'Account Dormancy - Payee, ' : '') + (scores.rule12 ? 'Party Type Individual, ' : '') + (scores.rule27 ? 'Transaction Mirroring, ' : '') + (scores.rule30 ? 'New Payee, ' : '') @@ -40,7 +41,7 @@ const handleQuoteMessage = async ( ) => { try { const transfer = JSON.parse(message.value.toString()); - const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer; + const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer; const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress); const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress); const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress); @@ -72,7 +73,7 @@ const handleQuoteMessage = async ( log(`Error while handling Cash Withdraw ${TransactionID}, with message: \r\n${error}`, topic) } - handleScores(scores, topic, TransactionID); + handleScores(scores, topic, TransactionID, HTTPTransactionDate); } catch (e) { console.error(e); } diff --git a/typology-27/src/producer.ts b/typology-27/src/producer.ts index e0d65b3ed..e04219544 100644 --- a/typology-27/src/producer.ts +++ b/typology-27/src/producer.ts @@ -7,7 +7,7 @@ let producer: kafka.Producer; const initializeProducer = () => { producer = new kafka.Producer(new kafka.KafkaClient({ kafkaHost: config.kafkaEndpoint, - }), {}); + }), {partitionerType: 2}); return new Promise((resolve) => { producer.on('ready', () => resolve(undefined)); }); @@ -15,13 +15,11 @@ const initializeProducer = () => { const publish = (topic: string, message: string) => { const result = `{"topic":"${topic}",${message}`; - log(result, topic); return new Promise((resolve) => { producer.send( [{ topic: config.resultTopic, messages: [result], - partition: config.partition, }], (err) => { if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); } diff --git a/typology-27/src/scoring-process.ts b/typology-27/src/scoring-process.ts index be4bb93e7..15d3d0790 100644 --- a/typology-27/src/scoring-process.ts +++ b/typology-27/src/scoring-process.ts @@ -17,7 +17,7 @@ class typology27Type { } // Composed probability for typology 27 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p) -const handleScores = (scores: any, topic: string, TransactionID: string) => { +const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => { const score = (scores.rule9 ? 1 : 0) * (scores.rule12 ? 1 : 0) @@ -29,7 +29,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => { + (scores.rule78 ? 0.2 : 0) ); - publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '') + publish(topic, `"typology":"typology-27","transactionID":"${TransactionID}","score":${score},"createDate":"${transactionDate}", + "textResult":"Typology 27 score is ${score}, Reason: ${(scores.rule9 ? 'Recent Sim Swap, ' : '') + (scores.rule12 ? 'Party Type Individual, ' : '') + (scores.rule14 ? 'Recent Password Reset, ' : '') + (scores.rule18 ? 'Exceptionally Large Transfer, ' : '') @@ -47,10 +48,10 @@ const handleQuoteMessage = async ( ) => { try { const transfer = JSON.parse(message.value.toString()); - const { TransactionID, ILPSourceAccountAddress } = transfer; + const { TransactionID, ILPSourceAccountAddress, HTTPTransactionDate } = transfer; const ILPList = await get(client, ILPSourceAccountAddress); - const historicalData = JSON.parse(ILPList); + const historicalData = ILPList == undefined ? undefined : JSON.parse(ILPList); const scores: typology27Type = new typology27Type(); try { scores.rule9 = rules.handleRecentSimSwap({ transfer, historicalData }); } @@ -75,7 +76,7 @@ const handleQuoteMessage = async ( try { scores.rule78 = rules.handleCashWithdraw(transfer); } catch (error) { log(`Error while handling Cash Withdraw for transaction ${TransactionID}, with message: \r\n${error}`, topic) } - handleScores(scores, topic, TransactionID); + handleScores(scores, topic, TransactionID, HTTPTransactionDate); } catch (e) { console.error(e); } diff --git a/typology-28/src/producer.ts b/typology-28/src/producer.ts index e0d65b3ed..e04219544 100644 --- a/typology-28/src/producer.ts +++ b/typology-28/src/producer.ts @@ -7,7 +7,7 @@ let producer: kafka.Producer; const initializeProducer = () => { producer = new kafka.Producer(new kafka.KafkaClient({ kafkaHost: config.kafkaEndpoint, - }), {}); + }), {partitionerType: 2}); return new Promise((resolve) => { producer.on('ready', () => resolve(undefined)); }); @@ -15,13 +15,11 @@ const initializeProducer = () => { const publish = (topic: string, message: string) => { const result = `{"topic":"${topic}",${message}`; - log(result, topic); return new Promise((resolve) => { producer.send( [{ topic: config.resultTopic, messages: [result], - partition: config.partition, }], (err) => { if (err) { log(`Error while sending result of blocking with message: \r\n${err}`, topic); } diff --git a/typology-28/src/scoring-process.ts b/typology-28/src/scoring-process.ts index 4bc3d89f5..71cab3d70 100644 --- a/typology-28/src/scoring-process.ts +++ b/typology-28/src/scoring-process.ts @@ -19,7 +19,7 @@ class typology28Type { } // Composed probability for typology 28 = (009.p)*(012.p)*(014.p+018.p+030.p+032.p+078.p) -const handleScores = (scores: any, topic: string, TransactionID: string) => { +const handleScores = (scores: any, topic: string, TransactionID: string, transactionDate: string) => { const score = (scores.rule2 ? 0.14 : 0) + (scores.rule12 ? 0.14 : 0) @@ -30,7 +30,8 @@ const handleScores = (scores: any, topic: string, TransactionID: string) => { + (scores.rule64 ? 0.15 : 0) ; - publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '') + publish(topic, `"typology":"typology-28","transactionID":"${TransactionID}","createDate":"${transactionDate}", + "score":${score},"textResult":"Typology 28 score is ${score}, Reason: ${(scores.rule2 ? 'Velocity (incoming), ' : '') + (scores.rule12 ? 'Party Type Individual, ' : '') + (scores.rule16 ? 'Transaction Convergence, ' : '') + (scores.rule27 ? 'Transaction Mirroring, ' : '') @@ -49,7 +50,7 @@ const handleQuoteMessage = async ( ) => { try { const transfer = JSON.parse(message.value.toString()); - const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress } = transfer; + const { TransactionID, ILPSourceAccountAddress, ILPDestinationAccountAddress, HTTPTransactionDate } = transfer; const sourceHistoricalSendDataJSON = await get(senderClient, ILPSourceAccountAddress); const payeeHistoricalReceiveDataJSON = await get(receiverClient, ILPDestinationAccountAddress); const payeeHistoricalSendDataJSON = await get(senderClient, ILPDestinationAccountAddress); @@ -78,7 +79,7 @@ const handleQuoteMessage = async ( log(`Error while handling Benford's Law ${TransactionID}, with message: \r\n${error}`, topic) } - handleScores(scores, topic, TransactionID); + handleScores(scores, topic, TransactionID, HTTPTransactionDate); } catch (e) { console.error(e); }