From 38a3e0203bbe79b47a4e402e1a111742ee2dd907 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 11:05:48 +0100 Subject: [PATCH 1/4] Fix typing replication not being handled on master Handling of incoming typing stream updates from replication was not hooked up on master, effecting set ups where typing was handled on a different worker. This is really only a problem if the master process is also handling sync requests, which is unlikely for those that are at the stage of moving typing off. The other observable effect is that if a worker restarts or a replication connect drops then the typing worker will issue a `POSITION typing`, triggering master process to try and stream *all* typing updates from position 0. Fixes #7907 --- synapse/app/generic_worker.py | 7 ------- synapse/replication/tcp/client.py | 8 ++++++++ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index c1b76d827b3c..ec0dbddb8ce3 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -87,7 +87,6 @@ ReceiptsStream, TagAccountDataStream, ToDeviceStream, - TypingStream, ) from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client.v1 import events @@ -644,7 +643,6 @@ def __init__(self, hs): super(GenericWorkerReplicationHandler, self).__init__(hs) self.store = hs.get_datastore() - self.typing_handler = hs.get_typing_handler() self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence self.notifier = hs.get_notifier() @@ -681,11 +679,6 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows): await self.pusher_pool.on_new_receipts( token, token, {row.room_id for row in rows} ) - elif stream_name == TypingStream.NAME: - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows] - ) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 4985e40b1ff4..fcf8ebf1e74f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol +from synapse.replication.tcp.streams import TypingStream from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, @@ -104,6 +105,7 @@ def __init__(self, hs: "HomeServer"): self._clock = hs.get_clock() self._streams = hs.get_replication_streams() self._instance_name = hs.get_instance_name() + self._typing_handler = hs.get_typing_handler() # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. @@ -127,6 +129,12 @@ async def on_rdata( """ self.store.process_replication_rows(stream_name, instance_name, token, rows) + if stream_name == TypingStream.NAME: + self._typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows] + ) + if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. From 06bb22e29a9789a02349ba682294f049f58ac59e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 11:20:14 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/7959.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7959.bugfix diff --git a/changelog.d/7959.bugfix b/changelog.d/7959.bugfix new file mode 100644 index 000000000000..241c93c47dc4 --- /dev/null +++ b/changelog.d/7959.bugfix @@ -0,0 +1 @@ +Fix typing notifications on master process when using another worker for typing stream. From c16efb5ee1dd8c8b810d7efdec5b0c083f1ef7c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 11:26:44 +0100 Subject: [PATCH 3/4] Fix typing --- synapse/server.pyi | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/server.pyi b/synapse/server.pyi index 90a673778f8e..1aba408c2164 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender import synapse.state import synapse.storage from synapse.events.builder import EventBuilderFactory +from synapse.handlers.typing import FollowerTypingHandler from synapse.replication.tcp.streams import Stream class HomeServer(object): @@ -150,3 +151,5 @@ class HomeServer(object): pass def should_send_federation(self) -> bool: pass + def get_typing_handler(self) -> FollowerTypingHandler: + pass From f76184c656a21b5a574ebca5ceef66382de45e4a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Jul 2020 13:14:48 +0100 Subject: [PATCH 4/4] Update changelog.d/7959.bugfix Co-authored-by: Patrick Cloke --- changelog.d/7959.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/7959.bugfix b/changelog.d/7959.bugfix index 241c93c47dc4..1982049a52ee 100644 --- a/changelog.d/7959.bugfix +++ b/changelog.d/7959.bugfix @@ -1 +1 @@ -Fix typing notifications on master process when using another worker for typing stream. +Add experimental support for moving typing off master.