Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LR-546 PII code implementation for data-products #38

Merged
merged 55 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
a6f39ae
LR-546 PII code implementation for data-products
Hari-stackroute May 5, 2023
d5231e7
Merge remote-tracking branch 'upstream/release-5.3.0' into LR-546
Hari-stackroute May 5, 2023
43aa464
LR-546 code changes for PII in reports
Hari-stackroute May 6, 2023
e10bf7f
LR-546 code changes for PII in reports
Hari-stackroute May 8, 2023
59bf476
LR-546 removed pom changes
Hari-stackroute May 8, 2023
0a88c78
LR-546 removed unnecessary test file changes
Hari-stackroute May 8, 2023
d36c914
LR-546 removed duplicate code
Hari-stackroute May 8, 2023
8accd97
LR-546 tested zip functionality
Hari-stackroute May 8, 2023
cafd0b6
LR-546 removed token details from conf
Hari-stackroute May 9, 2023
5f28dfd
LR-546 added test cases
Hari-stackroute May 10, 2023
c0dcb4b
LR-546 removed access-token from request
Hari-stackroute May 11, 2023
60d804d
LR-546 removed unused methods from HttpUtil
Hari-stackroute May 11, 2023
c686cfb
Merge remote-tracking branch 'upstream/release-5.3.0' into LR-546
Hari-stackroute May 12, 2023
212cae2
LR-546 code changes against review points
Hari-stackroute May 13, 2023
48d3faf
LR-546 code changes against review points
Hari-stackroute May 15, 2023
cb93e78
LR-546 code changes against review points
Hari-stackroute May 15, 2023
ee9f682
LR-546 code changes against review points
Hari-stackroute May 15, 2023
3adf311
LR-546 test-case fix
Hari-stackroute May 15, 2023
ebcc36f
LR-546 test-case fix-1
Hari-stackroute May 15, 2023
31b4951
LR-546 test-case fix-2
Hari-stackroute May 15, 2023
92b54eb
LR-546 code fixes for PII changes
Hari-stackroute May 16, 2023
49ce24d
LR-546 code fixes for PII changes-1
Hari-stackroute May 16, 2023
942dce4
LR-546 code fixes for PII changes-2
Hari-stackroute May 16, 2023
aec99b0
LR-546 level codes changed
Hari-stackroute May 16, 2023
aef04d0
LR-546 removed level and orgid details from JobRequest
Hari-stackroute May 16, 2023
9ee8739
LR-546 removed level and orgid details from JobRequest-1
Hari-stackroute May 16, 2023
fdbb791
LR-546 ignoring test-cases
Hari-stackroute May 16, 2023
b789597
LR-546 modified Encrypt util
Hari-stackroute May 17, 2023
0954e49
LR-546 expection with tenant preferance endpoint configuration
Hari-stackroute May 17, 2023
c1c50bf
LR-546 expection with tenant preferance endpoint configuration-1
Hari-stackroute May 17, 2023
e07afeb
LR-546 expection with tenant preferance endpoint configuration-2
Hari-stackroute May 17, 2023
8ee1d96
LR-546 expection with tenant preferance endpoint configuration-3
Hari-stackroute May 17, 2023
fcdbef1
LR-546 dev env configuration changes for spark
Hari-stackroute May 18, 2023
f0bd47a
LR-546 added logs
Hari-stackroute May 18, 2023
b3d08bb
LR-546 added logs-1
Hari-stackroute May 18, 2023
71fd18e
LR-546 added logs-2
Hari-stackroute May 18, 2023
a01d01b
LR-546 added logs-3
Hari-stackroute May 18, 2023
31f15d2
LR-546 added logs-4
Hari-stackroute May 18, 2023
6108409
LR-546 added logs-5
Hari-stackroute May 18, 2023
f4ab3c2
LR-546 added logs-6
Hari-stackroute May 18, 2023
ab105b5
LR-546 added logs-7
Hari-stackroute May 18, 2023
cdd503c
LR-546 added logs-8
Hari-stackroute May 18, 2023
6feefe6
LR-546 L3 level eccryption throwing errors
Hari-stackroute May 19, 2023
8735fec
LR-546 L3 level eccryption throwing errors-1
Hari-stackroute May 19, 2023
ce2357a
LR-546 L3 level eccryption throwing errors-2
Hari-stackroute May 19, 2023
d09d5a1
LR-546 config changes
Hari-stackroute May 19, 2023
2905ddf
LR-546 commented test-cases
Hari-stackroute May 21, 2023
865efe9
Delete BaseCollectionExhaustJob1.scala
Hari-stackroute May 21, 2023
c416c1a
LR-546 ignored test-cases
Hari-stackroute May 21, 2023
73dd8ca
Merge branch 'LR-546' of https://github.com/Hari-stackroute/data-prod…
Hari-stackroute May 21, 2023
bc801ab
LR-546 ignored test-cases-1
Hari-stackroute May 21, 2023
beaca03
LR-546 ignored test-cases-2
Hari-stackroute May 21, 2023
b380282
LR-546 ignored test-cases-2
Hari-stackroute May 21, 2023
0516264
LR-546 ignored test-cases-3
Hari-stackroute May 21, 2023
e87750c
LR-546 ignored test-cases-4
Hari-stackroute May 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ansible/inventory/env/group_vars/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,6 @@ s3_path_style_access: false
s3_https_only: false
s3_default_bucket_location: ""
s3_storage_container: ""

