Skip to content

Commit

Permalink
test: verify column family 50 moves
Browse files Browse the repository at this point in the history
  • Loading branch information
korthout committed Jan 25, 2024
1 parent 26cb24b commit 9c281e5
Showing 1 changed file with 214 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
import io.camunda.zeebe.db.impl.DbNil;
import io.camunda.zeebe.db.impl.DbString;
import io.camunda.zeebe.engine.state.message.DbMessageState;
import io.camunda.zeebe.engine.state.migration.MigrationTaskState;
import io.camunda.zeebe.engine.state.migration.MigrationTaskState.State;
import io.camunda.zeebe.engine.state.migration.to_8_4.corrections.ColumnFamily48Corrector;
import io.camunda.zeebe.engine.state.migration.to_8_4.corrections.ColumnFamily49Corrector;
import io.camunda.zeebe.engine.state.migration.to_8_4.corrections.ColumnFamily50Corrector;
import io.camunda.zeebe.engine.state.mutable.MutableProcessingState;
import io.camunda.zeebe.engine.state.signal.SignalSubscription;
import io.camunda.zeebe.engine.util.ProcessingStateExtension;
import io.camunda.zeebe.protocol.ZbColumnFamilies;
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalSubscriptionRecord;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
Expand All @@ -30,6 +37,9 @@
@SuppressWarnings("deprecation") // we need to use deprecated column families
public class ColumnFamilyPrefixCorrectionMigrationTest {

public static final String EXAMPLE_IDENTIFIER =
new ColumnFamilyPrefixCorrectionMigration().getIdentifier();

/**
* Test correction from DEPRECATED_DMN_DECISION_KEY_BY_DECISION_ID_AND_VERSION -> MESSAGE_STATS
*/
Expand Down Expand Up @@ -290,4 +300,208 @@ void shouldIgnoreProcessInstanceKeyByDefinitionKeyEntries() {
.isTrue();
}
}

/** Test correction from DEPRECATED_SIGNAL_SUBSCRIPTION_BY_NAME_AND_KEY -> MIGRATIONS_STATE */
@Nested
@ExtendWith(ProcessingStateExtension.class)
class ColumnFamily50CorrectorTest {
private ZeebeDb<ZbColumnFamilies> zeebeDb;
private MutableProcessingState processingState;
private TransactionContext transactionContext;

private ColumnFamily50Corrector sut;

private DbString migrationIdentifier;
private MigrationTaskState migrationTaskState;

private ColumnFamily<DbString, MigrationTaskState> wrongMigrationStateColumnFamily;

private ColumnFamily<DbString, MigrationTaskState> correctMigrationStateColumnFamily;

private DbLong subscriptionKey;
private DbString signalName;
private DbCompositeKey<DbString, DbLong> signalNameAndSubscriptionKey;
private SignalSubscription signalSubscription;

private ColumnFamily<DbCompositeKey<DbString, DbLong>, SignalSubscription>
correctSignalSubscriptionColumnFamily;

@BeforeEach
void setup() {
sut = new ColumnFamily50Corrector(zeebeDb, transactionContext);

migrationIdentifier = new DbString();
migrationTaskState = new MigrationTaskState();
wrongMigrationStateColumnFamily =
zeebeDb.createColumnFamily(
ZbColumnFamilies.DEPRECATED_SIGNAL_SUBSCRIPTION_BY_NAME_AND_KEY,
transactionContext,
migrationIdentifier,
migrationTaskState);

correctMigrationStateColumnFamily =
zeebeDb.createColumnFamily(
ZbColumnFamilies.MIGRATIONS_STATE,
transactionContext,
migrationIdentifier,
migrationTaskState);

subscriptionKey = new DbLong();
signalName = new DbString();
signalNameAndSubscriptionKey = new DbCompositeKey<>(signalName, subscriptionKey);
signalSubscription = new SignalSubscription();

correctSignalSubscriptionColumnFamily =
zeebeDb.createColumnFamily(
ZbColumnFamilies.DEPRECATED_SIGNAL_SUBSCRIPTION_BY_NAME_AND_KEY,
transactionContext,
signalNameAndSubscriptionKey,
signalSubscription);
}

@Test
void shouldMoveMigrationStateToCorrectColumnFamily() {
// given
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.FINISHED);
wrongMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

// when
sut.correctColumnFamilyPrefix();

// then
Assertions.assertThat(wrongMigrationStateColumnFamily.isEmpty()).isTrue();
Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.FINISHED);
}

