Skip to content

Commit

Permalink
Merge pull request #328 from camunda-community-hub/55-process-subscri…
Browse files Browse the repository at this point in the history
…ption

feat: Add subscriptions for process and process instance updates
  • Loading branch information
saig0 authored Jan 27, 2023
2 parents e7c9b68 + 81bce32 commit fedaf80
Show file tree
Hide file tree
Showing 23 changed files with 1,139 additions and 578 deletions.
5 changes: 0 additions & 5 deletions app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
<artifactId>hazelcast-importer</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ spring:
hibernate:
ddl-auto: create


# enable GraphiQL inspection tool by default
graphql:
websocket:
path: /graphql

# enable GraphiQL inspection tool by default
graphiql:
enabled: true
186 changes: 96 additions & 90 deletions data/pom.xml
Original file line number Diff line number Diff line change
@@ -1,93 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zeebe.zeeqs</groupId>
<artifactId>zeeqs-root</artifactId>
<version>2.6.1-SNAPSHOT</version>
</parent>

<artifactId>data</artifactId>
<name>ZeeQS - Data</name>

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-bpmn-model</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default</id>
<goals>
<goal>test</goal>
</goals>
</execution>
<!-- run the tests for each supported database -->
<execution>
<id>postgres</id>
<goals>
<goal>test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<spring.profiles.active>postgres-docker</spring.profiles.active>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zeebe.zeeqs</groupId>
<artifactId>zeeqs-root</artifactId>
<version>2.6.1-SNAPSHOT</version>
</parent>

<artifactId>data</artifactId>
<name>ZeeQS - Data</name>

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-bpmn-model</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>default</id>
<goals>
<goal>test</goal>
</goals>
</execution>
<!-- run the tests for each supported database -->
<execution>
<id>postgres</id>
<goals>
<goal>test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<spring.profiles.active>postgres-docker</spring.profiles.active>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Incident(
@Lob val errorMessage: String,
val processInstanceKey: Long,
val elementInstanceKey: Long,
val processDefinitionKey: Long,
val jobKey: Long?
) {

Expand Down
3 changes: 2 additions & 1 deletion data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ data class Job(
val position: Long,
val jobType: String,
val processInstanceKey: Long,
val elementInstanceKey: Long
val elementInstanceKey: Long,
val processDefinitionKey: Long
) {

@Enumerated(EnumType.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Variable(
val position: Long,
val name: String,
val processInstanceKey: Long,
val processDefinitionKey: Long,
val scopeKey: Long,
@Lob @Column(name = "value_") var value: String,
var timestamp: Long
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.zeebe.zeeqs.data.reactive

import io.zeebe.zeeqs.data.entity.*
import org.springframework.stereotype.Component
import java.util.function.Consumer

@Component
class DataUpdatesPublisher {

private val processListeners = mutableListOf<Consumer<Process>>()
private val processInstanceListeners = mutableListOf<Consumer<ProcessInstance>>()
private val elementInstanceListeners = mutableListOf<Consumer<ElementInstance>>()
private val variableListeners = mutableListOf<Consumer<Variable>>()
private val incidentListeners = mutableListOf<Consumer<Incident>>()
private val jobListeners = mutableListOf<Consumer<Job>>()

fun onProcessUpdated(process: Process) {
processListeners.forEach { it.accept(process) }
}

fun onProcessInstanceUpdated(processInstance: ProcessInstance) {
processInstanceListeners.forEach { it.accept(processInstance) }
}

fun onElementInstanceUpdated(elementInstance: ElementInstance) {
elementInstanceListeners.forEach { it.accept(elementInstance) }
}

fun onVariableUpdated(variable: Variable) {
variableListeners.forEach { it.accept(variable) }
}

fun onIncidentUpdated(incident: Incident) {
incidentListeners.forEach { it.accept(incident) }
}

fun onJobUpdated(job: Job) {
jobListeners.forEach { it.accept(job) }
}

fun registerProcessListener(listener: Consumer<Process>) {
processListeners.add(listener)
}

fun registerProcessInstanceListener(listener: Consumer<ProcessInstance>) {
processInstanceListeners.add(listener)
}

fun registerElementInstanceListener(listener: Consumer<ElementInstance>) {
elementInstanceListeners.add(listener)
}

fun registerVariableListener(listener: Consumer<Variable>) {
variableListeners.add(listener)
}

fun registerIncidentListener(listener: Consumer<Incident>) {
incidentListeners.add(listener)
}

fun registerJobListener(listener: Consumer<Job>) {
jobListeners.add(listener)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.zeebe.zeeqs.data.reactive

import io.zeebe.zeeqs.data.entity.Process
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux

@Component
class DataUpdatesSubscription(private val publisher: DataUpdatesPublisher) {

fun processSubscription(): Flux<Process> {
return Flux.create { sink ->
publisher.registerProcessListener { sink.next(it) }
}
}

fun processInstanceUpdateSubscription(): Flux<ProcessInstanceUpdate> {
return Flux.create { sink ->
publisher.registerProcessInstanceListener {
sink.next(ProcessInstanceUpdate(
processInstanceKey = it.key,
processKey = it.processDefinitionKey,
updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE
))
}

publisher.registerElementInstanceListener {
sink.next(ProcessInstanceUpdate(
processInstanceKey = it.processInstanceKey,
processKey = it.processDefinitionKey,
updateType = ProcessInstanceUpdateType.ELEMENT_INSTANCE
))
}

publisher.registerVariableListener {
sink.next(ProcessInstanceUpdate(
processInstanceKey = it.processInstanceKey,
processKey = it.processDefinitionKey,
updateType = ProcessInstanceUpdateType.VARIABLE
))
}

publisher.registerIncidentListener {
sink.next(ProcessInstanceUpdate(
processInstanceKey = it.processInstanceKey,
processKey = it.processDefinitionKey,
updateType = ProcessInstanceUpdateType.INCIDENT
))
}

publisher.registerJobListener {
sink.next(ProcessInstanceUpdate(
processInstanceKey = it.processInstanceKey,
processKey = it.processDefinitionKey,
updateType = ProcessInstanceUpdateType.JOB
))
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.zeebe.zeeqs.data.reactive

data class ProcessInstanceUpdate(
val processInstanceKey: Long,
val processKey: Long,
val updateType: ProcessInstanceUpdateType
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.zeebe.zeeqs.data.reactive

enum class ProcessInstanceUpdateType {
PROCESS_INSTANCE_STATE,
ELEMENT_INSTANCE,
VARIABLE,
INCIDENT,
JOB
}
Loading

0 comments on commit fedaf80

Please sign in to comment.