Skip to content

Commit 13b458e

Browse files
Issue #LR-588 merge: checking PII fields with respect to security levels (#50)
1 parent 2852263 commit 13b458e

16 files changed

+280
-116
lines changed

ansible/roles/lern-data-products-deploy/templates/lern-model-config.j2

+4-4
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,16 @@ config() {
5555
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.audit.CollectionSummaryJobV2","modelParams":{"storageKeyConfig":"druid_storage_account_key","storageSecretConfig":"druid_storage_account_secret","batchSize":50,"generateForAllBatches":true,"contentFields":["identifier","name","organisation","channel","status","keywords","createdFor","medium","subject"],"contentStatus":["Live","Unlisted","Retired"],"store":"azure","specPath":"/mount/data/analytics/scripts/collection-summary-ingestion-spec.json","druidIngestionUrl":"'$druidIngestionURL'","sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Collection Summary Report V2"}'
5656
;;
5757
"userinfo-exhaust")
58-
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"UserInfo Exhaust"}'
58+
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.UserInfoExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","csvColumns":["courseid", "collectionName", "batchid", "batchName", "userid", "username", "state", "district", "orgname", "email", "phone","consentflag", "consentprovideddate", "block", "cluster", "usertype", "usersubtype", "schooludisecode", "schoolname"]},"parallelization":8,"appName":"UserInfo Exhaust"}'
5959
;;
6060
"response-exhaust")
61-
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ResponseExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Response Exhaust"}'
61+
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ResponseExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","csvColumns":["courseid", "collectionName", "batchid", "batchName", "userid", "content_id", "contentname", "attempt_id", "last_attempted_on", "questionid","questiontype", "questiontitle", "questiondescription", "questionduration", "questionscore", "questionmaxscore", "questionoption", "questionresponse"]},"parallelization":8,"appName":"Response Exhaust"}'
6262
;;
6363
"response-exhaust-v2")
64-
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ResponseExhaustJobV2","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Response Exhaust V2"}'
64+
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ResponseExhaustJobV2","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","csvColumns":["courseid", "collectionName", "batchid", "batchName", "userid", "content_id", "contentname", "attempt_id", "last_attempted_on", "questionid", "questiontype", "questiontitle", "questiondescription", "questionduration", "questionscore", "questionmaxscore", "questionoption", "questionresponse"]},"parallelization":8,"appName":"Response Exhaust V2"}'
6565
;;
6666
"progress-exhaust")
67-
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Progress Exhaust"}'
67+
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ProgressExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","csvColumns":["courseid", "collectionName", "batchid", "batchName", "userid", "state", "district", "orgname", "schooludisecode", "schoolname", "board", "block", "cluster", "usertype", "usersubtype", "enrolleddate", "completedon", "certificatestatus", "completionPercentage"]},"parallelization":8,"appName":"Progress Exhaust"}'
6868
;;
6969
"progress-exhaust-v2")
7070
echo '{"search":{"type":"none"},"model":"org.sunbird.lms.exhaust.collection.ProgressExhaustJobV2","modelParams":{"store":"azure","mode":"OnDemand","batchFilters":["TPD"],"searchFilter":{}, "sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkUserDbRedisPort":"{{ user_port }}", "sparkCassandraConnectionHost":"{{ core_cassandra_host }}", "fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')"},"parallelization":8,"appName":"Progress Exhaust V2"}'

lern-data-products/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -500,4 +500,4 @@
500500
</plugin>
501501
</plugins>
502502
</build>
503-
</project>
503+
</project>

lern-data-products/src/main/scala/org/sunbird/core/exhaust/BaseReportsJob.scala

+2
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,6 @@ trait BaseReportsJob {
8181
StorageConfig(store, container, key, Option(storageKey), Option(storageSecret));
8282
}
8383

84+
def validateCsvColumns(piiFields: List[String], csvColumns: List[String], level: String): Boolean = true
85+
8486
}

lern-data-products/src/main/scala/org/sunbird/core/util/DataSecurityUtil.scala

