Skip to content

Commit 2852263

Browse files
Issue #LR-491 merge: ML Program UserInfoExhaust Data Product (#40)
1 parent caa091d commit 2852263

File tree

12 files changed

+1437
-2
lines changed

12 files changed

+1437
-2
lines changed

ansible/roles/lern-data-products-deploy/defaults/main.yml

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ hierarchySearchServicEndpoint: /v3/hierarchy/
3232

3333
user_table_keyspace: "sunbird"
3434
course_keyspace: "sunbird_courses"
35+
program_keyspace: "sunbird_programs"
3536
hierarchy_store_keyspace: "{{ env }}_hierarchy_store"
3637
job_request_table: "{{ env }}_job_request"
3738
dataset_metadata_table: "{{ env }}_dataset_metadata"

ansible/roles/lern-data-products-deploy/templates/common.conf.j2

+3
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,15 @@ druid.report.default.container="report-verification"
260260
sunbird.user.keyspace="{{ user_table_keyspace }}"
261261
sunbird.courses.keyspace="{{ course_keyspace }}"
262262
sunbird.content.hierarchy.keyspace="{{ cassandra_hierarchy_store_keyspace }}"
263+
sunbird.program.report.keyspace="{{ program_keyspace }}"
263264
sunbird.user.cluster.host="{{ core_cassandra_host }}"
265+
sunbird.program.report.host="{{ core_cassandra_host }}"
264266
sunbird.courses.cluster.host="{{ core_cassandra_host }}"
265267
sunbird.content.cluster.host="{{ core_cassandra_host }}"
266268
sunbird.report.cluster.host="{{ report_cassandra_cluster_host }}"
267269
sunbird.user.report.keyspace="{{ report_user_table_keyspace }}"
268270
collection.exhaust.store.prefix=""
271+
ml.exhaust.store.prefix="ml_reports"
269272
postgres.table.job_request="{{ job_request_table }}"
270273
postgres.table.dataset_metadata="{{ dataset_metadata_table }}"
271274

ansible/roles/lern-data-products-deploy/templates/lern-model-config.j2

+3
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ config() {
9393
"old-certificate-migration-job")
9494
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.audit.OldCertificateMigrationJob","modelParams":{"mode":"execute","store":"azure","sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "cert_base_path": "https://staging.sunbirded.org/", "cloud_storage_base_url": "https://sunbirdstaging.blob.core.windows.net", "cloud_store_base_path_placeholder": "CLOUD_BASE_PATH","content_cloud_storage_container": "sunbird-content-staging", "cloud_storage_cname_url": "https://obj.dev.sunbirded.org", "batchId": "0134278454483681283", "kafka_broker": "localhost:29092"}}'
9595
;;
96+
"program-user-exhaust")
97+
echo '{"search":{"type":"none"},"model":"org.sunbird.ml.exhaust.ProgramUserInfoExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","authorizedRoles":["PROGRAM_MANAGER"],"id":"ml-program-user-exhaust","keyspace_name":"sunbird_programs","table":[{"columns":["user_id","program_name","program_externalId","user_locations","user_type","user_sub_type","organisation_name","pii_consent_required"],"name":"program_enrollment","user_locations_columns":["state_name","district_name","block_name","cluster_name","school_code","school_name"]},{"name":"user","columns":["userid","firstname","lastname","email","phone","username"],"encrypted_columns":["email","phone"],"final_columns":["email","phone","username"]}],"label_mapping":{"user_id":"User UUID","username":"User Name(On user consent)","phone":"Mobile number(On user consent)","email":"Email ID(On user consent)","consentflag":"Consent Provided","consentprovideddate":"Consent Provided Date","program_name":"Program Name","program_externalId":"Program ID","state_name":"State","district_name":"District","block_name":"Block","cluster_name":"Cluster","school_code":"School Id","school_name":"School Name","user_type":"Usertype","user_sub_type":"Usersubtype","organisation_name":"Org Name"},"order_of_csv_column":["User UUID","User Name(On user consent)","Mobile number(On user consent)","Email ID(On user consent)","Consent Provided","Consent Provided Date","Program Name","Program ID","State","District","Block","Cluster","School Id","School Name","Usertype","Usersubtype","Org Name"],"sort":["District","Block","Cluster","School Id","User UUID"],"quote_column":["User Name(On user consent)","Program Name"],"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","sparkUserDbRedisPort":"{{ user_port }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","key":"ml_reports/","format":"csv"},"output":[{"params":{"file":"ml_reports/"},"to":"file"}],"parallelization":8,"appName":"Program UserInfo Exhaust"}'
98+
;;
9699
"*")
97100
echo "Unknown model code"
98101
exit 1 # Command to come out of the program with status 1

ansible/roles/lern-data-products-deploy/templates/lern-run-job.j2

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ get_report_job_model_name(){
5757
;;
5858
"old-certificate-migration-job") echo 'org.sunbird.lms.audit.OldCertificateMigrationJob'
5959
;;
60+
"program-user-exhaust") echo 'org.sunbird.ml.exhaust.ProgramUserInfoExhaustJob'
61+
;;
6062
*) echo $1
6163
;;
6264
esac

lern-data-products/src/main/resources/application.conf

+3
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,13 @@ sunbird.user.keyspace="sunbird"
164164
sunbird.courses.keyspace="sunbird_courses"
165165
sunbird.user.keyspace="sunbird"
166166
sunbird.content.hierarchy.keyspace="sunbird_courses"
167+
sunbird.program.report.keyspace="sunbird_programs"
167168
sunbird.user.cluster.host=127.0.0.1
168169
sunbird.courses.cluster.host=127.0.0.1
169170
sunbird.content.cluster.host=127.0.0.1
171+
sunbird.program.report.host=127.0.0.1
170172
collection.exhaust.store.prefix="reports/"
173+
ml.exhaust.store.prefix="ml_reports"
171174
postgres.table.job_request="job_request"
172175

