Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't go into federation catch up mode so easily #9561

Merged
merged 9 commits into from
Mar 15, 2021
73 changes: 33 additions & 40 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,46 +337,42 @@ async def _transaction_transmission_loop(self) -> None:
if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))

success = await self._transaction_manager.send_new_transaction(
await self._transaction_manager.send_new_transaction(
self._destination, pending_pdus, pending_edus
)
if success:
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

# also mark the device updates as sent
if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)
sent_transactions_counter.inc()
sent_edus_counter.inc(len(pending_edus))
for edu in pending_edus:
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if to_device_edus:
await self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
final_pdu = pending_pdus[-1]
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
else:
break
# also mark the device updates as sent
if device_update_edus:
logger.info("Marking as sent %r %r", self._destination, dev_list_id)
await self._store.mark_as_sent_devices_by_remote(
self._destination, dev_list_id
)

self._last_device_stream_id = device_stream_id
self._last_device_list_stream_id = dev_list_id

if pending_pdus:
# we sent some PDUs and it was successful, so update our
# last_successful_stream_ordering in the destinations table.
final_pdu = pending_pdus[-1]
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
Expand Down Expand Up @@ -502,13 +498,10 @@ async def _catch_up_transmission_loop(self) -> None:
rooms = [p.room_id for p in catchup_pdus]
logger.info("Catching up rooms to %s: %r", self._destination, rooms)

success = await self._transaction_manager.send_new_transaction(
await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)

if not success:
return

sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
Expand Down
48 changes: 14 additions & 34 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,12 @@ async def send_new_transaction(
destination: str,
pdus: List[EventBase],
edus: List[Edu],
) -> bool:
) -> None:
"""
Args:
destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send

Returns:
True iff the transaction was successful
Comment on lines -79 to -80
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth mentioning what this raises while we're here? 😄

"""

# Make a transaction-sending opentracing span. This span follows on from
Expand All @@ -96,8 +93,6 @@ async def send_new_transaction(
edu.strip_context()

with start_active_span_follows_from("send_transaction", span_contexts):
success = True

logger.debug("TX [%s] _attempt_new_transaction", destination)

txn_id = str(self._next_txn_id)
Expand Down Expand Up @@ -152,44 +147,29 @@ def json_data_cb():
response = await self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response

if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)

if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warning(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
set_tag(tags.ERROR, True)

logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
raise

logger.info("TX [%s] {%s} got 200 response", destination, txn_id)

for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warning(
"TX [%s] {%s} Failed to send event %s",
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
p.event_id,
e_id,
r,
)
success = False

if success and pdus and destination in self._federation_metrics_domains:
if pdus and destination in self._federation_metrics_domains:
last_pdu = pdus[-1]
last_pdu_ts_metric.labels(server_name=destination).set(
last_pdu.origin_server_ts / 1000
)

set_tag(tags.ERROR, not success)
return success