diff --git a/pom.xml b/pom.xml index 428b3ee366..f955249d0e 100755 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,7 @@ process-infinispan-persistence-springboot process-kafka-persistence-quarkus process-kafka-quickstart-quarkus + process-kafka-multi-quarkus process-kafka-quickstart-springboot process-knative-quickstart-quarkus process-mongodb-persistence-quarkus diff --git a/process-kafka-multi-quarkus/README.md b/process-kafka-multi-quarkus/README.md new file mode 100644 index 0000000000..63444c5682 --- /dev/null +++ b/process-kafka-multi-quarkus/README.md @@ -0,0 +1,253 @@ +# Process with Kafka + +## Description + +A quickstart project that deals with traveller processing carried by rules. It illustrates +how easy it is to make the Kogito processes and rules to work with Apache Kafka sending messages to different topics. + +This example shows + +* consuming events from a Kafka topic and for each event start new process instance +* each process instance is expecting a traveller information in JSON format +* traveller is then processed by rules and based on the outcome of the processing (processed or not) traveller is + * if successfully processed traveller information is logged and then updated information is send to another Kafka topic + * if not processed traveller info is logged and then process instance finishes sending reply to a different Kafka topic + + +

+ +* Diagram Properties (top) +

+ +* Diagram Properties (bottom) +

+ +* Diagram Properties (process variables) +

+ +* Start Message +

+ +* Start Message (Assignments) +

+ +* Process Traveler Business Rule (top) +

+ +* Process Traveler Business Rule (bottom) +

+ +* Process Traveler Business Rule (Assignments) +

+ +* Process Traveler Gateway +

+ +* Process Traveler Gateway Yes Connector +

+ +* Process Traveler Gateway No Connector +

+ +* Log Traveler Script Task +

+ +* Skip Traveler Script Task +

+ +* Processed Traveler End Message +

+ +* Processed Traveler End Message (Assignments) +

+ +* Skip Traveler End +

