Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable tx isolation level issue1000 #1008

Merged
merged 4 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/src/main/java/com/arcadedb/database/BasicDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public interface BasicDatabase extends AutoCloseable {
*/
void begin();

/**
* Begins a new transaction specifying the isolation level. If a transaction is already begun, the current transaction is parked and a new sub-transaction is
* begun. The new sub-transaction does not access to the content of the previous transaction. Sub transactions are totally isolated.
*
* @param isolationLevel Isolation level between the following: READ_COMMITTED, REPEATABLE_READS, SERIALIZABLE
*/
public void begin(Database.TRANSACTION_ISOLATION_LEVEL isolationLevel);

/**
* Commits the current transaction. If it was a sub-transaction, then the previous in the stack becomes active again.
*/
Expand Down
31 changes: 31 additions & 0 deletions engine/src/main/java/com/arcadedb/database/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import java.util.concurrent.*;

public interface Database extends BasicDatabase {
enum TRANSACTION_ISOLATION_LEVEL {
READ_COMMITTED, REPEATABLE_READS
}

ContextConfiguration getConfiguration();

PaginatedFile.MODE getMode();
Expand Down Expand Up @@ -274,6 +278,33 @@ Edge newEdgeByKeys(Vertex sourceVertex, String destinationVertexType, String[] d
*/
Database setReadYourWrites(boolean value);

/**
* Sets the transaction isolation level between the available ones:
* <ul>
* <li><b>READ_COMMITTED</b></li>
* <li><b>REPEATABLE_READS</b></li>
* <li><b>SERIALIZABLE</b></li>
* </ul>
*
* @param level The isolation level
*
* @return Current Database instance to execute setter methods in chain.
*/
Database setTransactionIsolationLevel(TRANSACTION_ISOLATION_LEVEL level);

/**
* returns the transaction isolation level between the available ones:
* <ul>
* <li><b>READ_COMMITTED</b></li>
* <li><b>REPEATABLE_READS</b></li>
* <li><b>SERIALIZABLE</b></li>
* </ul>
*
* @return Current isolation level.
*/

TRANSACTION_ISOLATION_LEVEL getTransactionIsolationLevel();

/**
* Returns the current default edge list initial size to hold and store edges.
*/
Expand Down
19 changes: 18 additions & 1 deletion engine/src/main/java/com/arcadedb/database/EmbeddedDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public class EmbeddedDatabase extends RWLockContext implements DatabaseInternal
private FileLock lockFileLock;
private final RecordEventsRegistry events = new RecordEventsRegistry();
private final ConcurrentHashMap<String, QueryEngine> reusableQueryEngines = new ConcurrentHashMap<>();
private TRANSACTION_ISOLATION_LEVEL transactionIsolationLevel = TRANSACTION_ISOLATION_LEVEL.READ_COMMITTED;

protected EmbeddedDatabase(final String path, final PaginatedFile.MODE mode, final ContextConfiguration configuration, final SecurityManager security,
final Map<CALLBACK_EVENT, List<Callable<Void>>> callbacks) {
Expand Down Expand Up @@ -322,6 +323,11 @@ public TransactionContext getTransaction() {

@Override
public void begin() {
begin(transactionIsolationLevel);
}

@Override
public void begin(final TRANSACTION_ISOLATION_LEVEL isolationLevel) {
executeInReadLock(() -> {
checkDatabaseIsOpen();

Expand All @@ -334,7 +340,7 @@ public void begin() {
current.pushTransaction(tx);
}

tx.begin();
tx.begin(isolationLevel);

return null;
});
Expand Down Expand Up @@ -712,6 +718,17 @@ public Database setReadYourWrites(final boolean readYourWrites) {
return this;
}

@Override
public TRANSACTION_ISOLATION_LEVEL getTransactionIsolationLevel() {
return transactionIsolationLevel;
}

@Override
public Database setTransactionIsolationLevel(final TRANSACTION_ISOLATION_LEVEL level) {
transactionIsolationLevel = level;
return this;
}

@Override
public EmbeddedDatabase setUseWAL(final boolean useWAL) {
getTransaction().setUseWAL(useWAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.arcadedb.engine.WALFile;

public interface Transaction {
void begin();
void begin(Database.TRANSACTION_ISOLATION_LEVEL isolationLevel);

Binary commit();

Expand Down
70 changes: 51 additions & 19 deletions engine/src/main/java/com/arcadedb/database/TransactionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,25 @@
* txId:long|pages:int|&lt;segmentSize:int|fileId:int|pageNumber:long|pageModifiedFrom:int|pageModifiedTo:int|&lt;prevContent&gt;&lt;newContent&gt;segmentSize:int&gt;MagicNumber:long
*/
public class TransactionContext implements Transaction {
private final DatabaseInternal database;
private final Map<Integer, Integer> newPageCounters = new HashMap<>();
private final Map<RID, Record> immutableRecordsCache = new HashMap<>(1024);
private final Map<RID, Record> modifiedRecordsCache = new HashMap<>(1024);
private final TransactionIndexContext indexChanges;
private Map<PageId, MutablePage> modifiedPages;
private Map<PageId, MutablePage> newPages;
private boolean useWAL;
private boolean asyncFlush = true;
private WALFile.FLUSH_TYPE walFlush;
private List<Integer> lockedFiles;
private long txId = -1;
private STATUS status = STATUS.INACTIVE;
private final DatabaseInternal database;
private final Map<Integer, Integer> newPageCounters = new HashMap<>();
private final Map<RID, Record> immutableRecordsCache = new HashMap<>(1024);
private final Map<RID, Record> modifiedRecordsCache = new HashMap<>(1024);
private final TransactionIndexContext indexChanges;
private final Map<PageId, ImmutablePage> immutablePages = new HashMap<>(64);
private Map<PageId, MutablePage> modifiedPages;
private Map<PageId, MutablePage> newPages;
private boolean useWAL;
private boolean asyncFlush = true;
private WALFile.FLUSH_TYPE walFlush;
private List<Integer> lockedFiles;
private long txId = -1;
private STATUS status = STATUS.INACTIVE;
// KEEPS TRACK OF MODIFIED RECORD IN TX. AT 1ST PHASE COMMIT TIME THE RECORD ARE SERIALIZED AND INDEXES UPDATED. THIS DEFERRING IMPROVES SPEED ESPECIALLY
// WITH GRAPHS WHERE EDGES ARE CREATED AND CHUNKS ARE UPDATED MULTIPLE TIMES IN THE SAME TX
// TODO: OPTIMIZE modifiedRecordsCache STRUCTURE, MAYBE JOIN IT WITH UPDATED RECORDS?
private Map<RID, Record> updatedRecords = null;
private Map<RID, Record> updatedRecords = null;
private Database.TRANSACTION_ISOLATION_LEVEL isolationLevel = Database.TRANSACTION_ISOLATION_LEVEL.READ_COMMITTED;

public enum STATUS {INACTIVE, BEGUN, COMMIT_1ST_PHASE, COMMIT_2ND_PHASE}

Expand All @@ -90,7 +92,9 @@ public TransactionContext(final DatabaseInternal database) {
}

@Override
public void begin() {
public void begin(final Database.TRANSACTION_ISOLATION_LEVEL isolationLevel) {
this.isolationLevel = isolationLevel;

if (status != STATUS.INACTIVE)
throw new TransactionException("Transaction already begun");

Expand Down Expand Up @@ -255,9 +259,27 @@ public BasePage getPage(final PageId pageId, final int size) throws IOException
page = newPages.get(pageId);

if (page == null)
page = immutablePages.get(pageId);

if (page == null) {
// NOT FOUND, DELEGATES TO THE DATABASE
page = database.getPageManager().getPage(pageId, size, false, true);

if (page != null) {
switch (isolationLevel) {
case READ_COMMITTED:
break;
case REPEATABLE_READS:
final PaginatedFile file = database.getFileManager().getFile(pageId.getFileId());
final boolean isNewPage = pageId.getPageNumber() >= file.getTotalPages();
if (!isNewPage)
// CACHE THE IMMUTABLE PAGE ONLY IF IT IS NOT NEW
immutablePages.put(pageId, (ImmutablePage) page);
break;
}
}
}

return page;
}

Expand All @@ -274,8 +296,12 @@ public MutablePage getPageToModify(final PageId pageId, final int size, final bo
page = newPages.get(pageId);

if (page == null) {
// NOT FOUND, DELEGATES TO THE DATABASE
final ImmutablePage loadedPage = database.getPageManager().getPage(pageId, size, isNew, true);
// IF AVAILABLE REMOVE THE PAGE FROM IMMUTABLE PAGES TO KEEP ONLY ONE PAGE IN RAM
ImmutablePage loadedPage = immutablePages.remove(pageId);
if (loadedPage == null)
// NOT FOUND, DELEGATES TO THE DATABASE
loadedPage = database.getPageManager().getPage(pageId, size, isNew, true);

if (loadedPage != null) {
final MutablePage mutablePage = loadedPage.modify();
if (isNew)
Expand All @@ -300,9 +326,11 @@ public MutablePage getPageToModify(final BasePage page) throws IOException {

final PageId pageId = page.getPageId();
if (newPages.containsKey(pageId))
newPages.put(mutablePage.getPageId(), mutablePage);
newPages.put(pageId, mutablePage);
else
modifiedPages.put(mutablePage.getPageId(), mutablePage);
modifiedPages.put(pageId, mutablePage);

immutablePages.remove(pageId);

return mutablePage;
}
Expand Down Expand Up @@ -378,6 +406,7 @@ public void kill() {
newPages = null;
updatedRecords = null;
newPageCounters.clear();
immutablePages.clear();
}

/**
Expand Down Expand Up @@ -615,6 +644,7 @@ public void reset() {
newPageCounters.clear();
modifiedRecordsCache.clear();
immutableRecordsCache.clear();
immutablePages.clear();
txId = -1;
}

Expand All @@ -627,6 +657,8 @@ public void removePagesOfFile(final int fileId) {
if (modifiedPages != null)
modifiedPages.values().removeIf(mutablePage -> fileId == mutablePage.getPageId().getFileId());

immutablePages.values().removeIf(page -> fileId == page.getPageId().getFileId());

// IMMUTABLE RECORD, AVOID IT'S POINTING TO THE OLD OFFSET IN A MODIFIED PAGE
// SAME PAGE, REMOVE IT
immutableRecordsCache.values().removeIf(r -> r.getIdentity().getBucketId() == fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void run() {
DatabaseContext.INSTANCE.getContext(database.getDatabasePath()).asyncMode = true;
database.getTransaction().setUseWAL(transactionUseWAL);
database.setWALFlush(transactionSync);
database.getTransaction().begin();
database.getTransaction().begin(Database.TRANSACTION_ISOLATION_LEVEL.READ_COMMITTED); // FORCE THE LOWEST LEVEL OF ISOLATION

while (!forceShutdown) {
try {
Expand Down
21 changes: 11 additions & 10 deletions engine/src/test/java/com/arcadedb/AsyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ public void testScan() {

@Test
public void testSyncScanAndAsyncUpdate() {
final AtomicLong callbackInvoked = new AtomicLong();
final AtomicLong updatedRecords = new AtomicLong();

database.begin();
try {
final AtomicLong callbackInvoked = new AtomicLong();
final AtomicLong updatedRecords = new AtomicLong();

database.scanType(TYPE_NAME, true, record -> {
callbackInvoked.incrementAndGet();
Expand All @@ -73,17 +74,17 @@ public void testSyncScanAndAsyncUpdate() {

database.async().waitCompletion();

Assertions.assertEquals(TOT, callbackInvoked.get());
Assertions.assertEquals(TOT, updatedRecords.get());

final ResultSet resultSet = database.query("sql", "select count(*) as count from " + TYPE_NAME + " where updated = true");

Assertions.assertTrue(resultSet.hasNext());
Assertions.assertEquals(TOT, ((Number) resultSet.next().getProperty("count")).intValue());

} finally {
database.commit();
}

Assertions.assertEquals(TOT, callbackInvoked.get());
Assertions.assertEquals(TOT, updatedRecords.get());

final ResultSet resultSet = database.query("sql", "select count(*) as count from " + TYPE_NAME + " where updated = true");

Assertions.assertTrue(resultSet.hasNext());
Assertions.assertEquals(TOT, ((Number) resultSet.next().getProperty("count")).intValue());
}

@Test
Expand Down
Loading