Skip to content

Commit

Permalink
Merge pull request #28903 from yrodiere/i28579-quarkustransaction-runner
Browse files Browse the repository at this point in the history
Introduce QuarkusTransaction#runner
  • Loading branch information
yrodiere authored Dec 21, 2022
2 parents 2bb893b + 00c4167 commit 07291d9
Show file tree
Hide file tree
Showing 12 changed files with 778 additions and 55 deletions.
55 changes: 30 additions & 25 deletions docs/src/main/asciidoc/transaction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,47 +175,53 @@ public class TransactionExample {
QuarkusTransaction.rollback();
}
public void lambdaExample() {
QuarkusTransaction.run(() -> {
public void runnerExample() {
QuarkusTransaction.requiringNew().run(() -> {
//do work
});
QuarkusTransaction.joiningExisting().run(() -> {
//do work
});
int result = QuarkusTransaction.call(QuarkusTransaction.runOptions()
int result = QuarkusTransaction.requiringNew()
.timeout(10)
.exceptionHandler((throwable) -> {
if (throwable instanceof SomeException) {
return RunOptions.ExceptionResult.COMMIT;
}
return RunOptions.ExceptionResult.ROLLBACK;
})
.semantic(RunOptions.Semantic.REQUIRE_NEW), () -> {
//do work
return 0;
});
.call(() -> {
//do work
return 0;
});
}
}
----

The above example shows a few different ways the API can be used. The first method simply calls begin, does some work and commits it.
The above example shows a few different ways the API can be used.

The first method simply calls begin, does some work and commits it.
This created transaction is tied to the CDI request scope, so if it is still active when the request scope is destroyed then it will
be automatically rolled back. This removes the need to explicitly catch exceptions and call `rollback`, and acts as a safety net
against inadvertent transaction leaks, however it does mean that this can only be used when the request scope is active. The second
example in the method calls begin with a timeout option, and then rolls back the transaction.

The second example shows the use of lambda scoped transactions, the first just runs a `Runnable` within a transaction, the second,
runs `Callable` with some specific options. In particular the `exceptionHandler` method can be used to control if the transaction
is rolled back or not on exception, and the `semantic` method controls the behaviour if an existing transaction is already started.
The second method shows the use of lambda scoped transactions with `QuarkusTransaction.runner(...)`;
the first example just runs a `Runnable` within a new transaction,
the second does the same but joining the existing transaction (if any),
and the third calls a `Callable` with some specific options.
In particular the `exceptionHandler` method can be used to control if the transaction is rolled back or not on exception.

The following semantics are supported:


DISALLOW_EXISTING::
`QuarkusTransaction.disallowingExisting()`/`DISALLOW_EXISTING`::

If a transaction is already associated with the current thread a `QuarkusTransactionException` will be thrown,
otherwise a new transaction is started, and follows all the normal lifecycle rules.

JOIN_EXISTING::
`QuarkusTransaction.joiningExisting()`/`JOIN_EXISTING`::

If no transaction is active then a new transaction will be started, and committed when the method ends.
If an exception is thrown the exception handler registered by `#exceptionHandler(Function)` will be called to
Expand All @@ -225,21 +231,20 @@ exception is thrown the exception handler will be called, however
a result of `ExceptionResult#ROLLBACK` will result in the TX marked as rollback only, while a result of
`ExceptionResult#COMMIT` will result in no action being taken.

REQUIRE_NEW::
`QuarkusTransaction.requiringNew()`/`REQUIRE_NEW`::

This is the default semantic.
If an existing transaction is already associated with the current thread then the transaction is suspended, and
resumed once
the current transaction is complete.
A new transaction is started after the existing transaction is suspended, and follows all the normal lifecycle rules.
If an existing transaction is already associated with the current thread then the transaction is suspended,
then a new transaction is started which follows all the normal lifecycle rules,
and when it's complete the original transaction is resumed.
Otherwise, a new transaction is started, and follows all the normal lifecycle rules.

SUSPEND_EXISTING::
`QuarkusTransaction.suspendingExisting()`/`SUSPEND_EXISTING`::

If no transaction is active then this semantic is basically a no-op.
If no transaction is active then these semantics are basically a no-op.
If a transaction is active then it is suspended, and resumed after the task is run.
The exception handler will never be consulted when this semantic is in use, specifying both an exception handler and
this semantic is considered an error.
This semantic allows for code to easily be run outside the scope of a transaction.
The exception handler will never be consulted when these semantics are in use, specifying both an exception handler and
these semantics are considered an error.
These semantics allows for code to easily be run outside the scope of a transaction.



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testPropertiesPropagatedToRuntimeInit() {
public void testInsertsOrdered() {
var listener = new BatchCountSpyingEventListener();

QuarkusTransaction.run(() -> {
QuarkusTransaction.requiringNew().run(() -> {
em.unwrap(Session.class).addEventListeners(listener);

ParentEntity parent1 = new ParentEntity(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package io.quarkus.narayana.quarkus;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import javax.enterprise.context.control.ActivateRequestContext;
import javax.inject.Inject;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.narayana.jta.QuarkusTransaction;
import io.quarkus.narayana.jta.QuarkusTransactionException;
import io.quarkus.narayana.jta.TransactionExceptionResult;
import io.quarkus.test.QuarkusUnitTest;

public class TransactionRunnerTest {

@Inject
TransactionManager transactionManager;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));

@Test
public void commit() {
var sync = new TestSync();
QuarkusTransaction.requiringNew().run(() -> register(sync));
assertEquals(Status.STATUS_COMMITTED, sync.completionStatus);

assertEquals(Status.STATUS_COMMITTED,
QuarkusTransaction.requiringNew().call(this::register).completionStatus);
}

@Test
public void rollback() {
var sync1 = new TestSync();
assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.requiringNew().run(() -> {
register(sync1);
QuarkusTransaction.rollback();
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync1.completionStatus);

var sync2 = new TestSync();
assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.requiringNew().call(() -> {
register(sync2);
QuarkusTransaction.rollback();
return null;
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync2.completionStatus);
}

@Test
public void rollbackOnly() {
var sync1 = new TestSync();
assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.requiringNew().run(() -> {
register(sync1);
QuarkusTransaction.setRollbackOnly();
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync1.completionStatus);

var sync2 = new TestSync();
assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.requiringNew().call(() -> {
register(sync2);
QuarkusTransaction.setRollbackOnly();
return null;
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync2.completionStatus);
}

@Test
public void timeout() {
var sync1 = new TestSync();
assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.requiringNew()
.timeout(1)
.run(() -> {
register(sync1);
try {
Thread.sleep(1200);
} catch (InterruptedException e) {
fail("Interrupted unexpectedly");
}
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync1.completionStatus);

var sync2 = new TestSync();
assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.requiringNew()
.timeout(1)
.call(() -> {
register(sync2);
try {
Thread.sleep(1200);
} catch (InterruptedException e) {
fail("Interrupted unexpectedly");
}
return null;
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync2.completionStatus);
}

@Test
public void exception() {
var sync1 = new TestSync();
assertThrows(MyRuntimeException.class, () -> QuarkusTransaction.requiringNew()
.run(() -> {
register(sync1);
throw new MyRuntimeException();
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync1.completionStatus);

var sync2 = new TestSync();
assertThrows(MyRuntimeException.class, () -> QuarkusTransaction.requiringNew()
.call(() -> {
register(sync2);
throw new MyRuntimeException();
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync2.completionStatus);

var sync3 = new TestSync();
assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.requiringNew()
.call(() -> {
register(sync3);
throw new MyCheckedException();
}));
assertEquals(Status.STATUS_ROLLEDBACK, sync3.completionStatus);
}

@Test
public void exceptionHandler() {
var sync1 = new TestSync();
assertThrows(MyRuntimeException.class, () -> QuarkusTransaction.requiringNew()
.exceptionHandler((e) -> TransactionExceptionResult.COMMIT)
.run(() -> {
register(sync1);
throw new MyRuntimeException();
}));
assertEquals(Status.STATUS_COMMITTED, sync1.completionStatus);

var sync2 = new TestSync();
assertThrows(MyRuntimeException.class, () -> QuarkusTransaction.requiringNew()
.exceptionHandler((e) -> TransactionExceptionResult.COMMIT)
.call(() -> {
register(sync2);
throw new MyRuntimeException();
}));
assertEquals(Status.STATUS_COMMITTED, sync2.completionStatus);

var sync3 = new TestSync();
assertThrows(QuarkusTransactionException.class, () -> QuarkusTransaction.requiringNew()
.exceptionHandler((e) -> TransactionExceptionResult.COMMIT)
.call(() -> {
register(sync3);
throw new MyCheckedException();
}));
assertEquals(Status.STATUS_COMMITTED, sync3.completionStatus);
}

@Test
@ActivateRequestContext
public void suspendingExisting() {
QuarkusTransaction.begin();
assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.suspendingExisting()
.run(() -> assertFalse(QuarkusTransaction.isActive()));
assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.rollback();

assertFalse(QuarkusTransaction.isActive());
QuarkusTransaction.suspendingExisting()
.run(() -> assertFalse(QuarkusTransaction.isActive()));
assertFalse(QuarkusTransaction.isActive());
}

@Test
@ActivateRequestContext
public void disallowingExisting() {
assertFalse(QuarkusTransaction.isActive());
assertEquals(Status.STATUS_COMMITTED,
QuarkusTransaction.disallowingExisting().call(this::register).completionStatus);
assertFalse(QuarkusTransaction.isActive());

QuarkusTransaction.begin();
assertTrue(QuarkusTransaction.isActive());
assertThrows(QuarkusTransactionException.class,
() -> QuarkusTransaction.disallowingExisting().call(this::register));
assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.rollback();
}

@Test
@ActivateRequestContext
public void requiringNew() throws SystemException {
assertFalse(QuarkusTransaction.isActive());
assertEquals(Status.STATUS_COMMITTED,
QuarkusTransaction.requiringNew().call(this::register).completionStatus);
assertFalse(QuarkusTransaction.isActive());

QuarkusTransaction.begin();
assertTrue(QuarkusTransaction.isActive());
var tx = transactionManager.getTransaction();
assertEquals(Status.STATUS_COMMITTED, QuarkusTransaction.requiringNew().call(() -> {
assertTrue(QuarkusTransaction.isActive());
assertNotEquals(tx, transactionManager.getTransaction());
return register();
}).completionStatus);
assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.rollback();
}

@Test
@ActivateRequestContext
public void joiningExisting() throws SystemException {
assertFalse(QuarkusTransaction.isActive());
assertEquals(Status.STATUS_COMMITTED,
QuarkusTransaction.joiningExisting().call(this::register).completionStatus);
assertFalse(QuarkusTransaction.isActive());

QuarkusTransaction.begin();
assertTrue(QuarkusTransaction.isActive());
var tx = transactionManager.getTransaction();
QuarkusTransaction.joiningExisting().call(() -> {
assertTrue(QuarkusTransaction.isActive());
assertEquals(tx, transactionManager.getTransaction());
return null;
});
assertTrue(QuarkusTransaction.isActive());
QuarkusTransaction.rollback();
}

void register(TestSync t) {
try {
transactionManager.getTransaction().registerSynchronization(t);
} catch (RollbackException | SystemException e) {
throw new RuntimeException(e);
}
}

TestSync register() {
TestSync t = new TestSync();
register(t);
return t;
}

static class TestSync implements Synchronization {

int completionStatus = -1;

@Override
public void beforeCompletion() {

}

@Override
public void afterCompletion(int status) {
this.completionStatus = status;
}
}

static class MyCheckedException extends Exception {
MyCheckedException() {
}
}

static class MyRuntimeException extends RuntimeException {
MyRuntimeException() {
}
}

}
Loading

0 comments on commit 07291d9

Please sign in to comment.