From 5705ecaec6b7a85c152691c79a4fa3526792a3eb Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 2 Oct 2019 11:16:38 +0100 Subject: [PATCH 01/38] Don't 500 code when trying to exchange a revoked 3PID invite While this is not documented in the spec (but should be), Riot (and other clients) revoke 3PID invites by sending a m.room.third_party_invite event with an empty ({}) content to the room's state. When the invited 3PID gets associated with a MXID, the identity server (which doesn't know about revocations) sends down to the MXID's homeserver all of the undelivered invites it has for this 3PID. The homeserver then tries to talk to the inviting homeserver in order to exchange these invite for m.room.member events. When one of the invite is revoked, the inviting homeserver responds with a 500 error because it tries to extract a 'display_name' property from the content, which is empty. This might cause the invited server to consider that the server is down and not try to exchange other, valid invites (or at least delay it). This fix handles the case of revoked invites by avoiding trying to fetch a 'display_name' from the original invite's content, and letting the m.room.member event fail the auth rules (because, since the original invite's content is empty, it doesn't have public keys), which results in sending a 403 with the correct error message to the invited server. --- synapse/handlers/federation.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f72b81d4193f..a3d7739eadd8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2599,8 +2599,19 @@ def add_display_name_to_third_party_invite( original_invite_id, allow_none=True ) if original_invite: - display_name = original_invite.content["display_name"] - event_dict["content"]["third_party_invite"]["display_name"] = display_name + # If the m.room.third_party_invite event's content is empty, it means the + # invite has been revoked. + if original_invite.content: + display_name = original_invite.content["display_name"] + event_dict["content"]["third_party_invite"]["display_name"] = display_name + else: + # Don't discard or raise an error here because that's not the right place + # to do auth checks. The auth check will fail on this invite because we + # won't be able to fetch public keys from the m.room.third_party_invite + # event's content (because it's empty). + logger.info( + "Found invite event for third_party_invite but it has been revoked" + ) else: logger.info( "Could not find invite event for third_party_invite: %r", event_dict From 972c9f65d7ddf94ce024b57397d651d184cb2d26 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 2 Oct 2019 12:17:46 +0100 Subject: [PATCH 02/38] Lint --- synapse/handlers/federation.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a3d7739eadd8..75d79bb8e4be 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2603,7 +2603,9 @@ def add_display_name_to_third_party_invite( # invite has been revoked. if original_invite.content: display_name = original_invite.content["display_name"] - event_dict["content"]["third_party_invite"]["display_name"] = display_name + event_dict["content"]["third_party_invite"][ + "display_name" + ] = display_name else: # Don't discard or raise an error here because that's not the right place # to do auth checks. The auth check will fail on this invite because we From 24efea338d8643ee15f03b30c080a53320926ee8 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 2 Oct 2019 12:20:03 +0100 Subject: [PATCH 03/38] Changelog --- changelog.d/6147.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6147.bugfix diff --git a/changelog.d/6147.bugfix b/changelog.d/6147.bugfix new file mode 100644 index 000000000000..b0f936d28096 --- /dev/null +++ b/changelog.d/6147.bugfix @@ -0,0 +1 @@ +Don't 500 when trying to exchange a revoked 3PID invite. From 6527fa18c1e6f9bcb22318916b5e9534c91c84c1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 2 Oct 2019 14:44:58 +0100 Subject: [PATCH 04/38] Add test case --- synapse/handlers/federation.py | 2 +- tests/handlers/test_federation.py | 83 +++++++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 tests/handlers/test_federation.py diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 75d79bb8e4be..91f3a6929873 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2570,7 +2570,7 @@ def on_exchange_third_party_invite_request(self, room_id, event_dict): ) try: - self.auth.check_from_context(room_version, event, context) + yield self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warn("Denying third party invite %r because %s", event, e) raise e diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py new file mode 100644 index 000000000000..20416a0142f1 --- /dev/null +++ b/tests/handlers/test_federation.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError, Codes +from synapse.rest import admin +from synapse.rest.client.v1 import login, room + +from tests import unittest + + +class FederationTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver(http_client=None) + self.handler = hs.get_handlers().federation_handler + self.store = hs.get_datastore() + return hs + + def test_exchange_revoked_invite(self): + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + + room_id = self.helper.create_room_as( + room_creator=user_id, tok=tok + ) + + # Send a 3PID invite event with an empty body so it's considered as a revoked one. + invite_token = "sometoken" + self.helper.send_state( + room_id=room_id, + event_type=EventTypes.ThirdPartyInvite, + state_key=invite_token, + body={}, + tok=tok, + ) + + d = self.handler.on_exchange_third_party_invite_request( + room_id=room_id, + event_dict={ + "type": EventTypes.Member, + "room_id": room_id, + "sender": user_id, + "state_key": "@someone:example.org", + "content": { + "membership": "invite", + "third_party_invite": { + "display_name": "alice", + "signed": { + "mxid": "@alice:localhost", + "token": invite_token, + "signatures": { + "magic.forest": { + "ed25519:3": "fQpGIW1Snz+pwLZu6sTy2aHy/DYWWTspTJRPyNp0PKkymfIsNffysMl6ObMMFdIJhk6g6pwlIqZ54rxo8SLmAg" + } + } + } + } + } + } + ) + + failure = self.get_failure(d, AuthError).value + + self.assertEqual(failure.code, 403, failure) + self.assertEqual(failure.errcode, Codes.FORBIDDEN, failure) + self.assertEqual(failure.msg, "You are not invited to this room.") From ebcb6a30d7b1bdb859a1fd22d567b163a1488763 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 11:29:07 +0100 Subject: [PATCH 05/38] Lint --- tests/handlers/test_federation.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 20416a0142f1..a18dfc0e96a3 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -69,11 +69,11 @@ def test_exchange_revoked_invite(self): "magic.forest": { "ed25519:3": "fQpGIW1Snz+pwLZu6sTy2aHy/DYWWTspTJRPyNp0PKkymfIsNffysMl6ObMMFdIJhk6g6pwlIqZ54rxo8SLmAg" } - } - } - } - } - } + }, + }, + }, + }, + }, ) failure = self.get_failure(d, AuthError).value From 8a5e8e829b98687ea274fae47db3aa801b6f97d3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 11:30:43 +0100 Subject: [PATCH 06/38] Lint (again) --- tests/handlers/test_federation.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index a18dfc0e96a3..d56220f403b7 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -37,9 +37,7 @@ def test_exchange_revoked_invite(self): user_id = self.register_user("kermit", "test") tok = self.login("kermit", "test") - room_id = self.helper.create_room_as( - room_creator=user_id, tok=tok - ) + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) # Send a 3PID invite event with an empty body so it's considered as a revoked one. invite_token = "sometoken" From c8145af8a9446911bbb52fc0114eebf4eebede4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:11:04 +0100 Subject: [PATCH 07/38] Cache room membership lookups in _get_joined_users_from_context --- synapse/storage/roommember.py | 64 ++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4df8ebdacdbd..f11bbd05f8e0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -16,6 +16,7 @@ import logging from collections import namedtuple +from typing import Iterable from six import iteritems, itervalues @@ -32,7 +33,7 @@ from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.stringutils import to_ascii logger = logging.getLogger(__name__) @@ -567,25 +568,10 @@ def _get_joined_users_from_context( missing_member_event_ids.append(event_id) if missing_member_event_ids: - rows = yield self._simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=missing_member_event_ids, - retcols=("user_id", "display_name", "avatar_url"), - keyvalues={"membership": Membership.JOIN}, - batch_size=500, - desc="_get_joined_users_from_context", - ) - - users_in_room.update( - { - to_ascii(row["user_id"]): ProfileInfo( - avatar_url=to_ascii(row["avatar_url"]), - display_name=to_ascii(row["display_name"]), - ) - for row in rows - } + event_to_memberships = yield self._get_membership_from_event_ids( + missing_member_event_ids ) + users_in_room.update((row for row in event_to_memberships.values() if row)) if event is not None and event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -597,6 +583,46 @@ def _get_joined_users_from_context( return users_in_room + @cached(max_entries=10000) + def _get_membership_from_event_id(self, event_id): + raise NotADirectoryError() + + @cachedList( + cached_method_name="_get_membership_from_event_id", + list_name="event_ids", + inlineCallbacks=True, + ) + def _get_membership_from_event_ids(self, event_ids: Iterable[str]): + """Lookup profile info for set of member event IDs. + + Args: + event_ids: The member event IDs to lookup + + Returns: + Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID + to `user_id` and ProfileInfo (or None if couldn't find event). + """ + + rows = yield self._simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=event_ids, + retcols=("user_id", "display_name", "avatar_url"), + keyvalues={"membership": Membership.JOIN}, + batch_size=500, + desc="_get_membership_from_event_ids", + ) + + return { + row["event_id"]: ( + row["user_id"], + ProfileInfo( + avatar_url=row["avatar_url"], display_name=row["display_name"] + ), + ) + for row in rows + } + @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): if "%" in host or "_" in host: From 0ccf0ffc855f8d12f16598af77f356f236096994 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:12:24 +0100 Subject: [PATCH 08/38] Newsfile --- changelog.d/6159.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6159.misc diff --git a/changelog.d/6159.misc b/changelog.d/6159.misc new file mode 100644 index 000000000000..06cc163f8bea --- /dev/null +++ b/changelog.d/6159.misc @@ -0,0 +1 @@ +Add more caching to `_get_joined_users_from_context` DB query. From d89ebf7c25b4f55d513da41a3ea20b9f8adc62d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:23:11 +0100 Subject: [PATCH 09/38] cachedList descriptor doesn't like typing --- synapse/storage/roommember.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index f11bbd05f8e0..ef6179cbe51c 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -592,11 +592,11 @@ def _get_membership_from_event_id(self, event_id): list_name="event_ids", inlineCallbacks=True, ) - def _get_membership_from_event_ids(self, event_ids: Iterable[str]): + def _get_membership_from_event_ids(self, event_ids): """Lookup profile info for set of member event IDs. Args: - event_ids: The member event IDs to lookup + event_ids (Iterable[str]): The member event IDs to lookup Returns: Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID From a9610cdf02e04ecd8df52ebe74b1cb1338a5be97 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:26:56 +0100 Subject: [PATCH 10/38] Fixup names and comments --- synapse/storage/roommember.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ef6179cbe51c..37c0723eb64e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -568,7 +568,7 @@ def _get_joined_users_from_context( missing_member_event_ids.append(event_id) if missing_member_event_ids: - event_to_memberships = yield self._get_membership_from_event_ids( + event_to_memberships = yield self._get_joined_profiles_from_event_ids( missing_member_event_ids ) users_in_room.update((row for row in event_to_memberships.values() if row)) @@ -584,23 +584,24 @@ def _get_joined_users_from_context( return users_in_room @cached(max_entries=10000) - def _get_membership_from_event_id(self, event_id): + def _get_joined_profile_from_event_id(self, event_id): raise NotADirectoryError() @cachedList( - cached_method_name="_get_membership_from_event_id", + cached_method_name="_get_joined_profile_from_event_id", list_name="event_ids", inlineCallbacks=True, ) - def _get_membership_from_event_ids(self, event_ids): - """Lookup profile info for set of member event IDs. + def _get_joined_profiles_from_event_ids(self, event_ids): + """For given set of member event_ids check if they point to a join + event and if so return the associated user and profile info. Args: event_ids (Iterable[str]): The member event IDs to lookup Returns: Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID - to `user_id` and ProfileInfo (or None if couldn't find event). + to `user_id` and ProfileInfo (or None if not join event). """ rows = yield self._simple_select_many_batch( From 84691da6c3058f265f8b86d9a6592ba8ce90e2ed Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:27:18 +0100 Subject: [PATCH 11/38] pep8 --- synapse/storage/roommember.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 37c0723eb64e..ed7d936b3df8 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -16,7 +16,6 @@ import logging from collections import namedtuple -from typing import Iterable from six import iteritems, itervalues From 91f61fc6d74e923822eb92933e0d028848285a40 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Oct 2019 17:28:31 +0100 Subject: [PATCH 12/38] Use the right error.... --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ed7d936b3df8..1160b98ccc06 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -584,7 +584,7 @@ def _get_joined_users_from_context( @cached(max_entries=10000) def _get_joined_profile_from_event_id(self, event_id): - raise NotADirectoryError() + raise NotImplementedError() @cachedList( cached_method_name="_get_joined_profile_from_event_id", From 66537e10ce77e47fac52e3f27569ac1ef0f1aaa3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 3 Oct 2019 17:47:20 +0100 Subject: [PATCH 13/38] add some metrics on the federation sender (#6160) --- changelog.d/6160.misc | 1 + synapse/federation/sender/__init__.py | 11 ++++++----- synapse/state/__init__.py | 24 ++++++++++++++++++------ synapse/storage/roommember.py | 21 +++++++++++++++------ synapse/util/metrics.py | 6 ++++-- 5 files changed, 44 insertions(+), 19 deletions(-) create mode 100644 changelog.d/6160.misc diff --git a/changelog.d/6160.misc b/changelog.d/6160.misc new file mode 100644 index 000000000000..3d7cce00e1a2 --- /dev/null +++ b/changelog.d/6160.misc @@ -0,0 +1 @@ +Add some metrics on the federation sender. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index d46f4aaeb1a3..2b2ee8612adc 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -38,7 +38,7 @@ events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.metrics import measure_func +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -183,8 +183,8 @@ def handle_event(event): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_current_hosts_in_room( - event.room_id, latest_event_ids=event.prev_event_ids() + destinations = yield self.state.get_hosts_in_room_at_events( + event.room_id, event_ids=event.prev_event_ids() ) except Exception: logger.exception( @@ -207,8 +207,9 @@ def handle_event(event): @defer.inlineCallbacks def handle_room_events(events): - for event in events: - yield handle_event(event) + with Measure(self.clock, "handle_room_events"): + for event in events: + yield handle_event(event) events_by_room = {} for event in events: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 2b0f4c79eef6..dc9f5a900841 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -33,7 +33,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -191,11 +191,22 @@ def get_current_users_in_room(self, room_id, latest_event_ids=None): return joined_users @defer.inlineCallbacks - def get_current_hosts_in_room(self, room_id, latest_event_ids=None): - if not latest_event_ids: - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) - logger.debug("calling resolve_state_groups from get_current_hosts_in_room") - entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids) + def get_current_hosts_in_room(self, room_id): + event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + return (yield self.get_hosts_in_room_at_events(room_id, event_ids)) + + @defer.inlineCallbacks + def get_hosts_in_room_at_events(self, room_id, event_ids): + """Get the hosts that were in a room at the given event ids + + Args: + room_id (str): + event_ids (list[str]): + + Returns: + Deferred[list[str]]: the hosts in the room at the given events + """ + entry = yield self.resolve_state_groups_for_events(room_id, event_ids) joined_hosts = yield self.store.get_joined_hosts(room_id, entry) return joined_hosts @@ -344,6 +355,7 @@ def compute_event_context(self, event, old_state=None): return context + @measure_func() @defer.inlineCallbacks def resolve_state_groups_for_events(self, room_id, event_ids): """ Given a list of event_ids this method fetches the state at each diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4df8ebdacdbd..1550d827ba1e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -33,6 +33,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.metrics import Measure from synapse.util.stringutils import to_ascii logger = logging.getLogger(__name__) @@ -483,6 +484,7 @@ def get_joined_users_from_context(self, event, context): ) return result + @defer.inlineCallbacks def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -492,9 +494,12 @@ def get_joined_users_from_state(self, room_id, state_entry): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - room_id, state_group, state_entry.state, context=state_entry - ) + with Measure(self._clock, "get_joined_users_from_state"): + return ( + yield self._get_joined_users_from_context( + room_id, state_group, state_entry.state, context=state_entry + ) + ) @cachedInlineCallbacks( num_args=2, cache_context=True, iterable=True, max_entries=100000 @@ -669,6 +674,7 @@ def was_host_joined(self, room_id, host): return True + @defer.inlineCallbacks def get_joined_hosts(self, room_id, state_entry): state_group = state_entry.state_group if not state_group: @@ -678,9 +684,12 @@ def get_joined_hosts(self, room_id, state_entry): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_hosts( - room_id, state_group, state_entry.state, state_entry=state_entry - ) + with Measure(self._clock, "get_joined_hosts"): + return ( + yield self._get_joined_hosts( + room_id, state_group, state_entry.state, state_entry=state_entry + ) + ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) # @defer.inlineCallbacks diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0910930c2117..4b1bcdf23c78 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -60,12 +60,14 @@ ) -def measure_func(name): +def measure_func(name=None): def wrapper(func): + block_name = func.__name__ if name is None else name + @wraps(func) @defer.inlineCallbacks def measured_func(self, *args, **kwargs): - with Measure(self.clock, name): + with Measure(self.clock, block_name): r = yield func(self, *args, **kwargs) return r From 39b40d6d9989b09de39da5b6d3f85ee535e41138 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Fri, 4 Oct 2019 10:34:52 +0200 Subject: [PATCH 14/38] media/thumbnailer: Better quality for 1-bit / 8-bit color palette images (#2142) Pillow will use nearest neighbour as the resampling algorithm if the source image is either 1-bit or a color palette using 8 bits. If we convert to RGB before scaling, we'll probably get a better result. --- changelog.d/2142.feature | 1 + synapse/rest/media/v1/thumbnailer.py | 14 +++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) create mode 100644 changelog.d/2142.feature diff --git a/changelog.d/2142.feature b/changelog.d/2142.feature new file mode 100644 index 000000000000..e21e8325e1f9 --- /dev/null +++ b/changelog.d/2142.feature @@ -0,0 +1 @@ +Improve quality of thumbnails for 1-bit/8-bit color palette images. diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index c995d7e04377..8cf415e29dee 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -82,13 +82,21 @@ def aspect(self, max_width, max_height): else: return (max_height * self.width) // self.height, max_height + def _resize(self, width, height): + # 1-bit or 8-bit color palette images need converting to RGB + # otherwise they will be scaled using nearest neighbour which + # looks awful + if self.image.mode in ["1", "P"]: + self.image = self.image.convert("RGB") + return self.image.resize((width, height), Image.ANTIALIAS) + def scale(self, width, height, output_type): """Rescales the image to the given dimensions. Returns: BytesIO: the bytes of the encoded image ready to be written to disk """ - scaled = self.image.resize((width, height), Image.ANTIALIAS) + scaled = self._resize(width, height) return self._encode_image(scaled, output_type) def crop(self, width, height, output_type): @@ -107,13 +115,13 @@ def crop(self, width, height, output_type): """ if width * self.height > height * self.width: scaled_height = (width * self.height) // self.width - scaled_image = self.image.resize((width, scaled_height), Image.ANTIALIAS) + scaled_image = self._resize(width, scaled_height) crop_top = (scaled_height - height) // 2 crop_bottom = height + crop_top cropped = scaled_image.crop((0, crop_top, width, crop_bottom)) else: scaled_width = (height * self.width) // self.height - scaled_image = self.image.resize((scaled_width, height), Image.ANTIALIAS) + scaled_image = self._resize(scaled_width, height) crop_left = (scaled_width - width) // 2 crop_right = width + crop_left cropped = scaled_image.crop((crop_left, 0, crop_right, height)) From 13c4345c844e75b0d1a4ce66e4fb2eb9820cb7f6 Mon Sep 17 00:00:00 2001 From: Alexander Maznev Date: Fri, 4 Oct 2019 04:34:16 -0500 Subject: [PATCH 15/38] Update `user_filters` table to have a unique index, and non-null columns (#1172) --- changelog.d/1172.misc | 1 + .../delta/56/unique_user_filter_index.py | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 changelog.d/1172.misc create mode 100644 synapse/storage/schema/delta/56/unique_user_filter_index.py diff --git a/changelog.d/1172.misc b/changelog.d/1172.misc new file mode 100644 index 000000000000..30b3e560825b --- /dev/null +++ b/changelog.d/1172.misc @@ -0,0 +1 @@ +Update `user_filters` table to have a unique index, and non-null columns. Thanks to @pik for contributing this. \ No newline at end of file diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py new file mode 100644 index 000000000000..4efc1a586ffe --- /dev/null +++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py @@ -0,0 +1,46 @@ +import logging + +from synapse.storage.engines import PostgresEngine + +logger = logging.getLogger(__name__) + + +def run_upgrade(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + select_clause = """ + CREATE TEMPORARY TABLE user_filters_migration AS + SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json + FROM user_filters; + """ + else: + select_clause = """ + CREATE TEMPORARY TABLE user_filters_migration AS + SELECT * FROM user_filters GROUP BY user_id, filter_id; + """ + sql = ( + """ + BEGIN; + %s + DROP INDEX user_filters_by_user_id_filter_id; + DELETE FROM user_filters; + ALTER TABLE user_filters + ALTER COLUMN user_id SET NOT NULL + ALTER COLUMN filter_id SET NOT NULL + ALTER COLUMN filter_json SET NOT NULL; + INSERT INTO user_filters(user_id, filter_id, filter_json) + SELECT * FROM user_filters_migration; + DROP TABLE user_filters_migration; + CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique + ON user_filters(user_id, filter_id); + END; + """ + % select_clause + ) + if isinstance(database_engine, PostgresEngine): + cur.execute(sql) + else: + cur.executescript(sql) + + +def run_create(cur, database_engine, *args, **kwargs): + pass From 81d51ce48b449ee55bdcc17b76050283d848d405 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 4 Oct 2019 11:16:19 +0100 Subject: [PATCH 16/38] Incorporate review --- synapse/handlers/federation.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 91f3a6929873..58a165488500 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2600,20 +2600,14 @@ def add_display_name_to_third_party_invite( ) if original_invite: # If the m.room.third_party_invite event's content is empty, it means the - # invite has been revoked. - if original_invite.content: - display_name = original_invite.content["display_name"] - event_dict["content"]["third_party_invite"][ - "display_name" - ] = display_name - else: - # Don't discard or raise an error here because that's not the right place - # to do auth checks. The auth check will fail on this invite because we - # won't be able to fetch public keys from the m.room.third_party_invite - # event's content (because it's empty). - logger.info( - "Found invite event for third_party_invite but it has been revoked" - ) + # invite has been revoked. In this case, we don't have to raise an error here + # because the auth check will fail on the invite (because it's not able to + # fetch public keys from the m.room.third_party_invite event's content, which + # is empty. + display_name = original_invite.content.get("display_name") + event_dict["content"]["third_party_invite"][ + "display_name" + ] = display_name else: logger.info( "Could not find invite event for third_party_invite: %r", event_dict From 4676732ca061ede3089b9c9b97a6d6b523b8c8e0 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 4 Oct 2019 11:18:28 +0100 Subject: [PATCH 17/38] Lint --- synapse/handlers/federation.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 58a165488500..b2b3a7e22170 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2605,9 +2605,7 @@ def add_display_name_to_third_party_invite( # fetch public keys from the m.room.third_party_invite event's content, which # is empty. display_name = original_invite.content.get("display_name") - event_dict["content"]["third_party_invite"][ - "display_name" - ] = display_name + event_dict["content"]["third_party_invite"]["display_name"] = display_name else: logger.info( "Could not find invite event for third_party_invite: %r", event_dict From 21d51ab59852d6a4d504a6ccd79ad82070c03a12 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 4 Oct 2019 11:21:24 +0100 Subject: [PATCH 18/38] Typo --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b2b3a7e22170..50fc0fde2af5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2603,7 +2603,7 @@ def add_display_name_to_third_party_invite( # invite has been revoked. In this case, we don't have to raise an error here # because the auth check will fail on the invite (because it's not able to # fetch public keys from the m.room.third_party_invite event's content, which - # is empty. + # is empty). display_name = original_invite.content.get("display_name") event_dict["content"]["third_party_invite"]["display_name"] = display_name else: From 5119a4cac7f42691beb455eedbefd99a39d64897 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 7 Oct 2019 12:21:17 +0100 Subject: [PATCH 19/38] Fix bug where we didn't pull out event ID --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 81a9ab6dc8dd..59a89fad60f6 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -612,7 +612,7 @@ def _get_joined_profiles_from_event_ids(self, event_ids): table="room_memberships", column="event_id", iterable=event_ids, - retcols=("user_id", "display_name", "avatar_url"), + retcols=("user_id", "display_name", "avatar_url", "event_id"), keyvalues={"membership": Membership.JOIN}, batch_size=500, desc="_get_membership_from_event_ids", From c8e6c308c6358f20b1d0ef1292f8e9e2f36a549e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 7 Oct 2019 13:15:35 +0100 Subject: [PATCH 20/38] Fix unique_user_filter_index schema update --- synapse/storage/schema/delta/56/unique_user_filter_index.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py index 4efc1a586ffe..60031f23ca6d 100644 --- a/synapse/storage/schema/delta/56/unique_user_filter_index.py +++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py @@ -24,8 +24,8 @@ def run_upgrade(cur, database_engine, *args, **kwargs): DROP INDEX user_filters_by_user_id_filter_id; DELETE FROM user_filters; ALTER TABLE user_filters - ALTER COLUMN user_id SET NOT NULL - ALTER COLUMN filter_id SET NOT NULL + ALTER COLUMN user_id SET NOT NULL, + ALTER COLUMN filter_id SET NOT NULL, ALTER COLUMN filter_json SET NOT NULL; INSERT INTO user_filters(user_id, filter_id, filter_json) SELECT * FROM user_filters_migration; From aa7a003074e4e42c4ac8a571d2cd18ecfea3990f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 7 Oct 2019 13:16:54 +0100 Subject: [PATCH 21/38] Changelog --- changelog.d/6175.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6175.bugfix diff --git a/changelog.d/6175.bugfix b/changelog.d/6175.bugfix new file mode 100644 index 000000000000..3cd9a99edf95 --- /dev/null +++ b/changelog.d/6175.bugfix @@ -0,0 +1 @@ +Fix syntax error in unique_user_filter_index schema update. From 276ae5c63eaef656d486e190298f7a5ec99a7a5b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 7 Oct 2019 14:41:39 +0100 Subject: [PATCH 22/38] add some logging to the rooms stats updates, to try to track down a flaky test (#6167) --- changelog.d/6167.misc | 1 + synapse/handlers/stats.py | 1 + synapse/storage/stats.py | 3 +++ 3 files changed, 5 insertions(+) create mode 100644 changelog.d/6167.misc diff --git a/changelog.d/6167.misc b/changelog.d/6167.misc new file mode 100644 index 000000000000..32c96b36819c --- /dev/null +++ b/changelog.d/6167.misc @@ -0,0 +1 @@ +Add some logging to the rooms stats updates, to try to track down a flaky test. diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index cbac7c347aab..c62b113115f5 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -293,6 +293,7 @@ def _handle_deltas(self, deltas): room_state["guest_access"] = event_content.get("guest_access") for room_id, state in room_to_state_updates.items(): + logger.info("Updating room_stats_state for %s: %s", room_id, state) yield self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 09190d684e96..7c224cd3d9d8 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -332,6 +332,9 @@ def bulk_update_stats_delta(self, ts, updates, stream_id): def _bulk_update_stats_delta_txn(txn): for stats_type, stats_updates in updates.items(): for stats_id, fields in stats_updates.items(): + logger.info( + "Updating %s stats for %s: %s", stats_type, stats_id, fields + ) self._update_stats_delta_txn( txn, ts=ts, From 1992f21a9fa00a37963bb6ac11d0e678cc08557e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 7 Oct 2019 14:54:36 +0100 Subject: [PATCH 23/38] Fix changelog for PR #6175 --- changelog.d/6175.bugfix | 1 - changelog.d/6175.misc | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 changelog.d/6175.bugfix create mode 100644 changelog.d/6175.misc diff --git a/changelog.d/6175.bugfix b/changelog.d/6175.bugfix deleted file mode 100644 index 3cd9a99edf95..000000000000 --- a/changelog.d/6175.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix syntax error in unique_user_filter_index schema update. diff --git a/changelog.d/6175.misc b/changelog.d/6175.misc new file mode 100644 index 000000000000..5bb24f02fc68 --- /dev/null +++ b/changelog.d/6175.misc @@ -0,0 +1 @@ +Update `user_filters` table to have a unique index, and non-null columns. Thanks to @pik for contributing this From 88957199e7edbd33a66d419224df9ba0eb9e604d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:16:39 +0100 Subject: [PATCH 24/38] Move client_ips's bg updates to a dedicated store --- synapse/storage/client_ips.py | 200 ++++++++++++++++++---------------- 1 file changed, 106 insertions(+), 94 deletions(-) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index bb135166ced0..1d89b50f5705 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -33,14 +33,14 @@ LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(background_updates.BackgroundUpdateStore): +class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore): def __init__(self, db_conn, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR ) - super(ClientIpStore, self).__init__(db_conn, hs) + super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs) self.user_ips_max_age = hs.config.user_ips_max_age @@ -92,19 +92,6 @@ def __init__(self, db_conn, hs): "devices_last_seen", self._devices_last_seen_update ) - # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) - self._batch_row_update = {} - - self._client_ip_looper = self._clock.looping_call( - self._update_client_ips_batch, 5 * 1000 - ) - self.hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", self._update_client_ips_batch - ) - - if self.user_ips_max_age: - self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) - @defer.inlineCallbacks def _remove_user_ip_nonunique(self, progress, batch_size): def f(conn): @@ -303,6 +290,110 @@ def remove(txn): return batch_size + @defer.inlineCallbacks + def _devices_last_seen_update(self, progress, batch_size): + """Background update to insert last seen info into devices table + """ + + last_user_id = progress.get("last_user_id", "") + last_device_id = progress.get("last_device_id", "") + + def _devices_last_seen_update_txn(txn): + # This consists of two queries: + # + # 1. The sub-query searches for the next N devices and joins + # against user_ips to find the max last_seen associated with + # that device. + # 2. The outer query then joins again against user_ips on + # user/device/last_seen. This *should* hopefully only + # return one row, but if it does return more than one then + # we'll just end up updating the same device row multiple + # times, which is fine. + + if self.database_engine.supports_tuple_comparison: + where_clause = "(user_id, device_id) > (?, ?)" + where_args = [last_user_id, last_device_id] + else: + # We explicitly do a `user_id >= ? AND (...)` here to ensure + # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` + # makes it hard for query optimiser to tell that it can use the + # index on user_id + where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" + where_args = [last_user_id, last_user_id, last_device_id] + + sql = """ + SELECT + last_seen, ip, user_agent, user_id, device_id + FROM ( + SELECT + user_id, device_id, MAX(u.last_seen) AS last_seen + FROM devices + INNER JOIN user_ips AS u USING (user_id, device_id) + WHERE %(where_clause)s + GROUP BY user_id, device_id + ORDER BY user_id ASC, device_id ASC + LIMIT ? + ) c + INNER JOIN user_ips AS u USING (user_id, device_id, last_seen) + """ % { + "where_clause": where_clause + } + txn.execute(sql, where_args + [batch_size]) + + rows = txn.fetchall() + if not rows: + return 0 + + sql = """ + UPDATE devices + SET last_seen = ?, ip = ?, user_agent = ? + WHERE user_id = ? AND device_id = ? + """ + txn.execute_batch(sql, rows) + + _, _, _, user_id, device_id = rows[-1] + self._background_update_progress_txn( + txn, + "devices_last_seen", + {"last_user_id": user_id, "last_device_id": device_id}, + ) + + return len(rows) + + updated = yield self.runInteraction( + "_devices_last_seen_update", _devices_last_seen_update_txn + ) + + if not updated: + yield self._end_background_update("devices_last_seen") + + return updated + + +class ClientIpStore(ClientIpBackgroundUpdateStore): + def __init__(self, db_conn, hs): + + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR + ) + + super(ClientIpStore, self).__init__(db_conn, hs) + + self.user_ips_max_age = hs.config.user_ips_max_age + + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) + self._batch_row_update = {} + + self._client_ip_looper = self._clock.looping_call( + self._update_client_ips_batch, 5 * 1000 + ) + self.hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", self._update_client_ips_batch + ) + + if self.user_ips_max_age: + self._clock.looping_call(self._prune_old_user_ips, 5 * 1000) + @defer.inlineCallbacks def insert_client_ip( self, user_id, access_token, ip, user_agent, device_id, now=None @@ -454,85 +545,6 @@ def get_user_ip_and_agents(self, user): for (access_token, ip), (user_agent, last_seen) in iteritems(results) ) - @defer.inlineCallbacks - def _devices_last_seen_update(self, progress, batch_size): - """Background update to insert last seen info into devices table - """ - - last_user_id = progress.get("last_user_id", "") - last_device_id = progress.get("last_device_id", "") - - def _devices_last_seen_update_txn(txn): - # This consists of two queries: - # - # 1. The sub-query searches for the next N devices and joins - # against user_ips to find the max last_seen associated with - # that device. - # 2. The outer query then joins again against user_ips on - # user/device/last_seen. This *should* hopefully only - # return one row, but if it does return more than one then - # we'll just end up updating the same device row multiple - # times, which is fine. - - if self.database_engine.supports_tuple_comparison: - where_clause = "(user_id, device_id) > (?, ?)" - where_args = [last_user_id, last_device_id] - else: - # We explicitly do a `user_id >= ? AND (...)` here to ensure - # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` - # makes it hard for query optimiser to tell that it can use the - # index on user_id - where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" - where_args = [last_user_id, last_user_id, last_device_id] - - sql = """ - SELECT - last_seen, ip, user_agent, user_id, device_id - FROM ( - SELECT - user_id, device_id, MAX(u.last_seen) AS last_seen - FROM devices - INNER JOIN user_ips AS u USING (user_id, device_id) - WHERE %(where_clause)s - GROUP BY user_id, device_id - ORDER BY user_id ASC, device_id ASC - LIMIT ? - ) c - INNER JOIN user_ips AS u USING (user_id, device_id, last_seen) - """ % { - "where_clause": where_clause - } - txn.execute(sql, where_args + [batch_size]) - - rows = txn.fetchall() - if not rows: - return 0 - - sql = """ - UPDATE devices - SET last_seen = ?, ip = ?, user_agent = ? - WHERE user_id = ? AND device_id = ? - """ - txn.execute_batch(sql, rows) - - _, _, _, user_id, device_id = rows[-1] - self._background_update_progress_txn( - txn, - "devices_last_seen", - {"last_user_id": user_id, "last_device_id": device_id}, - ) - - return len(rows) - - updated = yield self.runInteraction( - "_devices_last_seen_update", _devices_last_seen_update_txn - ) - - if not updated: - yield self._end_background_update("devices_last_seen") - - return updated - @wrap_as_background_process("prune_old_user_ips") async def _prune_old_user_ips(self): """Removes entries in user IPs older than the configured period. From 2d3b4f42f029da54132b26119e9ec0d902505eef Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:19:25 +0100 Subject: [PATCH 25/38] Move deviceinbox's bg updates to a dedicated store --- synapse/storage/deviceinbox.py | 37 ++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 6b7458304e69..70bc2bb2cce9 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -208,11 +208,11 @@ def delete_messages_for_remote_destination_txn(txn): ) -class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore): +class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" def __init__(self, db_conn, hs): - super(DeviceInboxStore, self).__init__(db_conn, hs) + super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_inbox_stream_index", @@ -225,6 +225,26 @@ def __init__(self, db_conn, hs): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) + @defer.inlineCallbacks + def _background_drop_index_device_inbox(self, progress, batch_size): + def reindex_txn(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") + txn.close() + + yield self.runWithConnection(reindex_txn) + + yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) + + return 1 + + +class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): + DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" + + def __init__(self, db_conn, hs): + super(DeviceInboxStore, self).__init__(db_conn, hs) + # Map of (user_id, device_id) to the last stream_id that has been # deleted up to. This is so that we can no op deletions. self._last_device_delete_cache = ExpiringCache( @@ -435,16 +455,3 @@ def get_all_new_device_messages_txn(txn): return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) - - @defer.inlineCallbacks - def _background_drop_index_device_inbox(self, progress, batch_size): - def reindex_txn(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id") - txn.close() - - yield self.runWithConnection(reindex_txn) - - yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID) - - return 1 From cef9f6753e51c1fb7b24f2122b0d0ada767d6e08 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:24:03 +0100 Subject: [PATCH 26/38] Move devices's bg updates to a dedicated store --- synapse/storage/devices.py | 49 +++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 79a58df59164..111bfb3d6490 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -512,17 +512,9 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids): return results -class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore): +class DeviceBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(DeviceStore, self).__init__(db_conn, hs) - - # Map of (user_id, device_id) -> bool. If there is an entry that implies - # the device exists. - self.device_id_exists_cache = Cache( - name="device_id_exists", keylen=2, max_entries=10000 - ) - - self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_lists_stream_idx", @@ -555,6 +547,31 @@ def __init__(self, db_conn, hs): self._drop_device_list_streams_non_unique_indexes, ) + @defer.inlineCallbacks + def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id") + txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id") + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) + return 1 + + +class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(DeviceStore, self).__init__(db_conn, hs) + + # Map of (user_id, device_id) -> bool. If there is an entry that implies + # the device exists. + self.device_id_exists_cache = Cache( + name="device_id_exists", keylen=2, max_entries=10000 + ) + + self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000) + @defer.inlineCallbacks def store_device(self, user_id, device_id, initial_device_display_name): """Ensure the given device is known; add it to the store if not @@ -910,15 +927,3 @@ def _prune_txn(txn): "_prune_old_outbound_device_pokes", _prune_txn, ) - - @defer.inlineCallbacks - def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): - def f(conn): - txn = conn.cursor() - txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id") - txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id") - txn.close() - - yield self.runWithConnection(f) - yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES) - return 1 From 54f87e07342ddbf82e9b8ee7c7ce227624c2f97b Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:30:22 +0100 Subject: [PATCH 27/38] Move media_repository's bg updates to a dedicated store --- synapse/storage/media_repository.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 6b1238ce4ae2..2eb2740d07c7 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -15,11 +15,10 @@ from synapse.storage.background_updates import BackgroundUpdateStore -class MediaRepositoryStore(BackgroundUpdateStore): - """Persistence for attachments and avatars""" +class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(MediaRepositoryStore, self).__init__(db_conn, hs) + super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_index_update( update_name="local_media_repository_url_idx", @@ -29,6 +28,13 @@ def __init__(self, db_conn, hs): where_clause="url_cache IS NOT NULL", ) + +class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): + """Persistence for attachments and avatars""" + + def __init__(self, db_conn, hs): + super(MediaRepositoryStore, self).__init__(db_conn, hs) + def get_local_media(self, media_id): """Get the metadata for a local piece of media Returns: From 81e6ffb536b19c662a81736f88ef2f243d425532 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:44:26 +0100 Subject: [PATCH 28/38] Move registration's bg updates to a dedicated store --- synapse/storage/registration.py | 198 +++++++++++++++++--------------- 1 file changed, 103 insertions(+), 95 deletions(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 1a859352b649..1f6c93b73b76 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -37,7 +37,57 @@ logger = logging.getLogger(__name__) -class RegistrationWorkerStore(SQLBaseStore): +class RegistrationDeactivationStore(SQLBaseStore): + @cachedInlineCallbacks() + def get_user_deactivated_status(self, user_id): + """Retrieve the value for the `deactivated` property for the provided user. + + Args: + user_id (str): The ID of the user to retrieve the status for. + + Returns: + defer.Deferred(bool): The requested value. + """ + + res = yield self._simple_select_one_onecol( + table="users", + keyvalues={"name": user_id}, + retcol="deactivated", + desc="get_user_deactivated_status", + ) + + # Convert the integer into a boolean. + return res == 1 + + @defer.inlineCallbacks + def set_user_deactivated_status(self, user_id, deactivated): + """Set the `deactivated` property for the provided user to the provided value. + + Args: + user_id (str): The ID of the user to set the status for. + deactivated (bool): The value to set for `deactivated`. + """ + + yield self.runInteraction( + "set_user_deactivated_status", + self.set_user_deactivated_status_txn, + user_id, + deactivated, + ) + + def set_user_deactivated_status_txn(self, txn, user_id, deactivated): + self._simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"deactivated": 1 if deactivated else 0}, + ) + self._invalidate_cache_and_stream( + txn, self.get_user_deactivated_status, (user_id,) + ) + + +class RegistrationWorkerStore(RegistrationDeactivationStore): def __init__(self, db_conn, hs): super(RegistrationWorkerStore, self).__init__(db_conn, hs) @@ -673,27 +723,6 @@ def get_id_servers_user_bound(self, user_id, medium, address): desc="get_id_servers_user_bound", ) - @cachedInlineCallbacks() - def get_user_deactivated_status(self, user_id): - """Retrieve the value for the `deactivated` property for the provided user. - - Args: - user_id (str): The ID of the user to retrieve the status for. - - Returns: - defer.Deferred(bool): The requested value. - """ - - res = yield self._simple_select_one_onecol( - table="users", - keyvalues={"name": user_id}, - retcol="deactivated", - desc="get_user_deactivated_status", - ) - - # Convert the integer into a boolean. - return res == 1 - def get_threepid_validation_session( self, medium, client_secret, address=None, sid=None, validated=True ): @@ -787,13 +816,14 @@ def delete_threepid_session_txn(txn): ) -class RegistrationStore( - RegistrationWorkerStore, background_updates.BackgroundUpdateStore +class RegistrationBackgroundUpdateStore( + RegistrationDeactivationStore, background_updates.BackgroundUpdateStore ): def __init__(self, db_conn, hs): - super(RegistrationStore, self).__init__(db_conn, hs) + super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs) self.clock = hs.get_clock() + self.config = hs.config self.register_background_index_update( "access_tokens_device_index", @@ -809,8 +839,6 @@ def __init__(self, db_conn, hs): columns=["creation_ts"], ) - self._account_validity = hs.config.account_validity - # we no longer use refresh tokens, but it's possible that some people # might have a background update queued to build this index. Just # clear the background update. @@ -824,17 +852,6 @@ def __init__(self, db_conn, hs): "users_set_deactivated_flag", self._background_update_set_deactivated_flag ) - # Create a background job for culling expired 3PID validity tokens - def start_cull(): - # run as a background process to make sure that the database transactions - # have a logcontext to report to - return run_as_background_process( - "cull_expired_threepid_validation_tokens", - self.cull_expired_threepid_validation_tokens, - ) - - hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) - @defer.inlineCallbacks def _background_update_set_deactivated_flag(self, progress, batch_size): """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1 @@ -896,6 +913,54 @@ def _background_update_set_deactivated_flag_txn(txn): return nb_processed + @defer.inlineCallbacks + def _bg_user_threepids_grandfather(self, progress, batch_size): + """We now track which identity servers a user binds their 3PID to, so + we need to handle the case of existing bindings where we didn't track + this. + + We do this by grandfathering in existing user threepids assuming that + they used one of the server configured trusted identity servers. + """ + id_servers = set(self.config.trusted_third_party_id_servers) + + def _bg_user_threepids_grandfather_txn(txn): + sql = """ + INSERT INTO user_threepid_id_server + (user_id, medium, address, id_server) + SELECT user_id, medium, address, ? + FROM user_threepids + """ + + txn.executemany(sql, [(id_server,) for id_server in id_servers]) + + if id_servers: + yield self.runInteraction( + "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn + ) + + yield self._end_background_update("user_threepids_grandfather") + + return 1 + + +class RegistrationStore(RegistrationWorkerStore, RegistrationBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(RegistrationStore, self).__init__(db_conn, hs) + + self._account_validity = hs.config.account_validity + + # Create a background job for culling expired 3PID validity tokens + def start_cull(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "cull_expired_threepid_validation_tokens", + self.cull_expired_threepid_validation_tokens, + ) + + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms): """Adds an access token for the given user. @@ -1244,36 +1309,6 @@ def get_user_pending_deactivation(self): desc="get_users_pending_deactivation", ) - @defer.inlineCallbacks - def _bg_user_threepids_grandfather(self, progress, batch_size): - """We now track which identity servers a user binds their 3PID to, so - we need to handle the case of existing bindings where we didn't track - this. - - We do this by grandfathering in existing user threepids assuming that - they used one of the server configured trusted identity servers. - """ - id_servers = set(self.config.trusted_third_party_id_servers) - - def _bg_user_threepids_grandfather_txn(txn): - sql = """ - INSERT INTO user_threepid_id_server - (user_id, medium, address, id_server) - SELECT user_id, medium, address, ? - FROM user_threepids - """ - - txn.executemany(sql, [(id_server,) for id_server in id_servers]) - - if id_servers: - yield self.runInteraction( - "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn - ) - - yield self._end_background_update("user_threepids_grandfather") - - return 1 - def validate_threepid_session(self, session_id, client_secret, token, current_ts): """Attempt to validate a threepid session using a token @@ -1464,30 +1499,3 @@ def cull_expired_threepid_validation_tokens_txn(txn, ts): cull_expired_threepid_validation_tokens_txn, self.clock.time_msec(), ) - - def set_user_deactivated_status_txn(self, txn, user_id, deactivated): - self._simple_update_one_txn( - txn=txn, - table="users", - keyvalues={"name": user_id}, - updatevalues={"deactivated": 1 if deactivated else 0}, - ) - self._invalidate_cache_and_stream( - txn, self.get_user_deactivated_status, (user_id,) - ) - - @defer.inlineCallbacks - def set_user_deactivated_status(self, user_id, deactivated): - """Set the `deactivated` property for the provided user to the provided value. - - Args: - user_id (str): The ID of the user to set the status for. - deactivated (bool): The value to set for `deactivated`. - """ - - yield self.runInteraction( - "set_user_deactivated_status", - self.set_user_deactivated_status_txn, - user_id, - deactivated, - ) From 841054ad96b44161d7a990bc1349ecc70fcd736c Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:47:42 +0100 Subject: [PATCH 29/38] Move search's bg updates to a dedicated store --- synapse/storage/search.py | 56 ++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index df87ab6a6dcb..9a41e780029f 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -36,7 +36,7 @@ ) -class SearchStore(BackgroundUpdateStore): +class SearchBackgroundUpdateStore(BackgroundUpdateStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" @@ -44,7 +44,7 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" def __init__(self, db_conn, hs): - super(SearchStore, self).__init__(db_conn, hs) + super(SearchBackgroundUpdateStore, self).__init__(db_conn, hs) if not hs.config.enable_search: return @@ -289,29 +289,6 @@ def reindex_search_txn(txn): return num_rows - def store_event_search_txn(self, txn, event, key, value): - """Add event to the search table - - Args: - txn (cursor): - event (EventBase): - key (str): - value (str): - """ - self.store_search_entries_txn( - txn, - ( - SearchEntry( - key=key, - value=value, - event_id=event.event_id, - room_id=event.room_id, - stream_ordering=event.internal_metadata.stream_ordering, - origin_server_ts=event.origin_server_ts, - ), - ), - ) - def store_search_entries_txn(self, txn, entries): """Add entries to the search table @@ -358,6 +335,35 @@ def store_search_entries_txn(self, txn, entries): # This should be unreachable. raise Exception("Unrecognized database engine") + +class SearchStore(SearchBackgroundUpdateStore): + + def __init__(self, db_conn, hs): + super(SearchStore, self).__init__(db_conn, hs) + + def store_event_search_txn(self, txn, event, key, value): + """Add event to the search table + + Args: + txn (cursor): + event (EventBase): + key (str): + value (str): + """ + self.store_search_entries_txn( + txn, + ( + SearchEntry( + key=key, + value=value, + event_id=event.event_id, + room_id=event.room_id, + stream_ordering=event.internal_metadata.stream_ordering, + origin_server_ts=event.origin_server_ts, + ), + ), + ) + @defer.inlineCallbacks def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. From cfccd2d78a0b8338f33a5631d8f0637d4ed07e7e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 17:56:16 +0100 Subject: [PATCH 30/38] Move state's bg updates to a dedicated store --- synapse/storage/state.py | 394 ++++++++++++++++++++------------------- 1 file changed, 204 insertions(+), 190 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1980a871087a..71b533c006d3 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -353,8 +353,158 @@ def get_member_split(self): return member_filter, non_member_filter +class StateGroupBackgroundUpdateStore(SQLBaseStore): + """Defines functions related to state groups needed to run the state backgroud + updates. + """ + + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = """ + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """ + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + + def _get_state_groups_from_groups_txn( + self, txn, groups, state_filter=StateFilter.all() + ): + results = {group: {} for group in groups} + + where_clause, where_args = state_filter.make_sql_filter_clause() + + # Unless the filter clause is empty, we're going to append it after an + # existing where clause + if where_clause: + where_clause = " AND (%s)" % (where_clause,) + + if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. + sql = """ + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT DISTINCT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state + ) + """ + + for group in groups: + args = [group] + args.extend(where_args) + + txn.execute(sql + where_clause, args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[group][key] = event_id + else: + max_entries_returned = state_filter.max_entries_returned() + + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + for group in groups: + next_group = group + + while next_group: + # We did this before by getting the list of group ids, and + # then passing that list to sqlite to get latest event for + # each (type, state_key). However, that was terribly slow + # without the right indices (which we can't add until + # after we finish deduping state, which requires this func) + args = [next_group] + args.extend(where_args) + + txn.execute( + "SELECT type, state_key, event_id FROM state_groups_state" + " WHERE state_group = ? " + where_clause, + args, + ) + results[group].update( + ((typ, state_key), event_id) + for typ, state_key, event_id in txn + if (typ, state_key) not in results[group] + ) + + # If the number of entries in the (type,state_key)->event_id dict + # matches the number of (type,state_keys) types we were searching + # for, then we must have found them all, so no need to go walk + # further down the tree... UNLESS our types filter contained + # wildcards (i.e. Nones) in which case we have to do an exhaustive + # search + if ( + max_entries_returned is not None + and len(results[group]) == max_entries_returned + ): + break + + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + + return results + + # this inherits from EventsWorkerStore because it calls self.get_events -class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): +class StateGroupWorkerStore( + EventsWorkerStore, StateGroupBackgroundUpdateStore, SQLBaseStore +): """The parts of StateGroupStore that can be called from workers. """ @@ -694,107 +844,6 @@ def _get_state_groups_from_groups(self, groups, state_filter): return results - def _get_state_groups_from_groups_txn( - self, txn, groups, state_filter=StateFilter.all() - ): - results = {group: {} for group in groups} - - where_clause, where_args = state_filter.make_sql_filter_clause() - - # Unless the filter clause is empty, we're going to append it after an - # existing where clause - if where_clause: - where_clause = " AND (%s)" % (where_clause,) - - if isinstance(self.database_engine, PostgresEngine): - # Temporarily disable sequential scans in this transaction. This is - # a temporary hack until we can add the right indices in - txn.execute("SET LOCAL enable_seqscan=off") - - # The below query walks the state_group tree so that the "state" - # table includes all state_groups in the tree. It then joins - # against `state_groups_state` to fetch the latest state. - # It assumes that previous state groups are always numerically - # lesser. - # The PARTITION is used to get the event_id in the greatest state - # group for the given type, state_key. - # This may return multiple rows per (type, state_key), but last_value - # should be the same. - sql = """ - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT DISTINCT type, state_key, last_value(event_id) OVER ( - PARTITION BY type, state_key ORDER BY state_group ASC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) AS event_id FROM state_groups_state - WHERE state_group IN ( - SELECT state_group FROM state - ) - """ - - for group in groups: - args = [group] - args.extend(where_args) - - txn.execute(sql + where_clause, args) - for row in txn: - typ, state_key, event_id = row - key = (typ, state_key) - results[group][key] = event_id - else: - max_entries_returned = state_filter.max_entries_returned() - - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - for group in groups: - next_group = group - - while next_group: - # We did this before by getting the list of group ids, and - # then passing that list to sqlite to get latest event for - # each (type, state_key). However, that was terribly slow - # without the right indices (which we can't add until - # after we finish deduping state, which requires this func) - args = [next_group] - args.extend(where_args) - - txn.execute( - "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? " + where_clause, - args, - ) - results[group].update( - ((typ, state_key), event_id) - for typ, state_key, event_id in txn - if (typ, state_key) not in results[group] - ) - - # If the number of entries in the (type,state_key)->event_id dict - # matches the number of (type,state_keys) types we were searching - # for, then we must have found them all, so no need to go walk - # further down the tree... UNLESS our types filter contained - # wildcards (i.e. Nones) in which case we have to do an exhaustive - # search - if ( - max_entries_returned is not None - and len(results[group]) == max_entries_returned - ): - break - - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - - return results - @defer.inlineCallbacks def get_state_for_events(self, event_ids, state_filter=StateFilter.all()): """Given a list of event_ids and type tuples, return a list of state @@ -1238,66 +1287,8 @@ def _store_state_group_txn(txn): return self.runInteraction("store_state_group", _store_state_group_txn) - def _count_state_group_hops_txn(self, txn, state_group): - """Given a state group, count how many hops there are in the tree. - - This is used to ensure the delta chains don't get too long. - """ - if isinstance(self.database_engine, PostgresEngine): - sql = """ - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT count(*) FROM state; - """ - - txn.execute(sql, (state_group,)) - row = txn.fetchone() - if row and row[0]: - return row[0] - else: - return 0 - else: - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - next_group = state_group - count = 0 - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - count += 1 - - return count - -class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): - """ Keeps track of the state at a given event. - - This is done by the concept of `state groups`. Every event is a assigned - a state group (identified by an arbitrary string), which references a - collection of state events. The current state of an event is then the - collection of state events referenced by the event's state group. - - Hence, every change in the current state causes a new state group to be - generated. However, if no change happens (e.g., if we get a message event - with only one parent it inherits the state group from its parent.) - - There are three tables: - * `state_groups`: Stores group name, first event with in the group and - room id. - * `event_to_state_groups`: Maps events to state groups. - * `state_groups_state`: Maps state group to state events. - """ +class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore, BackgroundUpdateStore): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" @@ -1305,7 +1296,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index" def __init__(self, db_conn, hs): - super(StateStore, self).__init__(db_conn, hs) + super(StateBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_update_handler( self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, self._background_deduplicate_state, @@ -1327,34 +1318,6 @@ def __init__(self, db_conn, hs): columns=["state_group"], ) - def _store_event_state_mappings_txn(self, txn, events_and_contexts): - state_groups = {} - for event, context in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - - # if the event was rejected, just give it the same state as its - # predecessor. - if context.rejected: - state_groups[event.event_id] = context.prev_group - continue - - state_groups[event.event_id] = context.state_group - - self._simple_insert_many_txn( - txn, - table="event_to_state_groups", - values=[ - {"state_group": state_group_id, "event_id": event_id} - for event_id, state_group_id in iteritems(state_groups) - ], - ) - - for event_id, state_group_id in iteritems(state_groups): - txn.call_after( - self._get_state_group_for_event.prefill, (event_id,), state_group_id - ) - @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): """This background update will slowly deduplicate state by reencoding @@ -1527,3 +1490,54 @@ def reindex_txn(conn): yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME) return 1 + + +class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ + + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) + + def _store_event_state_mappings_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group + continue + + state_groups[event.event_id] = context.state_group + + self._simple_insert_many_txn( + txn, + table="event_to_state_groups", + values=[ + {"state_group": state_group_id, "event_id": event_id} + for event_id, state_group_id in iteritems(state_groups) + ], + ) + + for event_id, state_group_id in iteritems(state_groups): + txn.call_after( + self._get_state_group_for_event.prefill, (event_id,), state_group_id + ) From e106a0e4db23fa1fedcce1c169985a8c91181c86 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 18:19:08 +0100 Subject: [PATCH 31/38] Move user_directory's bg updates to a dedicated store --- synapse/storage/user_directory.py | 178 ++++++++++++++++-------------- 1 file changed, 94 insertions(+), 84 deletions(-) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index b5188d9bee03..1b1e4751b9b2 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -32,14 +32,14 @@ TEMP_TABLE = "_temp_populate_user_directory" -class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore): +class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore): # How many records do we calculate before sending it to # add_users_who_share_private_rooms? SHARE_PRIVATE_WORKING_SET = 500 def __init__(self, db_conn, hs): - super(UserDirectoryStore, self).__init__(db_conn, hs) + super(UserDirectoryBackgroundUpdateStore, self).__init__(db_conn, hs) self.server_name = hs.hostname @@ -452,55 +452,6 @@ def _update_profile_in_user_dir_txn(txn): "update_profile_in_user_dir", _update_profile_in_user_dir_txn ) - def remove_from_user_dir(self, user_id): - def _remove_from_user_dir_txn(txn): - self._simple_delete_txn( - txn, table="user_directory", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, table="user_directory_search", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} - ) - self._simple_delete_txn( - txn, - table="users_who_share_private_rooms", - keyvalues={"user_id": user_id}, - ) - self._simple_delete_txn( - txn, - table="users_who_share_private_rooms", - keyvalues={"other_user_id": user_id}, - ) - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) - - return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) - - @defer.inlineCallbacks - def get_users_in_dir_due_to_room(self, room_id): - """Get all user_ids that are in the room directory because they're - in the given room_id - """ - user_ids_share_pub = yield self._simple_select_onecol( - table="users_in_public_rooms", - keyvalues={"room_id": room_id}, - retcol="user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids_share_priv = yield self._simple_select_onecol( - table="users_who_share_private_rooms", - keyvalues={"room_id": room_id}, - retcol="other_user_id", - desc="get_users_in_dir_due_to_room", - ) - - user_ids = set(user_ids_share_pub) - user_ids.update(user_ids_share_priv) - - return user_ids - def add_users_who_share_private_room(self, room_id, user_id_tuples): """Insert entries into the users_who_share_private_rooms table. The first user should be a local user. @@ -551,6 +502,98 @@ def _add_users_in_public_rooms_txn(txn): "add_users_in_public_rooms", _add_users_in_public_rooms_txn ) + def delete_all_from_user_dir(self): + """Delete the entire user directory + """ + + def _delete_all_from_user_dir_txn(txn): + txn.execute("DELETE FROM user_directory") + txn.execute("DELETE FROM user_directory_search") + txn.execute("DELETE FROM users_in_public_rooms") + txn.execute("DELETE FROM users_who_share_private_rooms") + txn.call_after(self.get_user_in_directory.invalidate_all) + + return self.runInteraction( + "delete_all_from_user_dir", _delete_all_from_user_dir_txn + ) + + @cached() + def get_user_in_directory(self, user_id): + return self._simple_select_one( + table="user_directory", + keyvalues={"user_id": user_id}, + retcols=("display_name", "avatar_url"), + allow_none=True, + desc="get_user_in_directory", + ) + + def update_user_directory_stream_pos(self, stream_id): + return self._simple_update_one( + table="user_directory_stream_pos", + keyvalues={}, + updatevalues={"stream_id": stream_id}, + desc="update_user_directory_stream_pos", + ) + + +class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): + + # How many records do we calculate before sending it to + # add_users_who_share_private_rooms? + SHARE_PRIVATE_WORKING_SET = 500 + + def __init__(self, db_conn, hs): + super(UserDirectoryStore, self).__init__(db_conn, hs) + + def remove_from_user_dir(self, user_id): + def _remove_from_user_dir_txn(txn): + self._simple_delete_txn( + txn, table="user_directory", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, table="user_directory_search", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"user_id": user_id}, + ) + self._simple_delete_txn( + txn, + table="users_who_share_private_rooms", + keyvalues={"other_user_id": user_id}, + ) + txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) + + return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn) + + @defer.inlineCallbacks + def get_users_in_dir_due_to_room(self, room_id): + """Get all user_ids that are in the room directory because they're + in the given room_id + """ + user_ids_share_pub = yield self._simple_select_onecol( + table="users_in_public_rooms", + keyvalues={"room_id": room_id}, + retcol="user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids_share_priv = yield self._simple_select_onecol( + table="users_who_share_private_rooms", + keyvalues={"room_id": room_id}, + retcol="other_user_id", + desc="get_users_in_dir_due_to_room", + ) + + user_ids = set(user_ids_share_pub) + user_ids.update(user_ids_share_priv) + + return user_ids + def remove_user_who_share_room(self, user_id, room_id): """ Deletes entries in the users_who_share_*_rooms table. The first @@ -637,31 +680,6 @@ def get_rooms_in_common_for_users(self, user_id, other_user_id): return [room_id for room_id, in rows] - def delete_all_from_user_dir(self): - """Delete the entire user directory - """ - - def _delete_all_from_user_dir_txn(txn): - txn.execute("DELETE FROM user_directory") - txn.execute("DELETE FROM user_directory_search") - txn.execute("DELETE FROM users_in_public_rooms") - txn.execute("DELETE FROM users_who_share_private_rooms") - txn.call_after(self.get_user_in_directory.invalidate_all) - - return self.runInteraction( - "delete_all_from_user_dir", _delete_all_from_user_dir_txn - ) - - @cached() - def get_user_in_directory(self, user_id): - return self._simple_select_one( - table="user_directory", - keyvalues={"user_id": user_id}, - retcols=("display_name", "avatar_url"), - allow_none=True, - desc="get_user_in_directory", - ) - def get_user_directory_stream_pos(self): return self._simple_select_one_onecol( table="user_directory_stream_pos", @@ -670,14 +688,6 @@ def get_user_directory_stream_pos(self): desc="get_user_directory_stream_pos", ) - def update_user_directory_stream_pos(self, stream_id): - return self._simple_update_one( - table="user_directory_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_user_directory_stream_pos", - ) - @defer.inlineCallbacks def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory From 0496eafbf4277523430114cf965a254241b290e7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 19:00:55 +0100 Subject: [PATCH 32/38] Move roommember's bg updates to a dedicated store --- synapse/storage/roommember.py | 222 +++++++++++++++++----------------- 1 file changed, 114 insertions(+), 108 deletions(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 59a89fad60f6..4e606a838049 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -27,6 +27,7 @@ from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import Sqlite3Engine from synapse.storage.events_worker import EventsWorkerStore from synapse.types import get_domain_from_id @@ -820,9 +821,9 @@ def get_rooms_user_has_been_in(self, user_id): return set(room_ids) -class RoomMemberStore(RoomMemberWorkerStore): +class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): - super(RoomMemberStore, self).__init__(db_conn, hs) + super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs) self.register_background_update_handler( _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile ) @@ -838,112 +839,6 @@ def __init__(self, db_conn, hs): where_clause="forgotten = 1", ) - def _store_room_members_txn(self, txn, events, backfilled): - """Store a room member in the database. - """ - self._simple_insert_many_txn( - txn, - table="room_memberships", - values=[ - { - "event_id": event.event_id, - "user_id": event.state_key, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - "display_name": event.content.get("displayname", None), - "avatar_url": event.content.get("avatar_url", None), - } - for event in events - ], - ) - - for event in events: - txn.call_after( - self._membership_stream_cache.entity_has_changed, - event.state_key, - event.internal_metadata.stream_ordering, - ) - txn.call_after( - self.get_invited_rooms_for_user.invalidate, (event.state_key,) - ) - - # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. If the event is an - # outlier it is only current if its an "out of band membership", - # like a remote invite or a rejection of a remote invite. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_out_of_band_membership() - ) - is_mine = self.hs.is_mine_id(event.state_key) - if is_new_state and is_mine: - if event.membership == Membership.INVITE: - self._simple_insert_txn( - txn, - table="local_invites", - values={ - "event_id": event.event_id, - "invitee": event.state_key, - "inviter": event.sender, - "room_id": event.room_id, - "stream_id": event.internal_metadata.stream_ordering, - }, - ) - else: - sql = ( - "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - txn.execute( - sql, - ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - ), - ) - - @defer.inlineCallbacks - def locally_reject_invite(self, user_id, room_id): - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - with self._stream_id_gen.get_next() as stream_ordering: - yield self.runInteraction("locally_reject_invite", f, stream_ordering) - - def forget(self, user_id, room_id): - """Indicate that user_id wishes to discard history for room_id.""" - - def f(txn): - sql = ( - "UPDATE" - " room_memberships" - " SET" - " forgotten = 1" - " WHERE" - " user_id = ?" - " AND" - " room_id = ?" - ) - txn.execute(sql, (user_id, room_id)) - - self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) - self._invalidate_cache_and_stream( - txn, self.get_forgotten_rooms_for_user, (user_id,) - ) - - return self.runInteraction("forget_membership", f) - @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( @@ -1078,6 +973,117 @@ def _background_current_state_membership_txn(txn, last_processed_room): return row_count +class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): + def __init__(self, db_conn, hs): + super(RoomMemberStore, self).__init__(db_conn, hs) + + def _store_room_members_txn(self, txn, events, backfilled): + """Store a room member in the database. + """ + self._simple_insert_many_txn( + txn, + table="room_memberships", + values=[ + { + "event_id": event.event_id, + "user_id": event.state_key, + "sender": event.user_id, + "room_id": event.room_id, + "membership": event.membership, + "display_name": event.content.get("displayname", None), + "avatar_url": event.content.get("avatar_url", None), + } + for event in events + ], + ) + + for event in events: + txn.call_after( + self._membership_stream_cache.entity_has_changed, + event.state_key, + event.internal_metadata.stream_ordering, + ) + txn.call_after( + self.get_invited_rooms_for_user.invalidate, (event.state_key,) + ) + + # We update the local_invites table only if the event is "current", + # i.e., its something that has just happened. If the event is an + # outlier it is only current if its an "out of band membership", + # like a remote invite or a rejection of a remote invite. + is_new_state = not backfilled and ( + not event.internal_metadata.is_outlier() + or event.internal_metadata.is_out_of_band_membership() + ) + is_mine = self.hs.is_mine_id(event.state_key) + if is_new_state and is_mine: + if event.membership == Membership.INVITE: + self._simple_insert_txn( + txn, + table="local_invites", + values={ + "event_id": event.event_id, + "invitee": event.state_key, + "inviter": event.sender, + "room_id": event.room_id, + "stream_id": event.internal_metadata.stream_ordering, + }, + ) + else: + sql = ( + "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + txn.execute( + sql, + ( + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.state_key, + ), + ) + + @defer.inlineCallbacks + def locally_reject_invite(self, user_id, room_id): + sql = ( + "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" + " room_id = ? AND invitee = ? AND locally_rejected is NULL" + " AND replaced_by is NULL" + ) + + def f(txn, stream_ordering): + txn.execute(sql, (stream_ordering, True, room_id, user_id)) + + with self._stream_id_gen.get_next() as stream_ordering: + yield self.runInteraction("locally_reject_invite", f, stream_ordering) + + def forget(self, user_id, room_id): + """Indicate that user_id wishes to discard history for room_id.""" + + def f(txn): + sql = ( + "UPDATE" + " room_memberships" + " SET" + " forgotten = 1" + " WHERE" + " user_id = ?" + " AND" + " room_id = ?" + ) + txn.execute(sql, (user_id, room_id)) + + self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id)) + self._invalidate_cache_and_stream( + txn, self.get_forgotten_rooms_for_user, (user_id,) + ) + + return self.runInteraction("forget_membership", f) + + class _JoinedHostsCache(object): """Cache for joined hosts in a room that is optimised to handle updates via state deltas. From bb05e4b4ec870a48f18ab4f9ecc36fc4ecc60e86 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 18:39:50 +0100 Subject: [PATCH 33/38] Use new stores instead of the DataStore --- scripts/synapse_port_db | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 3e5c47526656..486c78354b66 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -33,10 +33,18 @@ from twisted.internet import defer, reactor from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext from synapse.server import HomeServer -from synapse.storage import DataStore from synapse.storage._base import LoggingTransaction +from synapse.storage.client_ips import ClientIpBackgroundUpdateStore +from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore +from synapse.storage.devices import DeviceBackgroundUpdateStore from synapse.storage.engines import create_engine +from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore +from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore from synapse.storage.prepare_database import prepare_database +from synapse.storage.registration import RegistrationBackgroundUpdateStore +from synapse.storage.search import SearchBackgroundUpdateStore +from synapse.storage.state import StateBackgroundUpdateStore +from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore logger = logging.getLogger("synapse_port_db") @@ -102,8 +110,17 @@ APPEND_ONLY_TABLES = [ end_error_exec_info = None - -class Store(DataStore): +class Store( + ClientIpBackgroundUpdateStore, + DeviceInboxBackgroundUpdateStore, + DeviceBackgroundUpdateStore, + EventsBackgroundUpdatesStore, + MediaRepositoryBackgroundUpdateStore, + RegistrationBackgroundUpdateStore, + SearchBackgroundUpdateStore, + StateBackgroundUpdateStore, + UserDirectoryBackgroundUpdateStore, +): def __init__(self, db_conn, hs): super(Store, self).__init__(db_conn, hs) self.db_pool = hs.get_db_pool() From 65885b09b254baf4838d04b9af79a69a6e62d328 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 19:02:35 +0100 Subject: [PATCH 34/38] Add missing stores to the synapse_port_db store The StatsStore class hasn't been split because it's tied to a lot of code, including code outside of its own file (such as get_event()), and doesn't register any looping call, so it should be fine to leave it as is. --- scripts/synapse_port_db | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 486c78354b66..4d811593ba61 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -42,8 +42,10 @@ from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore from synapse.storage.prepare_database import prepare_database from synapse.storage.registration import RegistrationBackgroundUpdateStore +from synapse.storage.roommember import RoomMemberBackgroundUpdateStore from synapse.storage.search import SearchBackgroundUpdateStore from synapse.storage.state import StateBackgroundUpdateStore +from synapse.storage.stats import StatsStore from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore logger = logging.getLogger("synapse_port_db") @@ -117,9 +119,11 @@ class Store( EventsBackgroundUpdatesStore, MediaRepositoryBackgroundUpdateStore, RegistrationBackgroundUpdateStore, + RoomMemberBackgroundUpdateStore, SearchBackgroundUpdateStore, StateBackgroundUpdateStore, UserDirectoryBackgroundUpdateStore, + StatsStore, ): def __init__(self, db_conn, hs): super(Store, self).__init__(db_conn, hs) From ce339e66cb5fc59fa7678866c2a5b6394b54666f Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 4 Oct 2019 16:17:19 +0100 Subject: [PATCH 35/38] Don't inherit from HomeServer --- scripts/synapse_port_db | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 4d811593ba61..b6d73f65609e 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -32,7 +32,6 @@ from twisted.internet import defer, reactor from synapse.config.homeserver import HomeServerConfig from synapse.logging.context import PreserveLoggingContext -from synapse.server import HomeServer from synapse.storage._base import LoggingTransaction from synapse.storage.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore @@ -47,6 +46,7 @@ from synapse.storage.search import SearchBackgroundUpdateStore from synapse.storage.state import StateBackgroundUpdateStore from synapse.storage.stats import StatsStore from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore +from synapse.util import Clock logger = logging.getLogger("synapse_port_db") @@ -182,24 +182,24 @@ class Store( raise -class MockHomeserver(HomeServer): - DATASTORE_CLASS = Store - +class MockHomeserver: def __init__(self, config, database_engine, db_conn, db_pool): - super(MockHomeserver, self).__init__( - config.server_name, - reactor=reactor, - config=config, - database_engine=database_engine, - ) - self.database_engine = database_engine self.db_conn = db_conn self.db_pool = db_pool + self.clock = Clock(reactor) + self.config = config + self.hostname = config.server_name def get_db_conn(self): return self.db_conn + def get_db_pool(self): + return self.db_pool + + def get_clock(self): + return self.clock + class Porter(object): def __init__(self, **kwargs): @@ -490,8 +490,7 @@ class Porter(object): sqlite_db_pool, ) - hs.setup() - return hs.get_datastore() + return Store(sqlite_conn, hs) @defer.inlineCallbacks def get_postgres_store(self): @@ -510,8 +509,7 @@ class Porter(object): postgres_db_pool, ) - hs.setup() - postgres_store = hs.get_datastore() + postgres_store = Store(postgres_conn, hs) yield postgres_store.runInteraction( "postgres_engine.check_database", From 66ebea17235d9d3988d56cd1355656bbb508b3be Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 18:23:05 +0100 Subject: [PATCH 36/38] Lint --- synapse/storage/media_repository.py | 1 - synapse/storage/search.py | 1 - synapse/storage/state.py | 4 +++- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 2eb2740d07c7..84b5f3ad5ecc 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -16,7 +16,6 @@ class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore): - def __init__(self, db_conn, hs): super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 9a41e780029f..6ba4190f1ae8 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -337,7 +337,6 @@ def store_search_entries_txn(self, txn, entries): class SearchStore(SearchBackgroundUpdateStore): - def __init__(self, db_conn, hs): super(SearchStore, self).__init__(db_conn, hs) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 71b533c006d3..a941a5ae3fcd 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1288,7 +1288,9 @@ def _store_state_group_txn(txn): return self.runInteraction("store_state_group", _store_state_group_txn) -class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore, BackgroundUpdateStore): +class StateBackgroundUpdateStore( + StateGroupBackgroundUpdateStore, BackgroundUpdateStore +): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" From 53f7d03da39c7c9eee212c5173b746fd63564262 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 3 Oct 2019 19:07:25 +0100 Subject: [PATCH 37/38] Lint --- scripts/synapse_port_db | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index b6d73f65609e..07ad80e31c3e 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -112,6 +112,7 @@ APPEND_ONLY_TABLES = [ end_error_exec_info = None + class Store( ClientIpBackgroundUpdateStore, DeviceInboxBackgroundUpdateStore, From 21b5d8b1076354c7c6ee8849491f3fc886cc8189 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 7 Oct 2019 18:00:31 +0100 Subject: [PATCH 38/38] Changelog --- changelog.d/6178.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6178.bugfix diff --git a/changelog.d/6178.bugfix b/changelog.d/6178.bugfix new file mode 100644 index 000000000000..cd288c2a44d8 --- /dev/null +++ b/changelog.d/6178.bugfix @@ -0,0 +1 @@ +Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.