Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into fix-matrix-org#9348
Browse files Browse the repository at this point in the history
  • Loading branch information
JohannesKleine committed Jun 1, 2021
2 parents af12176 + 3ff6fe2 commit 1718981
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 27 deletions.
23 changes: 23 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
Synapse 1.35.0 (2021-06-01)
===========================

Note that [the tag](https://github.com/matrix-org/synapse/releases/tag/v1.35.0rc3) and [docker images](https://hub.docker.com/layers/matrixdotorg/synapse/v1.35.0rc3/images/sha256-34ccc87bd99a17e2cbc0902e678b5937d16bdc1991ead097eee6096481ecf2c4?context=explore) for `v1.35.0rc3` were incorrectly built. If you are experiencing issues with either, it is recommended to upgrade to the equivalent tag or docker image for the `v1.35.0` release.

Deprecations and Removals
-------------------------

- The core Synapse development team plan to drop support for the [unstable API of MSC2858](https://github.com/matrix-org/matrix-doc/blob/master/proposals/2858-Multiple-SSO-Identity-Providers.md#unstable-prefix), including the undocumented `experimental.msc2858_enabled` config option, in August 2021. Client authors should ensure that their clients are updated to use the stable API (which has been supported since Synapse 1.30) well before that time, to give their users time to upgrade. ([\#10101](https://github.com/matrix-org/synapse/issues/10101))

Bugfixes
--------

- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. Introduced in v1.33.0. ([\#10082](https://github.com/matrix-org/synapse/issues/10082))
- Fix HTTP response size limit to allow joining very large rooms over federation. Introduced in v1.33.0. ([\#10093](https://github.com/matrix-org/synapse/issues/10093))


Internal Changes
----------------

- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091))


Synapse 1.35.0rc2 (2021-05-27)
==============================

Expand Down
1 change: 1 addition & 0 deletions changelog.d/10074.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update opentracing to inject the right context into the carrier.
1 change: 1 addition & 0 deletions changelog.d/9953.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of incoming federation transactions in large rooms.
1 change: 1 addition & 0 deletions changelog.d/9973.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of incoming federation transactions in large rooms.
1 change: 0 additions & 1 deletion changelog.d/9973.misc

This file was deleted.

6 changes: 6 additions & 0 deletions debian/changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
matrix-synapse-py3 (1.35.0) stable; urgency=medium

* New synapse release 1.35.0.

-- Synapse Packaging team <[email protected]> Tue, 01 Jun 2021 13:23:35 +0100

matrix-synapse-py3 (1.34.0) stable; urgency=medium

* New synapse release 1.34.0.
Expand Down
2 changes: 1 addition & 1 deletion synapse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
except ImportError:
pass

__version__ = "1.35.0rc2"
__version__ = "1.35.0"

if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
Expand Down
7 changes: 7 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

logger = logging.getLogger(__name__)

# Send join responses can be huge, so we set a separate limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024


class TransportLayerClient:
"""Sends federation HTTP requests to other servers"""
Expand Down Expand Up @@ -261,6 +266,7 @@ async def send_join_v1(
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=True),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

return response
Expand All @@ -276,6 +282,7 @@ async def send_join_v2(
path=path,
data=content,
parser=SendJoinParser(room_version, v1_api=False),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
)

return response
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ async def _get_state_for_room(

# Fetch the state events from the DB, and check we have the auth events.
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
auth_events_in_store = await self.store.have_seen_events(
room_id, auth_event_ids
)

# Check for missing events. We handle state and auth event seperately,
# as we want to pull the state from the DB, but we don't for the auth
Expand Down Expand Up @@ -610,7 +612,7 @@ async def _get_state_for_room(

if missing_auth_events:
auth_events_in_store = await self.store.have_seen_events(
missing_auth_events
room_id, missing_auth_events
)
missing_auth_events.difference_update(auth_events_in_store)

Expand Down Expand Up @@ -710,7 +712,7 @@ async def _get_state_after_missing_prev_event(

missing_auth_events = set(auth_event_ids) - fetched_events.keys()
missing_auth_events.difference_update(
await self.store.have_seen_events(missing_auth_events)
await self.store.have_seen_events(room_id, missing_auth_events)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))

Expand Down Expand Up @@ -2475,7 +2477,7 @@ async def _update_auth_events_and_context_for_auth(
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
have_events = await self.store.have_seen_events(missing_auth)
have_events = await self.store.have_seen_events(event.room_id, missing_auth)
logger.debug("Events %s are in the store", have_events)
missing_auth.difference_update(have_events)

Expand All @@ -2494,7 +2496,7 @@ async def _update_auth_events_and_context_for_auth(
return context

seen_remotes = await self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
event.room_id, [e.event_id for e in remote_auth_chain]
)

for e in remote_auth_chain:
Expand Down
14 changes: 13 additions & 1 deletion synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def _handle_response(
response: IResponse,
start_ms: int,
parser: ByteParser[T],
max_response_size: Optional[int] = None,
) -> T:
"""
Reads the body of a response with a timeout and sends it to a parser
Expand All @@ -216,15 +217,20 @@ async def _handle_response(
response: response to the request
start_ms: Timestamp when request was made
parser: The parser for the response
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns:
The parsed response
"""

if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE

try:
check_content_type_is(response.headers, parser.CONTENT_TYPE)

d = read_body_with_max_size(response, parser, MAX_RESPONSE_SIZE)
d = read_body_with_max_size(response, parser, max_response_size)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)

length = await make_deferred_yieldable(d)
Expand Down Expand Up @@ -735,6 +741,7 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]:
...

Expand All @@ -752,6 +759,7 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
max_response_size: Optional[int] = None,
) -> T:
...

Expand All @@ -768,6 +776,7 @@ async def put_json(
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
):
"""Sends the specified json data using PUT
Expand Down Expand Up @@ -803,6 +812,8 @@ async def put_json(
enabled.
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns:
Succeeds when we get a 2xx HTTP response. The
Expand Down Expand Up @@ -853,6 +864,7 @@ async def put_json(
response,
start_ms,
parser=parser,
max_response_size=max_response_size,
)

return body
Expand Down
10 changes: 5 additions & 5 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T

span = opentracing.tracer.active_span
carrier = {} # type: Dict[str, str]
opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)

for key, value in carrier.items():
headers.addRawHeaders(key, value)
Expand Down Expand Up @@ -631,7 +631,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
span = opentracing.tracer.active_span

carrier = {} # type: Dict[str, str]
opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)

for key, value in carrier.items():
headers[key.encode()] = [value.encode()]
Expand Down Expand Up @@ -665,7 +665,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
return

opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
)


Expand All @@ -687,7 +687,7 @@ def get_active_span_text_map(destination=None):

carrier = {} # type: Dict[str, str]
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
)

return carrier
Expand All @@ -702,7 +702,7 @@ def active_span_context_as_string():
carrier = {} # type: Dict[str, str]
if opentracing:
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
)
return json_encoder.encode(carrier)

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _invalidate_caches_for_event(
backfilled,
):
self._invalidate_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
61 changes: 52 additions & 9 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
overload,
)
Expand Down Expand Up @@ -55,7 +56,7 @@
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -1045,32 +1046,74 @@ async def have_events_in_timeline(self, event_ids):

return {r["event_id"] for r in rows}

async def have_seen_events(self, event_ids):
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
The room_id is only used to structure the cache (so that it can later be
invalidated by room_id) - there is no guarantee that the events are actually
in the room in question.
Args:
event_ids (iterable[str]):
room_id: Room we are polling
event_ids: events we are looking for
Returns:
set[str]: The events we have already seen.
"""
res = await self._have_seen_events_dict(
(room_id, event_id) for event_id in event_ids
)
return {eid for ((_rid, eid), have_event) in res.items() if have_event}

@cachedList("have_seen_event", "keys")
async def _have_seen_events_dict(
self, keys: Iterable[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
"""Helper for have_seen_events
Returns:
a dict {(room_id, event_id)-> bool}
"""
# if the event cache contains the event, obviously we've seen it.
results = {x for x in event_ids if self._get_event_cache.contains(x)}

def have_seen_events_txn(txn, chunk):
sql = "SELECT event_id FROM events as e WHERE "
cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
}
results = {x: True for x in cache_results}

def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]):
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
# We therefore pull the events from the database into a set...

sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", chunk
txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
)
txn.execute(sql + clause, args)
results.update(row[0] for row in txn)
found_events = {eid for eid, in txn}

for chunk in batch_iter((x for x in event_ids if x not in results), 100):
# ... and then we can update the results for each row in the batch
results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})

# each batch requires its own index scan, so we make the batches as big as
# possible.
for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
)

return results

@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str):
# this only exists for the benefit of the @cachedList descriptor on
# _have_seen_events_dict
raise NotImplementedError()

def _get_current_state_event_counts_txn(self, txn, room_id):
"""
See get_current_state_event_counts.
Expand Down
Loading

0 comments on commit 1718981

Please sign in to comment.