Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Move pulling from queues to a async context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Mar 10, 2021
1 parent 4f04ecc commit d6effcf
Showing 1 changed file with 151 additions and 108 deletions.
259 changes: 151 additions & 108 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast

import attr
from prometheus_client import Counter

from synapse.api.errors import (
Expand Down Expand Up @@ -260,119 +261,39 @@ async def _transaction_transmission_loop(self) -> None:
while True:
self._new_data_to_send = False

# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

device_update_edus, dev_list_id = await self._get_device_update_edus(
limit
)

limit -= len(device_update_edus)

(
to_device_edus,
device_stream_id,
) = await self._get_to_device_message_edus(limit)

pending_edus = device_update_edus + to_device_edus

pending_pdus = self._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]

pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)

pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self._pending_edus_keyed
async with _TransactionQueueManager(self) as (
pending_pdus,
pending_edus,
):
_, val = self._pending_edus_keyed.popitem()
pending_edus.append(val)

if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)

if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)
self._last_device_stream_id = device_stream_id

# If we've gotten told about new things to send during
# checking for things to send, we try looking again.
# Otherwise new PDUs or EDUs might arrive in the meantime,
# but not get sent because we hold the
# `transmission_loop_running` flag.
if self._new_data_to_send:
continue
else:
return

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)

sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", self._destination)

# If we've gotten told about new things to send during
# checking for things to send, we try looking again.
# Otherwise new PDUs or EDUs might arrive in the meantime,
# but not get sent because we hold the
# `transmission_loop_running` flag.
if self._new_data_to_send:
continue
else:
return

if pending_pdus:
logger.debug(
"TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination,
len(pending_pdus),
)

# also mark the device updates as sent
if device_update_edus:
logger.info("Marking as sent %r %r", self._destination, dev_list_id)
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()

if pending_pdus:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
final_pdu = pending_pdus[-1]
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
Expand Down Expand Up @@ -580,3 +501,125 @@ def _start_catching_up(self) -> None:
"""
self._catching_up = True
self._pending_pdus = []


@attr.s(slots=True)
class _TransactionQueueManager:
"""A helper async context manager for pulling stuff off the queues and
tracking what was last successfully sent, etc.
"""

queue = attr.ib(type=PerDestinationQueue)

_device_stream_id = attr.ib(type=Optional[int], default=None)
_device_list_id = attr.ib(type=Optional[int], default=None)
_last_stream_ordering = attr.ib(type=Optional[int], default=None)

async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2

device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
limit
)

if device_update_edus:
self._device_list_id = dev_list_id
else:
self.queue._last_device_list_stream_id = dev_list_id

limit -= len(device_update_edus)

(
to_device_edus,
device_stream_id,
) = await self.queue._get_to_device_message_edus(limit)

if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id

pending_edus = device_update_edus + to_device_edus

pending_pdus = self.queue._pending_pdus

# We can only include at most 50 PDUs per transactions
pending_pdus, self.queue._pending_pdus = pending_pdus[:50], pending_pdus[50:]

pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
pending_presence = self.queue._pending_presence
self.queue._pending_presence = {}
if pending_presence:
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)

pending_edus.extend(
self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
while (
len(pending_edus) < MAX_EDUS_PER_TRANSACTION
and self.queue._pending_edus_keyed
):
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)

if not pending_pdus and not pending_edus:
return [], []

# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self.queue._get_rr_edus(force_flush=True))

if pending_pdus:
self._last_stream_ordering = pending_pdus[
-1
].internal_metadata.stream_ordering
assert self._last_stream_ordering

return pending_pdus, pending_edus

async def __aexit__(self, exc_type, exc, tb):
if exc_type is not None:
# Failed to send transaction, nothing to do.
return

# Succeeded to send the transaction so we record where we have sent up
# to in the various streams

if self._device_stream_id:
await self.queue._store.delete_device_msgs_for_remote(
self.queue._destination, self._device_stream_id
)
self.queue._last_device_stream_id = self._device_stream_id

# also mark the device updates as sent
if self._device_list_id:
logger.info(
"Marking as sent %r %r", self.queue._destination, self._device_list_id
)
await self.queue._store.mark_as_sent_devices_by_remote(
self.queue._destination, self._device_list_id
)
self.queue._last_device_list_stream_id = self._device_list_id

if self._last_stream_ordering:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
await self.queue._store.set_destination_last_successful_stream_ordering(
self.queue._destination, self._last_stream_ordering
)

0 comments on commit d6effcf

Please sign in to comment.