Prep work for removing outlier from internal_metadata (#9411)

* Populate `internal_metadata.outlier` based on `events` table

Rather than relying on `outlier` being in the `internal_metadata` column,
populate it based on the `events.outlier` column.

* Move `outlier` out of InternalMetadata._dict

Ultimately, this will allow us to stop writing it to the database. For now, we
have to grandfather it back in so as to maintain compatibility with older
versions of Synapse.
This commit is contained in:
Richard van der Hoff 2021-03-17 12:33:18 +00:00 committed by GitHub
parent b449af0379
commit 567f88f835
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 36 additions and 7 deletions

1
changelog.d/9411.misc Normal file
View File

@ -0,0 +1 @@
Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column.

View File

@ -98,7 +98,7 @@ class DefaultDictProperty(DictProperty):
class _EventInternalMetadata: class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering"] __slots__ = ["_dict", "stream_ordering", "outlier"]
def __init__(self, internal_metadata_dict: JsonDict): def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is # we have to copy the dict, because it turns out that the same dict is
@ -108,7 +108,10 @@ class _EventInternalMetadata:
# the stream ordering of this event. None, until it has been persisted. # the stream ordering of this event. None, until it has been persisted.
self.stream_ordering = None # type: Optional[int] self.stream_ordering = None # type: Optional[int]
outlier = DictProperty("outlier") # type: bool # whether this event is an outlier (ie, whether we have the state at that point
# in the DAG)
self.outlier = False
out_of_band_membership = DictProperty("out_of_band_membership") # type: bool out_of_band_membership = DictProperty("out_of_band_membership") # type: bool
send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str
recheck_redaction = DictProperty("recheck_redaction") # type: bool recheck_redaction = DictProperty("recheck_redaction") # type: bool
@ -129,7 +132,7 @@ class _EventInternalMetadata:
return dict(self._dict) return dict(self._dict)
def is_outlier(self) -> bool: def is_outlier(self) -> bool:
return self._dict.get("outlier", False) return self.outlier
def is_out_of_band_membership(self) -> bool: def is_out_of_band_membership(self) -> bool:
"""Whether this is an out of band membership, like an invite or an invite """Whether this is an out of band membership, like an invite or an invite

View File

@ -54,6 +54,8 @@ def prune_event(event: EventBase) -> EventBase:
event.internal_metadata.stream_ordering event.internal_metadata.stream_ordering
) )
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
# Mark the event as redacted # Mark the event as redacted
pruned_event.internal_metadata.redacted = True pruned_event.internal_metadata.redacted = True

View File

@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
// containing the event // containing the event
"event_format_version": .., // 1,2,3 etc: the event format version "event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. }, "internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field "rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. }, "context": { .. serialized event context .. },
}], }],
@ -84,6 +85,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"room_version": event.room_version.identifier, "room_version": event.room_version.identifier,
"event_format_version": event.format_version, "event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(), "internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason, "rejected_reason": event.rejected_reason,
"context": serialized_context, "context": serialized_context,
} }
@ -116,6 +118,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event = make_event_from_dict( event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason event_dict, room_ver, internal_metadata, rejected_reason
) )
event.internal_metadata.outlier = event_payload["outlier"]
context = EventContext.deserialize( context = EventContext.deserialize(
self.storage, event_payload["context"] self.storage, event_payload["context"]

View File

@ -40,6 +40,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
// containing the event // containing the event
"event_format_version": .., // 1,2,3 etc: the event format version "event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. }, "internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field "rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. }, "context": { .. serialized event context .. },
"requester": { .. serialized requester .. }, "requester": { .. serialized requester .. },
@ -79,7 +80,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
ratelimit (bool) ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event extra_users (list(UserID)): Any extra users to notify about event
""" """
serialized_context = await context.serialize(event, store) serialized_context = await context.serialize(event, store)
payload = { payload = {
@ -87,6 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"room_version": event.room_version.identifier, "room_version": event.room_version.identifier,
"event_format_version": event.format_version, "event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(), "internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason, "rejected_reason": event.rejected_reason,
"context": serialized_context, "context": serialized_context,
"requester": requester.serialize(), "requester": requester.serialize(),
@ -108,6 +109,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
event = make_event_from_dict( event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason event_dict, room_ver, internal_metadata, rejected_reason
) )
event.internal_metadata.outlier = content["outlier"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
context = EventContext.deserialize(self.storage, content["context"]) context = EventContext.deserialize(self.storage, content["context"])

View File

@ -1270,8 +1270,10 @@ class PersistEventsStore:
logger.exception("") logger.exception("")
raise raise
# update the stored internal_metadata to update the "outlier" flag.
# TODO: This is unused as of Synapse 1.31. Remove it once we are happy
# to drop backwards-compatibility with 1.30.
metadata_json = json_encoder.encode(event.internal_metadata.get_dict()) metadata_json = json_encoder.encode(event.internal_metadata.get_dict())
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?" sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id)) txn.execute(sql, (metadata_json, event.event_id))
@ -1319,6 +1321,19 @@ class PersistEventsStore:
d.pop("redacted_because", None) d.pop("redacted_because", None)
return d return d
def get_internal_metadata(event):
im = event.internal_metadata.get_dict()
# temporary hack for database compatibility with Synapse 1.30 and earlier:
# store the `outlier` flag inside the internal_metadata json as well as in
# the `events` table, so that if anyone rolls back to an older Synapse,
# things keep working. This can be removed once we are happy to drop support
# for that
if event.internal_metadata.is_outlier():
im["outlier"] = True
return im
self.db_pool.simple_insert_many_txn( self.db_pool.simple_insert_many_txn(
txn, txn,
table="event_json", table="event_json",
@ -1327,7 +1342,7 @@ class PersistEventsStore:
"event_id": event.event_id, "event_id": event.event_id,
"room_id": event.room_id, "room_id": event.room_id,
"internal_metadata": json_encoder.encode( "internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict() get_internal_metadata(event)
), ),
"json": json_encoder.encode(event_dict(event)), "json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version, "format_version": event.format_version,

View File

@ -799,6 +799,7 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason=rejected_reason, rejected_reason=rejected_reason,
) )
original_ev.internal_metadata.stream_ordering = row["stream_ordering"] original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
original_ev.internal_metadata.outlier = row["outlier"]
event_map[event_id] = original_ev event_map[event_id] = original_ev
@ -905,7 +906,8 @@ class EventsWorkerStore(SQLBaseStore):
ej.json, ej.json,
ej.format_version, ej.format_version,
r.room_version, r.room_version,
rej.reason rej.reason,
e.outlier
FROM events AS e FROM events AS e
JOIN event_json AS ej USING (event_id) JOIN event_json AS ej USING (event_id)
LEFT JOIN rooms r ON r.room_id = e.room_id LEFT JOIN rooms r ON r.room_id = e.room_id
@ -929,6 +931,7 @@ class EventsWorkerStore(SQLBaseStore):
"room_version_id": row[5], "room_version_id": row[5],
"rejected_reason": row[6], "rejected_reason": row[6],
"redactions": [], "redactions": [],
"outlier": row[7],
} }
# check for redactions # check for redactions