+58-16
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,55 @@ object DataSecurityUtil {
2828
*/
2929
def getSecurityLevel(jobId: String, orgId: String): String = {
3030
JobLogger.log(s"getSecurityLevel jobID:: $jobId orgid:: $orgId", None, INFO)(new String())
31-
val requestBody = Map("request" -> Map("orgId" -> orgId, "key" -> "dataSecurityPolicy"))
31+
val httpResponseBody = getTenantPreferanceDetails(orgId, "dataSecurityPolicy")
32+
val responseBody = JSONUtils.deserialize[Map[String, AnyRef]](httpResponseBody)
33+
val data = responseBody.getOrElse("result", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
34+
.getOrElse("response", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
35+
.getOrElse("data", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
36+
val globalLevel = data.getOrElse("level", "").asInstanceOf[String]
37+
val jobDetail = data.getOrElse("job", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
38+
.getOrElse(jobId, Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
39+
val jobLevel = jobDetail.getOrElse("level", "").asInstanceOf[String]
40+
if (!StringUtils.isEmpty(jobLevel)) jobLevel else globalLevel
41+
}
42+
43+
/**
44+
* fetch the PII fields by calling tenant preference read API using orgId
45+
*
46+
* @param jobId
47+
* @param orgId
48+
* @return
49+
*/
50+
def getPIIFieldDetails(jobId: String, orgId: String): List[String] = {
51+
JobLogger.log(s"getSecurityLevel jobID:: $jobId orgid:: $orgId", None, INFO)(new String())
52+
val httpResponseBody = getTenantPreferanceDetails(orgId, "userPrivateFields")
53+
val responseBody = JSONUtils.deserialize[Map[String, AnyRef]](httpResponseBody)
54+
val data = responseBody.getOrElse("result", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
55+
.getOrElse("response", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
56+
.getOrElse("data", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
57+
val piiFields = data.getOrElse("piiFields", List[String]()).asInstanceOf[List[String]]
58+
piiFields
59+
}
60+
61+
/**
62+
* fetch the job security level by calling tenant preference read API using orgId
63+
*
64+
* @param jobId
65+
* @param orgId
66+
* @return
67+
*/
68+
def getTenantPreferanceDetails(orgId: String, key: String): String = {
69+
JobLogger.log(s"getTenantPreferanceDetails orgid:: $orgId", None, INFO)(new String())
70+
val requestBody = Map("request" -> Map("orgId" -> orgId, "key" -> key))
3271
val request = JSONUtils.serialize(requestBody)
3372
val headers: Map[String, String] = Map("Content-Type" -> "application/json")
3473
val readTenantPrefURL = Constants.TENANT_PREFERENCE_PRIVATE_READ_URL
35-
JobLogger.log(s"getSecurityLevel readTenantPrefURL:: $readTenantPrefURL", None, INFO)(new String())
74+
JobLogger.log(s"getTenantPreferanceDetails readTenantPrefURL:: $readTenantPrefURL", None, INFO)(new String())
3675
val httpResponse = httpUtil.post(readTenantPrefURL, request, headers)
3776
if (httpResponse.status == 200) {
38-
JobLogger.log(s"dataSecurityPolicy for org=$orgId, response body=${httpResponse.body}", None, INFO)(new String())
39-
val responseBody = JSONUtils.deserialize[Map[String, AnyRef]](httpResponse.body)
40-
val data = responseBody.getOrElse("result", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
41-
.getOrElse("response", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
42-
.getOrElse("data", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
43-
val globalLevel = data.getOrElse("level", "").asInstanceOf[String]
44-
val jobDetail = data.getOrElse("job", Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
45-
.getOrElse(jobId, Map[String, AnyRef]()).asInstanceOf[Map[String, AnyRef]]
46-
val jobLevel = jobDetail.getOrElse("level", "").asInstanceOf[String]
47-
if (!StringUtils.isEmpty(jobLevel)) jobLevel else globalLevel
77+
JobLogger.log(s"getTenantPreferanceDetails for org=$orgId, response body=${httpResponse.body}", None, INFO)(new String())
78+
val responseBody = httpResponse.body
79+
responseBody
4880
} else {
4981
JobLogger.log(s"Error response from Tenant Preferance read API for request :: $requestBody :: response is :: ${httpResponse.status} :: ${httpResponse.body}", None, ERROR)(new String())
5082
""
@@ -169,6 +201,7 @@ object DataSecurityUtil {
169201
var tempDir = ""
170202
if (level.nonEmpty) {
171203
val storageService = fc.getStorageService(storageConfig.store, storageConfig.accountKey.getOrElse(""), storageConfig.secretKey.getOrElse(""));
204+
val path = Paths.get(url)
172205
val filePrefix = storageConfig.store.toLowerCase() match {
173206
// $COVERAGE-OFF$ Disabling scoverage
174207
case "s3" =>
@@ -179,19 +212,26 @@ object DataSecurityUtil {
179212
CommonUtil.getGCloudFile(storageConfig.container, "")
180213
// $COVERAGE-ON$ for case: local
181214
case _ =>
182-
storageConfig.fileName
215+
val filePath = path.toString
216+
if (filePath.contains(storageConfig.fileName)){
217+
filePath
218+
} else {
219+
storageConfig.fileName + "/" + filePath
220+
}
221+
183222
}
184223

185224
if (!url.isEmpty ) {
186225
if(request != null) {
187226
tempDir = AppConf.getConfig("spark_output_temp_dir") + request.request_id + "/"
188227
} else {
228+
val urlSplitArr = url.split("/")
189229
if (!storageConfig.store.equals("local")) {
190-
val urlSplitArr = url.split("/")
191230
tempDir = AppConf.getConfig("spark_output_temp_dir") + urlSplitArr(3) + "/"
231+
} else {
232+
tempDir = AppConf.getConfig("spark_output_temp_dir") + urlSplitArr(4) + "/"
192233
}
193234
}
194-
val path = Paths.get(url)
195235
objKey = url.replace(filePrefix, "")
196236
localPath = tempDir + path.getFileName
197237
fc.getHadoopFileUtil().delete(conf, tempDir)
@@ -237,9 +277,11 @@ object DataSecurityUtil {
237277
if(request != null) {
238278
tempDir = AppConf.getConfig("spark_output_temp_dir") + request.request_id + "/"
239279
} else {
280+
val urlSplitArr = url.split("/")
240281
if (!storageConfig.store.equals("local")) {
241-
val urlSplitArr = url.split("/")
242282
tempDir = AppConf.getConfig("spark_output_temp_dir") + urlSplitArr(3) + "/"
283+
} else {
284+
tempDir = AppConf.getConfig("spark_output_temp_dir") + urlSplitArr(4) + "/"
243285
}
244286
}
245287
val path = Paths.get(url)

lern-data-products/src/main/scala/org/sunbird/lms/exhaust/collection/BaseCollectionExhaustJob.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}
1919
import org.joda.time.{DateTime, DateTimeZone}
2020
import org.sunbird.core.util.{DecryptUtil, RedisConnect}
2121
import org.sunbird.core.exhaust.{BaseReportsJob, JobRequest, OnDemandExhaustJob}
22-
import org.sunbird.core.util.DataSecurityUtil.{getOrgId, getSecuredExhaustFile, getSecurityLevel}
22+
import org.sunbird.core.util.DataSecurityUtil.{getOrgId, getPIIFieldDetails, getSecuredExhaustFile, getSecurityLevel}
2323
import org.sunbird.lms.exhaust.collection.ResponseExhaustJobV2.Question
2424

2525
import java.security.MessageDigest
@@ -151,6 +151,8 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh
151151
JobLogger.log(s"executeOnDemand for channel= "+ request.requested_channel, None, INFO)
152152
val orgId = request.requested_channel//getOrgId("", request.requested_channel)
153153
val level = getSecurityLevel(jobId(), orgId)
154+
val piiFields = getPIIFieldDetails(jobId(), orgId)
155+
val csvColumns = modelParams.getOrElse("csvColumns", List[String]()).asInstanceOf[List[String]]
154156
JobLogger.log(s"executeOnDemand for url = $orgId and level = $level and channel= $request.requested_channel", None, INFO)
155157
val reqOrgAndLevel = (request.request_id, orgId, level)
156158
reqOrgAndLevelDtl :+= reqOrgAndLevel
@@ -161,6 +163,10 @@ trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExh
161163
JobLogger.log("Channel details at executeOnDemand", Some(Map("channel" -> request.requested_channel, "file size" -> processedSize, "completed batches" -> processedCount)), INFO)
162164

163165
if (checkRequestProcessCriteria(processedCount, processedSize)) {
166+
if(!validateCsvColumns(piiFields, csvColumns, level)) {
167+
JobLogger.log("Request should have either of batchId, batchFilter, searchFilter or encrption key", Some(Map("requestId" -> request.request_id, "remainingRequest" -> totalRequests.getAndDecrement())), INFO)
168+
markRequestAsFailed(request, "Request Security Level is not matching with PII fields or csvColumns CSV columns configured. Please check.")
169+
}
164170
if (validateRequest(request)) {
165171
val res = processRequest(request, custodianOrgId, userCachedDF, storageConfig, requestsCompleted, orgId, level)
166172
requestsCompleted.++=(JSONUtils.deserialize[ListBuffer[ProcessedRequest]](res.processed_batches.getOrElse("[]")))

lern-data-products/src/main/scala/org/sunbird/lms/exhaust/collection/ProgressExhaustJob.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import org.apache.spark.sql.expressions.Window
55
import org.apache.spark.sql.functions._
66
import org.apache.spark.sql.types.StructType
77
import org.apache.spark.sql.{DataFrame, SparkSession}
8+
import org.ekstep.analytics.framework.Level.INFO
89
import org.ekstep.analytics.framework.conf.AppConf
9-
import org.ekstep.analytics.framework.util.JSONUtils
10+
import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger}
1011
import org.ekstep.analytics.framework.{FrameworkContext, JobConfig}
1112

1213
import scala.collection.immutable.Set
@@ -42,8 +43,7 @@ object ProgressExhaustJob extends BaseCollectionExhaustJob {
4243
private val activityAggDBSettings = Map("table" -> "user_activity_agg", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster");
4344
private val assessmentAggDBSettings = Map("table" -> "assessment_aggregator", "keyspace" -> AppConf.getConfig("sunbird.courses.keyspace"), "cluster" -> "LMSCluster");
4445
private val contentHierarchyDBSettings = Map("table" -> "content_hierarchy", "keyspace" -> AppConf.getConfig("sunbird.content.hierarchy.keyspace"), "cluster" -> "ContentCluster");
45-
46-
private val filterColumns = Seq("courseid", "collectionName", "batchid", "batchName", "userid", "state", "district", "orgname", "schooludisecode", "schoolname", "board", "block", "cluster", "usertype", "usersubtype", "enrolleddate", "completedon", "certificatestatus", "completionPercentage");
46+
//private val filterColumns = Seq("courseid", "collectionName", "batchid", "batchName", "userid", "state", "district", "orgname", "schooludisecode", "schoolname", "board", "block", "cluster", "usertype", "usersubtype", "enrolleddate", "completedon", "certificatestatus", "completionPercentage");
4747
private val columnsOrder = List("Collection Id", "Collection Name", "Batch Id", "Batch Name", "User UUID", "State", "District", "Org Name", "School Id",
4848
"School Name", "Block Name", "Cluster Name", "User Type", "User Sub Type", "Declared Board", "Enrolment Date", "Completion Date", "Certificate Status", "Progress", "Total Score")
4949
private val columnMapping = Map("courseid" -> "Collection Id", "collectionName" -> "Collection Name", "batchid" -> "Batch Id", "batchName" -> "Batch Name", "userid" -> "User UUID",
@@ -52,6 +52,7 @@ object ProgressExhaustJob extends BaseCollectionExhaustJob {
5252
"completionPercentage" -> "Progress", "total_sum_score" -> "Total Score", "certificatestatus" -> "Certificate Status")
5353

5454
override def processBatch(userEnrolmentDF: DataFrame, collectionBatch: CollectionBatch)(implicit spark: SparkSession, fc: FrameworkContext, config: JobConfig): DataFrame = {
55+
val filterColumns : List[String] = config.modelParams.get.getOrElse("csvColumns", List[String]()).asInstanceOf[List[String]]
5556
val hierarchyData = loadCollectionHierarchy(collectionBatch.collectionId)
5657
//val collectionAggDF = getCollectionAggWithModuleData(collectionBatch, hierarchyData).withColumn("batchid", lit(collectionBatch.batchId));
5758
//val enrolledUsersToBatch = updateCertificateStatus(userEnrolmentDF).select(filterColumns.head, filterColumns.tail: _*)

0 commit comments

Comments
 (0)