Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prep work for removing outlier from internal_metadata #9411

Merged
merged 5 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9411.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column.
9 changes: 6 additions & 3 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering"]
__slots__ = ["_dict", "stream_ordering", "outlier"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -108,7 +108,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# the stream ordering of this event. None, until it has been persisted.
self.stream_ordering = None # type: Optional[int]

outlier = DictProperty("outlier") # type: bool
# whether this event is an outlier (ie, whether we have the state at that point
# in the DAG)
self.outlier = False

out_of_band_membership = DictProperty("out_of_band_membership") # type: bool
send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str
recheck_redaction = DictProperty("recheck_redaction") # type: bool
Expand All @@ -129,7 +132,7 @@ def get_dict(self) -> JsonDict:
return dict(self._dict)

def is_outlier(self) -> bool:
return self._dict.get("outlier", False)
return self.outlier

def is_out_of_band_membership(self) -> bool:
"""Whether this is an out of band membership, like an invite or an invite
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def prune_event(event: EventBase) -> EventBase:
event.internal_metadata.stream_ordering
)

pruned_event.internal_metadata.outlier = event.internal_metadata.outlier

# Mark the event as redacted
pruned_event.internal_metadata.redacted = True

Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
Expand Down Expand Up @@ -84,6 +85,7 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
}
Expand Down Expand Up @@ -116,6 +118,7 @@ async def _handle_request(self, request):
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
event.internal_metadata.outlier = event_payload["outlier"]

context = EventContext.deserialize(
self.storage, event_payload["context"]
Expand Down
4 changes: 4 additions & 0 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ async def _serialize_payload(
extra_users (list(UserID)): Any extra users to notify about event
"""

# we only expect this interface to be used for local events, so they shouldn't
# be outliers.
assert not event.internal_metadata.is_outlier()

serialized_context = await context.serialize(event, store)

payload = {
Expand Down
19 changes: 17 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,8 +1270,10 @@ def _update_outliers_txn(self, txn, events_and_contexts):
logger.exception("")
raise

# update the stored internal_metadata to update the "outlier" flag.
# TODO: This is unused as of Synapse 1.30. Remove it once we are happy
# to drop backwards-compatibility with 1.29.
metadata_json = json_encoder.encode(event.internal_metadata.get_dict())

sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))

Expand Down Expand Up @@ -1319,6 +1321,19 @@ def event_dict(event):
d.pop("redacted_because", None)
return d

def get_internal_metadata(event):
im = event.internal_metadata.get_dict()

# temporary hack for database compatibility with Synapse 1.29 and earlier:
# store the `outlier` flag inside the internal_metadata json as well as in
# the `events` table, so that if anyone rolls back to an older Synapse,
# things keep working. This can be removed once we are happy to drop support
# for that
if event.internal_metadata.is_outlier():
im["outlier"] = True

return im

self.db_pool.simple_insert_many_txn(
txn,
table="event_json",
Expand All @@ -1327,7 +1342,7 @@ def event_dict(event):
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
get_internal_metadata(event)
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
original_ev.internal_metadata.outlier = row["outlier"]

event_map[event_id] = original_ev

Expand Down Expand Up @@ -905,7 +906,8 @@ def _fetch_event_rows(self, txn, event_ids):
ej.json,
ej.format_version,
r.room_version,
rej.reason
rej.reason,
e.outlier
FROM events AS e
JOIN event_json AS ej USING (event_id)
LEFT JOIN rooms r ON r.room_id = e.room_id
Expand All @@ -929,6 +931,7 @@ def _fetch_event_rows(self, txn, event_ids):
"room_version_id": row[5],
"rejected_reason": row[6],
"redactions": [],
"outlier": row[7],
}

# check for redactions
Expand Down