diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 8e1f24646efd..cad9597882b0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -477,14 +477,11 @@ async def delete_messages_for_device( log_kv({"message": "No changes in cache since last check"}) return 0 - ROW_ID_LITERAL = "ctid" if isinstance(self.database_engine, PostgresEngine) else "rowid" - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: sql = ( - f"DELETE FROM device_inbox WHERE {ROW_ID_LITERAL} IN (" - f"SELECT {ROW_ID_LITERAL} FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" + f"DELETE FROM device_inbox WHERE {self.database_engine.row_id_name} IN (" + f"SELECT {self.database_engine.row_id_name} FROM device_inbox" + " WHERE user_id = ? AND device_id = ? AND stream_id <= ?" ) if limit: sql += f" LIMIT {limit}" diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f14..ba694bf2c0d6 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,6 @@ async def _background_receipts_linearized_unique_index( receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. @@ -962,7 +957,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: # `stream_id`, we delete by the ctid instead. for stream_id, room_id, receipt_type, user_id in duplicate_keys: sql = f""" - SELECT {ROW_ID_NAME} + SELECT {self.database_engine.row_id_name} FROM receipts_linearized WHERE room_id = ? AND @@ -982,7 +977,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: receipt_type = ? AND user_id = ? AND thread_id IS NULL AND - {ROW_ID_NAME} != ? + {self.database_engine.row_id_name} != ? """ txn.execute(sql, (room_id, receipt_type, user_id, row_id)) diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e1a..b1a2418cbdea 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ def server_version(self) -> str: """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc55435..6309363217a8 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ def server_version(self) -> str: else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c42..802069e1e1cb 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ def server_version(self) -> str: """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af84..622686d28f11 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute(