Skip to content

Commit

Permalink
Start å lytt til salesforce aktivitet topic
Browse files Browse the repository at this point in the history
Co-authored-by: Nima Jimale <[email protected]>
  • Loading branch information
klechr and nimajimale committed Feb 25, 2025
1 parent ac35174 commit 48c732b
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:
id-token: write
name: Deploy app to dev
needs: build
if: github.ref == 'refs/heads/eierskapsendring'
if: github.ref == 'refs/heads/sf-aktiviteter'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
7 changes: 7 additions & 0 deletions src/main/kotlin/no/nav/lydia/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import no.nav.lydia.integrasjoner.journalpost.JournalpostService
import no.nav.lydia.integrasjoner.kartlegging.KartleggingSvarConsumer
import no.nav.lydia.integrasjoner.kartlegging.SpørreundersøkelseHendelseConsumer
import no.nav.lydia.integrasjoner.pdfgen.PiaPdfgenService
import no.nav.lydia.integrasjoner.salesforce.SalesforceAktivitetKonsument
import no.nav.lydia.integrasjoner.salesforce.SalesforceClient
import no.nav.lydia.integrasjoner.ssb.NæringsDownloader
import no.nav.lydia.integrasjoner.ssb.NæringsRepository
Expand Down Expand Up @@ -362,6 +363,12 @@ fun startLydiaBackend() {
run()
}.also { HelseMonitor.leggTilHelsesjekk(it) }

SalesforceAktivitetKonsument().apply {
create(kafka = naisEnv.kafka)
run()
}
// .also { HelseMonitor.leggTilHelsesjekk(it) } // TODO: uncomment