173176
## Collection Exhaust Jobs Configuration - End ##

lern-data-products/src/main/resources/data.cql

+35-1
Original file line numberDiff line numberDiff line change
@@ -536,4 +536,38 @@ CREATE TABLE IF NOT EXISTS test_keyspace.report_user_enrolments(
536536
progress int,
537537
status int,
538538
PRIMARY KEY (batchid, courseid, userid))
539-
WITH COMPACTION = { 'class': 'SizeTieredCompactionStrategy', 'enabled': 'false' };
539+
WITH COMPACTION = { 'class': 'SizeTieredCompactionStrategy', 'enabled': 'false' };
540+
541+
CREATE KEYSPACE IF NOT EXISTS sunbird_programs WITH replication = {
542+
'class': 'SimpleStrategy',
543+
'replication_factor': '1'
544+
};
545+
546+
CREATE TABLE IF NOT EXISTS sunbird_programs.program_enrollment (
547+
program_id text,
548+
program_externalId text,
549+
program_name text,
550+
pii_consent_required boolean,
551+
user_id text,
552+
user_locations map<text,text>,
553+
organisation_id text,
554+
organisation_name text,
555+
user_sub_type text,
556+
user_type text,
557+
created_at date,
558+
updated_at date,
559+
PRIMARY KEY (program_id,user_id)
560+
) WITH bloom_filter_fp_chance = 0.01
561+
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
562+
AND comment = ''
563+
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
564+
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
565+
AND crc_check_chance = 1.0
566+
AND dclocal_read_repair_chance = 0.1
567+
AND default_time_to_live = 0
568+
AND gc_grace_seconds = 864000
569+
AND max_index_interval = 2048
570+
AND memtable_flush_period_in_ms = 0
571+
AND min_index_interval = 128
572+
AND read_repair_chance = 0.0
573+
AND speculative_retry = '99PERCENTILE';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.sunbird.core.exhaust
2+
3+
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
4+
import org.ekstep.analytics.framework.FrameworkContext
5+
import org.ekstep.analytics.framework.conf.AppConf
6+
import org.ekstep.analytics.framework.util.{CommonUtil, JobLogger}
7+
import org.apache.spark.sql._
8+
import org.apache.spark.sql.functions._
9+
import org.apache.spark.sql.types.StructType
10+
import org.sunbird.lms.exhaust.collection.UDFUtils
11+
case class UserData(userid: String, state: Option[String] = Option(""), district: Option[String] = Option(""), orgname: Option[String] = Option(""), firstname: Option[String] = Option(""), lastname: Option[String] = Option(""), email: Option[String] = Option(""),
12+
phone: Option[String] = Option(""), rootorgid: String, block: Option[String] = Option(""), schoolname: Option[String] = Option(""), schooludisecode: Option[String] = Option(""), board: Option[String] = Option(""), cluster: Option[String] = Option(""),
13+
usertype: Option[String] = Option(""), usersubtype: Option[String] = Option(""))
14+
object UserInfoUtil extends BaseReportsJob {
15+
private val userCacheDBSettings = Map("table" -> "user", "infer.schema" -> "true", "key.column" -> "userid");
16+
private val userConsentDBSettings = Map("table" -> "user_consent", "keyspace" -> AppConf.getConfig("sunbird.user.keyspace"), "cluster" -> "UserCluster");
17+
private val redisFormat = "org.apache.spark.sql.redis";
18+
val cassandraFormat = "org.apache.spark.sql.cassandra";
19+
def getUserCacheDF(cols: Seq[String], persist: Boolean)(implicit spark: SparkSession): DataFrame = {
20+
val schema = Encoders.product[UserData].schema
21+
val df = loadData(userCacheDBSettings, redisFormat, schema).withColumn("username", concat_ws(" ", col("firstname"), col("lastname"))).select(cols.head, cols.tail: _*)
22+
.repartition(AppConf.getConfig("exhaust.user.parallelism").toInt, col("userid"))
23+
if (persist) df.persist() else df
24+
}
25+
26+
def getUserConsentDF(filters: String, persist: Boolean)(implicit spark: SparkSession): DataFrame = {
27+
val df = loadData(userConsentDBSettings, cassandraFormat, new StructType())
28+
.where(s"""$filters""")
29+
.dropDuplicates("user_id", "object_id")
30+
.withColumn("consentflag", when(lower(col("status")) === "active", "true").otherwise("false"))
31+
.withColumn("last_updated_on", date_format(col("last_updated_on"), "dd/MM/yyyy"))
32+
.select(col("user_id").as("userid"), col("consentflag"), col("last_updated_on").as("consentprovideddate"));
33+
if (persist) df.persist() else df
34+
}
35+
36+
def decryptUserInfo(pgmUserDF: DataFrame, userCacheEncryptColNames: List[String])(implicit spark: SparkSession): DataFrame = {
37+
val schema = pgmUserDF.schema
38+
val decryptFields = schema.fields.filter(field => userCacheEncryptColNames.contains(field.name));
39+
var resultDF = decryptFields.foldLeft(pgmUserDF)((df, field) => {
40+
df.withColumn(field.name, when(col("consentflag") === "true", UDFUtils.toDecrypt(col(field.name))).otherwise(lit("")))
41+
})
42+
resultDF = resultDF.withColumn("username",when(col("consentflag") === "true", col("username")).otherwise(lit("")))
43+
resultDF
44+
}
45+
}

0 commit comments

Comments
 (0)