Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LR-285 User detail ml-jobs for ProgramUser Info #103

Merged
merged 7 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dp-core/src/main/resources/base-config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ lms-cassandra {
host = "localhost"
port = "9042"
isMultiDCEnabled = false
}
}

ml-cassandra {
host = "localhost"
port = "9042"
isMultiDCEnabled = false
}
6 changes: 6 additions & 0 deletions dp-core/src/test/resources/base-test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ lms-cassandra {
port = 9142
isMultiDCEnabled = false
}

ml-cassandra {
host = "localhost"
port = 9142
isMultiDCEnabled = false
}
18 changes: 18 additions & 0 deletions kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ middleware_cassandra_user_activity_agg_table: user_activity_agg
user_cache_updater_job_consumer_parallelism: 1
user_cache_updater_job_parallelism: 1

### Program-User-Info Job vars
programuserinfo_consumer_parallelism: 1
programuserinfo_downstream_parallelism: 1
programuserinfo_programuser_parallelism: 1
middleware_cassandra_programuserinfo_keyspace: sunbird_programs
middleware_cassandra_programuserinfo_table: program_enrollment

### to be removed
job_classname: ""

Expand Down Expand Up @@ -194,6 +201,16 @@ flink_job_names:
taskslots: 1
cpu_requests: 0.3
scale_enabled: false
program-user-info:
job_class_name: 'org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask'
replica: 1
jobmanager_memory: 1024m
taskmanager_memory: 1024m
taskmanager_process_memory: 1700m
jobmanager_process_memory: 1600m
taskslots: 1
cpu_requests: 0.3
scale_enabled: false


### Global vars
Expand All @@ -205,3 +222,4 @@ service_monitor_enabled: true
### controlling the flink jobs log level
flink_jobs_console_log_level: INFO
flink_libraries_log_level: ERROR

36 changes: 36 additions & 0 deletions kubernetes/helm_charts/datapipeline_jobs/values.j2
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ scale_properties:
scale_target_value: {{ flink_job_names['assessment-aggregator'].scale_target_value | default(0) }}
min_replica: {{ flink_job_names['assessment-aggregator'].min_replica | default(1) }}
max_replica: {{ flink_job_names['assessment-aggregator'].max_replica | default(2) }}
program-user-info:
enabled: {{ flink_job_names['program-user-info'].scale_enabled | lower}}
scale_target_value: {{ flink_job_names['program-user-info'].scale_target_value | default(0) }}
min_replica: {{ flink_job_names['program-user-info'].min_replica | default(1) }}
max_replica: {{ flink_job_names['program-user-info'].max_replica | default(2) }}

jobmanager:
rpc_port: {{ jobmanager_rpc_port }}
Expand Down Expand Up @@ -534,3 +539,34 @@ user-cache-updater-v2:
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

program-user-info:
program-user-info: |+
include file("/data/flink/conf/base-config.conf")
kafka {
producer.broker-servers = "{{ kafka_brokers }}"
consumer.broker-servers = "{{ kafka_brokers }}"
zookeeper = "{{ zookeepers }}"
input.topic = {{ kafka_topic_programuser_info }}
groupId = {{ kafka_group_programuser_info }}
}
task {
consumer.parallelism = {{ programuserinfo_consumer_parallelism }}
downstream.parallelism = {{ programuserinfo_downstream_parallelism }}
programuser {
parallelism = {{ programuserinfo_programuser_parallelism }}
}

}
ml-cassandra {
keyspace = "{{ middleware_cassandra_programuserinfo_keyspace }}"
table = "{{ middleware_cassandra_programuserinfo_table }}"
}

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['program-user-info'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['program-user-info'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['program-user-info'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

51 changes: 51 additions & 0 deletions ml-jobs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.sunbird</groupId>
<artifactId>data-pipeline</artifactId>
<version>1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>ml-jobs</artifactId>
<packaging>pom</packaging>
<name>ml-jobs</name>


<modules>
<module>program-user-info</module>
</modules>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<aggregate>true</aggregate>
<highlighting>true</highlighting>
<excludedPackages>org.sunbird.incredible</excludedPackages>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>


</project>

221 changes: 221 additions & 0 deletions ml-jobs/program-user-info/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.sunbird</groupId>
<artifactId>data-pipeline</artifactId>
<version>1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>program-user-info</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>ProgramUserInfo</name>
<description>
Job to insert user pii events details to cassandra
</description>

<properties>
<encoding>UTF-8</encoding>
<scoverage.plugin.version>1.4.0</scoverage.plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>dp-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.sunbird</groupId>
<artifactId>dp-core</artifactId>
<version>1.0.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<version>3.0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.3.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>0.13.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fiftyonred</groupId>
<artifactId>mock-jedis</artifactId>
<version>0.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>3.11.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.sunbird.dp.userinfo.task.ProgramUserInfoStreamTask</mainClass>
</transformer>
<!-- append default configs -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<source>${java.target.runtime}</source>
<target>${java.target.runtime}</target>
<scalaVersion>${scala.maj.version}</scalaVersion>
<checkMultipleScalaVersions>false</checkMultipleScalaVersions>
</configuration>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>

<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>program-user-info-testsuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scoverage</groupId>
<artifactId>scoverage-maven-plugin</artifactId>
<version>${scoverage.plugin.version}</version>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<aggregate>true</aggregate>
<highlighting>true</highlighting>
</configuration>
</plugin>
</plugins>
</build>

</project>

Loading