Merge branch 'develop' into travis/login-terms

This commit is contained in:
Travis Ralston 2018-10-31 13:15:14 -06:00
commit d1e7b9c44c
91 changed files with 2562 additions and 1263 deletions

View file

@ -43,6 +43,7 @@ class DirectoryHandler(BaseHandler):
self.state = hs.get_state_handler()
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.config = hs.config
self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
@ -111,6 +112,14 @@ class DirectoryHandler(BaseHandler):
403, "This user is not permitted to create this alias",
)
if not self.config.is_alias_creation_allowed(user_id, room_alias.to_string()):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
# per alias creation rule?
raise SynapseError(
403, "Not allowed to create alias",
)
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
@ -129,9 +138,30 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
def delete_association(self, requester, room_alias):
# association deletion for human users
def delete_association(self, requester, room_alias, send_event=True):
"""Remove an alias from the directory
(this is only meant for human users; AS users should call
delete_appservice_association)
Args:
requester (Requester):
room_alias (RoomAlias):
send_event (bool): Whether to send an updated m.room.aliases event.
Note that, if we delete the canonical alias, we will always attempt
to send an m.room.canonical_alias event
Returns:
Deferred[unicode]: room id that the alias used to point to
Raises:
NotFoundError: if the alias doesn't exist
AuthError: if the user doesn't have perms to delete the alias (ie, the user
is neither the creator of the alias, nor a server admin.
SynapseError: if the alias belongs to an AS
"""
user_id = requester.user.to_string()
try:
@ -159,10 +189,11 @@ class DirectoryHandler(BaseHandler):
room_id = yield self._delete_association(room_alias)
try:
yield self.send_room_alias_update_event(
requester,
room_id
)
if send_event:
yield self.send_room_alias_update_event(
requester,
room_id
)
yield self._update_canonical_alias(
requester,

View file

@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler):
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = run_in_background(
self.store.get_state_for_events,
[event.event_id], None,
[event.event_id],
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler):
def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
membership, member_event_id, is_peeking):
room_state = yield self.store.get_state_for_events(
[member_event_id], None
[member_event_id],
)
room_state = room_state[member_event_id]

View file

@ -35,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@ -80,7 +81,7 @@ class MessageHandler(object):
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
[membership_event_id], [key]
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
@ -88,7 +89,7 @@ class MessageHandler(object):
@defer.inlineCallbacks
def get_state_events(
self, user_id, room_id, types=None, filtered_types=None,
self, user_id, room_id, state_filter=StateFilter.all(),
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
@ -100,13 +101,8 @@ class MessageHandler(object):
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
state_filter (StateFilter): The state filter used to fetch state
from the database.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
@ -139,7 +135,7 @@ class MessageHandler(object):
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
[event.event_id], types, filtered_types=filtered_types,
[event.event_id], state_filter=state_filter,
)
room_state = room_state[event.event_id]
else:
@ -158,12 +154,12 @@ class MessageHandler(object):
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
room_id, types, filtered_types=filtered_types,
room_id, state_filter=state_filter,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], types, filtered_types=filtered_types,
[membership_event_id], state_filter=state_filter,
)
room_state = room_state[membership_event_id]
@ -431,6 +427,9 @@ class EventCreationHandler(object):
if event.is_state():
prev_state = yield self.deduplicate_state_event(event, context)
logger.info(
"Not bothering to persist duplicate state event %s", event.event_id,
)
if prev_state is not None:
defer.returnValue(prev_state)

View file

@ -21,6 +21,7 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.logcontext import run_in_background
@ -255,16 +256,14 @@ class PaginationHandler(object):
if event_filter and event_filter.lazy_load_members():
# TODO: remove redundant members
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in events
)
]
# FIXME: we also care about invite targets etc.
state_filter = StateFilter.from_types(
(EventTypes.Member, event.sender)
for event in events
)
state_ids = yield self.store.get_state_ids_for_event(
events[0].event_id, types=types,
events[0].event_id, state_filter=state_filter,
)
if state_ids:

View file

@ -220,15 +220,42 @@ class RegistrationHandler(BaseHandler):
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
# try to create the room if we're the first user on the server
should_auto_create_rooms = False
if self.hs.config.autocreate_auto_join_rooms:
count = yield self.store.count_all_users()
should_auto_create_rooms = count == 1
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
if should_auto_create_rooms:
room_alias = RoomAlias.from_string(r)
if self.hs.hostname != room_alias.domain:
logger.warning(
'Cannot create room alias %s, '
'it does not match server domain',
r,
)
else:
# create room expects the localpart of the room alias
room_alias_localpart = room_alias.localpart
# getting the RoomCreationHandler during init gives a dependency
# loop
yield self.hs.get_room_creation_handler().create_room(
fake_requester,
config={
"preset": "public_chat",
"room_alias_name": room_alias_localpart
},
ratelimit=False,
)
else:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
defer.returnValue((user_id, token))
@defer.inlineCallbacks

