Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Zeebe to version 8.0.0-rc1 #264

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
*/
package io.camunda.zeebe.process.test.engine.db;

import io.camunda.zeebe.db.*;
import io.camunda.zeebe.db.ColumnFamily;
import io.camunda.zeebe.db.DbKey;
import io.camunda.zeebe.db.DbValue;
import io.camunda.zeebe.db.KeyValuePairVisitor;
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDbInconsistentException;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -50,7 +55,37 @@ private void ensureInOpenTransaction(
}

@Override
public void put(final KeyType key, final ValueType value) {
public void insert(final KeyType key, final ValueType value) {
ensureInOpenTransaction(
context,
state -> {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
if (state.contains(fullyQualifiedKey)) {
throw new ZeebeDbInconsistentException(
"Key " + keyInstance + " in ColumnFamily " + columnFamily + " already exists");
} else {
state.put(fullyQualifiedKey, value);
}
});
}

@Override
public void update(final KeyType key, final ValueType value) {
ensureInOpenTransaction(
context,
state -> {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
if (state.contains(fullyQualifiedKey)) {
state.put(fullyQualifiedKey, value);
} else {
throw new ZeebeDbInconsistentException(
"Key " + keyInstance + " in ColumnFamily " + columnFamily + " does not exist");
}
});
}

@Override
public void upsert(final KeyType key, final ValueType value) {
ensureInOpenTransaction(
context, state -> state.put(new FullyQualifiedKey(columnFamily, key), value));
}
Expand All @@ -71,17 +106,6 @@ public ValueType get(final KeyType key) {
return null;
}

private DirectBuffer getValue(final InMemoryDbState state, final DbKey key) {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
final byte[] value = state.get(fullyQualifiedKey);

if (value != null) {
return BufferUtil.wrapArray(value);
} else {
return null;
}
}

@Override
public void forEach(final Consumer<ValueType> consumer) {
forEach(context, (key, value) -> consumer.accept(value));
Expand All @@ -92,21 +116,11 @@ public void forEach(final BiConsumer<KeyType, ValueType> consumer) {
forEach(context, consumer);
}

private void forEach(
final TransactionContext context, final BiConsumer<KeyType, ValueType> consumer) {
whileEqualPrefix(context, keyInstance, valueInstance, consumer);
}

@Override
public void whileTrue(final KeyValuePairVisitor<KeyType, ValueType> visitor) {
whileTrue(context, visitor);
}

private void whileTrue(
final TransactionContext context, final KeyValuePairVisitor<KeyType, ValueType> visitor) {
whileEqualPrefix(context, DbNullKey.INSTANCE, keyInstance, valueInstance, visitor);
}

@Override
public void whileEqualPrefix(
final DbKey keyPrefix, final BiConsumer<KeyType, ValueType> visitor) {
Expand All @@ -119,6 +133,74 @@ public void whileEqualPrefix(
whileEqualPrefix(context, keyPrefix, keyInstance, valueInstance, visitor);
}

@Override
public void deleteExisting(final KeyType key) {
ensureInOpenTransaction(
context,
state -> {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
if (state.contains(fullyQualifiedKey)) {
state.delete(fullyQualifiedKey);
} else {
throw new ZeebeDbInconsistentException(
"Key " + keyInstance + " in ColumnFamily " + columnFamily + " does not exist");
}
});
}

@Override
public void deleteIfExists(final KeyType key) {
ensureInOpenTransaction(
context,
state -> {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
state.delete(fullyQualifiedKey);
});
}

@Override
public boolean exists(final KeyType key) {
final AtomicBoolean exists = new AtomicBoolean(true);

ensureInOpenTransaction(context, state -> exists.set(getValue(state, key) != null));

return exists.get();
}

@Override
public boolean isEmpty() {
final AtomicBoolean isEmpty = new AtomicBoolean(true);
whileEqualPrefix(
DbNullKey.INSTANCE,
(key, value) -> {
isEmpty.set(false);
return false;
});

return isEmpty.get();
}

private DirectBuffer getValue(final InMemoryDbState state, final DbKey key) {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
final byte[] value = state.get(fullyQualifiedKey);

if (value != null) {
return BufferUtil.wrapArray(value);
} else {
return null;
}
}

private void forEach(
final TransactionContext context, final BiConsumer<KeyType, ValueType> consumer) {
whileEqualPrefix(context, keyInstance, valueInstance, consumer);
}

private void whileTrue(
final TransactionContext context, final KeyValuePairVisitor<KeyType, ValueType> visitor) {
whileEqualPrefix(context, DbNullKey.INSTANCE, keyInstance, valueInstance, visitor);
}

private void whileEqualPrefix(
final TransactionContext context,
final KeyType keyInstance,
Expand Down Expand Up @@ -171,34 +253,6 @@ private void whileEqualPrefix(
}));
}

@Override
public void delete(final KeyType key) {
final FullyQualifiedKey fullyQualifiedKey = new FullyQualifiedKey(columnFamily, key);
ensureInOpenTransaction(context, state -> state.delete(fullyQualifiedKey));
}

@Override
public boolean exists(final KeyType key) {
final AtomicBoolean exists = new AtomicBoolean(true);

ensureInOpenTransaction(context, state -> exists.set(getValue(state, key) != null));

return exists.get();
}

@Override
public boolean isEmpty() {
final AtomicBoolean isEmpty = new AtomicBoolean(true);
whileEqualPrefix(
DbNullKey.INSTANCE,
(key, value) -> {
isEmpty.set(false);
return false;
});

return isEmpty.get();
}

private static final class Visitor<KeyType extends DbKey, ValueType extends DbValue>
implements KeyValuePairVisitor<KeyType, ValueType> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ interface InMemoryDbState {
void delete(FullyQualifiedKey fullyQualifiedKey);

InMemoryDbIterator newIterator();

boolean contains(FullyQualifiedKey fullyQualifiedKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,10 @@ public InMemoryDbIterator newIterator() {

return new InMemoryDbIterator(snapshot);
}

@Override
public boolean contains(final FullyQualifiedKey fullyQualifiedKey) {
final Bytes keyBytes = fullyQualifiedKey.getKeyBytes();
return transactionCache.containsKey(keyBytes) || database.containsKey(keyBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.TransactionOperation;
import io.camunda.zeebe.db.ZeebeDbInconsistentException;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import java.util.TreeMap;

Expand All @@ -27,6 +28,8 @@ public void runInTransaction(final TransactionOperation operations) {
} else {
runInNewTransaction(operations);
}
} catch (final ZeebeDbInconsistentException e) {
throw e;
} catch (final Exception e) {
throw new RuntimeException(
"Unexpected error occurred during zeebe db transaction operation.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void shouldPutValue() {
value.wrapString("baring");

// when
columnFamily.put(key, value);
columnFamily.upsert(key, value);
value.wrapString("yes");

// then
Expand Down Expand Up @@ -223,6 +223,6 @@ void shouldThrowExceptionOnMultipleNestedWhileEqualPrefix() {
private void putKeyValuePair(final String key, final String value) {
this.key.wrapString(key);
this.value.wrapString(value);
columnFamily.put(this.key, this.value);
columnFamily.upsert(this.key, this.value);
}
}
Loading