Skip to content

Commit

Permalink
fix: do not delete jobs from backoff cf
Browse files Browse the repository at this point in the history
Do not delete jobs from backoff CF if they are still existing, or have a correct recurring time set.

Previously we checked the wrong property which was always not the same as in the state, this caused us
to delete jobs from the backoff column family. This means jobs were stuck in the failed state forever.

(cherry picked from commit 0db6579)
  • Loading branch information
ChrisKujawa authored and github-actions[bot] committed Nov 30, 2023
1 parent e7bbcf2 commit 6719061
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public void cleanupBackoffsWithoutJobs() {
final var jobKey = key.second().inner();
final var backoff = key.first().getValue();
final var job = jobsColumnFamily.get(jobKey);
if (job == null || job.getRecord().getRetryBackoff() != backoff) {
if (job == null || job.getRecord().getRecurringTime() != backoff) {
LOG.debug("Deleting orphaned job with key {}", key);
backoffColumnFamily.deleteExisting(key);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.camunda.zeebe.db.impl.DbLong;
import io.camunda.zeebe.db.impl.DbNil;
import io.camunda.zeebe.engine.state.instance.JobRecordValue;
import io.camunda.zeebe.engine.state.mutable.MutableJobState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.ProcessingStateExtension;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
Expand Down Expand Up @@ -89,6 +90,26 @@ public void afterCleanupOrphanedBackoffIsDeleted() {
assertThat(backoffColumnFamily.exists(backoffJobKey)).isFalse();
}

// regression test of https://github.com/camunda/zeebe/issues/14329
@Test
public void shouldNotCleanUpFailedJobs() {
// given
final MutableJobState jobState = processingState.getJobState();
final JobRecord record = new JobRecord();
record.setType("test");
jobState.create(jobKey.getValue(), record);
record.setRetries(3);
record.setRetryBackoff(1000);
record.setRecurringTime(System.currentTimeMillis() + 1000);
jobState.fail(jobKey.getValue(), record);

// when
jobBackoffCleanupMigration.runMigration(processingState);

// then
assertThat(backoffColumnFamily.isEmpty()).isFalse();
}

@Test
public void afterCleanupTimeoutWithNonMatchingRetryBackoffIsDeleted() {
// given
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.clustering;

import io.camunda.zeebe.it.util.GrpcClientRule;
import io.netty.util.NetUtil;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public class StateMigrationTest {

private static final DataSize ATOMIX_SEGMENT_SIZE = DataSize.ofMegabytes(2);
private static final Duration SNAPSHOT_PERIOD = Duration.ofMinutes(5);
private final ClusteringRule clusteringRule =
new ClusteringRule(
1,
3,
3,
cfg -> {
cfg.getData().setSnapshotPeriod(SNAPSHOT_PERIOD);
cfg.getData().setLogSegmentSize(ATOMIX_SEGMENT_SIZE);
cfg.getData().setLogIndexDensity(1);
cfg.getNetwork().setMaxMessageSize(ATOMIX_SEGMENT_SIZE);
});
private final GrpcClientRule clientRule =
new GrpcClientRule(
config ->
config
.gatewayAddress(NetUtil.toSocketAddressString(clusteringRule.getGatewayAddress()))
.defaultRequestTimeout(Duration.ofMinutes(1))
.usePlaintext());

@Rule public RuleChain ruleChain = RuleChain.outerRule(clusteringRule).around(clientRule);

// regression test for https://github.com/camunda/zeebe/issues/14329
@Test
public void shouldMakeJobActivatableAfterMigrationAndBackoff() {
// given
final String jobType = "test";
clientRule.createSingleJob(jobType);

final var activateResponse =
clientRule
.getClient()
.newActivateJobsCommand()
.jobType(jobType)
.maxJobsToActivate(1)
.send()
.join();
final var jobKey = activateResponse.getJobs().get(0).getKey();

final Duration backoffTimeout = Duration.ofDays(1);
clientRule
.getClient()
.newFailCommand(jobKey)
.retries(1)
.retryBackoff(backoffTimeout)
.send()
.join();

// when
// we restart the leader - and expect another node takes over
// new leader has to run migration first before starting processing
clusteringRule.restartBroker(clusteringRule.getLeaderForPartition(1).getNodeId());
// increasing time so after job backoff timeout job should be marked activatable again
clusteringRule.getClock().addTime(backoffTimeout.plus(backoffTimeout));

// then
Awaitility.await()
.until(
() ->
clientRule
.getClient()
.newActivateJobsCommand()
.jobType(jobType)
.maxJobsToActivate(1)
.send()
.join(),
r -> !activateResponse.getJobs().isEmpty());
}
}

0 comments on commit 6719061

Please sign in to comment.