mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-12-11 21:14:18 -05:00
28c98e51ff
Currently we rely on `current_state_events` to figure out what rooms a user was in and their last membership event in there. However, if the server leaves the room then the table may be cleaned up and that information is lost. So lets add a table that separately holds that information.
365 lines
13 KiB
Python
365 lines
13 KiB
Python
# Copyright 2016 OpenMarket Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import logging
|
|
|
|
from canonicaljson import encode_canonical_json
|
|
|
|
from synapse.events import FrozenEvent, _EventInternalMetadata
|
|
from synapse.events.snapshot import EventContext
|
|
from synapse.handlers.room import RoomEventSource
|
|
from synapse.replication.slave.storage.events import SlavedEventStore
|
|
from synapse.storage.roommember import RoomsForUser
|
|
|
|
from ._base import BaseSlavedStoreTestCase
|
|
|
|
USER_ID = "@feeling:blue"
|
|
USER_ID_2 = "@bright:blue"
|
|
OUTLIER = {"outlier": True}
|
|
ROOM_ID = "!room:blue"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def dict_equals(self, other):
|
|
me = encode_canonical_json(self.get_pdu_json())
|
|
them = encode_canonical_json(other.get_pdu_json())
|
|
return me == them
|
|
|
|
|
|
def patch__eq__(cls):
|
|
eq = getattr(cls, "__eq__", None)
|
|
cls.__eq__ = dict_equals
|
|
|
|
def unpatch():
|
|
if eq is not None:
|
|
cls.__eq__ = eq
|
|
|
|
return unpatch
|
|
|
|
|
|
class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
|
|
|
STORE_TYPE = SlavedEventStore
|
|
|
|
def setUp(self):
|
|
# Patch up the equality operator for events so that we can check
|
|
# whether lists of events match using assertEquals
|
|
self.unpatches = [patch__eq__(_EventInternalMetadata), patch__eq__(FrozenEvent)]
|
|
return super(SlavedEventStoreTestCase, self).setUp()
|
|
|
|
def tearDown(self):
|
|
[unpatch() for unpatch in self.unpatches]
|
|
|
|
def test_get_latest_event_ids_in_room(self):
|
|
create = self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.replicate()
|
|
self.check("get_latest_event_ids_in_room", (ROOM_ID,), [create.event_id])
|
|
|
|
join = self.persist(
|
|
type="m.room.member",
|
|
key=USER_ID,
|
|
membership="join",
|
|
prev_events=[(create.event_id, {})],
|
|
)
|
|
self.replicate()
|
|
self.check("get_latest_event_ids_in_room", (ROOM_ID,), [join.event_id])
|
|
|
|
def test_redactions(self):
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
|
|
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
|
|
self.replicate()
|
|
self.check("get_event", [msg.event_id], msg)
|
|
|
|
redaction = self.persist(type="m.room.redaction", redacts=msg.event_id)
|
|
self.replicate()
|
|
|
|
msg_dict = msg.get_dict()
|
|
msg_dict["content"] = {}
|
|
msg_dict["unsigned"]["redacted_by"] = redaction.event_id
|
|
msg_dict["unsigned"]["redacted_because"] = redaction
|
|
redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
|
|
self.check("get_event", [msg.event_id], redacted)
|
|
|
|
def test_backfilled_redactions(self):
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
|
|
msg = self.persist(type="m.room.message", msgtype="m.text", body="Hello")
|
|
self.replicate()
|
|
self.check("get_event", [msg.event_id], msg)
|
|
|
|
redaction = self.persist(
|
|
type="m.room.redaction", redacts=msg.event_id, backfill=True
|
|
)
|
|
self.replicate()
|
|
|
|
msg_dict = msg.get_dict()
|
|
msg_dict["content"] = {}
|
|
msg_dict["unsigned"]["redacted_by"] = redaction.event_id
|
|
msg_dict["unsigned"]["redacted_because"] = redaction
|
|
redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict())
|
|
self.check("get_event", [msg.event_id], redacted)
|
|
|
|
def test_invites(self):
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
|
|
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
|
|
|
|
self.replicate()
|
|
|
|
self.check(
|
|
"get_invited_rooms_for_local_user",
|
|
[USER_ID_2],
|
|
[
|
|
RoomsForUser(
|
|
ROOM_ID,
|
|
USER_ID,
|
|
"invite",
|
|
event.event_id,
|
|
event.internal_metadata.stream_ordering,
|
|
)
|
|
],
|
|
)
|
|
|
|
def test_push_actions_for_user(self):
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.persist(type="m.room.join", key=USER_ID, membership="join")
|
|
self.persist(
|
|
type="m.room.join", sender=USER_ID, key=USER_ID_2, membership="join"
|
|
)
|
|
event1 = self.persist(type="m.room.message", msgtype="m.text", body="hello")
|
|
self.replicate()
|
|
self.check(
|
|
"get_unread_event_push_actions_by_room_for_user",
|
|
[ROOM_ID, USER_ID_2, event1.event_id],
|
|
{"highlight_count": 0, "notify_count": 0},
|
|
)
|
|
|
|
self.persist(
|
|
type="m.room.message",
|
|
msgtype="m.text",
|
|
body="world",
|
|
push_actions=[(USER_ID_2, ["notify"])],
|
|
)
|
|
self.replicate()
|
|
self.check(
|
|
"get_unread_event_push_actions_by_room_for_user",
|
|
[ROOM_ID, USER_ID_2, event1.event_id],
|
|
{"highlight_count": 0, "notify_count": 1},
|
|
)
|
|
|
|
self.persist(
|
|
type="m.room.message",
|
|
msgtype="m.text",
|
|
body="world",
|
|
push_actions=[
|
|
(USER_ID_2, ["notify", {"set_tweak": "highlight", "value": True}])
|
|
],
|
|
)
|
|
self.replicate()
|
|
self.check(
|
|
"get_unread_event_push_actions_by_room_for_user",
|
|
[ROOM_ID, USER_ID_2, event1.event_id],
|
|
{"highlight_count": 1, "notify_count": 2},
|
|
)
|
|
|
|
def test_get_rooms_for_user_with_stream_ordering(self):
|
|
"""Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
|
|
by rows in the events stream
|
|
"""
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
self.replicate()
|
|
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
|
|
|
|
j2 = self.persist(
|
|
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
|
|
)
|
|
self.replicate()
|
|
self.check(
|
|
"get_rooms_for_user_with_stream_ordering",
|
|
(USER_ID_2,),
|
|
{(ROOM_ID, j2.internal_metadata.stream_ordering)},
|
|
)
|
|
|
|
def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(self):
|
|
"""Check that current_state invalidation happens correctly with multiple events
|
|
in the persistence batch.
|
|
|
|
This test attempts to reproduce a race condition between the event persistence
|
|
loop and a worker-based Sync handler.
|
|
|
|
The problem occurred when the master persisted several events in one batch. It
|
|
only updates the current_state at the end of each batch, so the obvious thing
|
|
to do is then to issue a current_state_delta stream update corresponding to the
|
|
last stream_id in the batch.
|
|
|
|
However, that raises the possibility that a worker will see the replication
|
|
notification for a join event before the current_state caches are invalidated.
|
|
|
|
The test involves:
|
|
* creating a join and a message event for a user, and persisting them in the
|
|
same batch
|
|
|
|
* controlling the replication stream so that updates are sent gradually
|
|
|
|
* between each bunch of replication updates, check that we see a consistent
|
|
snapshot of the state.
|
|
"""
|
|
self.persist(type="m.room.create", key="", creator=USER_ID)
|
|
self.persist(type="m.room.member", key=USER_ID, membership="join")
|
|
self.replicate()
|
|
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
|
|
|
|
# limit the replication rate
|
|
repl_transport = self.server_to_client_transport
|
|
repl_transport.autoflush = False
|
|
|
|
# build the join and message events and persist them in the same batch.
|
|
logger.info("----- build test events ------")
|
|
j2, j2ctx = self.build_event(
|
|
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
|
|
)
|
|
msg, msgctx = self.build_event()
|
|
self.get_success(
|
|
self.storage.persistence.persist_events([(j2, j2ctx), (msg, msgctx)])
|
|
)
|
|
self.replicate()
|
|
|
|
event_source = RoomEventSource(self.hs)
|
|
event_source.store = self.slaved_store
|
|
current_token = self.get_success(event_source.get_current_key())
|
|
|
|
# gradually stream out the replication
|
|
while repl_transport.buffer:
|
|
logger.info("------ flush ------")
|
|
repl_transport.flush(30)
|
|
self.pump(0)
|
|
|
|
prev_token = current_token
|
|
current_token = self.get_success(event_source.get_current_key())
|
|
|
|
# attempt to replicate the behaviour of the sync handler.
|
|
#
|
|
# First, we get a list of the rooms we are joined to
|
|
joined_rooms = self.get_success(
|
|
self.slaved_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
|
|
)
|
|
|
|
# Then, we get a list of the events since the last sync
|
|
membership_changes = self.get_success(
|
|
self.slaved_store.get_membership_changes_for_user(
|
|
USER_ID_2, prev_token, current_token
|
|
)
|
|
)
|
|
|
|
logger.info(
|
|
"%s->%s: joined_rooms=%r membership_changes=%r",
|
|
prev_token,
|
|
current_token,
|
|
joined_rooms,
|
|
membership_changes,
|
|
)
|
|
|
|
# the membership change is only any use to us if the room is in the
|
|
# joined_rooms list.
|
|
if membership_changes:
|
|
self.assertEqual(
|
|
joined_rooms, {(ROOM_ID, j2.internal_metadata.stream_ordering)}
|
|
)
|
|
|
|
event_id = 0
|
|
|
|
def persist(self, backfill=False, **kwargs):
|
|
"""
|
|
Returns:
|
|
synapse.events.FrozenEvent: The event that was persisted.
|
|
"""
|
|
event, context = self.build_event(**kwargs)
|
|
|
|
if backfill:
|
|
self.get_success(
|
|
self.storage.persistence.persist_events(
|
|
[(event, context)], backfilled=True
|
|
)
|
|
)
|
|
else:
|
|
self.get_success(self.storage.persistence.persist_event(event, context))
|
|
|
|
return event
|
|
|
|
def build_event(
|
|
self,
|
|
sender=USER_ID,
|
|
room_id=ROOM_ID,
|
|
type="m.room.message",
|
|
key=None,
|
|
internal={},
|
|
state=None,
|
|
depth=None,
|
|
prev_events=[],
|
|
auth_events=[],
|
|
prev_state=[],
|
|
redacts=None,
|
|
push_actions=[],
|
|
**content
|
|
):
|
|
|
|
if depth is None:
|
|
depth = self.event_id
|
|
|
|
if not prev_events:
|
|
latest_event_ids = self.get_success(
|
|
self.master_store.get_latest_event_ids_in_room(room_id)
|
|
)
|
|
prev_events = [(ev_id, {}) for ev_id in latest_event_ids]
|
|
|
|
event_dict = {
|
|
"sender": sender,
|
|
"type": type,
|
|
"content": content,
|
|
"event_id": "$%d:blue" % (self.event_id,),
|
|
"room_id": room_id,
|
|
"depth": depth,
|
|
"origin_server_ts": self.event_id,
|
|
"prev_events": prev_events,
|
|
"auth_events": auth_events,
|
|
}
|
|
if key is not None:
|
|
event_dict["state_key"] = key
|
|
event_dict["prev_state"] = prev_state
|
|
|
|
if redacts is not None:
|
|
event_dict["redacts"] = redacts
|
|
|
|
event = FrozenEvent(event_dict, internal_metadata_dict=internal)
|
|
|
|
self.event_id += 1
|
|
|
|
if state is not None:
|
|
state_ids = {key: e.event_id for key, e in state.items()}
|
|
context = EventContext.with_state(
|
|
state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids
|
|
)
|
|
else:
|
|
state_handler = self.hs.get_state_handler()
|
|
context = self.get_success(state_handler.compute_event_context(event))
|
|
|
|
self.master_store.add_push_actions_to_staging(
|
|
event.event_id, {user_id: actions for user_id, actions in push_actions}
|
|
)
|
|
return event, context
|