@Test
void shouldMergeWithCorrectMigrationStateOverwritingWhenFinished() {
// given
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.FINISHED);
wrongMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.NOT_STARTED);
correctMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

// when
sut.correctColumnFamilyPrefix();

// then
Assertions.assertThat(wrongMigrationStateColumnFamily.isEmpty()).isTrue();
Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.FINISHED);
}

@Test
void shouldMergeWithCorrectMigrationStateNotOverwritingAlreadyFinishedState() {
// given
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.NOT_STARTED);
wrongMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.FINISHED);
correctMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

// when
sut.correctColumnFamilyPrefix();

// then
Assertions.assertThat(wrongMigrationStateColumnFamily.isEmpty()).isTrue();
Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.FINISHED);
}

@Test
void shouldMergeWithCorrectMigrationStateNotOverwritingDifferentIdentifiers() {
// given
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.NOT_STARTED);
wrongMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER + "2");
migrationTaskState.setState(State.FINISHED);
correctMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

// when
sut.correctColumnFamilyPrefix();

// then
Assertions.assertThat(wrongMigrationStateColumnFamily.isEmpty()).isTrue();
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.NOT_STARTED);
migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER + "2");
Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.FINISHED);
}

@Test
void shouldIgnoreSignalSubscriptionEntries() {
// given
signalName.wrapString("signal");
subscriptionKey.wrapLong(123);
signalSubscription
.setKey(123)
.setRecord(new SignalSubscriptionRecord().setSignalName(BufferUtil.wrapString("signal")));
correctSignalSubscriptionColumnFamily.insert(
signalNameAndSubscriptionKey, signalSubscription);

migrationIdentifier.wrapString(EXAMPLE_IDENTIFIER);
migrationTaskState.setState(State.FINISHED);
wrongMigrationStateColumnFamily.insert(migrationIdentifier, migrationTaskState);

signalName.wrapString("signal2");
subscriptionKey.wrapLong(234);
signalSubscription
.setKey(234)
.setRecord(
new SignalSubscriptionRecord().setSignalName(BufferUtil.wrapString("signal2")));
correctSignalSubscriptionColumnFamily.insert(
signalNameAndSubscriptionKey, signalSubscription);

Assertions.assertThat(correctSignalSubscriptionColumnFamily.count()).isEqualTo(3);

// when
sut.correctColumnFamilyPrefix();

// then
// we can no longer use wrongMigrationStateColumnFamily.isEmpty() as there are entries in
// there
// just no longer migration state entries, but we can simply count the entries
Assertions.assertThat(correctSignalSubscriptionColumnFamily.count()).isEqualTo(2);

signalName.wrapString("signal");
subscriptionKey.wrapLong(123);
Assertions.assertThat(correctSignalSubscriptionColumnFamily.get(signalNameAndSubscriptionKey))
.isNotNull()
.extracting(SignalSubscription::getKey, s -> s.getRecord().getSignalName())
.isEqualTo(List.of(123L, "signal"));

signalName.wrapString("signal2");
subscriptionKey.wrapLong(234);
Assertions.assertThat(correctSignalSubscriptionColumnFamily.get(signalNameAndSubscriptionKey))
.isNotNull()
.extracting(SignalSubscription::getKey, s -> s.getRecord().getSignalName())
.isEqualTo(List.of(234L, "signal2"));

Assertions.assertThat(correctMigrationStateColumnFamily.get(migrationIdentifier))
.isNotNull()
.extracting(MigrationTaskState::getState)
.isEqualTo(State.FINISHED);
}
}
}

0 comments on commit 9c281e5

Please sign in to comment.