Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix typing replication not being handled on master #7959

Merged
merged 4 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7959.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for moving typing off master.
7 changes: 0 additions & 7 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events
Expand Down Expand Up @@ -644,7 +643,6 @@ def __init__(self, hs):
super(GenericWorkerReplicationHandler, self).__init__(hs)

self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
self.presence_handler = hs.get_presence_handler() # type: GenericWorkerPresence
self.notifier = hs.get_notifier()

Expand Down Expand Up @@ -681,11 +679,6 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows):
await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows}
)
elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
Expand Down
8 changes: 8 additions & 0 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.api.constants import EventTypes
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.streams import TypingStream
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self, hs: "HomeServer"):
self._clock = hs.get_clock()
self._streams = hs.get_replication_streams()
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()

# Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
Expand All @@ -127,6 +129,12 @@ async def on_rdata(
"""
self.store.process_replication_rows(stream_name, instance_name, token, rows)

if stream_name == TypingStream.NAME:
self._typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows]
)

if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
Expand Down
3 changes: 3 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import synapse.server_notices.server_notices_sender
import synapse.state
import synapse.storage
from synapse.events.builder import EventBuilderFactory
from synapse.handlers.typing import FollowerTypingHandler
from synapse.replication.tcp.streams import Stream

class HomeServer(object):
Expand Down Expand Up @@ -150,3 +151,5 @@ class HomeServer(object):
pass
def should_send_federation(self) -> bool:
pass
def get_typing_handler(self) -> FollowerTypingHandler:
pass