Skip to content

Commit

Permalink
merge: #13256 #13275
Browse files Browse the repository at this point in the history
13256: Invalidate process cache on deployment rejection r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This resolves a critical bug where process instances can be created for processes from rejected deployments. This could lead to unrecoverable partitions.

The problem was that a cache is build up for each of the processes during the processing of the `Deployment:CREATE` command, but this cache was not invalidated when that command is rejected.

This solution only provides a quick fix by completely clearing the cache, which comes at some performance loss when deployments are rejected.

In a future iteration, we'll need to investigate other solutions for the long term. We should also remove the cache entirely from any operations in the immutable ProcessState.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #13254



13275: [Backport stable/8.2] Prevent StackOverFlowError during termination of children r=remcowesterhoud a=backport-action

# Description
Backport of #13258 to `stable/8.2`.

relates to #8955

Co-authored-by: Nico Korthout <[email protected]>
Co-authored-by: Nico Korthout <[email protected]>
Co-authored-by: Remco Westerhoud <[email protected]>
  • Loading branch information
4 people authored Jul 3, 2023
3 parents 120c9b7 + afafed9 + be8e387 commit 13c59c4
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.function.Function;

public final class BpmnStateTransitionBehavior {

private static final String ALREADY_MIGRATED_ERROR_MSG =
"The Processor for the element type %s is already migrated no need to call %s again this is already done in the BpmnStreamProcessor for you. Happy to help :) ";
private static final String NO_PROCESS_FOUND_MESSAGE =
Expand Down Expand Up @@ -421,7 +420,19 @@ public void onElementTerminated(
element,
childContext,
(containerProcessor, containerScope, containerContext) -> {
containerProcessor.onChildTerminated(containerScope, containerContext, childContext);
try {
containerProcessor.onChildTerminated(containerScope, containerContext, childContext);
} catch (final StackOverflowError stackOverFlow) {
// This is a dirty quick "fix" for https://github.com/camunda/zeebe/issues/8955
// It's done so a cluster doesn't die when a user encounters this.
final var message =
String.format(
"""
Process instance `%d` has too many nested child instances and could not be terminated. \
The deepest nested child instance has been banned as a result.""",
containerContext.getProcessInstanceKey());
throw new ChildTerminationStackOverflowException(message);
}
return Either.right(null);
});
}
Expand Down Expand Up @@ -515,6 +526,13 @@ public <T extends ExecutableFlowElement> void terminateChildProcessInstance(
() -> containerProcessor.onChildTerminated(element, context, null));
}

private static final class ChildTerminationStackOverflowException extends RuntimeException {

public ChildTerminationStackOverflowException(final String message) {
super(message);
}
}

