Fix relations in worker mode

This commit is contained in:
Erik Johnston 2019-05-16 10:18:53 +01:00
parent 33453419b0
commit b5c62c6b26
5 changed files with 28 additions and 13 deletions

View File

@ -23,6 +23,7 @@ from synapse.replication.tcp.streams.events import (
from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.relations import RelationsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.storage.signatures import SignatureWorkerStore from synapse.storage.signatures import SignatureWorkerStore
from synapse.storage.state import StateGroupWorkerStore from synapse.storage.state import StateGroupWorkerStore
@ -52,6 +53,7 @@ class SlavedEventStore(EventFederationWorkerStore,
EventsWorkerStore, EventsWorkerStore,
SignatureWorkerStore, SignatureWorkerStore,
UserErasureWorkerStore, UserErasureWorkerStore,
RelationsWorkerStore,
BaseSlavedStore): BaseSlavedStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
@ -89,7 +91,7 @@ class SlavedEventStore(EventFederationWorkerStore,
for row in rows: for row in rows:
self.invalidate_caches_for_event( self.invalidate_caches_for_event(
-token, row.event_id, row.room_id, row.type, row.state_key, -token, row.event_id, row.room_id, row.type, row.state_key,
row.redacts, row.redacts, row.relates_to,
backfilled=True, backfilled=True,
) )
return super(SlavedEventStore, self).process_replication_rows( return super(SlavedEventStore, self).process_replication_rows(
@ -102,7 +104,7 @@ class SlavedEventStore(EventFederationWorkerStore,
if row.type == EventsStreamEventRow.TypeId: if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event( self.invalidate_caches_for_event(
token, data.event_id, data.room_id, data.type, data.state_key, token, data.event_id, data.room_id, data.type, data.state_key,
data.redacts, data.redacts, data.relates_to,
backfilled=False, backfilled=False,
) )
elif row.type == EventsStreamCurrentStateRow.TypeId: elif row.type == EventsStreamCurrentStateRow.TypeId:
@ -114,7 +116,8 @@ class SlavedEventStore(EventFederationWorkerStore,
raise Exception("Unknown events stream row type %s" % (row.type, )) raise Exception("Unknown events stream row type %s" % (row.type, ))
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
etype, state_key, redacts, backfilled): etype, state_key, redacts, relates_to,
backfilled):
self._invalidate_get_event_cache(event_id) self._invalidate_get_event_cache(event_id)
self.get_latest_event_ids_in_room.invalidate((room_id,)) self.get_latest_event_ids_in_room.invalidate((room_id,))
@ -136,3 +139,7 @@ class SlavedEventStore(EventFederationWorkerStore,
state_key, stream_ordering state_key, stream_ordering
) )
self.get_invited_rooms_for_user.invalidate((state_key,)) self.get_invited_rooms_for_user.invalidate((state_key,))
if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
self.get_aggregation_groups_for_event.invalidate_many((relates_to,))

View File

@ -32,6 +32,7 @@ BackfillStreamRow = namedtuple("BackfillStreamRow", (
"type", # str "type", # str
"state_key", # str, optional "state_key", # str, optional
"redacts", # str, optional "redacts", # str, optional
"relates_to", # str, optional
)) ))
PresenceStreamRow = namedtuple("PresenceStreamRow", ( PresenceStreamRow = namedtuple("PresenceStreamRow", (
"user_id", # str "user_id", # str

View File

@ -80,11 +80,12 @@ class BaseEventsStreamRow(object):
class EventsStreamEventRow(BaseEventsStreamRow): class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev" TypeId = "ev"
event_id = attr.ib() # str event_id = attr.ib() # str
room_id = attr.ib() # str room_id = attr.ib() # str
type = attr.ib() # str type = attr.ib() # str
state_key = attr.ib() # str, optional state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional redacts = attr.ib() # str, optional
relates_to = attr.ib() # str, optional
@attr.s(slots=True, frozen=True) @attr.s(slots=True, frozen=True)

View File

@ -1657,10 +1657,11 @@ class EventsStore(
def get_all_new_forward_event_rows(txn): def get_all_new_forward_event_rows(txn):
sql = ( sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type," "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts" " state_key, redacts, relates_to_id"
" FROM events AS e" " FROM events AS e"
" LEFT JOIN redactions USING (event_id)" " LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)" " LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?" " WHERE ? < stream_ordering AND stream_ordering <= ?"
" ORDER BY stream_ordering ASC" " ORDER BY stream_ordering ASC"
" LIMIT ?" " LIMIT ?"
@ -1675,11 +1676,12 @@ class EventsStore(
sql = ( sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type," "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts" " state_key, redacts, relates_to_id"
" FROM events AS e" " FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)" " INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)" " LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)" " LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? < event_stream_ordering" " WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?" " AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering DESC" " ORDER BY event_stream_ordering DESC"
@ -1700,10 +1702,11 @@ class EventsStore(
def get_all_new_backfill_event_rows(txn): def get_all_new_backfill_event_rows(txn):
sql = ( sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts" " state_key, redacts, relates_to_id"
" FROM events AS e" " FROM events AS e"
" LEFT JOIN redactions USING (event_id)" " LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)" " LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?" " WHERE ? > stream_ordering AND stream_ordering >= ?"
" ORDER BY stream_ordering ASC" " ORDER BY stream_ordering ASC"
" LIMIT ?" " LIMIT ?"
@ -1718,11 +1721,12 @@ class EventsStore(
sql = ( sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts" " state_key, redacts, relates_to_id"
" FROM events AS e" " FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)" " INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)" " LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)" " LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" WHERE ? > event_stream_ordering" " WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?" " AND event_stream_ordering >= ?"
" ORDER BY event_stream_ordering DESC" " ORDER BY event_stream_ordering DESC"

View File

@ -109,7 +109,7 @@ class AggregationPaginationToken(object):
return "%d-%d" % (self.count, self.stream) return "%d-%d" % (self.count, self.stream)
class RelationsStore(SQLBaseStore): class RelationsWorkerStore(SQLBaseStore):
@cached(tree=True) @cached(tree=True)
def get_relations_for_event( def get_relations_for_event(
self, self,
@ -318,6 +318,8 @@ class RelationsStore(SQLBaseStore):
"get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn
) )
class RelationsStore(RelationsWorkerStore):
def _handle_event_relations(self, txn, event): def _handle_event_relations(self, txn, event):
"""Handles inserting relation data during peristence of events """Handles inserting relation data during peristence of events