embeddedServer(Netty, port = 8080) {
lydiaRestApi(
naisEnv = naisEnv,
Expand Down
1 change: 1 addition & 0 deletions src/main/kotlin/no/nav/lydia/NaisEnvironment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ enum class Topic(
"lydia-api-statistikk-virksomhet-gradering-consumer",
),
JOBBLYTTER_TOPIC("pia.jobblytter-v1", "lydia-api-jobblytter-consumer"),
SALESFORCE_AKTIVITET_TOPIC("team-dialog.employer-activity", "lydia-api-salesforce-aktivitet-consumer"),
;

val konsumentGruppe
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package no.nav.lydia.integrasjoner.salesforce

import kotlinx.serialization.Serializable

/**
* Dokumentert på confluence https://confluence.adeo.no/display/PTC/Deling+av+data+via+Kafka
*/
@Serializable
data class SalesforceAktivitet(
val Id__c: String, // -- Id til aktivitet i SF
val ActivityCreatedDate__c: String, // -- Når aktiviteten ble opprettet i SF
val EventObject__c: String, // -- Objekttype i SF (Task ; Event)
val TaskEvent__c: String, // -- Hva slags aktivitet (Møte ; Oppgave)
val EventType__c: String, // -- Type hendelse (Created ; Updated ; Deleted ; Undeleted)
val Type__c: String?, // -- Hvilken kanal er brukt (Call ; Email ; SMS ; Meeting ; Other)
val ActivityDate__c: String?, // -- Planlagt tid for aktivitet
val Status__c: String?, // -- Status på aktivitet (Åpen ; Fullført)
// val Subject__c: String?, // -- Fritekst emne for aktivitet
val AccountOrgNumber__c: String?, // -- Orgnummer aktiviteten gjelder
val ActivityType__c: String?, // -- Aktivitetstype
val IACaseNumber__c: String?, // -- IA saksnummer
val IACooperationId__c: String?, // -- Samarbeidsid
val Service__c: String?, // -- Plan - Tema
val IASubtheme__c: String?, // -- Plan - Undertema
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package no.nav.lydia.integrasjoner.salesforce

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import no.nav.lydia.Kafka
import no.nav.lydia.Topic
import no.nav.lydia.appstatus.Helse
import no.nav.lydia.appstatus.Helsesjekk
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.errors.RetriableException
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import kotlin.coroutines.CoroutineContext

class SalesforceAktivitetKonsument :
CoroutineScope,
Helsesjekk {
private lateinit var job: Job
private lateinit var kafka: Kafka
private lateinit var kafkaConsumer: KafkaConsumer<String, String>
private val topic = Topic.SALESFORCE_AKTIVITET_TOPIC
private val json = Json {
ignoreUnknownKeys = true
}

override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + job

init {
Runtime.getRuntime().addShutdownHook(Thread(this::cancel))
}

fun create(kafka: Kafka) {
logger.info("Creating kafka consumer job for topic '${topic.navn}' i groupId '${topic.konsumentGruppe}'")
this.job = Job()
this.kafka = kafka
this.kafkaConsumer = KafkaConsumer(
this.kafka.consumerProperties(consumerGroupId = topic.konsumentGruppe),
StringDeserializer(),
StringDeserializer(),
)
logger.info("Created kafka consumer job for topic '${topic.navn}' i groupId '${topic.konsumentGruppe}'")
}

fun run() {
launch {
kafkaConsumer.use { consumer ->
try {
consumer.subscribe(listOf(topic.navn))
logger.info("Kafka consumer subscribed to topic '${topic.navn}' of groupId '${topic.konsumentGruppe}' )' in $consumer")

while (job.isActive) {
try {
val records = consumer.poll(Duration.ofSeconds(1))
if (!records.isEmpty) {
val aktiviteter = records.map {
json.decodeFromString<SalesforceAktivitet>(it.value())
}.filter {
!it.IACaseNumber__c.isNullOrBlank()
}

aktiviteter.forEach {
logger.info("Hentet aktivitet. id: '${it.Id__c}', type: '${it.TaskEvent__c}', saksnummer: '${it.IACaseNumber__c}'")
}
logger.info("Behandlet ${records.count()} meldinger i topic '${topic.navn}'). ${aktiviteter.size} er knyttet til et saksnr")
// consumer.commitSync()
}
} catch (e: RetriableException) {
logger.warn("Had a retriable exception in $consumer (topic '${topic.navn}'), retrying", e)
}
delay(kafka.consumerLoopDelay)
}
} catch (e: WakeupException) {
logger.info("$consumer (topic '${topic.navn}') is shutting down...")
} catch (e: Exception) {
logger.error("Exception is shutting down kafka listner i $consumer (topic '${topic.navn}')", e)
throw e
}
}
}
}

private fun cancel() =
runBlocking {
logger.info("Stopping kafka consumer job for topic '${topic.navn}'")
kafkaConsumer.wakeup()
job.cancelAndJoin()
logger.info("Stopped kafka consumer job for topic '${topic.navn}'")
}

private fun isRunning() = job.isActive

override fun helse() = if (isRunning()) Helse.UP else Helse.DOWN

companion object {
private val logger: Logger = LoggerFactory.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package no.nav.lydia.container.integrasjoner.salesforce

import no.nav.lydia.Topic
import no.nav.lydia.helper.TestContainerHelper
import no.nav.lydia.helper.TestContainerHelper.Companion.shouldContainLog
import no.nav.lydia.helper.TestContainerHelper.Companion.shouldNotContainLog
import kotlin.test.Test

class SalesforceAktivitetKonsumentTest {
@Test
fun `konsumerer meldinger på sf-aktivitet topic`() {
TestContainerHelper.kafkaContainerHelper.sendOgVentTilKonsumert(
"nøkkel",
melding(id = "id", saksnummer = "saksnummer"),
Topic.SALESFORCE_AKTIVITET_TOPIC,
)

TestContainerHelper.lydiaApiContainer shouldContainLog "Hentet aktivitet. id: 'id', type: 'Oppgave', saksnummer: 'saksnummer'".toRegex()
}

@Test
fun `skal ikke behandle meldinger uten ia-saksnummer`() {
TestContainerHelper.kafkaContainerHelper.sendOgVentTilKonsumert(
"nøkkel",
melding(id = "id2", saksnummer = null),
Topic.SALESFORCE_AKTIVITET_TOPIC,
)

TestContainerHelper.lydiaApiContainer shouldNotContainLog "Hentet aktivitet. id: 'id2', type: 'oppgave'".toRegex()
}

private fun melding(
id: String,
saksnummer: String? = "saksnummer",
) = """
{
"CreatedDate": "2025-02-24T18:07:47.467Z",
"CreatedById": "createdbyid",
"EventObject__c": "Task",
"EventType__c": "Created",
"AccountNavUnit__c": "",
"AccountOrgNumber__c": "123456789",
"AccountOrgType__c": "",
"AccountParentId__c": "",
"AccountParentOrgNumber__c": "987654321",
"ActivityDate__c": "2025-02-26T00:00:00Z",
"ActivityType__c": "Prioritert IA (Fia)",
"DurationInMinutes__c": null,
"EndDateTime__c": null,
"IACaseNumber__c": "$saksnummer",
"IACooperationId__c": "",
"IASubtheme__c": "Sykefraværsrutiner",
"Id__c": "$id",
"LastModifiedDate__c": "2025-02-24T18:07:47Z",
"Priority__c": "Normal",
"RecordTypeId__c": "rectypid",
"RecordTypeName__c": "IA_task",
"Region__c": "Oslo",
"ReminderDateTime__c": "2025-02-26T07:00:00Z",
"Service__c": "Sykefraværsarbeid",
"StartDateTime__c": null,
"Status__c": "Fullført",
"Subject__c": "Test for Pia",
"TaskEvent__c": "Oppgave",
"CompletedDate__c": null,
"Type__c": "",
"Unit__c": "NAV IT",
"UserNavUnit__c": "2940",
"WhatId__c": "what",
"WhoId__c": "",
"ActivityCreatedDate__c": "2025-02-24T18:07:44Z",
"DeletedDate__c": null,
"LastModifiedById__c": "00xxx",
"OwnerId__c": "00xxx"
}
""".trimIndent()
}

0 comments on commit 48c732b

Please sign in to comment.