diff --git a/dp-core/src/main/resources/base-config.conf b/dp-core/src/main/resources/base-config.conf
index 655281703..a0d2e4ec2 100644
--- a/dp-core/src/main/resources/base-config.conf
+++ b/dp-core/src/main/resources/base-config.conf
@@ -50,4 +50,10 @@ lms-cassandra {
host = "localhost"
port = "9042"
isMultiDCEnabled = false
-}
\ No newline at end of file
+}
+
+ml-cassandra {
+ host = "localhost"
+ port = "9042"
+ isMultiDCEnabled = false
+}
diff --git a/dp-core/src/test/resources/base-test.conf b/dp-core/src/test/resources/base-test.conf
index bb9c37a0c..40033c209 100644
--- a/dp-core/src/test/resources/base-test.conf
+++ b/dp-core/src/test/resources/base-test.conf
@@ -56,3 +56,9 @@ lms-cassandra {
port = 9142
isMultiDCEnabled = false
}
+
+ml-cassandra {
+ host = "localhost"
+ port = 9142
+ isMultiDCEnabled = false
+}
diff --git a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
index 46df288ff..0cbc79a44 100644
--- a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
+++ b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
@@ -120,6 +120,13 @@ middleware_cassandra_user_activity_agg_table: user_activity_agg
user_cache_updater_job_consumer_parallelism: 1
user_cache_updater_job_parallelism: 1
+### Program-User-Info Job vars
+programuserinfo_consumer_parallelism: 1
+programuserinfo_downstream_parallelism: 1
+programuserinfo_programuser_parallelism: 1
+middleware_cassandra_programuserinfo_keyspace: sunbird_programs
+middleware_cassandra_programuserinfo_table: program_enrollment
+
### to be removed
job_classname: ""
@@ -194,6 +201,16 @@ flink_job_names:
taskslots: 1
cpu_requests: 0.3
scale_enabled: false
+ program-user-info:
+ job_class_name: 'org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask'
+ replica: 1
+ jobmanager_memory: 1024m
+ taskmanager_memory: 1024m
+ taskmanager_process_memory: 1700m
+ jobmanager_process_memory: 1600m
+ taskslots: 1
+ cpu_requests: 0.3
+ scale_enabled: false
### Global vars
@@ -205,3 +222,4 @@ service_monitor_enabled: true
### controlling the flink jobs log level
flink_jobs_console_log_level: INFO
flink_libraries_log_level: ERROR
+
diff --git a/kubernetes/helm_charts/datapipeline_jobs/values.j2 b/kubernetes/helm_charts/datapipeline_jobs/values.j2
index c58630bd4..e57c455be 100644
--- a/kubernetes/helm_charts/datapipeline_jobs/values.j2
+++ b/kubernetes/helm_charts/datapipeline_jobs/values.j2
@@ -20,6 +20,11 @@ scale_properties:
scale_target_value: {{ flink_job_names['assessment-aggregator'].scale_target_value | default(0) }}
min_replica: {{ flink_job_names['assessment-aggregator'].min_replica | default(1) }}
max_replica: {{ flink_job_names['assessment-aggregator'].max_replica | default(2) }}
+ program-user-info:
+ enabled: {{ flink_job_names['program-user-info'].scale_enabled | lower}}
+ scale_target_value: {{ flink_job_names['program-user-info'].scale_target_value | default(0) }}
+ min_replica: {{ flink_job_names['program-user-info'].min_replica | default(1) }}
+ max_replica: {{ flink_job_names['program-user-info'].max_replica | default(2) }}
jobmanager:
rpc_port: {{ jobmanager_rpc_port }}
@@ -534,3 +539,34 @@ user-cache-updater-v2:
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1
+program-user-info:
+ program-user-info: |+
+ include file("/data/flink/conf/base-config.conf")
+ kafka {
+ producer.broker-servers = "{{ kafka_brokers }}"
+ consumer.broker-servers = "{{ kafka_brokers }}"
+ zookeeper = "{{ zookeepers }}"
+ input.topic = {{ kafka_topic_programuser_info }}
+ groupId = {{ kafka_group_programuser_info }}
+ }
+ task {
+ consumer.parallelism = {{ programuserinfo_consumer_parallelism }}
+ downstream.parallelism = {{ programuserinfo_downstream_parallelism }}
+ programuser {
+ parallelism = {{ programuserinfo_programuser_parallelism }}
+ }
+
+ }
+ ml-cassandra {
+ keyspace = "{{ middleware_cassandra_programuserinfo_keyspace }}"
+ table = "{{ middleware_cassandra_programuserinfo_table }}"
+ }
+
+ flink-conf: |+
+ jobmanager.memory.flink.size: {{ flink_job_names['program-user-info'].jobmanager_memory }}
+ taskmanager.memory.flink.size: {{ flink_job_names['program-user-info'].taskmanager_memory }}
+ taskmanager.numberOfTaskSlots: {{ flink_job_names['program-user-info'].taskslots }}
+ parallelism.default: 1
+ jobmanager.execution.failover-strategy: region
+ taskmanager.memory.network.fraction: 0.1
+
diff --git a/ml-jobs/pom.xml b/ml-jobs/pom.xml
new file mode 100644
index 000000000..bc0a21858
--- /dev/null
+++ b/ml-jobs/pom.xml
@@ -0,0 +1,51 @@
+
+ 4.0.0
+
+
+ org.sunbird
+ data-pipeline
+ 1.0
+ ../pom.xml
+
+
+ ml-jobs
+ pom
+ ml-jobs
+
+
+
+ program-user-info
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+ 11
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+ ${scoverage.plugin.version}
+
+ ${scala.version}
+ true
+ true
+ org.sunbird.incredible
+
+
+
+
+
+
+
+
+
diff --git a/ml-jobs/program-user-info/pom.xml b/ml-jobs/program-user-info/pom.xml
new file mode 100644
index 000000000..995174d31
--- /dev/null
+++ b/ml-jobs/program-user-info/pom.xml
@@ -0,0 +1,221 @@
+
+
+
+ 4.0.0
+
+ org.sunbird
+ data-pipeline
+ 1.0
+ ../../pom.xml
+
+ program-user-info
+ 1.0.0
+ jar
+ ProgramUserInfo
+
+ Job to insert user pii events details to cassandra
+
+
+
+ UTF-8
+ 1.4.0
+
+
+
+
+ org.apache.flink
+ flink-clients_${scala.version}
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-scala_${scala.version}
+ ${flink.version}
+ provided
+
+
+ org.sunbird
+ dp-core
+ 1.0.0
+
+
+ org.sunbird
+ dp-core
+ 1.0.0
+ test-jar
+ test
+
+
+ org.apache.flink
+ flink-test-utils_${scala.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.version}
+ ${flink.version}
+ test
+ tests
+
+
+ org.scalatest
+ scalatest_${scala.version}
+ 3.0.6
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+ test
+
+
+ com.opentable.components
+ otj-pg-embedded
+ 0.13.3
+ test
+
+
+ com.fiftyonred
+ mock-jedis
+ 0.4.0
+ test
+
+
+ org.cassandraunit
+ cassandra-unit
+ 3.11.2.0
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ com.google.code.findbugs:jsr305
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask
+
+
+
+ reference.conf
+
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.4.0
+
+ ${java.target.runtime}
+ ${java.target.runtime}
+ ${scala.maj.version}
+ false
+
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+
+ maven-surefire-plugin
+ 2.22.2
+
+ true
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ program-user-info-testsuite.txt
+
+
+
+ test
+
+ test
+
+
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+ ${scoverage.plugin.version}
+
+ ${scala.version}
+ true
+ true
+
+
+
+
+
+
+
diff --git a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf
new file mode 100644
index 000000000..8a811d27e
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf
@@ -0,0 +1,20 @@
+include "base-config.conf"
+
+kafka {
+ input.topic = ${job.env}".programuser.info"
+ groupId = ${job.env}"-programuser-group"
+}
+
+task {
+ consumer.parallelism = 1
+ downstream.parallelism = 1
+ programuser{
+ parallelism = 1
+ }
+}
+
+ml-cassandra {
+ keyspace = "sunbird_programs"
+ table = "program_enrollment"
+}
+
diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala
new file mode 100644
index 000000000..11ff07165
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala
@@ -0,0 +1,52 @@
+package org.sunbird.dp.userinfo.domain
+
+import java.util
+import org.sunbird.dp.core.domain.Events
+
+class Event(eventMap: util.Map[String, Any]) extends Events(eventMap) {
+
+ def _id: String = {
+ Option(telemetry.readOrDefault[String]("_id", null)).filter(_.nonEmpty).orNull
+ }
+
+ def program_id: String = {
+ Option(telemetry.readOrDefault[String]("programId", null)).filter(_.nonEmpty).orNull
+ }
+
+ def program_externalId: String = {
+ Option(telemetry.readOrDefault[String]("programExternalId", null)).filter(_.nonEmpty).orNull
+ }
+
+ def program_name: String = {
+ Option(telemetry.readOrDefault[String]("programName", null)).filter(_.nonEmpty).orNull
+ }
+
+ def pii_consent_required: Boolean = {
+ telemetry.readOrDefault[Boolean]("requestForPIIConsent", false)
+ }
+
+ def user_id: String = {
+ Option(telemetry.readOrDefault[String]("userId", null)).filter(_.nonEmpty).orNull
+ }
+
+ def created_at_string: String = {
+ Option(telemetry.readOrDefault[String]("createdAt", null)).filter(_.nonEmpty).orNull
+ }
+
+ def updated_at_string: String = {
+ Option(telemetry.readOrDefault[String]("updatedAt", null)).filter(_.nonEmpty).orNull
+ }
+
+ def user_Location: util.ArrayList[util.Map[String, Any]] = {
+ telemetry.read[util.ArrayList[util.Map[String, Any]]]("userProfile.userLocations").getOrElse(null)
+ }
+
+ def user_Types: util.ArrayList[util.Map[String, Any]] = {
+ telemetry.read[util.ArrayList[util.Map[String, Any]]]("userProfile.profileUserTypes").getOrElse(null)
+ }
+
+ def organisations: util.Map[String, Any] = {
+ telemetry.read[util.Map[String, Any]]("userProfile.rootOrg").getOrElse(null)
+ }
+}
+
diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala
new file mode 100644
index 000000000..2060d90ed
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala
@@ -0,0 +1,168 @@
+package org.sunbird.dp.userinfo.functions
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.slf4j.LoggerFactory
+import org.sunbird.dp.core.job.{BaseProcessFunction, Metrics}
+import org.sunbird.dp.core.util.CassandraUtil
+import org.sunbird.dp.userinfo.domain.Event
+import org.sunbird.dp.userinfo.task.ProgramUserInfoConfig
+import org.sunbird.dp.userinfo.util.Commons
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+
+case class Data(
+ program_id: String,
+ program_externalid: String,
+ program_name: String,
+ pii_consent_required: Boolean,
+ user_id: String,
+ organisation_id: String,
+ organisation_name: String,
+ user_sub_type: String,
+ user_type: String,
+ created_at: String,
+ updated_at: String
+ )
+
+class ProgramUserInfoFunction(config: ProgramUserInfoConfig,
+ @transient var cassandraUtil: CassandraUtil = null
+ ) extends BaseProcessFunction[Event, Event](config) {
+
+ private[this] val logger = LoggerFactory.getLogger(classOf[ProgramUserInfoFunction])
+
+ override def metricsList() = List()
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ cassandraUtil = new CassandraUtil(config.dbHost, config.dbPort, config.isMultiDCEnabled)
+ }
+
+ override def close(): Unit = {
+ super.close()
+ }
+
+ /**
+ * This method will convert the incoming date-time string to required Date format "yyyy-MM-dd"
+ *
+ * @param dateStr date will be passed as a string in this format "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
+ * @return
+ */
+ def getDateOnly(dateStr: String): java.sql.Date = {
+ val inputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ val date = inputFormat.parse(dateStr)
+ val outputFormat = new SimpleDateFormat("yyyy-MM-dd")
+ val formattedDate = outputFormat.format(date)
+ java.sql.Date.valueOf(formattedDate)
+ }
+
+ override def processElement(event: Event,
+ context: ProcessFunction[Event, Event]#Context,
+ metrics: Metrics): Unit = {
+
+ val created_at: Date = getDateOnly(event.created_at_string)
+ val updated_at: Date = getDateOnly(event.updated_at_string)
+
+ /**
+ * To get the key-values related to user location
+ */
+ val userLocationData = new util.HashMap[String, String]
+ if(event.user_Location!= null && event.user_Location.isEmpty==false) {
+ event.user_Location.forEach(f => {
+ if (f != null && f.get("type") != null && !f.get("type").toString().isBlank()) {
+ if (f != null && f.get("name") != null && !f.get("name").toString().isBlank()) {
+ userLocationData.put(f.get("type") + "_name", f.get("name").toString)
+ }
+ if (f != null && f.get("id") != null && !f.get("id").toString().isBlank()) {
+ userLocationData.put(f.get("type") + "_id", f.get("id").toString)
+ }
+ if (f != null && f.get("code") != null && !f.get("code").toString().isBlank()) {
+ userLocationData.put(f.get("type") + "_code", f.get("code").toString)
+ }
+ }
+ })
+ }
+
+ /**
+ * To get the key-values related to user type
+ */
+ val userTypeData = new util.HashMap[String, String]
+ if(event.user_Types!= null && event.user_Types.isEmpty==false) {
+ event.user_Types.forEach(f => {
+ userTypeData.put("type", Option(f.get("type")).map(_.toString).filter(_.nonEmpty).orNull)
+ userTypeData.put("sub_type", Option(f.get("subType")).map(_.toString).filter(_.nonEmpty).orNull)
+ })
+ }
+
+ /**
+ * To get the key-values related to user organisation
+ */
+ val organisationsData = new util.HashMap[String, String]
+ if(event.organisations!= null) {
+ organisationsData.put("organisation_id", Option(event.organisations.get("id")) match {
+ case Some(s: String) if s.trim.nonEmpty => s
+ case _ => null
+ })
+ organisationsData.put("organisation_name", Option(event.organisations.get("orgName")) match {
+ case Some(s: String) if s.trim.nonEmpty => s
+ case _ => null
+ })
+ }
+
+ /**
+ * Storing the parsed JSON event in to a single variable with the help of case class
+ */
+ val UserData = Data(
+ program_id = event.program_id,
+ program_externalid = event.program_externalId,
+ program_name = event.program_name,
+ pii_consent_required = event.pii_consent_required,
+ user_id = event.user_id,
+ organisation_id = organisationsData.getOrDefault("organisation_id", null),
+ organisation_name = organisationsData.getOrDefault("organisation_name", null),
+ user_sub_type = userTypeData.getOrDefault("sub_type", null),
+ user_type = userTypeData.getOrDefault("type", null),
+ created_at = created_at.toString,
+ updated_at = updated_at.toString
+ )
+ logger.info("Successfully parsed JSON and stored back the required fields to single case class")
+
+ val insertData = new util.HashMap[String, AnyRef]
+ insertData.put("program_id", UserData.program_id)
+ insertData.put("program_externalid", UserData.program_externalid)
+ insertData.put("program_name", UserData.program_name)
+ insertData.put("pii_consent_required", UserData.pii_consent_required.asInstanceOf[AnyRef])
+ insertData.put("user_id", UserData.user_id)
+ if(event.user_Location!= null) {
+ insertData.put("user_locations", userLocationData)
+ }
+ if(event.organisations!=null) {
+ insertData.put("organisation_id", UserData.organisation_id)
+ insertData.put("organisation_name", UserData.organisation_name)
+ }
+ if(event.user_Types!=null) {
+ insertData.put("user_sub_type", UserData.user_sub_type)
+ insertData.put("user_type", UserData.user_type)
+ }
+ insertData.put("created_at", UserData.created_at)
+ insertData.put("updated_at", UserData.updated_at)
+
+
+ /**
+ * Insert flattened data into cassandra database
+ */
+
+ if (UserData.program_id != null && UserData.user_id != null) {
+ val commons = new Commons(cassandraUtil)
+ commons.insertDbRecord(config.dbKeyspace, config.dbTable, insertData)
+ logger.info("Successfully inserted the parsed events to Database with program_Id = " + UserData.program_id)
+
+ }
+ else {
+ logger.debug("This Event is skipped due to ProgramId or userId missing DocumentId:" + event._id)
+ }
+ }
+
+}
diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala
new file mode 100644
index 000000000..19c25f08a
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala
@@ -0,0 +1,28 @@
+package org.sunbird.dp.userinfo.task
+
+import com.typesafe.config.Config
+import org.sunbird.dp.core.job.BaseJobConfig
+
+class ProgramUserInfoConfig(override val config: Config) extends BaseJobConfig(config, jobName = "ProgramUserInfo") {
+
+ //Kafka
+ override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism")
+
+ val programUserParallelism: Int = config.getInt("task.programuser.parallelism")
+
+ val kafkaInputTopic: String = config.getString("kafka.input.topic")
+
+ // Consumer
+ val programUserConsumer = "program-user-consumer"
+
+ //Cassandra
+ val dbTable: String = config.getString("ml-cassandra.table")
+ val dbKeyspace: String = config.getString("ml-cassandra.keyspace")
+ val dbHost: String = config.getString("ml-cassandra.host")
+ val dbPort: Int = config.getInt("ml-cassandra.port")
+
+
+ // Functions
+ val programUserInfoFunction = "ProgramUserInfoFunction"
+
+}
diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala
new file mode 100644
index 000000000..6ea547263
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala
@@ -0,0 +1,47 @@
+package org.sunbird.dp.userinfo.task
+
+import org.sunbird.dp.core.job.FlinkKafkaConnector
+import com.typesafe.config.ConfigFactory
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.sunbird.dp.core.util.FlinkUtil
+import org.sunbird.dp.userinfo.domain.Event
+import org.sunbird.dp.userinfo.functions.ProgramUserInfoFunction
+
+import java.io.File
+
+class ProgramUserInfoStreamTask(config: ProgramUserInfoConfig, kafkaConnector: FlinkKafkaConnector) {
+
+ private val serialVersionUID = -7729362727131516112L
+
+ def process(): Unit = {
+
+ implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config)
+ val source = kafkaConnector.kafkaEventSource[Event](config.kafkaInputTopic)
+
+ env.addSource(source, config.programUserConsumer)
+ .uid(config.programUserConsumer).setParallelism(config.kafkaConsumerParallelism).rebalance()
+ .process(new ProgramUserInfoFunction(config))
+ .name(config.programUserInfoFunction).uid(config.programUserInfoFunction)
+ .setParallelism(config.programUserParallelism)
+
+ env.execute(config.jobName)
+ }
+}
+
+object ProgramUserInfoStreamTask {
+
+ def main(args: Array[String]): Unit = {
+ // $COVERAGE-OFF$
+ val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path"))
+ val config = configFilePath.map {
+ path => ConfigFactory.parseFile(new File(path)).resolve()
+ }.getOrElse(ConfigFactory.load("program-user-info").withFallback(ConfigFactory.systemEnvironment()))
+ val programUserInfoConfig = new ProgramUserInfoConfig(config)
+ val kafkaUtil = new FlinkKafkaConnector(programUserInfoConfig)
+ val task = new ProgramUserInfoStreamTask(programUserInfoConfig, kafkaUtil)
+ task.process()
+ // $COVERAGE-ON$
+ }
+
+}
diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala
new file mode 100644
index 000000000..2b1eb4285
--- /dev/null
+++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala
@@ -0,0 +1,30 @@
+package org.sunbird.dp.userinfo.util
+
+import com.datastax.driver.core.querybuilder.QueryBuilder
+import org.apache.commons.collections.MapUtils
+import org.sunbird.dp.core.util.CassandraUtil
+
+import java.util
+import java.util.Map
+
+class Commons (@transient var cassandraUtil: CassandraUtil){
+
+ def insertDbRecord(keyspace: String, table: String, keys: Map[String, AnyRef]) = {
+ val insertQuery = QueryBuilder.insertInto(keyspace, table)
+ convertKeyCase(keys).entrySet.forEach((entry: util.Map.Entry[String, AnyRef]) => insertQuery.value(entry.getKey, entry.getValue))
+ cassandraUtil.upsert(insertQuery.toString)
+ }
+
+ private def convertKeyCase(properties: util.Map[String, AnyRef]) = {
+ val keyLowerCaseMap = new util.HashMap[String, AnyRef]
+ if (MapUtils.isNotEmpty(properties)) {
+ properties.entrySet.stream()
+ .filter(e => e != null && e.getKey != null)
+ .forEach((entry: util.Map.Entry[String, AnyRef]) => {
+ keyLowerCaseMap.put(entry.getKey.toLowerCase, entry.getValue)
+ })
+ }
+ keyLowerCaseMap
+ }
+
+}
diff --git a/ml-jobs/program-user-info/src/test/resources/test.conf b/ml-jobs/program-user-info/src/test/resources/test.conf
new file mode 100644
index 000000000..eea6ed618
--- /dev/null
+++ b/ml-jobs/program-user-info/src/test/resources/test.conf
@@ -0,0 +1,19 @@
+include "base-test.conf"
+
+kafka {
+ input.topic = "localhost.programuser.info"
+ groupId = "localhost-programuser-group"
+}
+
+task {
+ consumer.parallelism = 1
+ downstream.parallelism = 1
+ programuser{
+ parallelism = 1
+ }
+}
+
+ml-cassandra {
+ keyspace = "sunbird_programs"
+ table = "program_enrollment"
+}
diff --git a/ml-jobs/program-user-info/src/test/resources/test.cql b/ml-jobs/program-user-info/src/test/resources/test.cql
new file mode 100644
index 000000000..421265fda
--- /dev/null
+++ b/ml-jobs/program-user-info/src/test/resources/test.cql
@@ -0,0 +1,34 @@
+CREATE KEYSPACE IF NOT EXISTS sunbird_programs WITH replication = {
+ 'class': 'SimpleStrategy',
+ 'replication_factor': '1'
+ };
+
+CREATE TABLE IF NOT EXISTS sunbird_programs.program_enrollment (
+ program_id text,
+ program_externalId text,
+ program_name text,
+ pii_consent_required boolean,
+ user_id text,
+ user_locations map,
+ organisation_id text,
+ organisation_name text,
+ user_sub_type text,
+ user_type text,
+ created_at date,
+ updated_at date,
+ PRIMARY KEY (program_id,user_id)
+ ) WITH bloom_filter_fp_chance = 0.01
+ AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
+ AND comment = ''
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
+ AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
+ AND crc_check_chance = 1.0
+ AND dclocal_read_repair_chance = 0.1
+ AND default_time_to_live = 0
+ AND gc_grace_seconds = 864000
+ AND max_index_interval = 2048
+ AND memtable_flush_period_in_ms = 0
+ AND min_index_interval = 128
+ AND read_repair_chance = 0.0
+ AND speculative_retry = '99PERCENTILE';
+
diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala
new file mode 100644
index 000000000..1f4a3c203
--- /dev/null
+++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala
@@ -0,0 +1,18 @@
+package org.sunbird.dp.fixture
+
+object EventFixture {
+
+ val MODIFIED_EVENT = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"MODIFIED_EVENT","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+ val NO_DATA = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":null,"programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":null,"appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+ val USERLOCATION_MISSING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"USERLOCATION_MISSING","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+ val ROOTORG_MISSING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"ROOTORG_MISSING","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":null,"name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+ val WHEN_VALUES_ARE_EMPTY_STRING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"WHEN_VALUES_ARE_EMPTY_STRING","programName":"","programExternalId":"","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"5678","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"","type":""}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"", "orgName":""}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+ val WHEN_VALUES_ARE_NULL = """{"_id":"Whitespace_Tab","deleted":false,"programId":"WHEN_VALUES_ARE_NULL","programName":null,"programExternalId":null,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"5678","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":null,"type":null}],"framework":{"board":["CBSE"]},"rootOrg": {"id":null, "orgName":null}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}"""
+
+}
+
diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala
new file mode 100644
index 000000000..d85bf2876
--- /dev/null
+++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala
@@ -0,0 +1,120 @@
+package org.sunbird.dp.spec
+
+
+import com.google.gson.{Gson, JsonParser}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.test.util.MiniClusterWithClientResource
+import org.cassandraunit.CQLDataLoader
+import org.cassandraunit.dataset.cql.FileCQLDataSet
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper
+import org.mockito.Mockito
+import org.mockito.Mockito.when
+import org.sunbird.dp.userinfo.domain.Event
+import org.sunbird.dp.core.job.FlinkKafkaConnector
+import org.sunbird.dp.core.util.{CassandraUtil, JSONUtil}
+import org.sunbird.dp.fixture.EventFixture
+import org.sunbird.dp.userinfo.task.{ProgramUserInfoConfig, ProgramUserInfoStreamTask}
+import org.sunbird.dp.{BaseMetricsReporter, BaseTestSpec}
+
+
+import java.util
+
+class ProgramUserInfoTaskTestSpec extends BaseTestSpec {
+
+ /**
+ * The TypeInformation interface is part of the Flink API for working with data streams and datasets, It is capable of describing the data type
+ */
+ implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]])
+
+ /**
+ * This creates a Apache Flink cluster with one task manager and one slot per task manager, using the MiniClusterWithClientResource class.
+ */
+ val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(testConfiguration())
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build)
+
+ /**
+ * mock[FlinkKafkaConnector] creates a mock instance of FlinkKafkaConnector.
+ * new Gson() creates a new instance of the Gson library, which is used for converting between Java objects and JSON.
+ */
+ val config: Config = ConfigFactory.load("test.conf")
+ val programUserConfig: ProgramUserInfoConfig = new ProgramUserInfoConfig(config)
+ val mockKafkaUtil: FlinkKafkaConnector = mock[FlinkKafkaConnector](Mockito.withSettings().serializable())
+ val gson = new Gson()
+
+ var cassandraUtil: CassandraUtil = _
+
+ /**
+ * The startEmbeddedCassandra method of EmbeddedCassandraServerHelper is called to start an embedded Cassandra server.
+ * An instance of CassandraUtil is created using the configuration properties from programUserConfig.
+ * The test data is loaded into the embedded Cassandra server using the CQLDataLoader from the cassandra-unit library.
+ * The testCassandraUtil method is called to verify the CassandraUtil instance and clear any metrics.
+ * The before method of the flinkCluster is called to start the Flink cluster.
+ */
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ EmbeddedCassandraServerHelper.startEmbeddedCassandra(80000L)
+ cassandraUtil = new CassandraUtil(programUserConfig.dbHost, programUserConfig.dbPort, programUserConfig.isMultiDCEnabled)
+ val session = cassandraUtil.session
+ val dataLoader = new CQLDataLoader(session)
+ dataLoader.load(new FileCQLDataSet(getClass.getResource("/test.cql").getPath, true, true));
+ // Clear the metrics
+ testCassandraUtil(cassandraUtil)
+ BaseMetricsReporter.gaugeMetrics.clear()
+
+ flinkCluster.before()
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ try {
+ EmbeddedCassandraServerHelper.cleanEmbeddedCassandra()
+ } catch {
+ case ex: Exception => {
+
+ }
+ }
+ flinkCluster.after()
+ }
+
+
+ it should "get data from Kafka" in {
+ when(mockKafkaUtil.kafkaEventSource[Event](programUserConfig.kafkaInputTopic)).thenReturn(new ProgramUserInfoEventSource)
+ val task = new ProgramUserInfoStreamTask(programUserConfig,mockKafkaUtil)
+ task.process()
+ }
+
+ def testCassandraUtil(cassandraUtil: CassandraUtil): Unit = {
+ cassandraUtil.reconnect()
+ val response = cassandraUtil.find("SELECT * FROM sunbird_programs.program_enrollment;")
+ response should not be (null)
+ }
+}
+
+
+class ProgramUserInfoEventSource extends SourceFunction[Event] {
+ override def run(ctx: SourceContext[Event]) {
+ val eventMap1 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.MODIFIED_EVENT)
+ val eventMap2 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_EMPTY_STRING)
+ val eventMap3 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_NULL)
+ val eventMap4 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.NO_DATA)
+ val eventMap5 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.USERLOCATION_MISSING)
+ val eventMap6 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.ROOTORG_MISSING)
+ ctx.collect(new Event(eventMap1))
+ ctx.collect(new Event(eventMap2))
+ ctx.collect(new Event(eventMap3))
+ ctx.collect(new Event(eventMap4))
+ ctx.collect(new Event(eventMap5))
+ ctx.collect(new Event(eventMap6))
+ }
+
+ override def cancel() = {}
+}
+
diff --git a/pom.xml b/pom.xml
index 5e03b8dc0..10b5aa26e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,6 +25,7 @@
dp-core
lms-jobs
user-org-jobs
+ ml-jobs