From 3c78b6b1eff6796956c5832ee0b174c8adc872d2 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Fri, 10 Mar 2023 20:09:33 +0530 Subject: [PATCH 1/6] Addition of ml-jobs for PII data --- .../roles/flink-jobs-deploy/defaults/main.yml | 17 ++ .../helm_charts/datapipeline_jobs/values.j2 | 35 +++ ml-jobs/pom.xml | 50 ++++ ml-jobs/program-user-info/pom.xml | 220 ++++++++++++++++++ .../src/main/resources/program-user-info.conf | 21 ++ .../sunbird/dp/userinfo/domain/Event.scala | 51 ++++ .../functions/ProgramUserInfoFunction.scala | 186 +++++++++++++++ .../userinfo/task/ProgramUserInfoConfig.scala | 28 +++ .../task/ProgramUserInfoStreamTask.scala | 47 ++++ .../src/test/resources/test.conf | 21 ++ .../src/test/resources/test.cql | 47 ++++ .../org/sunbird/dp/fixture/EventFixture.scala | 11 + .../dp/spec/ProgramUserInfoTaskTestSpec.scala | 114 +++++++++ pom.xml | 1 + 14 files changed, 849 insertions(+) create mode 100644 ml-jobs/pom.xml create mode 100644 ml-jobs/program-user-info/pom.xml create mode 100644 ml-jobs/program-user-info/src/main/resources/program-user-info.conf create mode 100644 ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala create mode 100644 ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala create mode 100644 ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala create mode 100644 ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala create mode 100644 ml-jobs/program-user-info/src/test/resources/test.conf create mode 100644 ml-jobs/program-user-info/src/test/resources/test.cql create mode 100644 ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala create mode 100644 ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala diff --git a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml index 46df288ff..18fa68c0c 100644 --- a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml +++ b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml @@ -120,6 +120,13 @@ middleware_cassandra_user_activity_agg_table: user_activity_agg user_cache_updater_job_consumer_parallelism: 1 user_cache_updater_job_parallelism: 1 +### Program-User-Info Job vars +programuserinfo_consumer_parallelism: 1 +programuserinfo_downstream_parallelism: 1 +programuserinfo_programuser_parallelism: 1 +middleware_cassandra_programuserinfo_keyspace: sunbird_programs +middleware_cassandra_programuserinfo_table: program_enrollment + ### to be removed job_classname: "" @@ -194,6 +201,16 @@ flink_job_names: taskslots: 1 cpu_requests: 0.3 scale_enabled: false + program-user-info: + job_class_name: 'org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask' + replica: 1 + jobmanager_memory: 1024m + taskmanager_memory: 1024m + taskmanager_process_memory: 1700m + jobmanager_process_memory: 1600m + taskslots: 1 + cpu_requests: 0.3 + scale_enabled: false ### Global vars diff --git a/kubernetes/helm_charts/datapipeline_jobs/values.j2 b/kubernetes/helm_charts/datapipeline_jobs/values.j2 index c58630bd4..b2599a363 100644 --- a/kubernetes/helm_charts/datapipeline_jobs/values.j2 +++ b/kubernetes/helm_charts/datapipeline_jobs/values.j2 @@ -20,6 +20,11 @@ scale_properties: scale_target_value: {{ flink_job_names['assessment-aggregator'].scale_target_value | default(0) }} min_replica: {{ flink_job_names['assessment-aggregator'].min_replica | default(1) }} max_replica: {{ flink_job_names['assessment-aggregator'].max_replica | default(2) }} + program-user-info: + enabled: {{ flink_job_names['program-user-info'].scale_enabled | lower}} + scale_target_value: {{ flink_job_names['program-user-info'].scale_target_value | default(0) }} + min_replica: {{ flink_job_names['program-user-info'].min_replica | default(1) }} + max_replica: {{ flink_job_names['program-user-info'].max_replica | default(2) }} jobmanager: rpc_port: {{ jobmanager_rpc_port }} @@ -534,3 +539,33 @@ user-cache-updater-v2: jobmanager.execution.failover-strategy: region taskmanager.memory.network.fraction: 0.1 +program-user-info: + program-user-info: |+ + include file("/data/flink/conf/base-config.conf") + kafka { + producer.broker-servers = "{{ kafka_brokers }}" + consumer.broker-servers = "{{ kafka_brokers }}" + zookeeper = "{{ zookeepers }}" + input.topic = {{ kafka_topic_programuser_info }} + groupId = {{ kafka_group_programuser_info }} + } + task { + consumer.parallelism = {{ programuserinfo_consumer_parallelism }} + downstream.parallelism = {{ programuserinfo_downstream_parallelism }} + programuser { + parallelism = {{ programuserinfo_programuser_parallelism }} + } + + } + ml-cassandra { + keyspace = "{{ middleware_cassandra_programuserinfo_keyspace }}" + table = "{{ middleware_cassandra_programuserinfo_table }}" + } + + flink-conf: |+ + jobmanager.memory.flink.size: {{ flink_job_names['program-user-info'].jobmanager_memory }} + taskmanager.memory.flink.size: {{ flink_job_names['program-user-info'].taskmanager_memory }} + taskmanager.numberOfTaskSlots: {{ flink_job_names['program-user-info'].taskslots }} + parallelism.default: 1 + jobmanager.execution.failover-strategy: region + taskmanager.memory.network.fraction: 0.1 \ No newline at end of file diff --git a/ml-jobs/pom.xml b/ml-jobs/pom.xml new file mode 100644 index 000000000..306c477cc --- /dev/null +++ b/ml-jobs/pom.xml @@ -0,0 +1,50 @@ + + 4.0.0 + + + org.sunbird + data-pipeline + 1.0 + ../pom.xml + + + ml-jobs + pom + ml-jobs + + + + program-user-info + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + org.scoverage + scoverage-maven-plugin + ${scoverage.plugin.version} + + ${scala.version} + true + true + org.sunbird.incredible + + + + + + + + \ No newline at end of file diff --git a/ml-jobs/program-user-info/pom.xml b/ml-jobs/program-user-info/pom.xml new file mode 100644 index 000000000..7ce8eae2a --- /dev/null +++ b/ml-jobs/program-user-info/pom.xml @@ -0,0 +1,220 @@ + + + + 4.0.0 + + org.sunbird + data-pipeline + 1.0 + ../../pom.xml + + program-user-info + 1.0.0 + jar + ProgramUserInfo + + Job to insert user pii events details to cassandra + + + + UTF-8 + 1.4.0 + + + + + org.apache.flink + flink-clients_${scala.version} + ${flink.version} + + + org.apache.flink + flink-streaming-scala_${scala.version} + ${flink.version} + provided + + + org.sunbird + dp-core + 1.0.0 + + + org.sunbird + dp-core + 1.0.0 + test-jar + test + + + org.apache.flink + flink-test-utils_${scala.version} + ${flink.version} + test + + + org.apache.flink + flink-streaming-java_${scala.version} + ${flink.version} + test + tests + + + org.scalatest + scalatest_${scala.version} + 3.0.6 + test + + + org.mockito + mockito-core + 3.3.3 + test + + + com.opentable.components + otj-pg-embedded + 0.13.3 + test + + + com.fiftyonred + mock-jedis + 0.4.0 + test + + + org.cassandraunit + cassandra-unit + 3.11.2.0 + test + + + + + src/main/scala + src/test/scala + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + com.google.code.findbugs:jsr305 + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask + + + + reference.conf + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 4.4.0 + + ${java.target.runtime} + ${java.target.runtime} + ${scala.maj.version} + false + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + + maven-surefire-plugin + 2.22.2 + + true + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + program-user-info-testsuite.txt + + + + test + + test + + + + + + org.scoverage + scoverage-maven-plugin + ${scoverage.plugin.version} + + ${scala.version} + true + true + + + + + + \ No newline at end of file diff --git a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf new file mode 100644 index 000000000..49c948faf --- /dev/null +++ b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf @@ -0,0 +1,21 @@ +include "base-config.conf" + +kafka { + input.topic = ${job.env}".programuser.info" + groupId = ${job.env}"-programuser-group" +} + +task { + consumer.parallelism = 1 + downstream.parallelism = 1 + programuser{ + parallelism = 1 + } +} + +ml-cassandra { + keyspace = "sunbird_programs" + table = "program_enrollment" + host = "localhost" + port = "9042" +} \ No newline at end of file diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala new file mode 100644 index 000000000..d2ca299aa --- /dev/null +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala @@ -0,0 +1,51 @@ +package org.sunbird.dp.userinfo.domain + +import java.util +import org.sunbird.dp.core.domain.Events + +class Event(eventMap: util.Map[String, Any]) extends Events(eventMap) { + + def _id: String = { + Option(telemetry.readOrDefault[String]("_id", null)).filter(_.nonEmpty).orNull + } + + def program_id: String = { + Option(telemetry.readOrDefault[String]("programId", null)).filter(_.nonEmpty).orNull + } + + def program_externalId: String = { + Option(telemetry.readOrDefault[String]("programExternalId", null)).filter(_.nonEmpty).orNull + } + + def program_name: String = { + Option(telemetry.readOrDefault[String]("programName", null)).filter(_.nonEmpty).orNull + } + + def pii_consent_required: Boolean = { + telemetry.readOrDefault[Boolean]("requestForPIIConsent", false) + } + + def user_id: String = { + Option(telemetry.readOrDefault[String]("userId", null)).filter(_.nonEmpty).orNull + } + + def created_at_string: String = { + Option(telemetry.readOrDefault[String]("createdAt", null)).filter(_.nonEmpty).orNull + } + + def updated_at_string: String = { + Option(telemetry.readOrDefault[String]("updatedAt", null)).filter(_.nonEmpty).orNull + } + + def user_Location: util.ArrayList[util.Map[String, Any]] = { + telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("userLocations").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + } + + def user_Types: util.ArrayList[util.Map[String, Any]] = { + telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("profileUserTypes").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + } + + def organisations: util.ArrayList[util.Map[String, Any]] = { + telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("organisations").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + } +} \ No newline at end of file diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala new file mode 100644 index 000000000..e0c529bca --- /dev/null +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala @@ -0,0 +1,186 @@ +package org.sunbird.dp.userinfo.functions + +import com.datastax.driver.core.querybuilder.QueryBuilder +import com.google.gson.reflect.TypeToken +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.slf4j.LoggerFactory +import org.sunbird.dp.core.job.{BaseProcessFunction, Metrics} +import org.sunbird.dp.core.util.CassandraUtil +import org.sunbird.dp.userinfo.domain.Event +import org.sunbird.dp.userinfo.task.ProgramUserInfoConfig +import java.lang.reflect.Type +import java.text.SimpleDateFormat +import java.util +import java.util.Date + +case class Data( + program_id: String, + program_externalId: String, + program_name: String, + pii_consent_required: Boolean, + user_id: String, + state_code: String, + state_id: String, + state_name: String, + district_code: String, + district_id: String, + district_name: String, + block_code: String, + block_id: String, + block_name: String, + cluster_code: String, + cluster_id: String, + cluster_name: String, + school_code: String, + school_id: String, + school_name: String, + organisation_id: String, + organisation_name: String, + user_sub_type: String, + user_type: String, + created_at: String, + updated_at: String + ) + +class ProgramUserInfoFunction(config: ProgramUserInfoConfig, + @transient var cassandraUtil: CassandraUtil = null + ) extends BaseProcessFunction[Event, Event](config) { + + val mapType: Type = new TypeToken[util.Map[String, AnyRef]]() {}.getType + private[this] val logger = LoggerFactory.getLogger(classOf[ProgramUserInfoFunction]) + + override def metricsList() = List() + + override def open(parameters: Configuration): Unit = { + super.open(parameters) + cassandraUtil = new CassandraUtil(config.dbHost, config.dbPort, config.isMultiDCEnabled) + } + + override def close(): Unit = { + super.close() + } + + /** + * This method will convert the incoming date-time string to required Date format "yyyy-MM-dd" + * @param dateStr date will be passed as a string in this format "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + * @return + */ + def getDateOnly(dateStr: String): java.sql.Date = { + val inputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + val date = inputFormat.parse(dateStr) + val outputFormat = new SimpleDateFormat("yyyy-MM-dd") + val formattedDate = outputFormat.format(date) + java.sql.Date.valueOf(formattedDate) + } + + override def processElement(event: Event, + context: ProcessFunction[Event, Event]#Context, + metrics: Metrics): Unit = { + + val created_at: Date = getDateOnly(event.created_at_string) + val updated_at: Date = getDateOnly(event.updated_at_string) + + val userProfileData = scala.collection.mutable.Map[String, String]() + + /** + * To get the key-values related to user location + */ + event.user_Location.forEach(f => { + userProfileData.put(f.get("type") + "_name", Option(f.get("name")).map(_.toString).filter(_.nonEmpty).orNull) + userProfileData.put(f.get("type") + "_id", Option(f.get("id")).map(_.toString).filter(_.nonEmpty).orNull) + userProfileData.put(f.get("type") + "_code", Option(f.get("code")).map(_.toString).filter(_.nonEmpty).orNull) + }) + + /** + * To get the key-values related to user type + */ + event.user_Types.forEach(f => { + userProfileData.put("type", Option(f.get("type")).map(_.toString).filter(_.nonEmpty).orNull) + userProfileData.put("sub_type", Option(f.get("subType")).map(_.toString).filter(_.nonEmpty).orNull) + }) + + /** + * To get the key-values related to organisation when is_school = false + */ + event.organisations.forEach(f => { + if (f.get("isSchool") == false) { + userProfileData.put("organisation_name", Option(f.get("orgName")).map(_.toString).filter(_.nonEmpty).orNull) + userProfileData.put("organisation_id", Option(f.get("organisationId")).map(_.toString).filter(_.nonEmpty).orNull) + } + }) + + /** + * Storing the parsed JSON event in to a single variable with the help of case class + */ + val UserData = Data( + program_id = event.program_id, + program_externalId = event.program_externalId, + program_name = event.program_name, + pii_consent_required = event.pii_consent_required, + user_id = event.user_id, + state_code = userProfileData.getOrElse("state_code", null), + state_id = userProfileData.getOrElse("state_id", null), + state_name = userProfileData.getOrElse("state_name", null), + district_code = userProfileData.getOrElse("district_code", null), + district_id = userProfileData.getOrElse("district_id", null), + district_name = userProfileData.getOrElse("district_name", null), + block_code = userProfileData.getOrElse("block_code", null), + block_id = userProfileData.getOrElse("block_id", null), + block_name = userProfileData.getOrElse("block_name", null), + cluster_code = userProfileData.getOrElse("cluster_code", null), + cluster_id = userProfileData.getOrElse("cluster_id", null), + cluster_name = userProfileData.getOrElse("cluster_name", null), + school_code = userProfileData.getOrElse("school_code", null), + school_id = userProfileData.getOrElse("school_id", null), + school_name = userProfileData.getOrElse("school_name", null), + organisation_id = userProfileData.getOrElse("organisation_id", null), + organisation_name = userProfileData.getOrElse("organisation_name", null), + user_sub_type = userProfileData.getOrElse("sub_type", null), + user_type = userProfileData.getOrElse("type", null), + created_at = created_at.toString, + updated_at = updated_at.toString + ) + logger.info("Successfully parsed JSON and stored back the required fields to single case class") + + if (UserData.program_id != null && UserData.user_id != null) { + val query = QueryBuilder.insertInto(config.dbKeyspace, config.dbTable) + .value("program_id", UserData.program_id) + .value("program_externalId", UserData.program_externalId) + .value("program_name", UserData.program_name) + .value("pii_consent_required", UserData.pii_consent_required) + .value("user_id", UserData.user_id) + .value("state_code", UserData.state_code) + .value("state_id", UserData.state_id) + .value("state_name", UserData.state_name) + .value("district_code", UserData.district_code) + .value("district_id", UserData.district_id) + .value("district_name", UserData.district_name) + .value("block_code", UserData.block_code) + .value("block_id", UserData.block_id) + .value("block_name", UserData.block_name) + .value("cluster_code", UserData.cluster_code) + .value("cluster_id", UserData.cluster_id) + .value("cluster_name", UserData.cluster_name) + .value("school_code", UserData.school_code) + .value("school_id", UserData.school_id) + .value("school_name", UserData.school_name) + .value("organisation_id", UserData.organisation_id) + .value("organisation_name", UserData.organisation_name) + .value("user_sub_type", UserData.user_sub_type) + .value("user_type", UserData.user_type) + .value("created_at", UserData.created_at) + .value("updated_at", UserData.updated_at).toString + + cassandraUtil.upsert(query) + + logger.info("Successfully inserted the parsed events to Database with program_Id = " + UserData.program_id) + } + else { + logger.debug("This Event is skipped due to ProgramId or userId missing DocumentId:" + event._id) + } + } + + + +} diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala new file mode 100644 index 000000000..f0ba8eb13 --- /dev/null +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala @@ -0,0 +1,28 @@ +package org.sunbird.dp.userinfo.task + +import com.typesafe.config.Config +import org.sunbird.dp.core.job.BaseJobConfig + +class ProgramUserInfoConfig(override val config: Config) extends BaseJobConfig(config, jobName = "ProgramUserInfo") { + + //Kafka + override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism") + + val programUserParallelism: Int = config.getInt("task.programUser.parallelism") + + val kafkaInputTopic: String = config.getString("kafka.input.topic") + + // Consumer + val programUserConsumer = "program-user-consumer" + + //Cassandra + val dbTable: String = config.getString("ml-cassandra.table") + val dbKeyspace: String = config.getString("ml-cassandra.keyspace") + val dbHost: String = config.getString("ml-cassandra.host") + val dbPort: Int = config.getInt("ml-cassandra.port") + + + // Functions + val programUserInfoFunction = "ProgramUserInfoFunction" + +} diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala new file mode 100644 index 000000000..6ea547263 --- /dev/null +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoStreamTask.scala @@ -0,0 +1,47 @@ +package org.sunbird.dp.userinfo.task + +import org.sunbird.dp.core.job.FlinkKafkaConnector +import com.typesafe.config.ConfigFactory +import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.sunbird.dp.core.util.FlinkUtil +import org.sunbird.dp.userinfo.domain.Event +import org.sunbird.dp.userinfo.functions.ProgramUserInfoFunction + +import java.io.File + +class ProgramUserInfoStreamTask(config: ProgramUserInfoConfig, kafkaConnector: FlinkKafkaConnector) { + + private val serialVersionUID = -7729362727131516112L + + def process(): Unit = { + + implicit val env: StreamExecutionEnvironment = FlinkUtil.getExecutionContext(config) + val source = kafkaConnector.kafkaEventSource[Event](config.kafkaInputTopic) + + env.addSource(source, config.programUserConsumer) + .uid(config.programUserConsumer).setParallelism(config.kafkaConsumerParallelism).rebalance() + .process(new ProgramUserInfoFunction(config)) + .name(config.programUserInfoFunction).uid(config.programUserInfoFunction) + .setParallelism(config.programUserParallelism) + + env.execute(config.jobName) + } +} + +object ProgramUserInfoStreamTask { + + def main(args: Array[String]): Unit = { + // $COVERAGE-OFF$ + val configFilePath = Option(ParameterTool.fromArgs(args).get("config.file.path")) + val config = configFilePath.map { + path => ConfigFactory.parseFile(new File(path)).resolve() + }.getOrElse(ConfigFactory.load("program-user-info").withFallback(ConfigFactory.systemEnvironment())) + val programUserInfoConfig = new ProgramUserInfoConfig(config) + val kafkaUtil = new FlinkKafkaConnector(programUserInfoConfig) + val task = new ProgramUserInfoStreamTask(programUserInfoConfig, kafkaUtil) + task.process() + // $COVERAGE-ON$ + } + +} diff --git a/ml-jobs/program-user-info/src/test/resources/test.conf b/ml-jobs/program-user-info/src/test/resources/test.conf new file mode 100644 index 000000000..154369a55 --- /dev/null +++ b/ml-jobs/program-user-info/src/test/resources/test.conf @@ -0,0 +1,21 @@ +include "base-config.conf" + +kafka { + input.topic = "localhost.programuser.info" + groupId = "localhost-programuser-group" +} + +task { + consumer.parallelism = 1 + downstream.parallelism = 1 + programUser{ + parallelism = 1 + } +} + +ml-cassandra { + keyspace = "sunbird_programs" + table = "program_enrollment" + host = "localhost" + port = "9042" +} diff --git a/ml-jobs/program-user-info/src/test/resources/test.cql b/ml-jobs/program-user-info/src/test/resources/test.cql new file mode 100644 index 000000000..8576e431c --- /dev/null +++ b/ml-jobs/program-user-info/src/test/resources/test.cql @@ -0,0 +1,47 @@ +CREATE KEYSPACE IF NOT EXISTS sunbird_programs WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': '1' + }; + + CREATE TABLE IF NOT EXISTS sunbird_programs.program_enrollment ( + program_id text, + program_externalId text, + program_name text, + pii_consent_required boolean, + user_id text, + state_code text, + state_id text, + state_name text, + district_code text, + district_id text, + district_name text, + block_code text, + block_id text, + block_name text, + cluster_code text, + cluster_id text, + cluster_name text, + school_code text, + school_id text, + school_name text, + organisation_id text, + organisation_name text, + user_sub_type text, + user_type text, + created_at date, + updated_at date, + PRIMARY KEY (program_id,user_id) + ) WITH bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND comment = '' + AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} + AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND crc_check_chance = 1.0 + AND dclocal_read_repair_chance = 0.1 + AND default_time_to_live = 0 + AND gc_grace_seconds = 864000 + AND max_index_interval = 2048 + AND memtable_flush_period_in_ms = 0 + AND min_index_interval = 128 + AND read_repair_chance = 0.0 + AND speculative_retry = '99PERCENTILE'; \ No newline at end of file diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala new file mode 100644 index 000000000..acb1b1948 --- /dev/null +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala @@ -0,0 +1,11 @@ +package org.sunbird.dp.fixture + +object EventFixture { + + val EVENT = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"5f362b78af0a4decfa9a106f","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"organisations":[{"organisationId":"0126796199493140480","orgName":"Pre-prod Custodian Organization","isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + + val WHEN_VALUES_ARE_EMPTY = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"98765","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"","name":"","id":"","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"","name":"","id":"","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"","name":"","id":"","type":"state","parentId":null},{"code":"","name":"","id":"","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"","name":"","id":"","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"","type":""}],"organisations":[{"organisationId":"","orgName":"","isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + + val WHEN_VALUES_ARE_NULL = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"67890","programName":null,"programExternalId":null,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"54321","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"","name":"","id":"","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":null,"name":null,"id":null,"type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":null,"name":null,"id":null,"type":"state","parentId":null},{"code":null,"name":null,"id":null,"type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":null,"name":null,"id":null,"type":"school","parentId":"g"}],"profileUserTypes":[{"subType":null,"type":null}],"organisations":[{"organisationId":null,"orgName":null,"isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2040-01-12T06:30:56.829Z","createdAt":"2050-01-12T06:30:09.476Z","__v":0}""" + +} \ No newline at end of file diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala new file mode 100644 index 000000000..883166bdd --- /dev/null +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala @@ -0,0 +1,114 @@ +package org.sunbird.dp.spec + + +import com.google.gson.{Gson, JsonParser} +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.test.util.MiniClusterWithClientResource +import org.cassandraunit.CQLDataLoader +import org.cassandraunit.dataset.cql.FileCQLDataSet +import org.cassandraunit.utils.EmbeddedCassandraServerHelper +import org.mockito.Mockito +import org.mockito.Mockito.when +import org.sunbird.dp.userinfo.domain.Event +import org.sunbird.dp.core.job.FlinkKafkaConnector +import org.sunbird.dp.core.util.{CassandraUtil, JSONUtil} +import org.sunbird.dp.fixture.EventFixture +import org.sunbird.dp.userinfo.task.{ProgramUserInfoConfig, ProgramUserInfoStreamTask} +import org.sunbird.dp.{BaseMetricsReporter, BaseTestSpec} + + +import java.util + +class ProgramUserInfoTaskTestSpec extends BaseTestSpec { + + /** + * The TypeInformation interface is part of the Flink API for working with data streams and datasets, It is capable of describing the data type + */ + implicit val mapTypeInfo: TypeInformation[util.Map[String, AnyRef]] = TypeExtractor.getForClass(classOf[util.Map[String, AnyRef]]) + + /** + * This creates a Apache Flink cluster with one task manager and one slot per task manager, using the MiniClusterWithClientResource class. + */ + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() + .setConfiguration(testConfiguration()) + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build) + + /** + * mock[FlinkKafkaConnector] creates a mock instance of FlinkKafkaConnector. + * new Gson() creates a new instance of the Gson library, which is used for converting between Java objects and JSON. + */ + val config: Config = ConfigFactory.load("test.conf") + val programUserConfig: ProgramUserInfoConfig = new ProgramUserInfoConfig(config) + val mockKafkaUtil: FlinkKafkaConnector = mock[FlinkKafkaConnector](Mockito.withSettings().serializable()) + val gson = new Gson() + + var cassandraUtil: CassandraUtil = _ + + /** + * The startEmbeddedCassandra method of EmbeddedCassandraServerHelper is called to start an embedded Cassandra server. + * An instance of CassandraUtil is created using the configuration properties from programUserConfig. + * The test data is loaded into the embedded Cassandra server using the CQLDataLoader from the cassandra-unit library. + * The testCassandraUtil method is called to verify the CassandraUtil instance and clear any metrics. + * The before method of the flinkCluster is called to start the Flink cluster. + */ + override protected def beforeAll(): Unit = { + super.beforeAll() + EmbeddedCassandraServerHelper.startEmbeddedCassandra(80000L) + cassandraUtil = new CassandraUtil(programUserConfig.dbHost, programUserConfig.dbPort, programUserConfig.isMultiDCEnabled) + val session = cassandraUtil.session + val dataLoader = new CQLDataLoader(session) + dataLoader.load(new FileCQLDataSet(getClass.getResource("/test.cql").getPath, true, true)); + // Clear the metrics + testCassandraUtil(cassandraUtil) + BaseMetricsReporter.gaugeMetrics.clear() + + flinkCluster.before() + } + + override protected def afterAll(): Unit = { + super.afterAll() + try { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + } catch { + case ex: Exception => { + + } + } + flinkCluster.after() + } + + + it should "get data from Kafka" in { + when(mockKafkaUtil.kafkaEventSource[Event](programUserConfig.kafkaInputTopic)).thenReturn(new ProgramUserInfoEventSource) + val task = new ProgramUserInfoStreamTask(programUserConfig,mockKafkaUtil) + task.process() + } + + def testCassandraUtil(cassandraUtil: CassandraUtil): Unit = { + cassandraUtil.reconnect() + val response = cassandraUtil.find("SELECT * FROM sunbird_programs.program_enrollment;") + response should not be (null) + } +} + + +class ProgramUserInfoEventSource extends SourceFunction[Event] { + override def run(ctx: SourceContext[Event]) { + val eventMap1 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.EVENT) + val eventMap2 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_EMPTY) + val eventMap3 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_NULL) + ctx.collect(new Event(eventMap1)) + ctx.collect(new Event(eventMap2)) + ctx.collect(new Event(eventMap3)) + + } + + override def cancel() = {} +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5e03b8dc0..10b5aa26e 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ dp-core lms-jobs user-org-jobs + ml-jobs From 70a5167cdca170fddb29f50b79f076f019e925f1 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Fri, 10 Mar 2023 20:18:13 +0530 Subject: [PATCH 2/6] Add new line --- kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml | 1 + kubernetes/helm_charts/datapipeline_jobs/values.j2 | 3 ++- ml-jobs/pom.xml | 3 ++- ml-jobs/program-user-info/pom.xml | 3 ++- .../src/main/resources/program-user-info.conf | 3 ++- .../src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala | 3 ++- .../dp/userinfo/functions/ProgramUserInfoFunction.scala | 1 + ml-jobs/program-user-info/src/test/resources/test.cql | 3 ++- .../src/test/scala/org/sunbird/dp/fixture/EventFixture.scala | 3 ++- .../org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala | 3 ++- 10 files changed, 18 insertions(+), 8 deletions(-) diff --git a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml index 18fa68c0c..0cbc79a44 100644 --- a/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml +++ b/kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml @@ -222,3 +222,4 @@ service_monitor_enabled: true ### controlling the flink jobs log level flink_jobs_console_log_level: INFO flink_libraries_log_level: ERROR + diff --git a/kubernetes/helm_charts/datapipeline_jobs/values.j2 b/kubernetes/helm_charts/datapipeline_jobs/values.j2 index b2599a363..e57c455be 100644 --- a/kubernetes/helm_charts/datapipeline_jobs/values.j2 +++ b/kubernetes/helm_charts/datapipeline_jobs/values.j2 @@ -568,4 +568,5 @@ program-user-info: taskmanager.numberOfTaskSlots: {{ flink_job_names['program-user-info'].taskslots }} parallelism.default: 1 jobmanager.execution.failover-strategy: region - taskmanager.memory.network.fraction: 0.1 \ No newline at end of file + taskmanager.memory.network.fraction: 0.1 + diff --git a/ml-jobs/pom.xml b/ml-jobs/pom.xml index 306c477cc..bc0a21858 100644 --- a/ml-jobs/pom.xml +++ b/ml-jobs/pom.xml @@ -47,4 +47,5 @@ - \ No newline at end of file + + diff --git a/ml-jobs/program-user-info/pom.xml b/ml-jobs/program-user-info/pom.xml index 7ce8eae2a..995174d31 100644 --- a/ml-jobs/program-user-info/pom.xml +++ b/ml-jobs/program-user-info/pom.xml @@ -217,4 +217,5 @@ - \ No newline at end of file + + diff --git a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf index 49c948faf..8408b7ffc 100644 --- a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf +++ b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf @@ -18,4 +18,5 @@ ml-cassandra { table = "program_enrollment" host = "localhost" port = "9042" -} \ No newline at end of file +} + diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala index d2ca299aa..a2a5343ad 100644 --- a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala @@ -48,4 +48,5 @@ class Event(eventMap: util.Map[String, Any]) extends Events(eventMap) { def organisations: util.ArrayList[util.Map[String, Any]] = { telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("organisations").asInstanceOf[util.ArrayList[util.Map[String, Any]]] } -} \ No newline at end of file +} + diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala index e0c529bca..cd3a66bce 100644 --- a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala @@ -184,3 +184,4 @@ class ProgramUserInfoFunction(config: ProgramUserInfoConfig, } + diff --git a/ml-jobs/program-user-info/src/test/resources/test.cql b/ml-jobs/program-user-info/src/test/resources/test.cql index 8576e431c..1391a28e6 100644 --- a/ml-jobs/program-user-info/src/test/resources/test.cql +++ b/ml-jobs/program-user-info/src/test/resources/test.cql @@ -44,4 +44,5 @@ CREATE KEYSPACE IF NOT EXISTS sunbird_programs WITH replication = { AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 - AND speculative_retry = '99PERCENTILE'; \ No newline at end of file + AND speculative_retry = '99PERCENTILE'; + diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala index acb1b1948..72f3c33ba 100644 --- a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala @@ -8,4 +8,5 @@ object EventFixture { val WHEN_VALUES_ARE_NULL = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"67890","programName":null,"programExternalId":null,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"54321","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"","name":"","id":"","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":null,"name":null,"id":null,"type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":null,"name":null,"id":null,"type":"state","parentId":null},{"code":null,"name":null,"id":null,"type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":null,"name":null,"id":null,"type":"school","parentId":"g"}],"profileUserTypes":[{"subType":null,"type":null}],"organisations":[{"organisationId":null,"orgName":null,"isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2040-01-12T06:30:56.829Z","createdAt":"2050-01-12T06:30:09.476Z","__v":0}""" -} \ No newline at end of file +} + diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala index 883166bdd..17237c486 100644 --- a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala @@ -111,4 +111,5 @@ class ProgramUserInfoEventSource extends SourceFunction[Event] { } override def cancel() = {} -} \ No newline at end of file +} + From c01e864cc6c3e978447184057ea682b53ffcce06 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Fri, 10 Mar 2023 21:31:26 +0530 Subject: [PATCH 3/6] Add update config --- .../program-user-info/src/main/resources/program-user-info.conf | 2 -- .../org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala | 2 +- ml-jobs/program-user-info/src/test/resources/test.conf | 2 -- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf index 8408b7ffc..8a811d27e 100644 --- a/ml-jobs/program-user-info/src/main/resources/program-user-info.conf +++ b/ml-jobs/program-user-info/src/main/resources/program-user-info.conf @@ -16,7 +16,5 @@ task { ml-cassandra { keyspace = "sunbird_programs" table = "program_enrollment" - host = "localhost" - port = "9042" } diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala index f0ba8eb13..19c25f08a 100644 --- a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/task/ProgramUserInfoConfig.scala @@ -8,7 +8,7 @@ class ProgramUserInfoConfig(override val config: Config) extends BaseJobConfig(c //Kafka override val kafkaConsumerParallelism: Int = config.getInt("task.consumer.parallelism") - val programUserParallelism: Int = config.getInt("task.programUser.parallelism") + val programUserParallelism: Int = config.getInt("task.programuser.parallelism") val kafkaInputTopic: String = config.getString("kafka.input.topic") diff --git a/ml-jobs/program-user-info/src/test/resources/test.conf b/ml-jobs/program-user-info/src/test/resources/test.conf index 154369a55..c65d82500 100644 --- a/ml-jobs/program-user-info/src/test/resources/test.conf +++ b/ml-jobs/program-user-info/src/test/resources/test.conf @@ -16,6 +16,4 @@ task { ml-cassandra { keyspace = "sunbird_programs" table = "program_enrollment" - host = "localhost" - port = "9042" } From c2358fcef8ddfb2634da814a945dbca5e86f3026 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Sat, 11 Mar 2023 12:12:09 +0530 Subject: [PATCH 4/6] Update test conf --- ml-jobs/program-user-info/src/test/resources/test.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ml-jobs/program-user-info/src/test/resources/test.conf b/ml-jobs/program-user-info/src/test/resources/test.conf index c65d82500..18cb25627 100644 --- a/ml-jobs/program-user-info/src/test/resources/test.conf +++ b/ml-jobs/program-user-info/src/test/resources/test.conf @@ -8,7 +8,7 @@ kafka { task { consumer.parallelism = 1 downstream.parallelism = 1 - programUser{ + programuser{ parallelism = 1 } } From 2b89c99064791bf5d50304bc20d2799c5e2241f1 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Mon, 13 Mar 2023 13:21:20 +0530 Subject: [PATCH 5/6] Update cassandra related conf --- dp-core/src/main/resources/base-config.conf | 8 +++++++- dp-core/src/test/resources/base-test.conf | 6 ++++++ ml-jobs/program-user-info/src/test/resources/test.conf | 2 +- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/dp-core/src/main/resources/base-config.conf b/dp-core/src/main/resources/base-config.conf index 655281703..a0d2e4ec2 100644 --- a/dp-core/src/main/resources/base-config.conf +++ b/dp-core/src/main/resources/base-config.conf @@ -50,4 +50,10 @@ lms-cassandra { host = "localhost" port = "9042" isMultiDCEnabled = false -} \ No newline at end of file +} + +ml-cassandra { + host = "localhost" + port = "9042" + isMultiDCEnabled = false +} diff --git a/dp-core/src/test/resources/base-test.conf b/dp-core/src/test/resources/base-test.conf index bb9c37a0c..40033c209 100644 --- a/dp-core/src/test/resources/base-test.conf +++ b/dp-core/src/test/resources/base-test.conf @@ -56,3 +56,9 @@ lms-cassandra { port = 9142 isMultiDCEnabled = false } + +ml-cassandra { + host = "localhost" + port = 9142 + isMultiDCEnabled = false +} diff --git a/ml-jobs/program-user-info/src/test/resources/test.conf b/ml-jobs/program-user-info/src/test/resources/test.conf index 18cb25627..eea6ed618 100644 --- a/ml-jobs/program-user-info/src/test/resources/test.conf +++ b/ml-jobs/program-user-info/src/test/resources/test.conf @@ -1,4 +1,4 @@ -include "base-config.conf" +include "base-test.conf" kafka { input.topic = "localhost.programuser.info" From 2acedb1b9ee0e579914006429fb9a466b4a205d6 Mon Sep 17 00:00:00 2001 From: Ashwiniev95 Date: Thu, 30 Mar 2023 20:51:49 +0530 Subject: [PATCH 6/6] Modify userProfile logic --- .../sunbird/dp/userinfo/domain/Event.scala | 8 +- .../functions/ProgramUserInfoFunction.scala | 163 ++++++++---------- .../sunbird/dp/userinfo/util/Commons.scala | 30 ++++ .../src/test/resources/test.cql | 18 +- .../org/sunbird/dp/fixture/EventFixture.scala | 12 +- .../dp/spec/ProgramUserInfoTaskTestSpec.scala | 11 +- 6 files changed, 125 insertions(+), 117 deletions(-) create mode 100644 ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala index a2a5343ad..11ff07165 100644 --- a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/domain/Event.scala @@ -38,15 +38,15 @@ class Event(eventMap: util.Map[String, Any]) extends Events(eventMap) { } def user_Location: util.ArrayList[util.Map[String, Any]] = { - telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("userLocations").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + telemetry.read[util.ArrayList[util.Map[String, Any]]]("userProfile.userLocations").getOrElse(null) } def user_Types: util.ArrayList[util.Map[String, Any]] = { - telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("profileUserTypes").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + telemetry.read[util.ArrayList[util.Map[String, Any]]]("userProfile.profileUserTypes").getOrElse(null) } - def organisations: util.ArrayList[util.Map[String, Any]] = { - telemetry.getMap.get("userProfile").asInstanceOf[util.Map[String, Any]].get("organisations").asInstanceOf[util.ArrayList[util.Map[String, Any]]] + def organisations: util.Map[String, Any] = { + telemetry.read[util.Map[String, Any]]("userProfile.rootOrg").getOrElse(null) } } diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala index cd3a66bce..2060d90ed 100644 --- a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/functions/ProgramUserInfoFunction.scala @@ -1,7 +1,5 @@ package org.sunbird.dp.userinfo.functions -import com.datastax.driver.core.querybuilder.QueryBuilder -import com.google.gson.reflect.TypeToken import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.slf4j.LoggerFactory @@ -9,32 +7,18 @@ import org.sunbird.dp.core.job.{BaseProcessFunction, Metrics} import org.sunbird.dp.core.util.CassandraUtil import org.sunbird.dp.userinfo.domain.Event import org.sunbird.dp.userinfo.task.ProgramUserInfoConfig -import java.lang.reflect.Type +import org.sunbird.dp.userinfo.util.Commons + import java.text.SimpleDateFormat import java.util import java.util.Date case class Data( program_id: String, - program_externalId: String, + program_externalid: String, program_name: String, pii_consent_required: Boolean, user_id: String, - state_code: String, - state_id: String, - state_name: String, - district_code: String, - district_id: String, - district_name: String, - block_code: String, - block_id: String, - block_name: String, - cluster_code: String, - cluster_id: String, - cluster_name: String, - school_code: String, - school_id: String, - school_name: String, organisation_id: String, organisation_name: String, user_sub_type: String, @@ -47,7 +31,6 @@ class ProgramUserInfoFunction(config: ProgramUserInfoConfig, @transient var cassandraUtil: CassandraUtil = null ) extends BaseProcessFunction[Event, Event](config) { - val mapType: Type = new TypeToken[util.Map[String, AnyRef]]() {}.getType private[this] val logger = LoggerFactory.getLogger(classOf[ProgramUserInfoFunction]) override def metricsList() = List() @@ -63,6 +46,7 @@ class ProgramUserInfoFunction(config: ProgramUserInfoConfig, /** * This method will convert the incoming date-time string to required Date format "yyyy-MM-dd" + * * @param dateStr date will be passed as a string in this format "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" * @return */ @@ -81,107 +65,104 @@ class ProgramUserInfoFunction(config: ProgramUserInfoConfig, val created_at: Date = getDateOnly(event.created_at_string) val updated_at: Date = getDateOnly(event.updated_at_string) - val userProfileData = scala.collection.mutable.Map[String, String]() - /** * To get the key-values related to user location */ - event.user_Location.forEach(f => { - userProfileData.put(f.get("type") + "_name", Option(f.get("name")).map(_.toString).filter(_.nonEmpty).orNull) - userProfileData.put(f.get("type") + "_id", Option(f.get("id")).map(_.toString).filter(_.nonEmpty).orNull) - userProfileData.put(f.get("type") + "_code", Option(f.get("code")).map(_.toString).filter(_.nonEmpty).orNull) - }) + val userLocationData = new util.HashMap[String, String] + if(event.user_Location!= null && event.user_Location.isEmpty==false) { + event.user_Location.forEach(f => { + if (f != null && f.get("type") != null && !f.get("type").toString().isBlank()) { + if (f != null && f.get("name") != null && !f.get("name").toString().isBlank()) { + userLocationData.put(f.get("type") + "_name", f.get("name").toString) + } + if (f != null && f.get("id") != null && !f.get("id").toString().isBlank()) { + userLocationData.put(f.get("type") + "_id", f.get("id").toString) + } + if (f != null && f.get("code") != null && !f.get("code").toString().isBlank()) { + userLocationData.put(f.get("type") + "_code", f.get("code").toString) + } + } + }) + } /** * To get the key-values related to user type */ - event.user_Types.forEach(f => { - userProfileData.put("type", Option(f.get("type")).map(_.toString).filter(_.nonEmpty).orNull) - userProfileData.put("sub_type", Option(f.get("subType")).map(_.toString).filter(_.nonEmpty).orNull) - }) + val userTypeData = new util.HashMap[String, String] + if(event.user_Types!= null && event.user_Types.isEmpty==false) { + event.user_Types.forEach(f => { + userTypeData.put("type", Option(f.get("type")).map(_.toString).filter(_.nonEmpty).orNull) + userTypeData.put("sub_type", Option(f.get("subType")).map(_.toString).filter(_.nonEmpty).orNull) + }) + } /** - * To get the key-values related to organisation when is_school = false + * To get the key-values related to user organisation */ - event.organisations.forEach(f => { - if (f.get("isSchool") == false) { - userProfileData.put("organisation_name", Option(f.get("orgName")).map(_.toString).filter(_.nonEmpty).orNull) - userProfileData.put("organisation_id", Option(f.get("organisationId")).map(_.toString).filter(_.nonEmpty).orNull) - } - }) + val organisationsData = new util.HashMap[String, String] + if(event.organisations!= null) { + organisationsData.put("organisation_id", Option(event.organisations.get("id")) match { + case Some(s: String) if s.trim.nonEmpty => s + case _ => null + }) + organisationsData.put("organisation_name", Option(event.organisations.get("orgName")) match { + case Some(s: String) if s.trim.nonEmpty => s + case _ => null + }) + } /** * Storing the parsed JSON event in to a single variable with the help of case class */ val UserData = Data( program_id = event.program_id, - program_externalId = event.program_externalId, + program_externalid = event.program_externalId, program_name = event.program_name, pii_consent_required = event.pii_consent_required, user_id = event.user_id, - state_code = userProfileData.getOrElse("state_code", null), - state_id = userProfileData.getOrElse("state_id", null), - state_name = userProfileData.getOrElse("state_name", null), - district_code = userProfileData.getOrElse("district_code", null), - district_id = userProfileData.getOrElse("district_id", null), - district_name = userProfileData.getOrElse("district_name", null), - block_code = userProfileData.getOrElse("block_code", null), - block_id = userProfileData.getOrElse("block_id", null), - block_name = userProfileData.getOrElse("block_name", null), - cluster_code = userProfileData.getOrElse("cluster_code", null), - cluster_id = userProfileData.getOrElse("cluster_id", null), - cluster_name = userProfileData.getOrElse("cluster_name", null), - school_code = userProfileData.getOrElse("school_code", null), - school_id = userProfileData.getOrElse("school_id", null), - school_name = userProfileData.getOrElse("school_name", null), - organisation_id = userProfileData.getOrElse("organisation_id", null), - organisation_name = userProfileData.getOrElse("organisation_name", null), - user_sub_type = userProfileData.getOrElse("sub_type", null), - user_type = userProfileData.getOrElse("type", null), + organisation_id = organisationsData.getOrDefault("organisation_id", null), + organisation_name = organisationsData.getOrDefault("organisation_name", null), + user_sub_type = userTypeData.getOrDefault("sub_type", null), + user_type = userTypeData.getOrDefault("type", null), created_at = created_at.toString, updated_at = updated_at.toString ) logger.info("Successfully parsed JSON and stored back the required fields to single case class") - if (UserData.program_id != null && UserData.user_id != null) { - val query = QueryBuilder.insertInto(config.dbKeyspace, config.dbTable) - .value("program_id", UserData.program_id) - .value("program_externalId", UserData.program_externalId) - .value("program_name", UserData.program_name) - .value("pii_consent_required", UserData.pii_consent_required) - .value("user_id", UserData.user_id) - .value("state_code", UserData.state_code) - .value("state_id", UserData.state_id) - .value("state_name", UserData.state_name) - .value("district_code", UserData.district_code) - .value("district_id", UserData.district_id) - .value("district_name", UserData.district_name) - .value("block_code", UserData.block_code) - .value("block_id", UserData.block_id) - .value("block_name", UserData.block_name) - .value("cluster_code", UserData.cluster_code) - .value("cluster_id", UserData.cluster_id) - .value("cluster_name", UserData.cluster_name) - .value("school_code", UserData.school_code) - .value("school_id", UserData.school_id) - .value("school_name", UserData.school_name) - .value("organisation_id", UserData.organisation_id) - .value("organisation_name", UserData.organisation_name) - .value("user_sub_type", UserData.user_sub_type) - .value("user_type", UserData.user_type) - .value("created_at", UserData.created_at) - .value("updated_at", UserData.updated_at).toString - - cassandraUtil.upsert(query) + val insertData = new util.HashMap[String, AnyRef] + insertData.put("program_id", UserData.program_id) + insertData.put("program_externalid", UserData.program_externalid) + insertData.put("program_name", UserData.program_name) + insertData.put("pii_consent_required", UserData.pii_consent_required.asInstanceOf[AnyRef]) + insertData.put("user_id", UserData.user_id) + if(event.user_Location!= null) { + insertData.put("user_locations", userLocationData) + } + if(event.organisations!=null) { + insertData.put("organisation_id", UserData.organisation_id) + insertData.put("organisation_name", UserData.organisation_name) + } + if(event.user_Types!=null) { + insertData.put("user_sub_type", UserData.user_sub_type) + insertData.put("user_type", UserData.user_type) + } + insertData.put("created_at", UserData.created_at) + insertData.put("updated_at", UserData.updated_at) + + /** + * Insert flattened data into cassandra database + */ + + if (UserData.program_id != null && UserData.user_id != null) { + val commons = new Commons(cassandraUtil) + commons.insertDbRecord(config.dbKeyspace, config.dbTable, insertData) logger.info("Successfully inserted the parsed events to Database with program_Id = " + UserData.program_id) + } else { logger.debug("This Event is skipped due to ProgramId or userId missing DocumentId:" + event._id) } } - - } - diff --git a/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala new file mode 100644 index 000000000..2b1eb4285 --- /dev/null +++ b/ml-jobs/program-user-info/src/main/scala/org/sunbird/dp/userinfo/util/Commons.scala @@ -0,0 +1,30 @@ +package org.sunbird.dp.userinfo.util + +import com.datastax.driver.core.querybuilder.QueryBuilder +import org.apache.commons.collections.MapUtils +import org.sunbird.dp.core.util.CassandraUtil + +import java.util +import java.util.Map + +class Commons (@transient var cassandraUtil: CassandraUtil){ + + def insertDbRecord(keyspace: String, table: String, keys: Map[String, AnyRef]) = { + val insertQuery = QueryBuilder.insertInto(keyspace, table) + convertKeyCase(keys).entrySet.forEach((entry: util.Map.Entry[String, AnyRef]) => insertQuery.value(entry.getKey, entry.getValue)) + cassandraUtil.upsert(insertQuery.toString) + } + + private def convertKeyCase(properties: util.Map[String, AnyRef]) = { + val keyLowerCaseMap = new util.HashMap[String, AnyRef] + if (MapUtils.isNotEmpty(properties)) { + properties.entrySet.stream() + .filter(e => e != null && e.getKey != null) + .forEach((entry: util.Map.Entry[String, AnyRef]) => { + keyLowerCaseMap.put(entry.getKey.toLowerCase, entry.getValue) + }) + } + keyLowerCaseMap + } + +} diff --git a/ml-jobs/program-user-info/src/test/resources/test.cql b/ml-jobs/program-user-info/src/test/resources/test.cql index 1391a28e6..421265fda 100644 --- a/ml-jobs/program-user-info/src/test/resources/test.cql +++ b/ml-jobs/program-user-info/src/test/resources/test.cql @@ -3,27 +3,13 @@ CREATE KEYSPACE IF NOT EXISTS sunbird_programs WITH replication = { 'replication_factor': '1' }; - CREATE TABLE IF NOT EXISTS sunbird_programs.program_enrollment ( +CREATE TABLE IF NOT EXISTS sunbird_programs.program_enrollment ( program_id text, program_externalId text, program_name text, pii_consent_required boolean, user_id text, - state_code text, - state_id text, - state_name text, - district_code text, - district_id text, - district_name text, - block_code text, - block_id text, - block_name text, - cluster_code text, - cluster_id text, - cluster_name text, - school_code text, - school_id text, - school_name text, + user_locations map, organisation_id text, organisation_name text, user_sub_type text, diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala index 72f3c33ba..1f4a3c203 100644 --- a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/fixture/EventFixture.scala @@ -2,11 +2,17 @@ package org.sunbird.dp.fixture object EventFixture { - val EVENT = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"5f362b78af0a4decfa9a106f","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"organisations":[{"organisationId":"0126796199493140480","orgName":"Pre-prod Custodian Organization","isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + val MODIFIED_EVENT = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"MODIFIED_EVENT","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" - val WHEN_VALUES_ARE_EMPTY = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"98765","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"","name":"","id":"","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"","name":"","id":"","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"","name":"","id":"","type":"state","parentId":null},{"code":"","name":"","id":"","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"","name":"","id":"","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"","type":""}],"organisations":[{"organisationId":"","orgName":"","isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + val NO_DATA = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":null,"programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":null,"appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" - val WHEN_VALUES_ARE_NULL = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"67890","programName":null,"programExternalId":null,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"54321","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"","name":"","id":"","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":null,"name":null,"id":null,"type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":null,"name":null,"id":null,"type":"state","parentId":null},{"code":null,"name":null,"id":null,"type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":null,"name":null,"id":null,"type":"school","parentId":"g"}],"profileUserTypes":[{"subType":null,"type":null}],"organisations":[{"organisationId":null,"orgName":null,"isSchool":false},{"organisationId":"01337588247832985613211","orgName":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","isSchool":true}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2040-01-12T06:30:56.829Z","createdAt":"2050-01-12T06:30:09.476Z","__v":0}""" + val USERLOCATION_MISSING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"USERLOCATION_MISSING","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"profileUserTypes":[{"subType":"deo","type":"administrator"}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"0126796199493140480", "orgName":"Staging Custodian Organization"}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + + val ROOTORG_MISSING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"ROOTORG_MISSING","programName":"mno","programExternalId":"$programExternalId","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"ba9aa220-ff1b-4717-b6ea-ace55f04f11","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":null,"name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"framework":{"board":["CBSE"]}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + + val WHEN_VALUES_ARE_EMPTY_STRING = """{"_id":"63bfa8f173f6368ebde21bbe","deleted":false,"programId":"WHEN_VALUES_ARE_EMPTY_STRING","programName":"","programExternalId":"","requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"5678","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":"","type":""}],"framework":{"board":["CBSE"]},"rootOrg": {"id":"", "orgName":""}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" + + val WHEN_VALUES_ARE_NULL = """{"_id":"Whitespace_Tab","deleted":false,"programId":"WHEN_VALUES_ARE_NULL","programName":null,"programExternalId":null,"requestForPIIConsent":true,"userRoleInformation":{"role":"HM,DEO","state":"db331a8c-b9e2-45f8-b3c0-7ec1e826b6df","district":"1dcbc362-ec4c-4559-9081-e0c2864c2931","school":"c5726207-4f9f-4f45-91f1-3e9e8e84d824"},"userId":"5678","appInformation":{"appName":"Diksha","appVersion":"5.2"},"userProfile":{"userLocations":[{"code":"2822","name":"ANANTAPUR","id":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03","type":"district","parentId":"bc75cc99-9205-463e-a722-5326857838f8"},{"code":"282262","name":"AGALI","id":"966c3be4-c125-467d-aaff-1eb1cd525923","type":"block","parentId":"2f76dcf5-e43b-4f71-a3f2-c8f19e1fce03"},{"code":"28","name":"Andhra Pradesh","id":"bc75cc99-9205-463e-a722-5326857838f8","type":"state","parentId":null},{"code":"2822620004","name":"ZPHS AGALI","id":"beb0bcf4-d7cd-4a72-8f35-be8e5b03c0d1","type":"cluster","parentId":"966c3be4-c125-467d-aaff-1eb1cd525923"},{"code":"28226200816","name":"SMT PRAMEELAMMA AND SRI KGA GUPTA EM UP SCHOOL","id":"01337588247832985613211","type":"school","parentId":"g"}],"profileUserTypes":[{"subType":null,"type":null}],"framework":{"board":["CBSE"]},"rootOrg": {"id":null, "orgName":null}},"noOfResourcesStarted":3,"updatedAt":"2023-01-12T06:30:56.829Z","createdAt":"2022-01-12T06:30:09.476Z","__v":0}""" } diff --git a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala index 17237c486..d85bf2876 100644 --- a/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala +++ b/ml-jobs/program-user-info/src/test/scala/org/sunbird/dp/spec/ProgramUserInfoTaskTestSpec.scala @@ -101,13 +101,18 @@ class ProgramUserInfoTaskTestSpec extends BaseTestSpec { class ProgramUserInfoEventSource extends SourceFunction[Event] { override def run(ctx: SourceContext[Event]) { - val eventMap1 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.EVENT) - val eventMap2 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_EMPTY) + val eventMap1 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.MODIFIED_EVENT) + val eventMap2 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_EMPTY_STRING) val eventMap3 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.WHEN_VALUES_ARE_NULL) + val eventMap4 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.NO_DATA) + val eventMap5 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.USERLOCATION_MISSING) + val eventMap6 = JSONUtil.deserialize[util.HashMap[String, Any]](EventFixture.ROOTORG_MISSING) ctx.collect(new Event(eventMap1)) ctx.collect(new Event(eventMap2)) ctx.collect(new Event(eventMap3)) - + ctx.collect(new Event(eventMap4)) + ctx.collect(new Event(eventMap5)) + ctx.collect(new Event(eventMap6)) } override def cancel() = {}