org_search_service_private_endpoint: "{{sunbird_learner_service_url}}/private/v2/org/search"
tenant_preferance_read_private_service_endpoint: "{{sunbird_learner_service_url}}/private/v2/org/preferences/read"
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ azure_token_client_secret="{{ media_service_azure_token_client_secret }}"
elasticsearch.service.endpoint="http://{{groups['composite-search-cluster'][0]}}:9200"
elasticsearch.index.compositesearch.name="{{ es_search_index }}"

org.search.api.url="{{ channelSearchServiceEndpoint }}"
org.search.api.key="{{ searchServiceAuthorizationToken }}"

hierarchy.search.api.url="{{ hierarchySearchServiceUrl }}"
Expand Down Expand Up @@ -311,3 +310,6 @@ sunbird.course.optionalnodes="optionalnodes"
sunbird.course.redis.host={{ groups['redisall'][0] }}
sunbird.course.redis.port=6379
sunbird.course.redis.relationCache.id=5

org.search.private.api.url="{{ org_search_service_private_endpoint }}"
tenant.pref.read.private.api.url="{{ tenant_preferance_read_private_service_endpoint }}"
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ source lern-model-config.sh
today=$(date "+%Y-%m-%d")

libs_path="{{ analytics.home }}/models-{{ model_version }}/lern-data-products-1.0"
file_path="lern{{ env }}.conf"