View file

@ -21,7 +21,7 @@ import math
import string
from collections import OrderedDict
from six import string_types
from six import iteritems, string_types
from twisted.internet import defer
@ -32,9 +32,11 @@ from synapse.api.constants import (
JoinRules,
RoomCreationPreset,
)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -72,6 +74,334 @@ class RoomCreationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@defer.inlineCallbacks
def upgrade_room(self, requester, old_room_id, new_version):
"""Replace a room with a new room with a different version
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (unicode): the id of the room to be replaced
new_version (unicode): the new room version to use
Returns:
Deferred[unicode]: the new room id
"""
yield self.ratelimit(requester)
user_id = requester.user.to_string()
with (yield self._upgrade_linearizer.queue(old_room_id)):
# start by allocating a new room id
r = yield self.store.get_room(old_room_id)
if r is None:
raise NotFoundError("Unknown room id %s" % (old_room_id,))
new_room_id = yield self._generate_room_id(
creator_id=user_id, is_public=r["is_public"],
)
logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
# we create and auth the tombstone event before properly creating the new
# room, to check our user has perms in the old room.
tombstone_event, tombstone_context = (
yield self.event_creation_handler.create_event(
requester, {
"type": EventTypes.Tombstone,
"state_key": "",
"room_id": old_room_id,
"sender": user_id,
"content": {
"body": "This room has been replaced",
"replacement_room": new_room_id,
}
},
token_id=requester.access_token_id,
)
)
yield self.auth.check_from_context(tombstone_event, tombstone_context)
yield self.clone_exiting_room(
requester,
old_room_id=old_room_id,
new_room_id=new_room_id,
new_room_version=new_version,
tombstone_event_id=tombstone_event.event_id,
)
# now send the tombstone
yield self.event_creation_handler.send_nonmember_event(
requester, tombstone_event, tombstone_context,
)
old_room_state = yield tombstone_context.get_current_state_ids(self.store)
# update any aliases
yield self._move_aliases_to_new_room(
requester, old_room_id, new_room_id, old_room_state,
)
# and finally, shut down the PLs in the old room, and update them in the new
# room.
yield self._update_upgraded_room_pls(
requester, old_room_id, new_room_id, old_room_state,
)
defer.returnValue(new_room_id)
@defer.inlineCallbacks
def _update_upgraded_room_pls(
self, requester, old_room_id, new_room_id, old_room_state,
):
"""Send updated power levels in both rooms after an upgrade
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (unicode): the id of the room to be replaced
new_room_id (unicode): the id of the replacement room
old_room_state (dict[tuple[str, str], str]): the state map for the old room
Returns:
Deferred
"""
old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
if old_room_pl_event_id is None:
logger.warning(
"Not supported: upgrading a room with no PL event. Not setting PLs "
"in old room.",
)
return
old_room_pl_state = yield self.store.get_event(old_room_pl_event_id)
# we try to stop regular users from speaking by setting the PL required
# to send regular events and invites to 'Moderator' level. That's normally
# 50, but if the default PL in a room is 50 or more, then we set the
# required PL above that.
pl_content = dict(old_room_pl_state.content)
users_default = int(pl_content.get("users_default", 0))
restricted_level = max(users_default + 1, 50)
updated = False
for v in ("invite", "events_default"):
current = int(pl_content.get(v, 0))
if current < restricted_level:
logger.info(
"Setting level for %s in %s to %i (was %i)",
v, old_room_id, restricted_level, current,
)
pl_content[v] = restricted_level
updated = True
else:
logger.info(
"Not setting level for %s (already %i)",
v, current,
)
if updated:
try:
yield self.event_creation_handler.create_and_send_nonmember_event(
requester, {
"type": EventTypes.PowerLevels,
"state_key": '',
"room_id": old_room_id,
"sender": requester.user.to_string(),
"content": pl_content,
}, ratelimit=False,
)
except AuthError as e:
logger.warning("Unable to update PLs in old room: %s", e)
logger.info("Setting correct PLs in new room")
yield self.event_creation_handler.create_and_send_nonmember_event(
requester, {
"type": EventTypes.PowerLevels,
"state_key": '',
"room_id": new_room_id,
"sender": requester.user.to_string(),
"content": old_room_pl_state.content,
}, ratelimit=False,
)
@defer.inlineCallbacks
def clone_exiting_room(
self, requester, old_room_id, new_room_id, new_room_version,
tombstone_event_id,
):
"""Populate a new room based on an old room
Args:
requester (synapse.types.Requester): the user requesting the upgrade
old_room_id (unicode): the id of the room to be replaced
new_room_id (unicode): the id to give the new room (should already have been
created with _gemerate_room_id())
new_room_version (unicode): the new room version to use
tombstone_event_id (unicode|str): the ID of the tombstone event in the old
room.
Returns:
Deferred[None]
"""
user_id = requester.user.to_string()
if not self.spam_checker.user_may_create_room(user_id):
raise SynapseError(403, "You are not permitted to create rooms")
creation_content = {
"room_version": new_room_version,
"predecessor": {
"room_id": old_room_id,
"event_id": tombstone_event_id,
}
}
initial_state = dict()
types_to_copy = (
(EventTypes.JoinRules, ""),
(EventTypes.Name, ""),
(EventTypes.Topic, ""),
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.GuestAccess, ""),
(EventTypes.RoomAvatar, ""),
)
old_room_state_ids = yield self.store.get_filtered_current_state_ids(
old_room_id, StateFilter.from_types(types_to_copy),
)
# map from event_id to BaseEvent
old_room_state_events = yield self.store.get_events(old_room_state_ids.values())
for k, old_event_id in iteritems(old_room_state_ids):
old_event = old_room_state_events.get(old_event_id)
if old_event:
initial_state[k] = old_event.content
yield self._send_events_for_new_room(
requester,
new_room_id,
# we expect to override all the presets with initial_state, so this is
# somewhat arbitrary.
preset_config=RoomCreationPreset.PRIVATE_CHAT,
invite_list=[],
initial_state=initial_state,
creation_content=creation_content,
)
# XXX invites/joins
# XXX 3pid invites
@defer.inlineCallbacks
def _move_aliases_to_new_room(
self, requester, old_room_id, new_room_id, old_room_state,
):
directory_handler = self.hs.get_handlers().directory_handler
aliases = yield self.store.get_aliases_for_room(old_room_id)
# check to see if we have a canonical alias.
canonical_alias = None
canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event_id:
canonical_alias_event = yield self.store.get_event(canonical_alias_event_id)
if canonical_alias_event:
canonical_alias = canonical_alias_event.content.get("alias", "")
# first we try to remove the aliases from the old room (we suppress sending
# the room_aliases event until the end).
#
# Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
# and (b) unless the user is a server admin, which the user created.
#
# This is probably correct - given we don't allow such aliases to be deleted
# normally, it would be odd to allow it in the case of doing a room upgrade -
# but it makes the upgrade less effective, and you have to wonder why a room
# admin can't remove aliases that point to that room anyway.
# (cf https://github.com/matrix-org/synapse/issues/2360)
#
removed_aliases = []
for alias_str in aliases:
alias = RoomAlias.from_string(alias_str)
try:
yield directory_handler.delete_association(
requester, alias, send_event=False,
)
removed_aliases.append(alias_str)
except SynapseError as e:
logger.warning(
"Unable to remove alias %s from old room: %s",
alias, e,
)
# if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
# of this.
if not removed_aliases:
return
try:
# this can fail if, for some reason, our user doesn't have perms to send
# m.room.aliases events in the old room (note that we've already checked that
# they have perms to send a tombstone event, so that's not terribly likely).
#
# If that happens, it's regrettable, but we should carry on: it's the same
# as when you remove an alias from the directory normally - it just means that
# the aliases event gets out of sync with the directory
# (cf https://github.com/vector-im/riot-web/issues/2369)
yield directory_handler.send_room_alias_update_event(
requester, old_room_id,
)
except AuthError as e:
logger.warning(
"Failed to send updated alias event on old room: %s", e,
)
# we can now add any aliases we successfully removed to the new room.
for alias in removed_aliases:
try:
yield directory_handler.create_association(
requester, RoomAlias.from_string(alias),
new_room_id, servers=(self.hs.hostname, ),
send_event=False,
)
logger.info("Moved alias %s to new room", alias)
except SynapseError as e:
# I'm not really expecting this to happen, but it could if the spam
# checking module decides it shouldn't, or similar.
logger.error(
"Error adding alias %s to new room: %s",
alias, e,
)
try:
if canonical_alias and (canonical_alias in removed_aliases):
yield self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.CanonicalAlias,
"state_key": "",
"room_id": new_room_id,
"sender": requester.user.to_string(),
"content": {"alias": canonical_alias, },
},
ratelimit=False
)
yield directory_handler.send_room_alias_update_event(
requester, new_room_id,
)
except SynapseError as e:
# again I'm not really expecting this to fail, but if it does, I'd rather
# we returned the new room to the client at this point.
logger.error(
"Unable to send updated alias events in new room: %s", e,
)
@defer.inlineCallbacks
def create_room(self, requester, config, ratelimit=True,
@ -164,28 +494,7 @@ class RoomCreationHandler(BaseHandler):
visibility = config.get("visibility", None)
is_public = visibility == "public"
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
room_id = None
while attempts < 5:
try:
random_string = stringutils.random_string(18)
gen_room_id = RoomID(
random_string,
self.hs.hostname,
)
yield self.store.store_room(
room_id=gen_room_id.to_string(),
room_creator_user_id=user_id,
is_public=is_public
)
room_id = gen_room_id.to_string()
break
except StoreError:
attempts += 1
if not room_id:
raise StoreError(500, "Couldn't generate a room ID.")
room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
@ -215,18 +524,15 @@ class RoomCreationHandler(BaseHandler):
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
requester,
room_id,
room_member_handler,
preset_config=preset_config,
invite_list=invite_list,
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
power_level_content_override=config.get("power_level_content_override", {}),
power_level_content_override=config.get("power_level_content_override"),
creator_join_profile=creator_join_profile,
)
@ -262,7 +568,7 @@ class RoomCreationHandler(BaseHandler):
if is_direct:
content["is_direct"] = is_direct
yield room_member_handler.update_membership(
yield self.room_member_handler.update_membership(
requester,
UserID.from_string(invitee),
room_id,
@ -300,14 +606,13 @@ class RoomCreationHandler(BaseHandler):
self,
creator, # A Requester object.
room_id,
room_member_handler,
preset_config,
invite_list,
initial_state,
creation_content,
room_alias,
power_level_content_override,
creator_join_profile,
room_alias=None,
power_level_content_override=None,
creator_join_profile=None,
):
def create(etype, content, **kwargs):
e = {
@ -323,6 +628,7 @@ class RoomCreationHandler(BaseHandler):
@defer.inlineCallbacks
def send(etype, content, **kwargs):
event = create(etype, content, **kwargs)
logger.info("Sending %s in new room", etype)
yield self.event_creation_handler.create_and_send_nonmember_event(
creator,
event,
@ -345,7 +651,8 @@ class RoomCreationHandler(BaseHandler):
content=creation_content,
)
yield room_member_handler.update_membership(
logger.info("Sending %s in new room", EventTypes.Member)
yield self.room_member_handler.update_membership(
creator,
creator.user,
room_id,
@ -387,7 +694,8 @@ class RoomCreationHandler(BaseHandler):
for invitee in invite_list:
power_level_content["users"][invitee] = 100
power_level_content.update(power_level_content_override)
if power_level_content_override:
power_level_content.update(power_level_content_override)
yield send(
etype=EventTypes.PowerLevels,
@ -426,6 +734,30 @@ class RoomCreationHandler(BaseHandler):
content=content,
)
@defer.inlineCallbacks
def _generate_room_id(self, creator_id, is_public):
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
attempts = 0
while attempts < 5:
try:
random_string = stringutils.random_string(18)
gen_room_id = RoomID(
random_string,
self.hs.hostname,
).to_string()
if isinstance(gen_room_id, bytes):
gen_room_id = gen_room_id.decode('utf-8')
yield self.store.store_room(
room_id=gen_room_id,
room_creator_user_id=creator_id,
is_public=is_public,
)
defer.returnValue(gen_room_id)
except StoreError:
attempts += 1
raise StoreError(500, "Couldn't generate a room ID.")
class RoomContextHandler(object):
def __init__(self, hs):
@ -489,23 +821,24 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
types = None
filtered_types = None
if event_filter and event_filter.lazy_load_members():
members = set(ev.sender for ev in itertools.chain(
results["events_before"],
(results["event"],),
results["events_after"],
))
filtered_types = [EventTypes.Member]
types = [(EventTypes.Member, member) for member in members]
state_filter = StateFilter.from_lazy_load_member_list(
ev.sender
for ev in itertools.chain(
results["events_before"],
(results["event"],),
results["events_after"],
)
)
else:
state_filter = StateFilter.all()
# XXX: why do we return the state as of the last event rather than the
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687
state = yield self.store.get_state_for_events(
[last_event_id], types, filtered_types=filtered_types,
[last_event_id], state_filter=state_filter,
)
results["state"] = list(state[last_event_id].values())

View file

@ -24,6 +24,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -324,9 +325,12 @@ class SearchHandler(BaseHandler):
else:
last_event_id = event.event_id
state_filter = StateFilter.from_types(
[(EventTypes.Member, sender) for sender in senders]
)
state = yield self.store.get_state_for_event(
last_event_id,
types=[(EventTypes.Member, sender) for sender in senders]
last_event_id, state_filter
)
res["profile_info"] = {

View file

@ -27,6 +27,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@ -469,25 +470,20 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
def get_state_after_event(self, event, types=None, filtered_types=None):
def get_state_after_event(self, event, state_filter=StateFilter.all()):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
May be None, which matches any key.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
state_filter (StateFilter): The state filter used to fetch state
from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
state_ids = yield self.store.get_state_ids_for_event(
event.event_id, types, filtered_types=filtered_types,
event.event_id, state_filter=state_filter,
)
if event.is_state():
state_ids = state_ids.copy()
@ -495,18 +491,14 @@ class SyncHandler(object):
defer.returnValue(state_ids)
@defer.inlineCallbacks
def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
types(list[(str, str|None)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. If `state_key` is None,
all events are returned of the given type.
filtered_types(list[str]|None): Only apply filtering via `types` to this
list of event types. Other types of events are returned unfiltered.
If None, `types` filtering is applied to all events.
state_filter (StateFilter): The state filter used to fetch state
from the database.
Returns:
A Deferred map from ((type, state_key)->Event)
@ -522,7 +514,7 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
state = yield self.get_state_after_event(
last_event, types, filtered_types=filtered_types,
last_event, state_filter=state_filter,
)
else:
@ -563,10 +555,11 @@ class SyncHandler(object):
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [
last_event.event_id,
state_filter=StateFilter.from_types([
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
]
]),
)
# this is heavily cached, thus: fast.
@ -717,8 +710,7 @@ class SyncHandler(object):
with Measure(self.clock, "compute_state_delta"):
types = None
filtered_types = None
members_to_fetch = None
lazy_load_members = sync_config.filter_collection.lazy_load_members()
include_redundant_members = (
@ -729,16 +721,21 @@ class SyncHandler(object):
# We only request state for the members needed to display the
# timeline:
types = [
(EventTypes.Member, state_key)
for state_key in set(
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
)
]
members_to_fetch = set(
event.sender # FIXME: we also care about invite targets etc.
for event in batch.events
)
# only apply the filtering to room members
filtered_types = [EventTypes.Member]
if full_state:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
members_to_fetch.add(sync_config.user.to_string())
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
else:
state_filter = StateFilter.all()
timeline_state = {
(event.type, event.state_key): event.event_id
@ -746,28 +743,19 @@ class SyncHandler(object):
}
if full_state:
if lazy_load_members:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in #3840.
types.append((EventTypes.Member, sync_config.user.to_string()))
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, types=types,
filtered_types=filtered_types,
batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
batch.events[0].event_id, state_filter=state_filter,
)
else:
current_state_ids = yield self.get_state_at(
room_id, stream_position=now_token, types=types,
filtered_types=filtered_types,
room_id, stream_position=now_token,
state_filter=state_filter,
)
state_ids = current_state_ids
@ -781,8 +769,7 @@ class SyncHandler(object):
)
elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
batch.events[0].event_id, state_filter=state_filter,
)
# for now, we disable LL for gappy syncs - see
@ -797,17 +784,15 @@ class SyncHandler(object):
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
types = None
filtered_types = None
state_filter = StateFilter.all()
state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token, types=types,
filtered_types=filtered_types,
room_id, stream_position=since_token,
state_filter=state_filter,
)
current_state_ids = yield self.store.get_state_ids_for_event(
batch.events[-1].event_id, types=types,
filtered_types=filtered_types,
batch.events[-1].event_id, state_filter=state_filter,
)
state_ids = _calculate_state(
@ -821,7 +806,7 @@ class SyncHandler(object):
else:
state_ids = {}
if lazy_load_members:
if types and batch.events:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
@ -831,8 +816,12 @@ class SyncHandler(object):
# timeline here, and then dedupe any redundant ones below.
state_ids = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=None, # we only want members!
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member)
for member in members_to_fetch
),
)
if lazy_load_members and not include_redundant_members: