forked-synapse/tests/replication/slave/storage/test_events.py
Erik Johnston 28c98e51ff
Add local_current_membership table (#6655)
Currently we rely on `current_state_events` to figure out what rooms a
user was in and their last membership event in there. However, if the
server leaves the room then the table may be cleaned up and that
information is lost. So lets add a table that separately holds that
information.
2020-01-15 14:59:33 +00:00

365 lines
13 KiB
Python

# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from canonicaljson import encode_canonical_json
from synapse.events import FrozenEvent, _EventInternalMetadata
from synapse.events.snapshot import EventContext
from synapse.handlers.room import RoomEventSource
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.roommember import RoomsForUser
from ._base import BaseSlavedStoreTestCase
USER_ID = "@feeling:blue"
USER_ID_2 = "@bright:blue"
OUTLIER = {"outlier": True}
ROOM_ID = "!room:blue"
logger = logging.getLogger(__name__)
def dict_equals(self, other):
me = encode_canonical_json(self.get_pdu_json())
them = encode_canonical_json(other.get_pdu_json())
return me == them
def patch__eq__(cls):
eq = getattr(cls, "__eq__", None)
cls.__eq__ = dict_equals
def unpatch():
if eq is not None:
cls.__eq__ = eq
return unpatch
class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
STORE_TYPE = SlavedEventStore
def setUp(self):
# Patch up the equality operator for events so that we can check
# whether lists of events match using assertEquals
self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(FrozenEvent)]
return super(SlavedEventStoreTestCase, self).setUp()
def tearDown(self):
[unpatch() for unpatch in self.unpatches]
def test_get_latest_event_ids_in_room(self):
create = self.persist(type="m.room.create", key="", creator=USER_ID)
self.replicate()
self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
join = self.persist(
type="m.room.member",
key=USER_ID,
membership="join",
prev_events=[(create.event_id, {})],
)
self.replicate()
self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
def test_redactions(self):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
self.replicate()
self.check("get_event", [msg.event_id], msg)
redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
self.replicate()
msg_dict = msg.get_dict()
msg_dict["content"] = {}
msg_dict["unsigned"]["redacted_by"] = redaction.event_id
msg_dict["unsigned"]["redacted_because"] = redaction
redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
self.check("get_event", [msg.event_id], redacted)
def test_backfilled_redactions(self):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
self.replicate()
self.check("get_event", [msg.event_id], msg)
redaction = self.persist(
type="m.room.redaction", redacts=msg.event_id, backfill=True
)
self.replicate()
msg_dict = msg.get_dict()
msg_dict["content"] = {}
msg_dict["unsigned"]["redacted_by"] = redaction.event_id
msg_dict["unsigned"]["redacted_because"] = redaction
redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
self.check("get_event", [msg.event_id], redacted)
def test_invites(self):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
self.replicate()
self.check(
"get_invited_rooms_for_local_user",
[USER_ID_2],
[
RoomsForUser(
ROOM_ID,
USER_ID,
"invite",
event.event_id,
event.internal_metadata.stream_ordering,
)
],
)
def test_push_actions_for_user(self):
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.join", key=USER_ID, membership="join")
self.persist(
type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
)
event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 0},
)
self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[(USER_ID_2, ["notify"])],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 0, "notify_count": 1},
)
self.persist(
type="m.room.message",
msgtype="m.text",
body="world",
push_actions=[
(USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
],
)
self.replicate()
self.check(
"get_unread_event_push_actions_by_room_for_user",
[ROOM_ID, USER_ID_2, event1.event_id],
{"highlight_count": 1, "notify_count": 2},
)
def test_get_rooms_for_user_with_stream_ordering(self):
"""Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
by rows in the events stream
"""
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.replicate()
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
self.replicate()
self.check(
"get_rooms_for_user_with_stream_ordering",
(USER_ID_2,),
{(ROOM_ID, j2.internal_metadata.stream_ordering)},
)
def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
"""Check that current_state invalidation happens correctly with multiple events
in the persistence batch.
This test attempts to reproduce a race condition between the event persistence
loop and a worker-based Sync handler.
The problem occurred when the master persisted several events in one batch. It
only updates the current_state at the end of each batch, so the obvious thing
to do is then to issue a current_state_delta stream update corresponding to the
last stream_id in the batch.
However, that raises the possibility that a worker will see the replication
notification for a join event before the current_state caches are invalidated.
The test involves:
* creating a join and a message event for a user, and persisting them in the
same batch
* controlling the replication stream so that updates are sent gradually
* between each bunch of replication updates, check that we see a consistent
snapshot of the state.
"""
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.replicate()
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
# limit the replication rate
repl_transport = self.server_to_client_transport
repl_transport.autoflush = False
# build the join and message events and persist them in the same batch.
logger.info("----- build test events ------")
j2, j2ctx = self.build_event(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
msg, msgctx = self.build_event()
self.get_success(
self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
)
self.replicate()
event_source = RoomEventSource(self.hs)
event_source.store = self.slaved_store
current_token = self.get_success(event_source.get_current_key())
# gradually stream out the replication
while repl_transport.buffer:
logger.info("------ flush ------")
repl_transport.flush(30)
self.pump(0)
prev_token = current_token
current_token = self.get_success(event_source.get_current_key())
# attempt to replicate the behaviour of the sync handler.
#
# First, we get a list of the rooms we are joined to
joined_rooms = self.get_success(
self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
)
# Then, we get a list of the events since the last sync
membership_changes = self.get_success(
self.slaved_store.get_membership_changes_for_user(
USER_ID_2, prev_token, current_token
)
)
logger.info(
"%s->%s: joined_rooms=%r membership_changes=%r",
prev_token,
current_token,
joined_rooms,
membership_changes,
)
# the membership change is only any use to us if the room is in the
# joined_rooms list.
if membership_changes:
self.assertEqual(
joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)}
)
event_id = 0
def persist(self, backfill=False, **kwargs):
"""
Returns:
synapse.events.FrozenEvent: The event that was persisted.
"""
event, context = self.build_event(**kwargs)
if backfill:
self.get_success(
self.storage.persistence.persist_events(
[(event, context)], backfilled=True
)
)
else:
self.get_success(self.storage.persistence.persist_event(event, context))
return event
def build_event(
self,
sender=USER_ID,
room_id=ROOM_ID,
type="m.room.message",
key=None,
internal={},
state=None,
depth=None,
prev_events=[],
auth_events=[],
prev_state=[],
redacts=None,
push_actions=[],
**content
):
if depth is None:
depth = self.event_id
if not prev_events:
latest_event_ids = self.get_success(
self.master_store.get_latest_event_ids_in_room(room_id)
)
prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
event_dict = {
"sender": sender,
"type": type,
"content": content,
"event_id": "$%d:blue" % (self.event_id,),
"room_id": room_id,
"depth": depth,
"origin_server_ts": self.event_id,
"prev_events": prev_events,
"auth_events": auth_events,
}
if key is not None:
event_dict["state_key"] = key
event_dict["prev_state"] = prev_state
if redacts is not None:
event_dict["redacts"] = redacts
event = FrozenEvent(event_dict, internal_metadata_dict=internal)
self.event_id += 1
if state is not None:
state_ids = {key: e.event_id for key, e in state.items()}
context = EventContext.with_state(
state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids
)
else:
state_handler = self.hs.get_state_handler()
context = self.get_success(state_handler.compute_event_context(event))
self.master_store.add_push_actions_to_staging(
event.event_id, {user_id: actions for user_id, actions in push_actions}
)
return event, context