Skip to content

Commit

Permalink
Using Entity.flush does not alert EntityHook subscribers #1225
Browse files Browse the repository at this point in the history
  • Loading branch information
Tapac committed May 9, 2021
1 parent 9ad54b4 commit 0287350
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ open class Entity<ID : Comparable<ID>>(val id: EntityID<ID>) {
*/
open fun delete() {
val table = klass.table
TransactionManager.current().registerChange(klass, id, EntityChangeType.Removed)
executeAsPartOfEntityLifecycle {
table.deleteWhere { table.id eq id }
}
klass.removeFromCache(this)
TransactionManager.current().registerChange(klass, id, EntityChangeType.Removed)
}

open fun flush(batch: EntityBatchUpdate? = null): Boolean {
Expand All @@ -176,6 +176,8 @@ open class Entity<ID : Comparable<ID>>(val id: EntityID<ID>) {
// Store values before update to prevent flush inside UpdateStatement
val _writeValues = writeValues.toMap()
storeWrittenValues()
// In case of batch all changes will be registered after all entities flushed
TransactionManager.current().registerChange(klass, id, EntityChangeType.Updated)
executeAsPartOfEntityLifecycle {
table.update({ table.id eq id }) {
for ((c, v) in _writeValues) {
Expand All @@ -191,7 +193,6 @@ open class Entity<ID : Comparable<ID>>(val id: EntityID<ID>) {
storeWrittenValues()
}

TransactionManager.current().registerChange(klass, id, EntityChangeType.Updated)
return true
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class EntityCache(private val transaction: Transaction) {
toFlush = partition.second
} while (toFlush.isNotEmpty())
}
transaction.alertSubscribers()
}

fun clearReferrersCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fun <ID : Comparable<ID>, T : Entity<ID>> EntityChange.toEntity(klass: EntityCla
return toEntity<ID, T>()
}

private val Transaction.unprocessedEvents: Deque<EntityChange> by transactionScope { ConcurrentLinkedDeque() }
private val Transaction.entityEvents: Deque<EntityChange> by transactionScope { ConcurrentLinkedDeque() }
private val entitySubscribers = ConcurrentLinkedQueue<(EntityChange) -> Unit>()

Expand All @@ -40,15 +41,16 @@ object EntityHook {

fun Transaction.registerChange(entityClass: EntityClass<*, Entity<*>>, entityId: EntityID<*>, changeType: EntityChangeType) {
EntityChange(entityClass, entityId, changeType, id).let {
if (entityEvents.peekLast() != it) {
if (unprocessedEvents.peekLast() != it) {
unprocessedEvents.addLast(it)
entityEvents.addLast(it)
}
}
}

fun Transaction.alertSubscribers() {
while (true) {
val event = entityEvents.pollFirst() ?: break
val event = unprocessedEvents.pollFirst() ?: break
entitySubscribers.forEach { it(event) }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.sql.Query
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.statements.*
import org.jetbrains.exposed.sql.statements.api.PreparedStatementApi
import org.jetbrains.exposed.sql.targetTables
import org.jetbrains.exposed.sql.transactions.transactionScope

private var isExecutedWithinEntityLifecycle by transactionScope { false }

internal fun <T> executeAsPartOfEntityLifecycle(body: () -> T): T {
val currentExecutionState = isExecutedWithinEntityLifecycle
return try {
isExecutedWithinEntityLifecycle = true
body()
} finally {
isExecutedWithinEntityLifecycle = false
isExecutedWithinEntityLifecycle = currentExecutionState
}
}

Expand Down Expand Up @@ -58,6 +60,10 @@ class EntityLifecycleInterceptor : GlobalStatementInterceptor {
}
}

override fun afterExecution(transaction: Transaction, contexts: List<StatementContext>, executedStatement: PreparedStatementApi) {
transaction.alertSubscribers()
}

override fun beforeCommit(transaction: Transaction) {
val created = transaction.flushCache()
transaction.alertSubscribers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ class EntityHookTest : DatabaseTestsBase() {

@Test fun testModifiedSimple01() {
withTables(*EntityHookTestData.allTables) {
transaction {
val (_, events1, _) = trackChanges {
val ru = EntityHookTestData.Country.new {
name = "RU"
}
val x = EntityHookTestData.City.new {
EntityHookTestData.City.new {
name = "St. Petersburg"
country = ru
}

flushCache()
}

val (_, events, txId) = trackChanges {
assertEquals(2, events1.count())

val (_, events2, txId) = trackChanges {
val de = EntityHookTestData.Country.new {
name = "DE"
}
Expand All @@ -141,10 +141,10 @@ class EntityHookTest : DatabaseTestsBase() {
x.country = de
}
// TODO: one may expect change for RU but we do not send it due to performance reasons
assertEquals(2, events.count())
assertEqualCollections(events.mapNotNull { it.toEntity(EntityHookTestData.City)?.name }, "Munich")
assertEqualCollections(events.mapNotNull { it.toEntity(EntityHookTestData.Country)?.name }, "DE")
events.forEach {
assertEquals(2, events2.count())
assertEqualCollections(events2.mapNotNull { it.toEntity(EntityHookTestData.City)?.name }, "Munich")
assertEqualCollections(events2.mapNotNull { it.toEntity(EntityHookTestData.Country)?.name }, "DE")
events2.forEach {
assertEquals(txId, it.transactionId)
}
}
Expand Down Expand Up @@ -297,4 +297,32 @@ class EntityHookTest : DatabaseTestsBase() {
assertEquals(EntityChangeType.Updated, updateEvent.changeType)
}
}

@Test
fun `calling flush notifies EntityHook subscribers`() {
withTables(EntityHookTestData.User.table) {
var hookCalls = 0
val user = EntityHookTestData.User.new {
name = "[email protected]"
age = 30
}
user.flush()

EntityHook.subscribe {
hookCalls++
}

user.name = "[email protected]"
assertEquals(0, hookCalls)

user.flush()
assertEquals(1, hookCalls)

user.name = "[email protected]"
assertEquals(1, hookCalls)

commit()
assertEquals(2, hookCalls)
}
}
}

0 comments on commit 0287350

Please sign in to comment.