Skip to content

Commit

Permalink
merge: #15432
Browse files Browse the repository at this point in the history
15432: [Backport stable/8.3] Do not delete jobs from backoff cf r=Zelldon a=backport-action

# Description
Backport of #15419 to `stable/8.3`.

relates to #14329
original author: `@Zelldon`

Co-authored-by: Christopher Zell <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and ChrisKujawa authored Nov 30, 2023
2 parents e7bbcf2 + f6f2b03 commit b244aeb
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public void cleanupBackoffsWithoutJobs() {
final var jobKey = key.second().inner();
final var backoff = key.first().getValue();
final var job = jobsColumnFamily.get(jobKey);
if (job == null || job.getRecord().getRetryBackoff() != backoff) {
if (job == null || job.getRecord().getRecurringTime() != backoff) {
LOG.debug("Deleting orphaned job with key {}", key);
backoffColumnFamily.deleteExisting(key);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import io.camunda.zeebe.db.impl.DbLong;
import io.camunda.zeebe.db.impl.DbNil;
import io.camunda.zeebe.engine.state.instance.JobRecordValue;
import io.camunda.zeebe.engine.state.mutable.MutableJobState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.util.ProcessingStateExtension;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import java.util.ArrayList;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -59,60 +61,71 @@ public void setup() {
jobKey.wrapLong(1);
}

// regression test of https://github.com/camunda/zeebe/issues/14329
@Test
public void afterCleanupValidTimeoutIsStillPresent() {
public void shoulCleanOrphanBackoffEntries() {
// given
final int deadline = 123;
jobsColumnFamily.upsert(jobKey, createJobRecordValue(deadline));
backoffKey.wrapLong(deadline);
backoffColumnFamily.upsert(backoffJobKey, DbNil.INSTANCE);
final MutableJobState jobState = processingState.getJobState();
final JobRecord record = createJobRecord(1000);
jobState.create(jobKey.getValue(), record);
jobState.fail(jobKey.getValue(), record);
jobsColumnFamily.deleteExisting(jobKey);

// when
jobBackoffCleanupMigration.runMigration(processingState);

// then
assertThat(backoffColumnFamily.exists(backoffJobKey)).isTrue();
assertThat(backoffColumnFamily.isEmpty()).isTrue();
}

// regression test of https://github.com/camunda/zeebe/issues/14329
@Test
public void afterCleanupOrphanedBackoffIsDeleted() {
public void shouldNotCleanUpFailedJobs() {
// given
jobsColumnFamily.upsert(jobKey, new JobRecordValue());
backoffKey.wrapLong(123);
backoffColumnFamily.upsert(backoffJobKey, DbNil.INSTANCE);
jobsColumnFamily.deleteExisting(jobKey);
final MutableJobState jobState = processingState.getJobState();
final JobRecord record = createJobRecord(1000);
jobState.create(jobKey.getValue(), record);
jobState.fail(jobKey.getValue(), record);

// when
jobBackoffCleanupMigration.runMigration(processingState);

// then
assertThat(backoffColumnFamily.exists(backoffJobKey)).isFalse();
assertThat(backoffColumnFamily.isEmpty()).isFalse();
}

// regression test of https://github.com/camunda/zeebe/issues/14329
@Test
public void afterCleanupTimeoutWithNonMatchingRetryBackoffIsDeleted() {
public void shoulCleanDuplicatedBackoffEntries() {
// given
final int firstRetryBackoff = 123;
final int secondRetryBackoff = 456;
jobsColumnFamily.upsert(jobKey, createJobRecordValue(secondRetryBackoff));
backoffKey.wrapLong(firstRetryBackoff);
backoffColumnFamily.upsert(backoffJobKey, DbNil.INSTANCE);
backoffKey.wrapLong(secondRetryBackoff);
backoffColumnFamily.upsert(backoffJobKey, DbNil.INSTANCE);
final MutableJobState jobState = processingState.getJobState();
final JobRecord record = createJobRecord(1000);
jobState.create(jobKey.getValue(), record);
jobState.fail(jobKey.getValue(), record);

// second fail will cause duplicate entry and orphan the first backoff
record.setRecurringTime(System.currentTimeMillis() + 1001);
jobState.fail(jobKey.getValue(), record);

// when
jobBackoffCleanupMigration.runMigration(processingState);

// then
backoffKey.wrapLong(firstRetryBackoff);
assertThat(backoffColumnFamily.exists(backoffJobKey)).isFalse();
backoffKey.wrapLong(secondRetryBackoff);
assertThat(backoffColumnFamily.exists(backoffJobKey)).isTrue();
assertThat(backoffColumnFamily.isEmpty()).isFalse();
final var keys = new ArrayList<DbCompositeKey<DbLong, DbForeignKey<DbLong>>>();
backoffColumnFamily.forEach((k, v) -> keys.add(k));
assertThat(keys).hasSize(1);
assertThat(keys)
.extracting(DbCompositeKey::second)
.extracting(DbForeignKey::inner)
.contains(jobKey);
}

private static JobRecordValue createJobRecordValue(final long retryBackoff) {
final JobRecordValue jobRecordValue = new JobRecordValue();
jobRecordValue.setRecordWithoutVariables(new JobRecord().setRetryBackoff(retryBackoff));
return jobRecordValue;
private static JobRecord createJobRecord(final long retryBackoff) {
return new JobRecord()
.setType("test")
.setRetries(3)
.setRetryBackoff(retryBackoff)
.setRecurringTime(System.currentTimeMillis() + retryBackoff);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.clustering;

import io.camunda.zeebe.it.util.GrpcClientRule;
import io.netty.util.NetUtil;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public class StateMigrationTest {

private static final DataSize ATOMIX_SEGMENT_SIZE = DataSize.ofMegabytes(2);
private static final Duration SNAPSHOT_PERIOD = Duration.ofMinutes(5);
private final ClusteringRule clusteringRule =
new ClusteringRule(
1,
3,
3,
cfg -> {
cfg.getData().setSnapshotPeriod(SNAPSHOT_PERIOD);
cfg.getData().setLogSegmentSize(ATOMIX_SEGMENT_SIZE);
cfg.getData().setLogIndexDensity(1);
cfg.getNetwork().setMaxMessageSize(ATOMIX_SEGMENT_SIZE);
});
private final GrpcClientRule clientRule =
new GrpcClientRule(
config ->
config
.gatewayAddress(NetUtil.toSocketAddressString(clusteringRule.getGatewayAddress()))
.defaultRequestTimeout(Duration.ofMinutes(1))
.usePlaintext());

@Rule public RuleChain ruleChain = RuleChain.outerRule(clusteringRule).around(clientRule);

// regression test for https://github.com/camunda/zeebe/issues/14329
@Test
public void shouldMakeJobActivatableAfterMigrationAndBackoff() {
// given
final String jobType = "test";
clientRule.createSingleJob(jobType);

final var activateResponse =
clientRule
.getClient()
.newActivateJobsCommand()
.jobType(jobType)
.maxJobsToActivate(1)
.send()
.join();
final var jobKey = activateResponse.getJobs().get(0).getKey();

final Duration backoffTimeout = Duration.ofDays(1);
clientRule
.getClient()
.newFailCommand(jobKey)
.retries(1)
.retryBackoff(backoffTimeout)
.send()
.join();

// when
// we restart the leader - and expect another node takes over
// new leader has to run migration first before starting processing
clusteringRule.restartBroker(clusteringRule.getLeaderForPartition(1).getNodeId());
// increasing time so after job backoff timeout job should be marked activatable again
clusteringRule.getClock().addTime(backoffTimeout.plus(backoffTimeout));

// then
Awaitility.await()
.until(
() ->
clientRule
.getClient()
.newActivateJobsCommand()
.jobType(jobType)
.maxJobsToActivate(1)
.send()
.join(),
r -> !activateResponse.getJobs().isEmpty());
}
}

0 comments on commit b244aeb

Please sign in to comment.