+ + +## Infrastructure requirements + +This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost. + +* Install and Startup Kafka Server / Zookeeper + +https://kafka.apache.org/quickstart + +## Build and run + +### Prerequisites + +You will need: + - Java 11+ installed + - Environment variable JAVA_HOME set accordingly + - Maven 3.6.2+ installed + +When using native image compilation, you will also need: + - GraalVM 19.3+ installed + - Environment variable GRAALVM_HOME set accordingly + - GraalVM native image needs as well native-image extension: https://www.graalvm.org/docs/reference-manual/native-image/ + - Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too, please refer to GraalVM installation documentation for more details. + +### Compile and Run in Local Dev Mode + +```sh +mvn clean compile quarkus:dev +``` + +NOTE: With dev mode of Quarkus you can take advantage of hot reload for business assets like processes, rules, decision tables and java code. No need to redeploy or restart your running application. + +### Package and Run in JVM mode + +```sh +mvn clean package +java -jar target/quarkus-app/quarkus-run.jar +``` + +or on windows + +```sh +mvn clean package +java -jar target\quarkus-app\quarkus-run.jar +``` + +### Package and Run using Local Native Image +Note that this requires GRAALVM_HOME to point to a valid GraalVM installation + +``` +mvn clean package -Pnative +``` + +To run the generated native executable, generated in `target/`, execute + +``` +./target/process-kafka-quickstart-quarkus-runner +``` + +### OpenAPI (Swagger) documentation +[Specification at swagger.io](https://swagger.io/docs/specification/about/) + +You can take a look at the [OpenAPI definition](http://localhost:8080/openapi?format=json) - automatically generated and included in this service - to determine all available operations exposed by this service. For easy readability you can visualize the OpenAPI definition file using a UI tool like for example available [Swagger UI](https://editor.swagger.io). + +In addition, various clients to interact with this service can be easily generated using this OpenAPI definition. + +When running in either Quarkus Development or Native mode, we also leverage the [Quarkus OpenAPI extension](https://quarkus.io/guides/openapi-swaggerui#use-swagger-ui-for-development) that exposes [Swagger UI](http://localhost:8080/swagger-ui/) that you can use to look at available REST endpoints and send test requests. + +### Use the application + +To make use of this application it is as simple as putting a message on `travellers` topic with following content (cloud event format) + +* To examine ProcessedTravellers topic and verify upcoming messages will be processed + +Execute in a separate terminal session + +```sh +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processedtravellers +``` + +Execute in a separate terminal session + +```sh +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cancelledtravellers +``` + + +* Send message that should be processed to Topic + +```sh +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers +``` + +Content (cloud event format) + +```json +{ + "specversion": "0.3", + "id": "21627e26-31eb-43e7-8343-92a696fd96b1", + "source": "", + "type": "TravellersMessageDataEvent_3", + "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]", + "data": { + "firstName" : "Jan", + "lastName" : "Kowalski", + "email" : "jan.kowalski@example.com", + "nationality" : "Polish" + } +} +``` +One liner + +```json +{"specversion": "0.3","id": "21627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "travellers", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "Jan", "lastName" : "Kowalski", "email" : "jan.kowalski@example.com", "nationality" : "Polish"}} +``` + + +this will then trigger the successful processing of the traveller and put another message on `processedtravellers` topic with following content (cloud event format) + +```json +{ + "specversion": "0.3", + "id": "86f69dd6-7145-4188-aeaa-e44622eeec86", + "source": "", + "type": "TravellersMessageDataEvent_3", + "time": "2019-10-03T16:22:40.373523+02:00[Europe/Warsaw]", + "data": { + "firstName": "Jan", + "lastName": "Kowalski", + "email": "jan.kowalski@example.com", + "nationality": "Polish", + "processed": true + }, + "kogitoProcessinstanceId": "4fb091c2-82f7-4655-8687-245a4ab07483", + "kogitoParentProcessinstanceId": null, + "kogitoRootProcessinstanceId": null, + "kogitoProcessId": "Travellers", + "kogitoRootProcessId": null, + "kogitoProcessinstanceState": "1", + "kogitoReferenceId": null +} +``` + +there are bunch of extension attributes that starts with `kogito` to provide some context of the execution and the event producer. + +To take the other path of the process put following message on `travellers` topic + +* Send Message to Topic + +```sh +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers +``` + +With the following content (Cloud Event Format) + +```json +{ + "specversion": "0.3", + "id": "31627e26-31eb-43e7-8343-92a696fd96b1", + "source": "", + "type": "travellers", + "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]", + "data": { + "firstName" : "John", + "lastName" : "Doe", + "email" : "john.doe@example.com", + "nationality" : "American" + } +} +``` + +One Liner + +```json +{"specversion": "0.3","id": "31627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "travellers", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "John", "lastName" : "Doe", "email" : "john.doe@example.com", "nationality" : "American"}} +``` + +this will result in message being send to `cancelledtravelers` topic, according to this configuration + +``` +mp.messaging.outgoing.no\u0020travel.connector=smallrye-kafka +mp.messaging.outgoing.no\u0020travel.topic=cancelledtravellers +mp.messaging.outgoing.no\u0020travel.value.serializer=org.apache.kafka.common.serialization.StringSerializer +``` +## Deploying with Kogito Operator + +In the [`operator`](operator) directory you'll find the custom resources needed to deploy this example on OpenShift with the [Kogito Operator](https://docs.jboss.org/kogito/release/latest/html_single/#chap_kogito-deploying-on-openshift). diff --git a/process-kafka-multi-quarkus/docs/images/diagramProperties.png b/process-kafka-multi-quarkus/docs/images/diagramProperties.png new file mode 100644 index 0000000000..734ee934fb Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/diagramProperties.png differ diff --git a/process-kafka-multi-quarkus/docs/images/diagramProperties2.png b/process-kafka-multi-quarkus/docs/images/diagramProperties2.png new file mode 100644 index 0000000000..c1616eec89 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/diagramProperties2.png differ diff --git a/process-kafka-multi-quarkus/docs/images/diagramProperties3.png b/process-kafka-multi-quarkus/docs/images/diagramProperties3.png new file mode 100644 index 0000000000..559b69c77c Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/diagramProperties3.png differ diff --git a/process-kafka-multi-quarkus/docs/images/logTravelerScriptTask.png b/process-kafka-multi-quarkus/docs/images/logTravelerScriptTask.png new file mode 100644 index 0000000000..f1c5a745c2 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/logTravelerScriptTask.png differ diff --git a/process-kafka-multi-quarkus/docs/images/process.png b/process-kafka-multi-quarkus/docs/images/process.png new file mode 100644 index 0000000000..20ae77a77e Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/process.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule.png b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule.png new file mode 100644 index 0000000000..30d848d521 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule2.png b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule2.png new file mode 100644 index 0000000000..af27fecfaf Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRule2.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRuleAssignments.png b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRuleAssignments.png new file mode 100644 index 0000000000..d94af89f3b Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processTravelerBusinessRuleAssignments.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessage.png b/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessage.png new file mode 100644 index 0000000000..6d896e3b1d Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessage.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessageAssignments.png b/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessageAssignments.png new file mode 100644 index 0000000000..c2107a3209 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processedTravelerEndMessageAssignments.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processedTravelerGateway.png b/process-kafka-multi-quarkus/docs/images/processedTravelerGateway.png new file mode 100644 index 0000000000..a099b4d47e Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processedTravelerGateway.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processedTravelerNoConnector.png b/process-kafka-multi-quarkus/docs/images/processedTravelerNoConnector.png new file mode 100644 index 0000000000..dde668ef4f Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processedTravelerNoConnector.png differ diff --git a/process-kafka-multi-quarkus/docs/images/processedTravelerYesConnector.png b/process-kafka-multi-quarkus/docs/images/processedTravelerYesConnector.png new file mode 100644 index 0000000000..f1fba68fcb Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/processedTravelerYesConnector.png differ diff --git a/process-kafka-multi-quarkus/docs/images/skipTraveler.png b/process-kafka-multi-quarkus/docs/images/skipTraveler.png new file mode 100644 index 0000000000..57856f882e Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/skipTraveler.png differ diff --git a/process-kafka-multi-quarkus/docs/images/skipTravelerScriptTask.png b/process-kafka-multi-quarkus/docs/images/skipTravelerScriptTask.png new file mode 100644 index 0000000000..8c3509ea68 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/skipTravelerScriptTask.png differ diff --git a/process-kafka-multi-quarkus/docs/images/startMessage.png b/process-kafka-multi-quarkus/docs/images/startMessage.png new file mode 100644 index 0000000000..8812df7947 Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/startMessage.png differ diff --git a/process-kafka-multi-quarkus/docs/images/startMessageAssignments.png b/process-kafka-multi-quarkus/docs/images/startMessageAssignments.png new file mode 100644 index 0000000000..a6ef0067fd Binary files /dev/null and b/process-kafka-multi-quarkus/docs/images/startMessageAssignments.png differ diff --git a/process-kafka-multi-quarkus/operator/process-kafka-multi-quarkus.yaml b/process-kafka-multi-quarkus/operator/process-kafka-multi-quarkus.yaml new file mode 100644 index 0000000000..5ff0f4b426 --- /dev/null +++ b/process-kafka-multi-quarkus/operator/process-kafka-multi-quarkus.yaml @@ -0,0 +1,33 @@ +#Strimzi operator should be pre-installed in namespace +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoInfra +metadata: + name: kogito-kafka-infra +spec: + resource: + apiVersion: kafka.strimzi.io/v1beta1 + kind: Kafka +--- +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoBuild +metadata: + name: process-kafka-quickstart-quarkus +spec: + type: RemoteSource + #env: + # env can be used to set variables during build + #- name: MY_CUSTOM_ENV + # value: "my value" + gitSource: + contextDir: process-kafka-multi-quickstart-quarkus + uri: 'https://github.com/kiegroup/kogito-examples' + # set your maven nexus repository to speed up the build time + #mavenMirrorURL: +--- +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoRuntime +metadata: + name: process-kafka-multi-quickstart-quarkus +spec: + infra: + - kogito-kafka-infra diff --git a/process-kafka-multi-quarkus/pom.xml b/process-kafka-multi-quarkus/pom.xml new file mode 100644 index 0000000000..df572bd2ac --- /dev/null +++ b/process-kafka-multi-quarkus/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + org.kie.kogito + kogito-examples + 2.0.0-SNAPSHOT + + process-kafka-multi-quarkus + Kogito Example :: Process with Kafka and Quarkus, multiple channels + Kogito with Kafka - Quarkus, using one channel per message name + + + + org.kie.kogito + kogito-quarkus-bom + ${kogito.version} + pom + import + + + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + + + org.kie.kogito + kogito-cloudevents-quarkus-multi-addon + + + org.kie.kogito + kogito-quarkus + + + io.quarkus + quarkus-smallrye-openapi + + + io.quarkus + quarkus-kafka-client + + + org.kie.kogito + kogito-api + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.kie.kogito + kogito-test-utils + test + + + org.apache.kafka + kafka-clients + test + + + io.cloudevents + cloudevents-api + test + + + io.cloudevents + cloudevents-json-jackson + test + + + io.quarkus + quarkus-smallrye-health + + + + ${project.artifactId} + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + diff --git a/process-kafka-multi-quarkus/src/main/docker/Dockerfile.jvm b/process-kafka-multi-quarkus/src/main/docker/Dockerfile.jvm new file mode 100644 index 0000000000..2d9e7f1325 --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/docker/Dockerfile.jvm @@ -0,0 +1,26 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in JVM mode +# +# Before building the docker image run: +# +# mvn package +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.jvm -t quarkus/process-kafka-multi-quarkus-jvm . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/process-kafka-multi-quarkus-jvm +# +### +FROM fabric8/java-alpine-openjdk8-jre +ENV JAVA_OPTIONS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager" +ENV AB_ENABLED=jmx_exporter + +COPY target/quarkus-app/lib/ /deployments/lib/ +COPY target/quarkus-app/*.jar /deployments/ +COPY target/quarkus-app/app/ /deployments/app/ +COPY target/quarkus-app/quarkus/ /deployments/quarkus/ + +ENTRYPOINT [ "/deployments/run-java.sh" ] diff --git a/process-kafka-multi-quarkus/src/main/docker/Dockerfile.native b/process-kafka-multi-quarkus/src/main/docker/Dockerfile.native new file mode 100644 index 0000000000..80ff691a65 --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/docker/Dockerfile.native @@ -0,0 +1,22 @@ +#### +# This Dockerfile is used in order to build a container that runs the Quarkus application in native (no JVM) mode +# +# Before building the docker image run: +# +# mvn package -Pnative -Dnative-image.docker-build=true +# +# Then, build the image with: +# +# docker build -f src/main/docker/Dockerfile.native -t quarkus/process-kafka-multi-quarkus . +# +# Then run the container using: +# +# docker run -i --rm -p 8080:8080 quarkus/process-kafka-multi-quarkus +# +### +FROM registry.access.redhat.com/ubi8/ubi-minimal +WORKDIR /work/ +COPY target/*-runner /work/application +RUN chmod 775 /work +EXPOSE 8080 +CMD ["./application", "-Dquarkus.http.host=0.0.0.0"] diff --git a/process-kafka-multi-quarkus/src/main/java/org/acme/travel/Traveller.java b/process-kafka-multi-quarkus/src/main/java/org/acme/travel/Traveller.java new file mode 100644 index 0000000000..75e93ab4ba --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/java/org/acme/travel/Traveller.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel; + +public class Traveller { + + private String firstName; + private String lastName; + private String email; + private String nationality; + + private boolean processed; + + public Traveller() { + + } + + public Traveller(String firstName, String lastName, String email, String nationality) { + super(); + this.firstName = firstName; + this.lastName = lastName; + this.email = email; + this.nationality = nationality; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getNationality() { + return nationality; + } + + public void setNationality(String nationality) { + this.nationality = nationality; + } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + @Override + public String toString() { + return "Traveller [firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + ", nationality=" + + nationality + ", processed=" + processed + "]"; + } + +} diff --git a/process-kafka-multi-quarkus/src/main/java/org/acme/travel/TravellerValidationService.java b/process-kafka-multi-quarkus/src/main/java/org/acme/travel/TravellerValidationService.java new file mode 100644 index 0000000000..0120d15609 --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/java/org/acme/travel/TravellerValidationService.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel; + +import org.kie.kogito.rules.*; + +public class TravellerValidationService implements RuleUnitData { + private final SingletonStore traveller = DataSource.createSingleton(); + + public SingletonStore getTraveller() { + return traveller; + } +} diff --git a/process-kafka-multi-quarkus/src/main/resources/META-INF/resources/index.html b/process-kafka-multi-quarkus/src/main/resources/META-INF/resources/index.html new file mode 100644 index 0000000000..94a44ee8ab --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/resources/META-INF/resources/index.html @@ -0,0 +1,152 @@ + + + + + kafka-quickstart - 1.0-SNAPSHOT + + + + + + +
+
+