get_report_job_model_name(){
case "$1" in
Expand Down Expand Up @@ -72,6 +73,6 @@ echo "Starting the job - $1" >> "$DP_LOGS/$today-job-execution.log"

echo "Job modelName - $job_id" >> "$DP_LOGS/$today-job-execution.log"

nohup $SPARK_HOME/bin/spark-submit --master local[*] --jars $(echo ${libs_path}/lib/*.jar | tr ' ' ','),$MODELS_HOME/analytics-framework-2.0.jar,$MODELS_HOME/scruid_2.12-2.5.0.jar,$MODELS_HOME/batch-models-2.0.jar --class org.ekstep.analytics.job.JobExecutor $MODELS_HOME/batch-models-2.0.jar --model "$job_id" --config "$job_config$batchIds" >> "$DP_LOGS/$today-job-execution.log" 2>&1
nohup $SPARK_HOME/bin/spark-submit --conf spark.driver.extraJavaOptions="-Dconfig.file=$MODELS_HOME/$file_path" --conf spark.executor.extraJavaOptions="-Dconfig.file=$MODELS_HOME/$file_path" --master local[*] --jars $(echo ${libs_path}/lib/*.jar | tr ' ' ','),$MODELS_HOME/analytics-framework-2.0.jar,$MODELS_HOME/scruid_2.12-2.5.0.jar,$MODELS_HOME/batch-models-2.0.jar --class org.ekstep.analytics.job.JobExecutor $MODELS_HOME/batch-models-2.0.jar --model "$job_id" --config "$job_config$batchIds" >> "$DP_LOGS/$today-job-execution.log" 2>&1

echo "Job execution completed - $1" >> "$DP_LOGS/$today-job-execution.log"
echo "Job execution completed - $1" >> "$DP_LOGS/$today-job-execution.log"
16 changes: 15 additions & 1 deletion lern-data-products/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@
<version>0.7.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.moparisthebest</groupId>
<artifactId>junidecode</artifactId>
<version>0.1.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
Expand Down Expand Up @@ -461,6 +467,14 @@
</filters>
</configuration>
</plugin>
</plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
7 changes: 4 additions & 3 deletions lern-data-products/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ druid.deletesegment.path="/druid/coordinator/v1/datasources/"
druid.content.consumption.query="{\"query\":\"SELECT COUNT(*) as \\\"play_sessions_count\\\", SUM(total_time_spent) as \\\"total_time_spent\\\", dimensions_pdata_id, object_id\\nFROM \\\"summary-events\\\"\\nWHERE \\\"dimensions_mode\\\" = 'play' AND \\\"dimensions_type\\\" ='content'\\nGROUP BY object_id, dimensions_pdata_id\"}"

// TPD Configurations
org.search.api.url="https://dev.sunbirded.org/api"
org.search.api.path="/org/v1/search"
druid.host="http://localhost:8082/druid/v2"
elasticsearch.index.coursebatch.name="course-batch"
//ETB Configurations
Expand Down Expand Up @@ -202,4 +200,7 @@ redis.user.index.source.key="id" # this will be used as key for redis
cassandra.read.timeoutMS="500000"
cassandra.query.retry.count="100"
cassandra.input.consistency.level="LOCAL_QUORUM"
## user cache indexer job Configuration - end ##
## user cache indexer job Configuration - end ##

org.search.private.api.url="{{sunbird_learner_service_url}}/private/v2/org/search"
tenant.pref.read.private.api.url="{{sunbird_learner_service_url}}/private/v2/org/preferences/read"
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.sunbird.core.exception

class APIException(message: String, cause: Throwable) extends Exception(message, cause)

class ServerException(code: String, msg: String, cause: Throwable = null) extends Exception(msg, cause)
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package org.sunbird.core.exhaust

import net.lingala.zip4j.ZipFile
import net.lingala.zip4j.model.ZipParameters
import net.lingala.zip4j.model.enums.EncryptionMethod
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.functions._
Expand All @@ -11,9 +8,8 @@ import org.ekstep.analytics.framework.Level.INFO
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.{CommonUtil, JobLogger}
import org.ekstep.analytics.framework.{FrameworkContext, StorageConfig}
import org.sunbird.core.util.DataSecurityUtil.{zipAndPasswordProtect}

import java.io.File
import java.nio.file.Paths
import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.util.Properties
import java.util.concurrent.CompletableFuture
Expand All @@ -22,6 +18,7 @@ import java.util.function.Supplier
case class JobRequest(tag: String, request_id: String, job_id: String, var status: String, request_data: String, requested_by: String, requested_channel: String,
dt_job_submitted: Long, var download_urls: Option[List[String]], var dt_file_created: Option[Long], var dt_job_completed: Option[Long],
var execution_time: Option[Long], var err_message: Option[String], var iteration: Option[Int], encryption_key: Option[String], var processed_batches : Option[String] = None) {

def this() = this("", "", "", "", "", "", "", 0, None, None, None, None, None, None, None, None)
}
case class RequestStatus(channel: String, batchLimit: Long, fileLimit: Long)
Expand Down Expand Up @@ -114,32 +111,35 @@ trait OnDemandExhaustJob {

}

def saveRequests(storageConfig: StorageConfig, requests: Array[JobRequest])(implicit conf: Configuration, fc: FrameworkContext) = {
val zippedRequests = for (request <- requests) yield processRequestEncryption(storageConfig, request)
def saveRequests(storageConfig: StorageConfig, requests: Array[JobRequest], reqOrgAndLevelDtl: List[(String, String, String)])(implicit conf: Configuration, fc: FrameworkContext) = {
val zippedRequests = for (request <- requests) yield {
val reqOrgAndLevel = reqOrgAndLevelDtl.filter(_._1 == request.request_id).headOption
processRequestEncryption(storageConfig, request, reqOrgAndLevel.getOrElse("", "", ""))
}
updateRequests(zippedRequests)
}

def saveRequestAsync(storageConfig: StorageConfig, request: JobRequest)(implicit conf: Configuration, fc: FrameworkContext): CompletableFuture[JobRequest] = {
def saveRequestAsync(storageConfig: StorageConfig, request: JobRequest, reqOrgAndLevel: (String, String, String))(implicit conf: Configuration, fc: FrameworkContext): CompletableFuture[JobRequest] = {

CompletableFuture.supplyAsync(new Supplier[JobRequest]() {
override def get() : JobRequest = {
val res = CommonUtil.time(saveRequest(storageConfig, request))
val res = CommonUtil.time(saveRequest(storageConfig, request, reqOrgAndLevel))
JobLogger.log("Request is zipped", Some(Map("requestId" -> request.request_id, "timeTakenForZip" -> res._1)), INFO)
request
}
})

}

def saveRequest(storageConfig: StorageConfig, request: JobRequest)(implicit conf: Configuration, fc: FrameworkContext): Boolean = {
updateRequest(processRequestEncryption(storageConfig, request))
def saveRequest(storageConfig: StorageConfig, request: JobRequest, reqOrgAndLevel: (String, String, String))(implicit conf: Configuration, fc: FrameworkContext): Boolean = {
updateRequest(processRequestEncryption(storageConfig, request, reqOrgAndLevel))
}

def processRequestEncryption(storageConfig: StorageConfig, request: JobRequest)(implicit conf: Configuration, fc: FrameworkContext): JobRequest = {
def processRequestEncryption(storageConfig: StorageConfig, request: JobRequest, reqOrgAndLevel: (String, String, String))(implicit conf: Configuration, fc: FrameworkContext): JobRequest = {
val downloadURLs = CommonUtil.time(for (url <- request.download_urls.getOrElse(List())) yield {
if (zipEnabled())
try {
zipAndEncrypt(url, storageConfig, request)
zipAndPasswordProtect(url, storageConfig, request, null, reqOrgAndLevel._3)
url.replace(".csv", ".zip")
} catch {
case ex: Exception => ex.printStackTrace();
Expand All @@ -161,60 +161,6 @@ trait OnDemandExhaustJob {

def canZipExceptionBeIgnored(): Boolean = true

@throws(classOf[Exception])
private def zipAndEncrypt(url: String, storageConfig: StorageConfig, request: JobRequest)(implicit conf: Configuration, fc: FrameworkContext): String = {

val path = Paths.get(url);
val storageService = fc.getStorageService(storageConfig.store, storageConfig.accountKey.getOrElse(""), storageConfig.secretKey.getOrElse(""));
val tempDir = AppConf.getConfig("spark_output_temp_dir") + request.request_id + "/"
val localPath = tempDir + path.getFileName;
fc.getHadoopFileUtil().delete(conf, tempDir);
val filePrefix = storageConfig.store.toLowerCase() match {
// $COVERAGE-OFF$ Disabling scoverage
case "s3" =>
CommonUtil.getS3File(storageConfig.container, "")
case "azure" =>
CommonUtil.getAzureFile(storageConfig.container, "", storageConfig.accountKey.getOrElse("azure_storage_key"))
case "gcloud" =>
CommonUtil.getGCloudFile(storageConfig.container, "")
// $COVERAGE-ON$ for case: local
case _ =>
storageConfig.fileName
}
val objKey = url.replace(filePrefix, "");
if (storageConfig.store.equals("local")) {
fc.getHadoopFileUtil().copy(filePrefix, localPath, conf)
}
// $COVERAGE-OFF$ Disabling scoverage
else {
storageService.download(storageConfig.container, objKey, tempDir, Some(false));
}
// $COVERAGE-ON$
val zipPath = localPath.replace("csv", "zip")
val zipObjectKey = objKey.replace("csv", "zip")
val zipLocalObjKey = url.replace("csv", "zip")

request.encryption_key.map(key => {
val zipParameters = new ZipParameters();
zipParameters.setEncryptFiles(true);
zipParameters.setEncryptionMethod(EncryptionMethod.ZIP_STANDARD); // AES encryption is not supported by default with various OS.
val zipFile = new ZipFile(zipPath, key.toCharArray());
zipFile.addFile(localPath, zipParameters)
}).getOrElse({
new ZipFile(zipPath).addFile(new File(localPath));
})
val resultFile = if (storageConfig.store.equals("local")) {
fc.getHadoopFileUtil().copy(zipPath, zipLocalObjKey, conf)
}
// $COVERAGE-OFF$ Disabling scoverage
else {
storageService.upload(storageConfig.container, zipPath, zipObjectKey, Some(false), Some(0), Some(3), None);
}
// $COVERAGE-ON$
fc.getHadoopFileUtil().delete(conf, tempDir);
resultFile;
}

def markRequestAsFailed(request: JobRequest, failedMsg: String, completed_Batches: Option[String] = None): JobRequest = {
request.status = "FAILED";
request.dt_job_completed = Option(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ object Constants {
val LP_URL = AppConf.getConfig("lp.url")
val SEARCH_SERVICE_URL = AppConf.getConfig("service.search.url")
val COMPOSITE_SEARCH_URL = s"$SEARCH_SERVICE_URL" + AppConf.getConfig("service.search.path")
val ORG_SEARCH_URL: String = AppConf.getConfig("org.search.api.url") + AppConf.getConfig("org.search.api.path")
val ORG_SEARCH_API_KEY: String = AppConf.getConfig("org.search.api.key")
val USER_SEARCH_URL : String = AppConf.getConfig("user.search.api.url")
val TENANT_PREFERENCE_PRIVATE_READ_URL = AppConf.getConfig("tenant.pref.read.private.api.url")
val ORG_PRIVATE_SEARCH_URL: String = AppConf.getConfig("org.search.private.api.url")
val TEMP_DIR = AppConf.getConfig("spark_output_temp_dir")

val HIERARCHY_STORE_KEY_SPACE_NAME = AppConf.getConfig("cassandra.hierarchy_store_prefix")+"hierarchy_store"
val CONTENT_HIERARCHY_TABLE = "content_hierarchy"
Expand Down
Loading