Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix database performance of read/write worker locks #16061

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16061.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix database performance of read/write worker locks.
87 changes: 35 additions & 52 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -96,6 +95,10 @@ def __init__(

self._acquiring_locks: Set[Tuple[str, str]] = set()

self._clock.looping_call(
self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
)
Comment on lines +98 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is every worker going to be doing this reaping? I guess that's fine since the reaps are idempotent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point but I don't think it matters in practice.


@wrap_as_background_process("LockStore._on_shutdown")
async def _on_shutdown(self) -> None:
"""Called when the server is shutting down"""
Expand Down Expand Up @@ -216,6 +219,7 @@ async def try_acquire_read_write_lock(
lock_name,
lock_key,
write,
db_autocommit=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment? Why is it unhelpful to run this function in a transaction?

I think:

  • we're still using isolation level REPEATABLE READ
  • but we're not in a transaction, so if it fails we won't bother to retry
  • which means we'll properly wait for the lock renewal period before trying to lock again, thrashing the DB less?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broadly: there's not much point doing a transaction for a single query as it basically has the same semantics, and by doing auto commit we reduce the work as we don't have to start/end the transaction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh: and means the length of time we hold the lock on those rows is reduced to just the query execution time, which helps reduce the likelihood of clashes

)
except self.database_engine.module.IntegrityError:
return None
Expand All @@ -233,61 +237,22 @@ def _try_acquire_read_write_lock_txn(
# `worker_read_write_locks` and seeing if that fails any
# constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
#
# Before that though we clear the table of any stale locks.

now = self._clock.time_msec()
token = random_string(6)

delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
"""

insert_sql = """
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?, ?)
"""

if isinstance(self.database_engine, PostgresEngine):
# For Postgres we can send these queries at the same time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the point just that we now always send these as two separate queries (deletes in a background process?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, which means they're much less likely to conflict and cause retries

txn.execute(
delete_sql + ";" + insert_sql,
(
# DELETE args
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
# UPSERT args
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
else:
# For SQLite these need to be two queries.
txn.execute(
delete_sql,
(
now - _LOCK_TIMEOUT_MS,
lock_name,
lock_key,
),
)
txn.execute(
insert_sql,
(
lock_name,
lock_key,
write,
self._instance_name,
token,
now,
),
)
self.db_pool.simple_insert_txn(
txn,
table="worker_read_write_locks",
values={
"lock_name": lock_name,
"lock_key": lock_key,
"write_lock": write,
"instance_name": self._instance_name,
"token": token,
"last_renewed_ts": now,
},
)

lock = Lock(
self._reactor,
Expand Down Expand Up @@ -351,6 +316,24 @@ def _try_acquire_multi_read_write_lock_txn(

return locks

@wrap_as_background_process("_reap_stale_read_write_locks")
async def _reap_stale_read_write_locks(self) -> None:
delete_sql = """
DELETE FROM worker_read_write_locks
WHERE last_renewed_ts < ?
"""

def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
if txn.rowcount:
logger.info("Reaped %d stale locks", txn.rowcount)

await self.db_pool.runInteraction(
"_reap_stale_read_write_locks",
reap_stale_read_write_locks_txn,
db_autocommit=True,
)


class Lock:
"""An async context manager that manages an acquired lock, ensuring it is
Expand Down
7 changes: 4 additions & 3 deletions tests/storage/databases/main/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from twisted.internet import defer, reactor
from twisted.internet.base import ReactorBase
from twisted.internet.defer import Deferred
from twisted.test.proto_helpers import MemoryReactor

from synapse.server import HomeServer
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS
from synapse.util import Clock

from tests import unittest
Expand Down Expand Up @@ -380,8 +381,8 @@ def test_maintain_lock(self) -> None:
self.get_success(lock.__aenter__())

# Wait for ages with the lock, we should not be able to get the lock.
self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
self.pump()
for _ in range(0, 10):
self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000))

lock2 = self.get_success(
self.store.try_acquire_read_write_lock("name", "key", write=True)
Expand Down