Congratulations, you have created a new Quarkus application.

+ +

Why do you see this?

+ +

This page is served by Quarkus. The source is in + src/main/resources/META-INF/resources/index.html.

+ +

What can I do from here?

+ +

If not already done, run the application in dev mode using: mvn compile quarkus:dev. +

+
    +
  • Add REST resources, Servlets, functions and other services in src/main/java.
  • +
  • Your static assets are located in src/main/resources/META-INF/resources.
  • +
  • Configure your application in src/main/resources/application.properties. +
  • +
+ +

How do I get rid of this page?

+

Just delete the src/main/resources/META-INF/resources/index.html file.

+
+
+
+

Application

+
    +
  • GroupId: org.acme
  • +
  • ArtifactId: kafka-quickstart
  • +
  • Version: 1.0-SNAPSHOT
  • +
  • Quarkus Version: 0.18.0
  • +
+
+ +
+
+ + + + \ No newline at end of file diff --git a/process-kafka-multi-quarkus/src/main/resources/application.properties b/process-kafka-multi-quarkus/src/main/resources/application.properties new file mode 100644 index 0000000000..43fbbe8422 --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/resources/application.properties @@ -0,0 +1,22 @@ +# Packaging +# quarkus.package.type=fast-jar + +quarkus.swagger-ui.always-include=true + +kafka.bootstrap.servers=localhost:9092 + + +mp.messaging.incoming.travellers.connector=smallrye-kafka +mp.messaging.incoming.travellers.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +mp.messaging.outgoing.processedtravellers.connector=smallrye-kafka +mp.messaging.outgoing.processedtravellers.value.serializer=org.apache.kafka.common.serialization.StringSerializer + + +mp.messaging.outgoing.no\u0020travel.connector=smallrye-kafka +mp.messaging.outgoing.no\u0020travel.topic=cancelledtravellers +mp.messaging.outgoing.no\u0020travel.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +# Maximum Java heap to be used during the native image generation +quarkus.native.native-image-xmx=4g + \ No newline at end of file diff --git a/process-kafka-multi-quarkus/src/main/resources/handle-travellers.bpmn b/process-kafka-multi-quarkus/src/main/resources/handle-travellers.bpmn new file mode 100644 index 0000000000..5aeaea50cd --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/resources/handle-travellers.bpmn @@ -0,0 +1,269 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + _61646087-8A4A-44C0-8077-6D05E69CCB46 + + + traveller + _048DFBEC-837E-4F9B-9EAC-F0B5C731C563_eventInputX + + + _048DFBEC-837E-4F9B-9EAC-F0B5C731C563_eventInputX + + + + + + + + + + _EDFD7D46-79EC-4C49-B6D8-5873E9301AC7 + _61646087-8A4A-44C0-8077-6D05E69CCB46 + System.out.println("Skipping traveller " + traveller); + + + + + + + + _C5756B00-2541-4948-A7F5-4494A10E3A31 + _6ED9B3DA-9FD9-49F3-A70E-A6AB6712A628 + System.out.println("Processing traveller " + traveller); + + + + + + + + _D15CD483-D31D-42F8-A93A-AAAF44292D84 + + + _8BEA9396-93DE-4D44-8CE2-4A146464264E_eventOutputX + traveller + + + _8BEA9396-93DE-4D44-8CE2-4A146464264E_eventOutputX + + + + + + + + + + _D15CD483-D31D-42F8-A93A-AAAF44292D84 + _96F8E118-C58C-4369-89DC-2C881614FF73 + + + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerInputX + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerOutputX + + + + traveller + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerInputX + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerOutputX + traveller + + + + + + + + + _96F8E118-C58C-4369-89DC-2C881614FF73 + _C5756B00-2541-4948-A7F5-4494A10E3A31 + _EDFD7D46-79EC-4C49-B6D8-5873E9301AC7 + + + + + + + + _6ED9B3DA-9FD9-49F3-A70E-A6AB6712A628 + + + traveller + _0E0784C3-1BEC-4A51-A5E6-D7E5DA3A4402_eventInputX + + + _0E0784C3-1BEC-4A51-A5E6-D7E5DA3A4402_eventInputX + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + _4xHPsIZIEDmfcZl7SK4hOQ + _4xHPsIZIEDmfcZl7SK4hOQ + + \ No newline at end of file diff --git a/process-kafka-multi-quarkus/src/main/resources/travellers.drl b/process-kafka-multi-quarkus/src/main/resources/travellers.drl new file mode 100644 index 0000000000..e2feedd297 --- /dev/null +++ b/process-kafka-multi-quarkus/src/main/resources/travellers.drl @@ -0,0 +1,40 @@ +/** + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel +unit TravellerValidationService + +import org.acme.travel.Traveller; + + +rule "Process travellers" + +when + $traveller: /traveller[processed == false, nationality != 'American'] +then + System.out.println("Hello traveller " + $traveller); + $traveller.setProcessed( true ); + +end + +rule "Don't process travellers from US" + +when + $traveller: /traveller[processed == false, nationality == 'American'] +then + System.out.println("This system can't deal with " + $traveller.getNationality()); + $traveller.setProcessed( false ); + +end diff --git a/process-kafka-multi-quarkus/src/test/java/org/acme/travel/MultiMessagingIT.java b/process-kafka-multi-quarkus/src/test/java/org/acme/travel/MultiMessagingIT.java new file mode 100644 index 0000000000..1249f50d3a --- /dev/null +++ b/process-kafka-multi-quarkus/src/test/java/org/acme/travel/MultiMessagingIT.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import javax.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.kie.kogito.kafka.KafkaClient; +import org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.core.builder.CloudEventBuilder; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@QuarkusTest +@QuarkusTestResource(KafkaQuarkusTestResource.class) +public class MultiMessagingIT { + + public static final String TOPIC_PRODUCER = "travellers"; + public static final String TOPIC_PROCESSED_CONSUMER = "processedtravellers"; + public static final String TOPIC_CANCEL_CONSUMER = "cancelledtravellers"; + + private static Logger LOGGER = LoggerFactory.getLogger(MultiMessagingIT.class); + + @Inject + private ObjectMapper objectMapper; + + public KafkaClient kafkaClient; + + @ConfigProperty(name = KafkaQuarkusTestResource.KOGITO_KAFKA_PROPERTY) + private String kafkaBootstrapServers; + + @Test + public void testProcess() throws InterruptedException { + objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + kafkaClient = new KafkaClient(kafkaBootstrapServers); + + //number of generated events to test + final int count = 3; + final CountDownLatch countDownLatch = new CountDownLatch(count); + + kafkaClient.consume(Arrays.asList(TOPIC_PROCESSED_CONSUMER, TOPIC_CANCEL_CONSUMER), s -> { + LOGGER.info("Received from kafka: {}", s); + try { + JsonNode event = objectMapper.readValue(s, JsonNode.class); + Traveller traveller = objectMapper.readValue(event.get("data").toString(), Traveller.class); + assertEquals(!traveller.getNationality().equals("American"), traveller.isProcessed()); + assertTrue(traveller.getFirstName().matches("Name[0-9]+")); + assertTrue(traveller.getLastName().matches("LastName[0-9]+")); + assertTrue(traveller.getEmail().matches("email[0-9]+")); + countDownLatch.countDown(); + } catch (JsonProcessingException e) { + LOGGER.error("Error parsing {}", s, e); + fail(e); + } + }); + + IntStream.range(0, count) + .mapToObj(i -> new Traveller("Name" + i, "LastName" + i, "email" + i, getNationality(i))) + .forEach(traveller -> kafkaClient.produce(generateCloudEvent(traveller), TOPIC_PRODUCER)); + + countDownLatch.await(5, TimeUnit.SECONDS); + assertEquals(0, countDownLatch.getCount()); + } + + private String getNationality(int i) { + return i % 2 == 0 ? "American" : "Spanish"; + } + + private String generateCloudEvent(Traveller traveller) { + assertFalse(traveller.isProcessed()); + try { + return objectMapper.writeValueAsString(CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("")) + //Start message event name in handle-travellers.bpmn + .withType("travellers") + .withTime(OffsetDateTime.now()) + .withData(objectMapper.writeValueAsString(traveller).getBytes()) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterEach + public void stop() { + Optional.ofNullable(kafkaClient).ifPresent(KafkaClient::shutdown); + } +} diff --git a/process-kafka-multi-springboot/README.md b/process-kafka-multi-springboot/README.md new file mode 100644 index 0000000000..9b786afcc9 --- /dev/null +++ b/process-kafka-multi-springboot/README.md @@ -0,0 +1,196 @@ +# Process with Kafka + +## Description + +A quickstart project that deals with traveller processing carried by rules. It illustrates +how easy it is to make the Kogito processes and rules to work with Apache Kafka sending messages to different topics. + +This example shows + +* consuming events from a Kafka topic and for each event start new process instance +* each process instance is expecting a traveller information in JSON format +* traveller is then processed by rules and based on the outcome of the processing (processed or not) traveller is + * if successfully processed traveller information is logged and then updated information is send to another Kafka topic + * if not processed traveller info is logged and then process instance finishes sending reply to a different Kafka topic + +

