Skip to content

Commit

Permalink
Support multi-tenancy per worker (#614)
Browse files Browse the repository at this point in the history
* formatter

* formatter

* formatter

* introduced configurable tenant ids for workers

* remove adding default tenant ids to job worker configuration

* update workflow to verify format

* added a test to prove a zeebe worker value picks up the tenant ids

(cherry picked from commit a780472)
  • Loading branch information
jonathanlukas authored and megglos committed Jan 2, 2025
1 parent 7a2ff55 commit b0a4c7b
Show file tree
Hide file tree
Showing 15 changed files with 404 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,31 @@
import io.camunda.zeebe.spring.client.event.ZeebeClientCreatedEvent;
import io.camunda.zeebe.spring.test.proxy.ZeebeClientProxy;
import io.camunda.zeebe.spring.test.proxy.ZeebeTestEngineProxy;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.TestContext;

import java.lang.invoke.MethodHandles;

/**
* Base class for the two different ZeebeTestExecutionListener classes provided for in-memory vs Testcontainer tests
* Base class for the two different ZeebeTestExecutionListener classes provided for in-memory vs
* Testcontainer tests
*/
public class AbstractZeebeTestExecutionListener {

private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Logger LOGGER =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private ZeebeClient zeebeClient;

/**
* Registers the ZeebeEngine for test case in relevant places and creates the ZeebeClient
*/
/** Registers the ZeebeEngine for test case in relevant places and creates the ZeebeClient */
public void setupWithZeebeEngine(TestContext testContext, ZeebeTestEngine zeebeEngine) {

testContext.getApplicationContext().getBean(ZeebeTestEngineProxy.class).swapZeebeEngine(zeebeEngine);
testContext
.getApplicationContext()
.getBean(ZeebeTestEngineProxy.class)
.swapZeebeEngine(zeebeEngine);

BpmnAssert.initRecordStream(
RecordStream.of(zeebeEngine.getRecordStreamSource()));
BpmnAssert.initRecordStream(RecordStream.of(zeebeEngine.getRecordStreamSource()));

ZeebeTestThreadSupport.setEngineForCurrentThread(zeebeEngine);

Expand All @@ -42,17 +43,25 @@ public void setupWithZeebeEngine(TestContext testContext, ZeebeTestEngine zeebeE
// Not using zeebeEngine.createClient(); to be able to set JsonMapper
zeebeClient = createClient(testContext, zeebeEngine);

testContext.getApplicationContext().getBean(ZeebeClientProxy.class).swapZeebeClient(zeebeClient);
testContext.getApplicationContext().publishEvent(new ZeebeClientCreatedEvent(this, zeebeClient));
testContext
.getApplicationContext()
.getBean(ZeebeClientProxy.class)
.swapZeebeClient(zeebeClient);
testContext
.getApplicationContext()
.publishEvent(new ZeebeClientCreatedEvent(this, zeebeClient));

LOGGER.info("...deployments and workers started.");
}

public ZeebeClient createClient(TestContext testContext, ZeebeTestEngine zeebeEngine) {
// Maybe use more of the normal config properties (https://github.com/camunda-community-hub/spring-zeebe/blob/11966be454cc76f3966fb2c0e4114a35487946fc/client/spring-zeebe-starter/src/main/java/io/camunda/zeebe/spring/client/config/ZeebeClientStarterAutoConfiguration.java#L30)?
ZeebeClientBuilder builder = ZeebeClient.newClientBuilder()
.gatewayAddress(zeebeEngine.getGatewayAddress()).usePlaintext();
if (testContext.getApplicationContext().getBeanNamesForType(JsonMapper.class).length>0) {
// Maybe use more of the normal config properties
// (https://github.com/camunda-community-hub/spring-zeebe/blob/11966be454cc76f3966fb2c0e4114a35487946fc/client/spring-zeebe-starter/src/main/java/io/camunda/zeebe/spring/client/config/ZeebeClientStarterAutoConfiguration.java#L30)?
ZeebeClientBuilder builder =
ZeebeClient.newClientBuilder()
.gatewayAddress(zeebeEngine.getGatewayAddress())
.usePlaintext();
if (testContext.getApplicationContext().getBeanNamesForType(JsonMapper.class).length > 0) {
JsonMapper jsonMapper = testContext.getApplicationContext().getBean(JsonMapper.class);
builder.withJsonMapper(jsonMapper);
}
Expand All @@ -62,21 +71,34 @@ public ZeebeClient createClient(TestContext testContext, ZeebeTestEngine zeebeEn

public void cleanup(TestContext testContext, ZeebeTestEngine zeebeEngine) {

if (testContext.getTestException()!=null) {
LOGGER.warn("Test failure on '"+testContext.getTestMethod()+"'. Tracing workflow engine internals on INFO for debugging purposes:");
if (testContext.getTestException() != null) {
LOGGER.warn(
"Test failure on '"
+ testContext.getTestMethod()
+ "'. Tracing workflow engine internals on INFO for debugging purposes:");
RecordStream recordStream = RecordStream.of(zeebeEngine.getRecordStreamSource());
recordStream.print(true);

if (recordStream.incidentRecords().iterator().hasNext()) {
LOGGER.warn("There were incidents in Zeebe during '"+testContext.getTestMethod()+"', maybe they caused some unexpected behavior for you? Please check below:");
recordStream.incidentRecords().forEach( record -> {LOGGER.warn(". " + record.getValue());});
LOGGER.warn(
"There were incidents in Zeebe during '"
+ testContext.getTestMethod()
+ "', maybe they caused some unexpected behavior for you? Please check below:");
recordStream
.incidentRecords()
.forEach(
record -> {
LOGGER.warn(". " + record.getValue());
});
}
}

BpmnAssert.resetRecordStream();
ZeebeTestThreadSupport.cleanupEngineForCurrentThread();

testContext.getApplicationContext().publishEvent(new ZeebeClientClosingEvent(this, zeebeClient));
testContext
.getApplicationContext()
.publishEvent(new ZeebeClientClosingEvent(this, zeebeClient));
testContext.getApplicationContext().getBean(ZeebeClientProxy.class).removeZeebeClient();
zeebeClient.close();
testContext.getApplicationContext().getBean(ZeebeTestEngineProxy.class).removeZeebeEngine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;


@TestConfiguration
@ImportAutoConfiguration({
TestProxyConfiguration.class,
Expand All @@ -24,5 +23,4 @@ public SpringZeebeTestContext enableTestContext() {
// add marker bean to Spring context that we are running in a test case
return new SpringZeebeTestContext();
}

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package io.camunda.zeebe.spring.test;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;

import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.assertions.BpmnAssert;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.inspections.model.InspectedProcessInstance;
import org.awaitility.Awaitility;


import java.time.Duration;
import java.util.Objects;
import org.awaitility.Awaitility;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.assertThat;

/**
* Helper to wait in the multithreaded environment for the worker to execute.
*/
/** Helper to wait in the multithreaded environment for the worker to execute. */
public class ZeebeTestThreadSupport {

private final static ThreadLocal<ZeebeTestEngine> ENGINES = new ThreadLocal<>();
private final static Duration DEFAULT_DURATION = Duration.ofMillis(5000);
private final static Integer DEFAULT_TIMES_PASSED = 1;
private final static Long DEFAULT_INTERVAL_MILLIS = 100L;
private static final ThreadLocal<ZeebeTestEngine> ENGINES = new ThreadLocal<>();
private static final Duration DEFAULT_DURATION = Duration.ofMillis(5000);
private static final Integer DEFAULT_TIMES_PASSED = 1;
private static final Long DEFAULT_INTERVAL_MILLIS = 100L;

public static void setEngineForCurrentThread(ZeebeTestEngine engine) {
ENGINES.set(engine);
Expand All @@ -35,78 +31,105 @@ public static void waitForProcessInstanceCompleted(ProcessInstanceEvent processI
waitForProcessInstanceCompleted(processInstance.getProcessInstanceKey(), DEFAULT_DURATION);
}

public static void waitForProcessInstanceCompleted(ProcessInstanceEvent processInstance, Duration duration) {
public static void waitForProcessInstanceCompleted(
ProcessInstanceEvent processInstance, Duration duration) {
waitForProcessInstanceCompleted(processInstance.getProcessInstanceKey(), duration);
}

public static void waitForProcessInstanceCompleted(long processInstanceKey) {
waitForProcessInstanceCompleted(new InspectedProcessInstance(processInstanceKey), DEFAULT_DURATION);
waitForProcessInstanceCompleted(
new InspectedProcessInstance(processInstanceKey), DEFAULT_DURATION);
}

public static void waitForProcessInstanceCompleted(long processInstanceKey, Duration duration) {
waitForProcessInstanceCompleted(new InspectedProcessInstance(processInstanceKey), duration);
}

public static void waitForProcessInstanceCompleted(InspectedProcessInstance inspectedProcessInstance) {
public static void waitForProcessInstanceCompleted(
InspectedProcessInstance inspectedProcessInstance) {
waitForProcessInstanceCompleted(inspectedProcessInstance, DEFAULT_DURATION);
}

public static void waitForProcessInstanceCompleted(InspectedProcessInstance inspectedProcessInstance, Duration duration) {
public static void waitForProcessInstanceCompleted(
InspectedProcessInstance inspectedProcessInstance, Duration duration) {
// get it in the thread of the test
final ZeebeTestEngine engine = ENGINES.get();
if (engine == null) {
throw new IllegalStateException("No Zeebe engine is initialized for the current thread, annotate the test with @ZeebeSpringTest");
throw new IllegalStateException(
"No Zeebe engine is initialized for the current thread, annotate the test with @ZeebeSpringTest");
}
if (duration == null) {
duration = DEFAULT_DURATION;
}
Awaitility.await().atMost(duration).untilAsserted(() -> {
// allow the worker to work
Thread.sleep(DEFAULT_INTERVAL_MILLIS);
BpmnAssert.initRecordStream(
RecordStream.of(Objects.requireNonNull(engine).getRecordStreamSource()));
// use inside the awaitility thread
assertThat(inspectedProcessInstance).isCompleted();
});
Awaitility.await()
.atMost(duration)
.untilAsserted(
() -> {
// allow the worker to work
Thread.sleep(DEFAULT_INTERVAL_MILLIS);
BpmnAssert.initRecordStream(
RecordStream.of(Objects.requireNonNull(engine).getRecordStreamSource()));
// use inside the awaitility thread
assertThat(inspectedProcessInstance).isCompleted();
});
}

public static void waitForProcessInstanceHasPassedElement(ProcessInstanceEvent processInstance, String elementId) {
waitForProcessInstanceHasPassedElement(processInstance.getProcessInstanceKey(), elementId, DEFAULT_DURATION);
public static void waitForProcessInstanceHasPassedElement(
ProcessInstanceEvent processInstance, String elementId) {
waitForProcessInstanceHasPassedElement(
processInstance.getProcessInstanceKey(), elementId, DEFAULT_DURATION);
}

public static void waitForProcessInstanceHasPassedElement(ProcessInstanceEvent processInstance, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(processInstance.getProcessInstanceKey(), elementId, duration);
public static void waitForProcessInstanceHasPassedElement(
ProcessInstanceEvent processInstance, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(
processInstance.getProcessInstanceKey(), elementId, duration);
}

public static void waitForProcessInstanceHasPassedElement(long processInstanceKey, String elementId) {
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(processInstanceKey), elementId, DEFAULT_DURATION);
public static void waitForProcessInstanceHasPassedElement(
long processInstanceKey, String elementId) {
waitForProcessInstanceHasPassedElement(
new InspectedProcessInstance(processInstanceKey), elementId, DEFAULT_DURATION);
}

public static void waitForProcessInstanceHasPassedElement(long processInstanceKey, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(processInstanceKey), elementId, duration);
public static void waitForProcessInstanceHasPassedElement(
long processInstanceKey, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(
new InspectedProcessInstance(processInstanceKey), elementId, duration);
}

public static void waitForProcessInstanceHasPassedElement(InspectedProcessInstance inspectedProcessInstance, String elementId) {
public static void waitForProcessInstanceHasPassedElement(
InspectedProcessInstance inspectedProcessInstance, String elementId) {
waitForProcessInstanceHasPassedElement(inspectedProcessInstance, elementId, DEFAULT_DURATION);
}

public static void waitForProcessInstanceHasPassedElement(InspectedProcessInstance inspectedProcessInstance, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(inspectedProcessInstance, elementId, duration, DEFAULT_TIMES_PASSED);
public static void waitForProcessInstanceHasPassedElement(
InspectedProcessInstance inspectedProcessInstance, String elementId, Duration duration) {
waitForProcessInstanceHasPassedElement(
inspectedProcessInstance, elementId, duration, DEFAULT_TIMES_PASSED);
}

public static void waitForProcessInstanceHasPassedElement(InspectedProcessInstance inspectedProcessInstance, String elementId, Duration duration, final int times) {
public static void waitForProcessInstanceHasPassedElement(
InspectedProcessInstance inspectedProcessInstance,
String elementId,
Duration duration,
final int times) {
final ZeebeTestEngine engine = ENGINES.get();
if (engine == null) {
throw new IllegalStateException("No Zeebe engine is initialized for the current thread, annotate the test with @ZeebeSpringTest");
throw new IllegalStateException(
"No Zeebe engine is initialized for the current thread, annotate the test with @ZeebeSpringTest");
}
if (duration == null) {
duration = DEFAULT_DURATION;
}
Awaitility.await().atMost(duration).untilAsserted(() -> {
Thread.sleep(DEFAULT_INTERVAL_MILLIS);
BpmnAssert.initRecordStream(
RecordStream.of(Objects.requireNonNull(engine).getRecordStreamSource()));
assertThat(inspectedProcessInstance).hasPassedElement(elementId, times);
});
Awaitility.await()
.atMost(duration)
.untilAsserted(
() -> {
Thread.sleep(DEFAULT_INTERVAL_MILLIS);
BpmnAssert.initRecordStream(
RecordStream.of(Objects.requireNonNull(engine).getRecordStreamSource()));
assertThat(inspectedProcessInstance).hasPassedElement(elementId, times);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package io.camunda.zeebe.spring.test.configuration;

import static io.camunda.zeebe.spring.client.CamundaAutoConfiguration.DEFAULT_OBJECT_MAPPER;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.client.api.JsonMapper;
import io.camunda.zeebe.client.impl.ZeebeObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;

import static io.camunda.zeebe.spring.client.CamundaAutoConfiguration.DEFAULT_OBJECT_MAPPER;


/**
* Fallback values if certain beans are missing
*/
/** Fallback values if certain beans are missing */
public class ZeebeTestDefaultConfiguration {

@Bean(name = "zeebeJsonMapper")
Expand All @@ -25,5 +22,4 @@ public JsonMapper jsonMapper(ObjectMapper objectMapper) {
public ObjectMapper objectMapper() {
return DEFAULT_OBJECT_MAPPER;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ public abstract class AbstractInvocationHandler implements InvocationHandler {
@Override
@CheckForNull
public final Object invoke(Object proxy, Method method, @CheckForNull @Nullable Object[] args)
throws Throwable {
throws Throwable {
if (args == null) {
args = NO_ARGS;
}
if (args.length == 0 && method.getName().equals("hashCode")) {
return hashCode();
}
if (args.length == 1
&& method.getName().equals("equals")
&& method.getParameterTypes()[0] == Object.class) {
&& method.getName().equals("equals")
&& method.getParameterTypes()[0] == Object.class) {
Object arg = args[0];
if (arg == null) {
return false;
Expand All @@ -32,7 +32,7 @@ public final Object invoke(Object proxy, Method method, @CheckForNull @Nullable
return true;
}
return isProxyOfSameInterfaces(arg, proxy.getClass())
&& equals(Proxy.getInvocationHandler(arg));
&& equals(Proxy.getInvocationHandler(arg));
}
if (args.length == 0 && method.getName().equals("toString")) {
return toString();
Expand All @@ -50,7 +50,7 @@ public final Object invoke(Object proxy, Method method, @CheckForNull @Nullable
*/
@CheckForNull
protected abstract Object handleInvocation(Object proxy, Method method, @Nullable Object[] args)
throws Throwable;
throws Throwable;

/**
* By default delegates to {@link Object#equals} so instances are only equal if they are
Expand Down Expand Up @@ -89,12 +89,12 @@ public String toString() {

private static boolean isProxyOfSameInterfaces(Object arg, Class<?> proxyClass) {
return proxyClass.isInstance(arg)
// Equal proxy instances should mostly be instance of proxyClass
// Under some edge cases (such as the proxy of JDK types serialized and then deserialized)
// the proxy type may not be the same.
// We first check isProxyClass() so that the common case of comparing with non-proxy objects
// is efficient.
|| (Proxy.isProxyClass(arg.getClass())
&& Arrays.equals(arg.getClass().getInterfaces(), proxyClass.getInterfaces()));
// Equal proxy instances should mostly be instance of proxyClass
// Under some edge cases (such as the proxy of JDK types serialized and then deserialized)
// the proxy type may not be the same.
// We first check isProxyClass() so that the common case of comparing with non-proxy objects
// is efficient.
|| (Proxy.isProxyClass(arg.getClass())
&& Arrays.equals(arg.getClass().getInterfaces(), proxyClass.getInterfaces()));
}
}
Loading

0 comments on commit b0a4c7b

Please sign in to comment.