Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Correctly read to-device stream pos on SQLite #16682

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
device_inbox_id_gen -> to_device_msg_id_gen
  • Loading branch information
David Robertson committed Nov 23, 2023
commit 530337d1d8514af173070407115fc2ab73bd1b59
2 changes: 1 addition & 1 deletion synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(
hs.get_instance_name(),
store.get_all_new_device_messages,
store._device_inbox_id_gen,
store._to_device_msg_id_gen,
)


Expand Down
20 changes: 10 additions & 10 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(
self._instance_name in hs.config.worker.writers.to_device
)

self._device_inbox_id_gen: AbstractStreamIdGenerator = (
self._to_device_msg_id_gen: AbstractStreamIdGenerator = (
MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
Expand All @@ -104,15 +104,15 @@ def __init__(
)
else:
self._can_write_to_device = True
self._device_inbox_id_gen = StreamIdGenerator(
self._to_device_msg_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_inbox",
"stream_id",
extra_tables=[("device_federation_outbox", "stream_id")],
)

max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
max_device_inbox_id = self._to_device_msg_id_gen.get_current_token()
device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
db_conn,
"device_inbox",
Expand Down Expand Up @@ -152,8 +152,8 @@ def process_replication_rows(
) -> None:
if stream_name == ToDeviceStream.NAME:
# If replication is happening than postgres must be being used.
assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
self._device_inbox_id_gen.advance(instance_name, token)
assert isinstance(self._to_device_msg_id_gen, MultiWriterIdGenerator)
self._to_device_msg_id_gen.advance(instance_name, token)
for row in rows:
if row.entity.startswith("@"):
self._device_inbox_stream_cache.entity_has_changed(
Expand All @@ -169,11 +169,11 @@ def process_replication_position(
self, stream_name: str, instance_name: str, token: int
) -> None:
if stream_name == ToDeviceStream.NAME:
self._device_inbox_id_gen.advance(instance_name, token)
self._to_device_msg_id_gen.advance(instance_name, token)
super().process_replication_position(stream_name, instance_name, token)

def get_to_device_stream_token(self) -> int:
return self._device_inbox_id_gen.get_current_token()
return self._to_device_msg_id_gen.get_current_token()

async def get_messages_for_user_devices(
self,
Expand Down Expand Up @@ -808,7 +808,7 @@ def add_messages_txn(
msg.get(EventContentFields.TO_DEVICE_MSGID),
)

async with self._device_inbox_id_gen.get_next() as stream_id:
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
Expand All @@ -820,7 +820,7 @@ def add_messages_txn(
destination, stream_id
)

return self._device_inbox_id_gen.get_current_token()
return self._to_device_msg_id_gen.get_current_token()

async def add_messages_from_remote_to_device_inbox(
self,
Expand Down Expand Up @@ -864,7 +864,7 @@ def add_messages_txn(
txn, stream_id, local_messages_by_user_then_device
)

async with self._device_inbox_id_gen.get_next() as stream_id:
async with self._to_device_msg_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
Expand Down