From 48c732b6f9487facdc060206d3eef82c9a2c1c89 Mon Sep 17 00:00:00 2001 From: Christian Klem Date: Tue, 25 Feb 2025 12:07:16 +0100 Subject: [PATCH] =?UTF-8?q?Start=20=C3=A5=20lytt=20til=20salesforce=20akti?= =?UTF-8?q?vitet=20topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Nima Jimale --- .github/workflows/main.yaml | 2 +- src/main/kotlin/no/nav/lydia/App.kt | 7 ++ .../kotlin/no/nav/lydia/NaisEnvironment.kt | 1 + .../salesforce/SalesforceAktivitet.kt | 25 ++++ .../SalesforceAktivitetKonsument.kt | 107 ++++++++++++++++++ .../SalesforceAktivitetKonsumentTest.kt | 77 +++++++++++++ 6 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitet.kt create mode 100644 src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitetKonsument.kt create mode 100644 src/test/kotlin/no/nav/lydia/container/integrasjoner/salesforce/SalesforceAktivitetKonsumentTest.kt diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 82c193b7..cd191064 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -67,7 +67,7 @@ jobs: id-token: write name: Deploy app to dev needs: build - if: github.ref == 'refs/heads/eierskapsendring' + if: github.ref == 'refs/heads/sf-aktiviteter' runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/src/main/kotlin/no/nav/lydia/App.kt b/src/main/kotlin/no/nav/lydia/App.kt index b88ae4d8..13d9d13d 100644 --- a/src/main/kotlin/no/nav/lydia/App.kt +++ b/src/main/kotlin/no/nav/lydia/App.kt @@ -92,6 +92,7 @@ import no.nav.lydia.integrasjoner.journalpost.JournalpostService import no.nav.lydia.integrasjoner.kartlegging.KartleggingSvarConsumer import no.nav.lydia.integrasjoner.kartlegging.SpørreundersøkelseHendelseConsumer import no.nav.lydia.integrasjoner.pdfgen.PiaPdfgenService +import no.nav.lydia.integrasjoner.salesforce.SalesforceAktivitetKonsument import no.nav.lydia.integrasjoner.salesforce.SalesforceClient import no.nav.lydia.integrasjoner.ssb.NæringsDownloader import no.nav.lydia.integrasjoner.ssb.NæringsRepository @@ -362,6 +363,12 @@ fun startLydiaBackend() { run() }.also { HelseMonitor.leggTilHelsesjekk(it) } + SalesforceAktivitetKonsument().apply { + create(kafka = naisEnv.kafka) + run() + } +// .also { HelseMonitor.leggTilHelsesjekk(it) } // TODO: uncomment + embeddedServer(Netty, port = 8080) { lydiaRestApi( naisEnv = naisEnv, diff --git a/src/main/kotlin/no/nav/lydia/NaisEnvironment.kt b/src/main/kotlin/no/nav/lydia/NaisEnvironment.kt index 88dd1d25..fdd9cfce 100644 --- a/src/main/kotlin/no/nav/lydia/NaisEnvironment.kt +++ b/src/main/kotlin/no/nav/lydia/NaisEnvironment.kt @@ -199,6 +199,7 @@ enum class Topic( "lydia-api-statistikk-virksomhet-gradering-consumer", ), JOBBLYTTER_TOPIC("pia.jobblytter-v1", "lydia-api-jobblytter-consumer"), + SALESFORCE_AKTIVITET_TOPIC("team-dialog.employer-activity", "lydia-api-salesforce-aktivitet-consumer"), ; val konsumentGruppe diff --git a/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitet.kt b/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitet.kt new file mode 100644 index 00000000..bd00c783 --- /dev/null +++ b/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitet.kt @@ -0,0 +1,25 @@ +package no.nav.lydia.integrasjoner.salesforce + +import kotlinx.serialization.Serializable + +/** + * Dokumentert på confluence https://confluence.adeo.no/display/PTC/Deling+av+data+via+Kafka + */ +@Serializable +data class SalesforceAktivitet( + val Id__c: String, // -- Id til aktivitet i SF + val ActivityCreatedDate__c: String, // -- Når aktiviteten ble opprettet i SF + val EventObject__c: String, // -- Objekttype i SF (Task ; Event) + val TaskEvent__c: String, // -- Hva slags aktivitet (Møte ; Oppgave) + val EventType__c: String, // -- Type hendelse (Created ; Updated ; Deleted ; Undeleted) + val Type__c: String?, // -- Hvilken kanal er brukt (Call ; Email ; SMS ; Meeting ; Other) + val ActivityDate__c: String?, // -- Planlagt tid for aktivitet + val Status__c: String?, // -- Status på aktivitet (Åpen ; Fullført) +// val Subject__c: String?, // -- Fritekst emne for aktivitet + val AccountOrgNumber__c: String?, // -- Orgnummer aktiviteten gjelder + val ActivityType__c: String?, // -- Aktivitetstype + val IACaseNumber__c: String?, // -- IA saksnummer + val IACooperationId__c: String?, // -- Samarbeidsid + val Service__c: String?, // -- Plan - Tema + val IASubtheme__c: String?, // -- Plan - Undertema +) diff --git a/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitetKonsument.kt b/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitetKonsument.kt new file mode 100644 index 00000000..7eb355a7 --- /dev/null +++ b/src/main/kotlin/no/nav/lydia/integrasjoner/salesforce/SalesforceAktivitetKonsument.kt @@ -0,0 +1,107 @@ +package no.nav.lydia.integrasjoner.salesforce + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.json.Json +import no.nav.lydia.Kafka +import no.nav.lydia.Topic +import no.nav.lydia.appstatus.Helse +import no.nav.lydia.appstatus.Helsesjekk +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.errors.RetriableException +import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.serialization.StringDeserializer +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.time.Duration +import kotlin.coroutines.CoroutineContext + +class SalesforceAktivitetKonsument : + CoroutineScope, + Helsesjekk { + private lateinit var job: Job + private lateinit var kafka: Kafka + private lateinit var kafkaConsumer: KafkaConsumer + private val topic = Topic.SALESFORCE_AKTIVITET_TOPIC + private val json = Json { + ignoreUnknownKeys = true + } + + override val coroutineContext: CoroutineContext + get() = Dispatchers.IO + job + + init { + Runtime.getRuntime().addShutdownHook(Thread(this::cancel)) + } + + fun create(kafka: Kafka) { + logger.info("Creating kafka consumer job for topic '${topic.navn}' i groupId '${topic.konsumentGruppe}'") + this.job = Job() + this.kafka = kafka + this.kafkaConsumer = KafkaConsumer( + this.kafka.consumerProperties(consumerGroupId = topic.konsumentGruppe), + StringDeserializer(), + StringDeserializer(), + ) + logger.info("Created kafka consumer job for topic '${topic.navn}' i groupId '${topic.konsumentGruppe}'") + } + + fun run() { + launch { + kafkaConsumer.use { consumer -> + try { + consumer.subscribe(listOf(topic.navn)) + logger.info("Kafka consumer subscribed to topic '${topic.navn}' of groupId '${topic.konsumentGruppe}' )' in $consumer") + + while (job.isActive) { + try { + val records = consumer.poll(Duration.ofSeconds(1)) + if (!records.isEmpty) { + val aktiviteter = records.map { + json.decodeFromString(it.value()) + }.filter { + !it.IACaseNumber__c.isNullOrBlank() + } + + aktiviteter.forEach { + logger.info("Hentet aktivitet. id: '${it.Id__c}', type: '${it.TaskEvent__c}', saksnummer: '${it.IACaseNumber__c}'") + } + logger.info("Behandlet ${records.count()} meldinger i topic '${topic.navn}'). ${aktiviteter.size} er knyttet til et saksnr") +// consumer.commitSync() + } + } catch (e: RetriableException) { + logger.warn("Had a retriable exception in $consumer (topic '${topic.navn}'), retrying", e) + } + delay(kafka.consumerLoopDelay) + } + } catch (e: WakeupException) { + logger.info("$consumer (topic '${topic.navn}') is shutting down...") + } catch (e: Exception) { + logger.error("Exception is shutting down kafka listner i $consumer (topic '${topic.navn}')", e) + throw e + } + } + } + } + + private fun cancel() = + runBlocking { + logger.info("Stopping kafka consumer job for topic '${topic.navn}'") + kafkaConsumer.wakeup() + job.cancelAndJoin() + logger.info("Stopped kafka consumer job for topic '${topic.navn}'") + } + + private fun isRunning() = job.isActive + + override fun helse() = if (isRunning()) Helse.UP else Helse.DOWN + + companion object { + private val logger: Logger = LoggerFactory.getLogger(this::class.java) + } +} diff --git a/src/test/kotlin/no/nav/lydia/container/integrasjoner/salesforce/SalesforceAktivitetKonsumentTest.kt b/src/test/kotlin/no/nav/lydia/container/integrasjoner/salesforce/SalesforceAktivitetKonsumentTest.kt new file mode 100644 index 00000000..3590ddb7 --- /dev/null +++ b/src/test/kotlin/no/nav/lydia/container/integrasjoner/salesforce/SalesforceAktivitetKonsumentTest.kt @@ -0,0 +1,77 @@ +package no.nav.lydia.container.integrasjoner.salesforce + +import no.nav.lydia.Topic +import no.nav.lydia.helper.TestContainerHelper +import no.nav.lydia.helper.TestContainerHelper.Companion.shouldContainLog +import no.nav.lydia.helper.TestContainerHelper.Companion.shouldNotContainLog +import kotlin.test.Test + +class SalesforceAktivitetKonsumentTest { + @Test + fun `konsumerer meldinger på sf-aktivitet topic`() { + TestContainerHelper.kafkaContainerHelper.sendOgVentTilKonsumert( + "nøkkel", + melding(id = "id", saksnummer = "saksnummer"), + Topic.SALESFORCE_AKTIVITET_TOPIC, + ) + + TestContainerHelper.lydiaApiContainer shouldContainLog "Hentet aktivitet. id: 'id', type: 'Oppgave', saksnummer: 'saksnummer'".toRegex() + } + + @Test + fun `skal ikke behandle meldinger uten ia-saksnummer`() { + TestContainerHelper.kafkaContainerHelper.sendOgVentTilKonsumert( + "nøkkel", + melding(id = "id2", saksnummer = null), + Topic.SALESFORCE_AKTIVITET_TOPIC, + ) + + TestContainerHelper.lydiaApiContainer shouldNotContainLog "Hentet aktivitet. id: 'id2', type: 'oppgave'".toRegex() + } + + private fun melding( + id: String, + saksnummer: String? = "saksnummer", + ) = """ + { + "CreatedDate": "2025-02-24T18:07:47.467Z", + "CreatedById": "createdbyid", + "EventObject__c": "Task", + "EventType__c": "Created", + "AccountNavUnit__c": "", + "AccountOrgNumber__c": "123456789", + "AccountOrgType__c": "", + "AccountParentId__c": "", + "AccountParentOrgNumber__c": "987654321", + "ActivityDate__c": "2025-02-26T00:00:00Z", + "ActivityType__c": "Prioritert IA (Fia)", + "DurationInMinutes__c": null, + "EndDateTime__c": null, + "IACaseNumber__c": "$saksnummer", + "IACooperationId__c": "", + "IASubtheme__c": "Sykefraværsrutiner", + "Id__c": "$id", + "LastModifiedDate__c": "2025-02-24T18:07:47Z", + "Priority__c": "Normal", + "RecordTypeId__c": "rectypid", + "RecordTypeName__c": "IA_task", + "Region__c": "Oslo", + "ReminderDateTime__c": "2025-02-26T07:00:00Z", + "Service__c": "Sykefraværsarbeid", + "StartDateTime__c": null, + "Status__c": "Fullført", + "Subject__c": "Test for Pia", + "TaskEvent__c": "Oppgave", + "CompletedDate__c": null, + "Type__c": "", + "Unit__c": "NAV IT", + "UserNavUnit__c": "2940", + "WhatId__c": "what", + "WhoId__c": "", + "ActivityCreatedDate__c": "2025-02-24T18:07:44Z", + "DeletedDate__c": null, + "LastModifiedById__c": "00xxx", + "OwnerId__c": "00xxx" + } + """.trimIndent() +}