+ +* Diagram Properties (top) +

+ +* Diagram Properties (bottom) +

+ +* Diagram Properties (process variables) +

+ +* Start Message +

+ +* Start Message (Assignments) +

+ +* Process Traveler Business Rule (top) +

+ +* Process Traveler Business Rule (bottom) +

+ +* Process Traveler Business Rule (Assignments) +

+ +* Process Traveler Gateway +

+ +* Process Traveler Gateway Yes Connector +

+ +* Process Traveler Gateway No Connector +

+ +* Log Traveler Script Task +

+ +* Skip Traveler Script Task +

+ +* Processed Traveler End Message +

+ +* Processed Traveler End Message (Assignments) +

+ +* Skip Traveler End +

+ + +## Infrastructure requirements + +This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost. + +* Install and Startup Kafka Server / Zookeeper + +https://kafka.apache.org/quickstart + +## Build and run + +### Prerequisites + +You will need: + - Java 11+ installed + - Environment variable JAVA_HOME set accordingly + - Maven 3.6.2+ installed + +### Compile and Run in Local Dev Mode + +```sh +mvn clean compile spring-boot:run +``` + + +### Package and Run using uberjar + +```sh +mvn clean package +``` + +To run the generated native executable, generated in `target/`, execute + +```sh +java -jar target/process-kafka-quickstart-springboot.jar +``` + +### OpenAPI (Swagger) documentation +[Specification at swagger.io](https://swagger.io/docs/specification/about/) + +You can take a look at the [OpenAPI definition](http://localhost:8080/v3/api-docs) - automatically generated and included in this service - to determine all available operations exposed by this service. For easy readability you can visualize the OpenAPI definition file using a UI tool like for example available [Swagger UI](https://editor.swagger.io). + +In addition, various clients to interact with this service can be easily generated using this OpenAPI definition. + +### Use the application + +To make use of this application it is as simple as putting a message on `travellers` topic with following content + +* To examine ProcessedTravellers topic and verify upcoming messages will be processed + +Execute in a separate terminal session + +```sh +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processedtravellers +``` + +Execute in a separate terminal session + +```sh +bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cancelledtravellers +``` + + + +* Send message that should be processed to Topic + +```sh +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers +``` + +Content (cloud event format) + +```json +{ + "specversion": "0.3", + "id": "21627e26-31eb-43e7-8343-92a696fd96b1", + "source": "", + "type": "travellers", + "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]", + "data": { + "firstName" : "Jan", + "lastName" : "Kowalski", + "email" : "jan.kowalski@example.com", + "nationality" : "Polish" + } +} +``` +One liner + +```json +{"specversion": "0.3","id": "21627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "travellers", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "Jan", "lastName" : "Kowalski", "email" : "jan.kowalski@example.com", "nationality" : "Polish"}} +``` + +this will then trigger the successful processing of the traveller and put another message on `processedtravellers` topic. + +To take the other path of the process put following message on `travellers` topic + +* Send Message to Topic + +```sh +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic travellers +``` +With the following content (Cloud Event Format) + +```json +{ + "specversion": "0.3", + "id": "31627e26-31eb-43e7-8343-92a696fd96b1", + "source": "", + "type": "travellers", + "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]", + "data": { + "firstName" : "John", + "lastName" : "Doe", + "email" : "john.doe@example.com", + "nationality" : "American" + } +} +``` + +One Liner + +```json +{"specversion": "0.3","id": "31627e26-31eb-43e7-8343-92a696fd96b1","source": "","type": "travellers", "time": "2019-10-01T12:02:23.812262+02:00[Europe/Warsaw]","data": { "firstName" : "John", "lastName" : "Doe", "email" : "john.doe@example.com", "nationality" : "American"}} +``` + +this will result in message being send to `cancelledtravelers` topic. + +## Deploying with Kogito Operator + +In the [`operator`](operator) directory you'll find the custom resources needed to deploy this example on OpenShift with the [Kogito Operator](https://docs.jboss.org/kogito/release/latest/html_single/#chap_kogito-deploying-on-openshift). diff --git a/process-kafka-multi-springboot/docs/images/diagramProperties.png b/process-kafka-multi-springboot/docs/images/diagramProperties.png new file mode 100644 index 0000000000..734ee934fb Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/diagramProperties.png differ diff --git a/process-kafka-multi-springboot/docs/images/diagramProperties2.png b/process-kafka-multi-springboot/docs/images/diagramProperties2.png new file mode 100644 index 0000000000..c1616eec89 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/diagramProperties2.png differ diff --git a/process-kafka-multi-springboot/docs/images/diagramProperties3.png b/process-kafka-multi-springboot/docs/images/diagramProperties3.png new file mode 100644 index 0000000000..559b69c77c Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/diagramProperties3.png differ diff --git a/process-kafka-multi-springboot/docs/images/logTravelerScriptTask.png b/process-kafka-multi-springboot/docs/images/logTravelerScriptTask.png new file mode 100644 index 0000000000..f1c5a745c2 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/logTravelerScriptTask.png differ diff --git a/process-kafka-multi-springboot/docs/images/process.png b/process-kafka-multi-springboot/docs/images/process.png new file mode 100644 index 0000000000..20ae77a77e Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/process.png differ diff --git a/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule.png b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule.png new file mode 100644 index 0000000000..30d848d521 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule.png differ diff --git a/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule2.png b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule2.png new file mode 100644 index 0000000000..af27fecfaf Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRule2.png differ diff --git a/process-kafka-multi-springboot/docs/images/processTravelerBusinessRuleAssignments.png b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRuleAssignments.png new file mode 100644 index 0000000000..d94af89f3b Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processTravelerBusinessRuleAssignments.png differ diff --git a/process-kafka-multi-springboot/docs/images/processedTravelerEndMessage.png b/process-kafka-multi-springboot/docs/images/processedTravelerEndMessage.png new file mode 100644 index 0000000000..6d896e3b1d Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processedTravelerEndMessage.png differ diff --git a/process-kafka-multi-springboot/docs/images/processedTravelerEndMessageAssignments.png b/process-kafka-multi-springboot/docs/images/processedTravelerEndMessageAssignments.png new file mode 100644 index 0000000000..c2107a3209 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processedTravelerEndMessageAssignments.png differ diff --git a/process-kafka-multi-springboot/docs/images/processedTravelerGateway.png b/process-kafka-multi-springboot/docs/images/processedTravelerGateway.png new file mode 100644 index 0000000000..a099b4d47e Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processedTravelerGateway.png differ diff --git a/process-kafka-multi-springboot/docs/images/processedTravelerNoConnector.png b/process-kafka-multi-springboot/docs/images/processedTravelerNoConnector.png new file mode 100644 index 0000000000..dde668ef4f Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processedTravelerNoConnector.png differ diff --git a/process-kafka-multi-springboot/docs/images/processedTravelerYesConnector.png b/process-kafka-multi-springboot/docs/images/processedTravelerYesConnector.png new file mode 100644 index 0000000000..f1fba68fcb Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/processedTravelerYesConnector.png differ diff --git a/process-kafka-multi-springboot/docs/images/skipTraveler.png b/process-kafka-multi-springboot/docs/images/skipTraveler.png new file mode 100644 index 0000000000..57856f882e Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/skipTraveler.png differ diff --git a/process-kafka-multi-springboot/docs/images/skipTravelerScriptTask.png b/process-kafka-multi-springboot/docs/images/skipTravelerScriptTask.png new file mode 100644 index 0000000000..8c3509ea68 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/skipTravelerScriptTask.png differ diff --git a/process-kafka-multi-springboot/docs/images/startMessage.png b/process-kafka-multi-springboot/docs/images/startMessage.png new file mode 100644 index 0000000000..8812df7947 Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/startMessage.png differ diff --git a/process-kafka-multi-springboot/docs/images/startMessageAssignments.png b/process-kafka-multi-springboot/docs/images/startMessageAssignments.png new file mode 100644 index 0000000000..a6ef0067fd Binary files /dev/null and b/process-kafka-multi-springboot/docs/images/startMessageAssignments.png differ diff --git a/process-kafka-multi-springboot/operator/process-kafka-multi-springboot.yaml b/process-kafka-multi-springboot/operator/process-kafka-multi-springboot.yaml new file mode 100644 index 0000000000..20c67ade50 --- /dev/null +++ b/process-kafka-multi-springboot/operator/process-kafka-multi-springboot.yaml @@ -0,0 +1,35 @@ +#Strimzi operator should be pre-installed in namespace +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoInfra +metadata: + name: kogito-kafka-infra +spec: + resource: + apiVersion: kafka.strimzi.io/v1beta1 + kind: Kafka +--- +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoBuild +metadata: + name: process-kafka-quickstart-springboot +spec: + type: RemoteSource + runtime: springboot + #env: + # env can be used to set variables during build + #- name: MY_CUSTOM_ENV + # value: "my value" + gitSource: + contextDir: process-kafka-multi-springboot + uri: 'https://github.com/kiegroup/kogito-examples' + # set your maven nexus repository to speed up the build time + #mavenMirrorURL: +--- +apiVersion: app.kiegroup.org/v1beta1 +kind: KogitoRuntime +metadata: + name: process-kafka-multi-springboot +spec: + runtime: springboot + infra: + - kogito-kafka-infra diff --git a/process-kafka-multi-springboot/pom.xml b/process-kafka-multi-springboot/pom.xml new file mode 100644 index 0000000000..b365628b2b --- /dev/null +++ b/process-kafka-multi-springboot/pom.xml @@ -0,0 +1,116 @@ + + + 4.0.0 + + org.kie.kogito + kogito-examples + 2.0.0-SNAPSHOT + + + process-kafka-multi-springboot + Kogito Example :: Process with Kafka and Spring Boot, multiple channels + Kogito with Kafka - Spring Boot, using multiple channels + + + + + org.springframework.boot + spring-boot-dependencies + ${version.org.springframework.boot} + pom + import + + + org.kie.kogito + kogito-bom + ${kogito.version} + pom + import + + + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.kafka + spring-kafka + + + org.kie.kogito + kogito-cloudevents-spring-boot-addon + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.kie.kogito + kogito-springboot-starter + + + org.springdoc + springdoc-openapi-ui + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.kie.kogito + kogito-test-utils + test + + + io.cloudevents + cloudevents-api + test + + + io.cloudevents + cloudevents-json-jackson + test + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + org.kie.kogito + kogito-maven-plugin + true + + + + + diff --git a/process-kafka-multi-springboot/src/main/java/org/acme/travel/Traveller.java b/process-kafka-multi-springboot/src/main/java/org/acme/travel/Traveller.java new file mode 100644 index 0000000000..75e93ab4ba --- /dev/null +++ b/process-kafka-multi-springboot/src/main/java/org/acme/travel/Traveller.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel; + +public class Traveller { + + private String firstName; + private String lastName; + private String email; + private String nationality; + + private boolean processed; + + public Traveller() { + + } + + public Traveller(String firstName, String lastName, String email, String nationality) { + super(); + this.firstName = firstName; + this.lastName = lastName; + this.email = email; + this.nationality = nationality; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } + + public String getLastName() { + return lastName; + } + + public void setLastName(String lastName) { + this.lastName = lastName; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getNationality() { + return nationality; + } + + public void setNationality(String nationality) { + this.nationality = nationality; + } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + @Override + public String toString() { + return "Traveller [firstName=" + firstName + ", lastName=" + lastName + ", email=" + email + ", nationality=" + + nationality + ", processed=" + processed + "]"; + } + +} diff --git a/process-kafka-multi-springboot/src/main/java/org/kie/kogito/tests/KogitoKafkaMultiSpringbootApplication.java b/process-kafka-multi-springboot/src/main/java/org/kie/kogito/tests/KogitoKafkaMultiSpringbootApplication.java new file mode 100644 index 0000000000..a612a99eaa --- /dev/null +++ b/process-kafka-multi-springboot/src/main/java/org/kie/kogito/tests/KogitoKafkaMultiSpringbootApplication.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.tests; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication(scanBasePackages = { "org.kie.kogito.**", "org.kie.kogito.tests.**", "org.drools.project.model.**" }) +public class KogitoKafkaMultiSpringbootApplication { + + public static void main(String[] args) { + SpringApplication.run(KogitoKafkaMultiSpringbootApplication.class, args); + } + +} diff --git a/process-kafka-multi-springboot/src/main/resources/META-INF/kmodule.xml b/process-kafka-multi-springboot/src/main/resources/META-INF/kmodule.xml new file mode 100644 index 0000000000..2ba4076c16 --- /dev/null +++ b/process-kafka-multi-springboot/src/main/resources/META-INF/kmodule.xml @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/process-kafka-multi-springboot/src/main/resources/application.properties b/process-kafka-multi-springboot/src/main/resources/application.properties new file mode 100644 index 0000000000..430aa2ad83 --- /dev/null +++ b/process-kafka-multi-springboot/src/main/resources/application.properties @@ -0,0 +1,5 @@ +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=travellers-group +kogito.addon.cloudevents.kafka.kogito_incoming_stream=travellers +kogito.addon.cloudevents.kafka.kogito_outgoing_stream=processedtravellers +kogito.addon.cloudevents.kafka.kogito_outgoing_stream.no\u0020travel=processedtravellers \ No newline at end of file diff --git a/process-kafka-multi-springboot/src/main/resources/handle-travellers.bpmn b/process-kafka-multi-springboot/src/main/resources/handle-travellers.bpmn new file mode 100644 index 0000000000..f09b64f4f3 --- /dev/null +++ b/process-kafka-multi-springboot/src/main/resources/handle-travellers.bpmn @@ -0,0 +1,296 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + _148A7959-371E-4EC5-9676-1A98C95BA7E8 + + + traveller + _67233131-DCD2-4EFC-9B7A-0754E12375AC_eventInputX + + + _67233131-DCD2-4EFC-9B7A-0754E12375AC_eventInputX + + + + + + + + + + _EDFD7D46-79EC-4C49-B6D8-5873E9301AC7 + _148A7959-371E-4EC5-9676-1A98C95BA7E8 + System.out.println("Skipping traveller " + traveller); + + + + + + + + _C5756B00-2541-4948-A7F5-4494A10E3A31 + _6ED9B3DA-9FD9-49F3-A70E-A6AB6712A628 + System.out.println("Processing traveller " + traveller); + + + + + + + + _D15CD483-D31D-42F8-A93A-AAAF44292D84 + + + _8BEA9396-93DE-4D44-8CE2-4A146464264E_eventOutputX + traveller + + + _8BEA9396-93DE-4D44-8CE2-4A146464264E_eventOutputX + + + + + + + + + + + + + _D15CD483-D31D-42F8-A93A-AAAF44292D84 + _96F8E118-C58C-4369-89DC-2C881614FF73 + + + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerInputX + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerOutputX + + + + traveller + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerInputX + + + _60FA6326-76DC-4DB2-AB06-DB8AC8EE8DC8_travellerOutputX + traveller + + + + + + + + + _96F8E118-C58C-4369-89DC-2C881614FF73 + _EDFD7D46-79EC-4C49-B6D8-5873E9301AC7 + _C5756B00-2541-4948-A7F5-4494A10E3A31 + + + + + + + + _6ED9B3DA-9FD9-49F3-A70E-A6AB6712A628 + + + traveller + _0E0784C3-1BEC-4A51-A5E6-D7E5DA3A4402_eventInputX + + + _0E0784C3-1BEC-4A51-A5E6-D7E5DA3A4402_eventInputX + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + _h2cAMJFrEDmQPdE0CWoRjQ + _h2cAMJFrEDmQPdE0CWoRjQ + + \ No newline at end of file diff --git a/process-kafka-multi-springboot/src/main/resources/travellers.drl b/process-kafka-multi-springboot/src/main/resources/travellers.drl new file mode 100644 index 0000000000..7bee0205bc --- /dev/null +++ b/process-kafka-multi-springboot/src/main/resources/travellers.drl @@ -0,0 +1,39 @@ +/** + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package defaultPackage + +import org.acme.travel.Traveller; + + +rule "Process travellers" ruleflow-group "travellers" + +when + $traveller: Traveller(processed == false, nationality != 'American') +then + System.out.println("Hello traveller " + $traveller); + $traveller.setProcessed( true ); + +end + +rule "Don't process travellers from US" ruleflow-group "travellers" + +when + $traveller: Traveller(processed == false, nationality == 'American') +then + System.out.println("This system can't deal with " + $traveller.getNationality()); + $traveller.setProcessed( false ); + +end diff --git a/process-kafka-multi-springboot/src/test/java/org/acme/travel/MultiMessagingIT.java b/process-kafka-multi-springboot/src/test/java/org/acme/travel/MultiMessagingIT.java new file mode 100644 index 0000000000..19fa398987 --- /dev/null +++ b/process-kafka-multi-springboot/src/test/java/org/acme/travel/MultiMessagingIT.java @@ -0,0 +1,122 @@ +/* + * Copyright 2020 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme.travel; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.kie.kogito.kafka.KafkaClient; +import org.kie.kogito.testcontainers.springboot.KafkaSpringBootTestResource; +import org.kie.kogito.tests.KogitoKafkaMultiSpringbootApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.core.builder.CloudEventBuilder; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +@SpringBootTest(classes = KogitoKafkaMultiSpringbootApplication.class) +@ContextConfiguration(initializers = KafkaSpringBootTestResource.class) +public class MultiMessagingIT { + + public static final String TOPIC_PRODUCER = "travellers"; + public static final String TOPIC_PROCESSED_CONSUMER = "processedtravellers"; + public static final String TOPIC_CANCEL_CONSUMER = "cancelledtravellers"; + + private static Logger LOGGER = LoggerFactory.getLogger(MultiMessagingIT.class); + + @Autowired + private ObjectMapper objectMapper; + + @Autowired + private KafkaClient kafkaClient; + + @Test + public void testProcess() throws InterruptedException { + objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + + //number of generated events to test + final int count = 3; + final CountDownLatch countDownLatch = new CountDownLatch(count); + + kafkaClient.consume(Arrays.asList(TOPIC_PROCESSED_CONSUMER, TOPIC_CANCEL_CONSUMER), s -> { + LOGGER.info("Received from kafka: {}", s); + try { + JsonNode event = objectMapper.readValue(s, JsonNode.class); + Traveller traveller = objectMapper.readValue(event.get("data").toString(), Traveller.class); + assertEquals(!traveller.getNationality().equals("American"), traveller.isProcessed()); + assertTrue(traveller.getFirstName().matches("Name[0-9]+")); + assertTrue(traveller.getLastName().matches("LastName[0-9]+")); + assertTrue(traveller.getEmail().matches("email[0-9]+")); + countDownLatch.countDown(); + } catch (JsonProcessingException e) { + LOGGER.error("Error parsing {}", s, e); + fail(e); + } + }); + + IntStream.range(0, count) + .mapToObj(i -> new Traveller("Name" + i, "LastName" + i, "email" + i, getNationality(i))) + .forEach(traveller -> kafkaClient.produce(generateCloudEvent(traveller), TOPIC_PRODUCER)); + + countDownLatch.await(5, TimeUnit.SECONDS); + assertEquals(0, countDownLatch.getCount()); + } + + private String getNationality(int i) { + return i % 2 == 0 ? "American" : "Spanish"; + } + + private String generateCloudEvent(Traveller traveller) { + assertFalse(traveller.isProcessed()); + try { + return objectMapper.writeValueAsString(CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("")) + //Start message event name in handle-travellers.bpmn + .withType("travellers") + .withTime(OffsetDateTime.now()) + .withData(objectMapper.writeValueAsString(traveller).getBytes()) + .build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterEach + public void stop() { + Optional.ofNullable(kafkaClient).ifPresent(KafkaClient::shutdown); + } +}