@FunctionalInterface
private interface ElementContainerProcessorFunction {
Either<Failure, ?> apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public void processRecord(final TypedRecord<DeploymentRecord> command) {
try {
createTimerIfTimerStartEvent(command);
} catch (final RuntimeException e) {
// Make sure the cache does not contain any leftovers from this run (by hard resetting)
processState.clearCache();

final String reason = String.format(COULD_NOT_CREATE_TIMER_MESSAGE, e.getMessage());
responseWriter.writeRejectionOnCommand(command, RejectionType.PROCESSING_ERROR, reason);
rejectionWriter.appendRejection(command, RejectionType.PROCESSING_ERROR, reason);
Expand All @@ -116,6 +119,9 @@ public void processRecord(final TypedRecord<DeploymentRecord> command) {
@Override
public ProcessingError tryHandleError(
final TypedRecord<DeploymentRecord> command, final Throwable error) {
// Make sure the cache does not contain any leftovers from this run (by hard resetting)
processState.clearCache();

if (error instanceof ResourceTransformationFailedException exception) {
rejectionWriter.appendRejection(
command, RejectionType.INVALID_ARGUMENT, exception.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ public <T extends ExecutableFlowElement> T getFlowElement(
return element;
}

@Override
public void clearCache() {
processesByKey.clear();
processesByProcessIdAndVersion.clear();
versionManager.clear();
}

private DeployedProcess lookupProcessByIdAndPersistedVersion(final long latestVersion) {
processVersion.wrapLong(latestVersion);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ private long getProcessVersionFromDB() {
}
return currentValue;
}

public void clear() {
versionCache.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ public interface ProcessState {

<T extends ExecutableFlowElement> T getFlowElement(
long processDefinitionKey, DirectBuffer elementId, Class<T> elementType);

/** TODO: Remove the cache entirely from the immutable state */
void clearCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,37 @@ public void shouldDoAtomicDeployments() {
tuple(DeploymentIntent.CREATED, RecordType.EVENT),
tuple(DeploymentDistributionIntent.DISTRIBUTING, RecordType.EVENT));
}

@Test // Regression of https://github.com/camunda/zeebe/issues/13254
public void shouldNotBeAbleToCreateInstanceWhenDeploymentIsRejected() {
// given
final BpmnModelInstance invalidProcess =
Bpmn.createExecutableProcess("too_large_process")
.startEvent()
// In order to cause BATCH SIZE EXCEEDING we add a big comment
.documentation("x".repeat((int) ByteValue.ofMegabytes(3)))
.done();
final BpmnModelInstance validProcess =
Bpmn.createExecutableProcess("valid_process").startEvent().task().endEvent().done();

// when
ENGINE
.deployment()
.withXmlResource(invalidProcess)
.withXmlResource(validProcess)
.expectRejection()
.deploy();

// then
assertThat(
RecordingExporter.records()
.limit(r -> r.getRecordType() == RecordType.COMMAND_REJECTION)
.collect(Collectors.toList()))
.extracting(Record::getIntent, Record::getRecordType)
.doesNotContain(
tuple(ProcessIntent.CREATED, RecordType.EVENT),
tuple(DeploymentIntent.CREATED, RecordType.EVENT));

ENGINE.processInstance().ofBpmnProcessId("valid_process").expectRejection().create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.processinstance;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.Strings;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import org.assertj.core.api.Assertions;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;

public final class CancelProcessInstanceBanTest {

@ClassRule public static final EngineRule ENGINE = EngineRule.singlePartition();
@Rule public final TestWatcher recordingExporterTestWatcher = new RecordingExporterTestWatcher();

@Test // Regression of https://github.com/camunda/zeebe/issues/8955
public void shouldBanInstanceWhenTerminatingInstanceWithALotOfNestedChildInstances() {
// given
final var amountOfNestedChildInstances = 1000;
final var processId = Strings.newRandomValidBpmnId();
ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(processId)
.startEvent()
.exclusiveGateway()
.defaultFlow()
.userTask()
.endEvent()
.moveToLastGateway()
.conditionExpression("count < " + amountOfNestedChildInstances)
.intermediateThrowEvent("preventStraightThroughLoop")
.callActivity(
"callActivity",
c -> c.zeebeProcessId(processId).zeebeInputExpression("count + 1", "count"))
.endEvent()
.done())
.deploy();

final long processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(processId).withVariable("count", 0).create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withElementType(BpmnElementType.USER_TASK)
.getFirst();

// when
final var errorRecordValue =
ENGINE.processInstance().withInstanceKey(processInstanceKey).cancelWithError();

// then
Assertions.assertThat(errorRecordValue.getValue().getStacktrace())
.contains("ChildTerminationStackOverflowException");
Assertions.assertThat(errorRecordValue.getValue().getExceptionMessage())
.contains(
"Process instance",
"""
has too many nested child instances and could not be terminated. The deepest nested \
child instance has been banned as a result.""");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationVariableInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.value.ErrorRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceCreationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
Expand Down Expand Up @@ -197,6 +199,10 @@ public static class ExistingInstanceClient {
.withProcessInstanceKey(processInstanceKey)
.getFirst();

public static final Function<Long, Record<ErrorRecordValue>> ERROR_EXPECTATION =
(processInstanceKey) ->
RecordingExporter.errorRecords().withIntent(ErrorIntent.CREATED).getFirst();

private static final int DEFAULT_PARTITION = -1;
private final StreamProcessorRule environmentRule;
private final long processInstanceKey;
Expand All @@ -221,6 +227,16 @@ public ExistingInstanceClient expectRejection() {
}

public Record<ProcessInstanceRecordValue> cancel() {
writeCancelCommand();
return expectation.apply(processInstanceKey);
}

public Record<ErrorRecordValue> cancelWithError() {
writeCancelCommand();
return ERROR_EXPECTATION.apply(processInstanceKey);
}

private void writeCancelCommand() {
if (partition == DEFAULT_PARTITION) {
partition =
RecordingExporter.processInstanceRecords()
Expand All @@ -234,8 +250,6 @@ public Record<ProcessInstanceRecordValue> cancel() {
processInstanceKey,
ProcessInstanceIntent.CANCEL,
new ProcessInstanceRecord().setProcessInstanceKey(processInstanceKey));

return expectation.apply(processInstanceKey);
}

public ProcessInstanceModificationClient modification() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.test.util.record;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.ErrorRecordValue;
import java.util.stream.Stream;

public class ErrorRecordStream extends ExporterRecordStream<ErrorRecordValue, ErrorRecordStream> {

public ErrorRecordStream(final Stream<Record<ErrorRecordValue>> wrappedStream) {
super(wrappedStream);
}

@Override
protected ErrorRecordStream supply(final Stream<Record<ErrorRecordValue>> wrappedStream) {
return new ErrorRecordStream(wrappedStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.zeebe.protocol.record.value.DecisionEvaluationRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.ErrorRecordValue;
import io.camunda.zeebe.protocol.record.value.EscalationRecordValue;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.protocol.record.value.JobBatchRecordValue;
Expand Down Expand Up @@ -344,6 +345,10 @@ public static ResourceDeletionRecordStream resourceDeletionRecords(
return resourceDeletionRecords().withIntent(intent);
}

public static ErrorRecordStream errorRecords() {
return new ErrorRecordStream(records(ValueType.ERROR, ErrorRecordValue.class));
}

public static class AwaitingRecordIterator implements Iterator<Record<?>> {

private int nextIndex = 0;
Expand Down

0 comments on commit 13c59c4

Please sign in to comment.