From 028735095ca03b77521cff8e4991c4628bbfcc97 Mon Sep 17 00:00:00 2001 From: "Andrey.Tarashevskiy" Date: Mon, 10 May 2021 01:19:39 +0300 Subject: [PATCH] Using Entity.flush does not alert EntityHook subscribers #1225 --- .../org/jetbrains/exposed/dao/Entity.kt | 5 +- .../org/jetbrains/exposed/dao/EntityCache.kt | 1 + .../org/jetbrains/exposed/dao/EntityHook.kt | 6 ++- .../exposed/dao/EntityLifecycleInterceptor.kt | 8 +++- .../tests/shared/entities/EntityHookTest.kt | 46 +++++++++++++++---- 5 files changed, 52 insertions(+), 14 deletions(-) diff --git a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/Entity.kt b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/Entity.kt index d30c8a860e..980de99643 100644 --- a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/Entity.kt +++ b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/Entity.kt @@ -158,11 +158,11 @@ open class Entity>(val id: EntityID) { */ open fun delete() { val table = klass.table + TransactionManager.current().registerChange(klass, id, EntityChangeType.Removed) executeAsPartOfEntityLifecycle { table.deleteWhere { table.id eq id } } klass.removeFromCache(this) - TransactionManager.current().registerChange(klass, id, EntityChangeType.Removed) } open fun flush(batch: EntityBatchUpdate? = null): Boolean { @@ -176,6 +176,8 @@ open class Entity>(val id: EntityID) { // Store values before update to prevent flush inside UpdateStatement val _writeValues = writeValues.toMap() storeWrittenValues() + // In case of batch all changes will be registered after all entities flushed + TransactionManager.current().registerChange(klass, id, EntityChangeType.Updated) executeAsPartOfEntityLifecycle { table.update({ table.id eq id }) { for ((c, v) in _writeValues) { @@ -191,7 +193,6 @@ open class Entity>(val id: EntityID) { storeWrittenValues() } - TransactionManager.current().registerChange(klass, id, EntityChangeType.Updated) return true } return false diff --git a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityCache.kt b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityCache.kt index 6038cab441..33ac359fe0 100644 --- a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityCache.kt +++ b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityCache.kt @@ -143,6 +143,7 @@ class EntityCache(private val transaction: Transaction) { toFlush = partition.second } while (toFlush.isNotEmpty()) } + transaction.alertSubscribers() } fun clearReferrersCache() { diff --git a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityHook.kt b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityHook.kt index 933087084b..18dd887e05 100644 --- a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityHook.kt +++ b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityHook.kt @@ -24,6 +24,7 @@ fun , T : Entity> EntityChange.toEntity(klass: EntityCla return toEntity() } +private val Transaction.unprocessedEvents: Deque by transactionScope { ConcurrentLinkedDeque() } private val Transaction.entityEvents: Deque by transactionScope { ConcurrentLinkedDeque() } private val entitySubscribers = ConcurrentLinkedQueue<(EntityChange) -> Unit>() @@ -40,7 +41,8 @@ object EntityHook { fun Transaction.registerChange(entityClass: EntityClass<*, Entity<*>>, entityId: EntityID<*>, changeType: EntityChangeType) { EntityChange(entityClass, entityId, changeType, id).let { - if (entityEvents.peekLast() != it) { + if (unprocessedEvents.peekLast() != it) { + unprocessedEvents.addLast(it) entityEvents.addLast(it) } } @@ -48,7 +50,7 @@ fun Transaction.registerChange(entityClass: EntityClass<*, Entity<*>>, entityId: fun Transaction.alertSubscribers() { while (true) { - val event = entityEvents.pollFirst() ?: break + val event = unprocessedEvents.pollFirst() ?: break entitySubscribers.forEach { it(event) } } } diff --git a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityLifecycleInterceptor.kt b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityLifecycleInterceptor.kt index 78bb72a8d4..07e400aa42 100644 --- a/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityLifecycleInterceptor.kt +++ b/exposed-dao/src/main/kotlin/org/jetbrains/exposed/dao/EntityLifecycleInterceptor.kt @@ -4,17 +4,19 @@ import org.jetbrains.exposed.dao.id.IdTable import org.jetbrains.exposed.sql.Query import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.statements.* +import org.jetbrains.exposed.sql.statements.api.PreparedStatementApi import org.jetbrains.exposed.sql.targetTables import org.jetbrains.exposed.sql.transactions.transactionScope private var isExecutedWithinEntityLifecycle by transactionScope { false } internal fun executeAsPartOfEntityLifecycle(body: () -> T): T { + val currentExecutionState = isExecutedWithinEntityLifecycle return try { isExecutedWithinEntityLifecycle = true body() } finally { - isExecutedWithinEntityLifecycle = false + isExecutedWithinEntityLifecycle = currentExecutionState } } @@ -58,6 +60,10 @@ class EntityLifecycleInterceptor : GlobalStatementInterceptor { } } + override fun afterExecution(transaction: Transaction, contexts: List, executedStatement: PreparedStatementApi) { + transaction.alertSubscribers() + } + override fun beforeCommit(transaction: Transaction) { val created = transaction.flushCache() transaction.alertSubscribers() diff --git a/exposed-tests/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/entities/EntityHookTest.kt b/exposed-tests/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/entities/EntityHookTest.kt index a1f366b347..4c304e2b62 100644 --- a/exposed-tests/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/entities/EntityHookTest.kt +++ b/exposed-tests/src/test/kotlin/org/jetbrains/exposed/sql/tests/shared/entities/EntityHookTest.kt @@ -120,19 +120,19 @@ class EntityHookTest : DatabaseTestsBase() { @Test fun testModifiedSimple01() { withTables(*EntityHookTestData.allTables) { - transaction { + val (_, events1, _) = trackChanges { val ru = EntityHookTestData.Country.new { name = "RU" } - val x = EntityHookTestData.City.new { + EntityHookTestData.City.new { name = "St. Petersburg" country = ru } - - flushCache() } - val (_, events, txId) = trackChanges { + assertEquals(2, events1.count()) + + val (_, events2, txId) = trackChanges { val de = EntityHookTestData.Country.new { name = "DE" } @@ -141,10 +141,10 @@ class EntityHookTest : DatabaseTestsBase() { x.country = de } // TODO: one may expect change for RU but we do not send it due to performance reasons - assertEquals(2, events.count()) - assertEqualCollections(events.mapNotNull { it.toEntity(EntityHookTestData.City)?.name }, "Munich") - assertEqualCollections(events.mapNotNull { it.toEntity(EntityHookTestData.Country)?.name }, "DE") - events.forEach { + assertEquals(2, events2.count()) + assertEqualCollections(events2.mapNotNull { it.toEntity(EntityHookTestData.City)?.name }, "Munich") + assertEqualCollections(events2.mapNotNull { it.toEntity(EntityHookTestData.Country)?.name }, "DE") + events2.forEach { assertEquals(txId, it.transactionId) } } @@ -297,4 +297,32 @@ class EntityHookTest : DatabaseTestsBase() { assertEquals(EntityChangeType.Updated, updateEvent.changeType) } } + + @Test + fun `calling flush notifies EntityHook subscribers`() { + withTables(EntityHookTestData.User.table) { + var hookCalls = 0 + val user = EntityHookTestData.User.new { + name = "1@test.local" + age = 30 + } + user.flush() + + EntityHook.subscribe { + hookCalls++ + } + + user.name = "2@test.local" + assertEquals(0, hookCalls) + + user.flush() + assertEquals(1, hookCalls) + + user.name = "3@test.local" + assertEquals(1, hookCalls) + + commit() + assertEquals(2, hookCalls) + } + } }