Merge branch 'develop' of github.com:matrix-org/synapse into neilj/fix_mau_init

This commit is contained in:
Neil Johnson 2018-10-24 16:25:39 +01:00
commit 07126e43a4
50 changed files with 1701 additions and 380 deletions

View File

@ -4,6 +4,13 @@ language: python
# tell travis to cache ~/.cache/pip
cache: pip
# only build branches we care about (PRs are built seperately)
branches:
only:
- master
- develop
- /^release-v/
before_script:
- git remote set-branches --add origin develop
- git fetch origin develop

View File

@ -174,6 +174,12 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a
Dockerfile to automate a synapse server in a single Docker image, at
https://hub.docker.com/r/avhost/docker-matrix/tags/
Slavi Pantaleev has created an Ansible playbook,
which installs the offical Docker image of Matrix Synapse
along with many other Matrix-related services (Postgres database, riot-web, coturn, mxisd, SSL support, etc.).
For more details, see
https://github.com/spantaleev/matrix-docker-ansible-deploy
Configuring Synapse
-------------------

1
changelog.d/3698.misc Normal file
View File

@ -0,0 +1 @@
Add information about the [matrix-docker-ansible-deploy](https://github.com/spantaleev/matrix-docker-ansible-deploy) playbook

1
changelog.d/3786.misc Normal file
View File

@ -0,0 +1 @@
Add initial implementation of new state resolution algorithm

1
changelog.d/3969.bugfix Normal file
View File

@ -0,0 +1 @@
Fix HTTP error response codes for federated group requests.

1
changelog.d/4063.misc Normal file
View File

@ -0,0 +1 @@
Refactor room alias creation code

1
changelog.d/4074.bugfix Normal file
View File

@ -0,0 +1 @@
Correctly account for cpu usage by background threads

1
changelog.d/4075.misc Normal file
View File

@ -0,0 +1 @@
Clean up threading and logcontexts in pushers

1
changelog.d/4077.misc Normal file
View File

@ -0,0 +1 @@
Give some more things logcontexts

1
changelog.d/4082.misc Normal file
View File

@ -0,0 +1 @@
Clean up some bits of code which were flagged by the linter

1
changelog.d/4083.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug which prevented backslashes being used in event field filters

View File

@ -48,7 +48,7 @@ def main():
row.name: row.position
for row in replicate(server, {"streams": "-1"})["streams"].rows
}
except requests.exceptions.ConnectionError as e:
except requests.exceptions.ConnectionError:
time.sleep(0.1)
while True:

View File

@ -501,7 +501,8 @@ class Porter(object):
try:
yield self.postgres_store.runInteraction("alter_table", alter_table)
except Exception as e:
except Exception:
# On Error Resume Next
pass
yield self.postgres_store.runInteraction(

View File

@ -14,17 +14,16 @@ ignore =
pylint.cfg
tox.ini
[pep8]
max-line-length = 90
# W503 requires that binary operators be at the end, not start, of lines. Erik
# doesn't like it. E203 is contrary to PEP8. E731 is silly.
ignore = W503,E203,E731
[flake8]
# note that flake8 inherits the "ignore" settings from "pep8" (because it uses
# pep8 to do those checks), but not the "max-line-length" setting
max-line-length = 90
ignore=W503,E203,E731
# see https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes
# for error codes. The ones we ignore are:
# W503: line break before binary operator
# W504: line break after binary operator
# E203: whitespace before ':' (which is contrary to pep8?)
# E731: do not assign a lambda expression, use a def
ignore=W503,W504,E203,E731
[isort]
line_length = 89

View File

@ -172,7 +172,10 @@ USER_FILTER_SCHEMA = {
# events a lot easier as we can then use a negative lookbehind
# assertion to split '\.' If we allowed \\ then it would
# incorrectly split '\\.' See synapse.events.utils.serialize_event
"pattern": "^((?!\\\).)*$"
#
# Note that because this is a regular expression, we have to escape
# each backslash in the pattern.
"pattern": r"^((?!\\\\).)*$"
}
}
},

View File

@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
self.pusher_pool.on_new_receipts(
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def start_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
logger.info("Starting pusher %r / %r", user_id, key)
return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
def start(config_options):

View File

@ -178,7 +178,7 @@ class ContentRepositoryConfig(Config):
def default_config(self, **kwargs):
media_store = self.default_path("media_store")
uploads_path = self.default_path("uploads")
return """
return r"""
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"

View File

@ -55,7 +55,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
logger.warn("Error getting key for %r: %s", server_name, e)
except Exception as e:
except Exception:
logger.exception("Error getting key for %r", server_name)
raise IOError("Cannot get key for %r" % server_name)

View File

@ -690,7 +690,7 @@ def auth_types_for_event(event):
auth_types = []
auth_types.append((EventTypes.PowerLevels, "", ))
auth_types.append((EventTypes.Member, event.user_id, ))
auth_types.append((EventTypes.Member, event.sender, ))
auth_types.append((EventTypes.Create, "", ))
if event.type == EventTypes.Member:

View File

@ -800,7 +800,7 @@ class FederationHandlerRegistry(object):
yield handler(origin, content)
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception as e:
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args):

View File

@ -22,7 +22,7 @@ import bcrypt
import pymacaroons
from canonicaljson import json
from twisted.internet import defer, threads
from twisted.internet import defer
from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
@ -37,8 +37,8 @@ from synapse.api.errors import (
)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable
from ._base import BaseHandler
@ -884,11 +884,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
),
)
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@ -913,13 +909,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode('ascii')
return make_deferred_yieldable(
threads.deferToThreadPool(
self.hs.get_reactor(),
self.hs.get_reactor().getThreadPool(),
_do_validate_hash,
),
)
return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)

View File

@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, create_requester
from synapse.util.logcontext import run_in_background
from ._base import BaseHandler
@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler):
None
"""
if not self._user_parter_running:
run_in_background(self._user_parter_loop)
run_as_background_process("user_parter_loop", self._user_parter_loop)
@defer.inlineCallbacks
def _user_parter_loop(self):

View File

@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
def create_association(self, user_id, room_alias, room_id, servers=None):
# association creation for human users
# TODO(erikj): Do user auth.
def create_association(self, requester, room_alias, room_id, servers=None,
send_event=True):
"""Attempt to create a new alias
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise SynapseError(
403, "This user is not permitted to create this alias",
)
Args:
requester (Requester)
room_alias (RoomAlias)
room_id (str)
servers (list[str]|None): List of servers that others servers
should try and join via
send_event (bool): Whether to send an updated m.room.aliases event
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
)
if not can_create:
raise SynapseError(
400, "This alias is reserved by an application service.",
errcode=Codes.EXCLUSIVE
Returns:
Deferred
"""
user_id = requester.user.to_string()
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400, "This application service has not reserved"
" this kind of alias.", errcode=Codes.EXCLUSIVE
)
else:
if not self.spam_checker.user_may_create_room_alias(user_id, room_alias):
raise AuthError(
403, "This user is not permitted to create this alias",
)
can_create = yield self.can_modify_alias(
room_alias,
user_id=user_id
)
if not can_create:
raise AuthError(
400, "This alias is reserved by an application service.",
errcode=Codes.EXCLUSIVE
)
yield self._create_association(room_alias, room_id, servers, creator=user_id)
@defer.inlineCallbacks
def create_appservice_association(self, service, room_alias, room_id,
servers=None):
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400, "This application service has not reserved"
" this kind of alias.", errcode=Codes.EXCLUSIVE
if send_event:
yield self.send_room_alias_update_event(
requester,
room_id
)
# association creation for app services
yield self._create_association(room_alias, room_id, servers)
@defer.inlineCallbacks
def delete_association(self, requester, user_id, room_alias):
def delete_association(self, requester, room_alias):
# association deletion for human users
user_id = requester.user.to_string()
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler):
try:
yield self.send_room_alias_update_event(
requester,
requester.user.to_string(),
room_id
)
@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler):
)
@defer.inlineCallbacks
def send_room_alias_update_event(self, requester, user_id, room_id):
def send_room_alias_update_event(self, requester, room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler):
"type": EventTypes.Aliases,
"state_key": self.hs.hostname,
"room_id": room_id,
"sender": user_id,
"sender": requester.user.to_string(),
"content": {"aliases": aliases},
},
ratelimit=False

View File

@ -53,7 +53,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import resolve_events_with_factory
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
from synapse.util import logcontext, unwrapFirstError
from synapse.util.async_helpers import Linearizer
@ -384,24 +384,24 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
# Resolve any conflicting state
@defer.inlineCallbacks
def fetch(ev_ids):
fetched = yield self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
# add any events we fetch here to the `event_map` so that we
# can use them to build the state event list below.
event_map.update(fetched)
defer.returnValue(fetched)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
room_version, state_maps, event_map, fetch,
state_map = yield resolve_events_with_store(
room_version, state_maps, event_map,
state_res_store=StateResolutionStore(self.store),
)
# we need to give _process_received_pdu the actual state events
# We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now.
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = yield self.store.get_events(
list(state_map.values()),
get_prev_content=False,
check_redacted=False,
)
event_map.update(evs)
state = [
event_map[e] for e in six.itervalues(state_map)
]
@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id)
yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the
@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users
)
self.pusher_pool.on_new_notifications(
return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

View File

@ -20,7 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.api.errors import HttpResponseException, SynapseError
from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@ -37,9 +37,23 @@ def _create_rerouter(func_name):
)
else:
destination = get_domain_from_id(group_id)
return getattr(self.transport_client, func_name)(
d = getattr(self.transport_client, func_name)(
destination, group_id, *args, **kwargs
)
# Capture errors returned by the remote homeserver and
# re-throw specific errors as SynapseErrors. This is so
# when the remote end responds with things like 403 Not
# In Group, we can communicate that to the client instead
# of a 500.
def h(failure):
failure.trap(HttpResponseException)
e = failure.value
if e.code == 403:
raise e.to_synapse_error()
return failure
d.addErrback(h)
return d
return f

View File

@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context
)
self.pusher_pool.on_new_notifications(
yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)

View File

@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids
)
# Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts(
yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids,
)

View File

@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
requester=requester,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
send_event=False,
)
preset_config = config.get(
@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
yield directory_handler.send_room_alias_update_event(
requester, user_id, room_id
requester, room_id
)
defer.returnValue(result)

View File

@ -20,6 +20,7 @@ from six import iteritems
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
@ -98,7 +99,6 @@ class UserDirectoryHandler(object):
"""
return self.store.search_user_dir(user_id, search_term, limit)
@defer.inlineCallbacks
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
@ -108,11 +108,15 @@ class UserDirectoryHandler(object):
if self._is_processing:
return
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
self._is_processing = False
self._is_processing = True
try:
yield self._unsafe_process()
finally:
self._is_processing = False
run_as_background_process("user_directory.notify_new_event", process)
@defer.inlineCallbacks
def handle_local_profile_change(self, user_id, profile):

View File

@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object):
Returns:
Deferred: resolves with the http response object on success.
Fails with ``HTTPRequestException``: if we get an HTTP response
Fails with ``HttpResponseException``: if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
Fails with ``HTTPRequestException`` if we get an HTTP response
Fails with ``HttpResponseException`` if we get an HTTP response
code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object):
Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers.
Fails with ``HTTPRequestException`` if we get an HTTP response code
Fails with ``HttpResponseException`` if we get an HTTP response code
>= 300
Fails with ``NotRetryingDestination`` if we are not yet ready

View File

@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@ -71,18 +70,11 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
self.processing = False
self._is_processing = False
@defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
try:
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
yield self._process()
except Exception:
logger.exception("Error starting email pusher")
self._start_processing()
def on_stop(self):
if self.timed_call:
@ -92,43 +84,52 @@ class EmailPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
yield self._process()
self._start_processing()
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the
# timer fire
return defer.succeed(None)
pass
@defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
yield self._process()
self._start_processing()
def _start_processing(self):
if self._is_processing:
return
run_as_background_process("emailpush.process", self._process)
@defer.inlineCallbacks
def _process(self):
if self.processing:
return
# we should never get here if we are already processing
assert not self._is_processing
with LoggingContext("emailpush._process"):
with Measure(self.clock, "emailpush._process"):
try:
self._is_processing = True
if self.throttle_params is None:
# this is our first loop: load up the throttle params
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):

View File

@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@ -61,7 +60,7 @@ class HttpPusher(object):
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.failing_since = pusherdict['failing_since']
self.timed_call = None
self.processing = False
self._is_processing = False
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@ -92,34 +91,27 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
@defer.inlineCallbacks
def on_started(self):
try:
yield self._process()
except Exception:
logger.exception("Error starting http pusher")
self._start_processing()
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
yield self._process()
self._start_processing()
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
with LoggingContext("push.on_new_receipts"):
with Measure(self.clock, "push.on_new_receipts"):
badge = yield push_tools.get_badge_count(
self.hs.get_datastore(), self.user_id
)
yield self._send_badge(badge)
run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
@defer.inlineCallbacks
def _update_badge(self):
badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
yield self._send_badge(badge)
def on_timer(self):
yield self._process()
self._start_processing()
def on_stop(self):
if self.timed_call:
@ -129,27 +121,31 @@ class HttpPusher(object):
pass
self.timed_call = None
@defer.inlineCallbacks
def _process(self):
if self.processing:
def _start_processing(self):
if self._is_processing:
return
with LoggingContext("push._process"):
with Measure(self.clock, "push._process"):
run_as_background_process("httppush.process", self._process)
@defer.inlineCallbacks
def _process(self):
# we should never get here if we are already processing
assert not self._is_processing
try:
self._is_processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
self.processing = True
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self.processing = False
yield self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self._is_processing = False
@defer.inlineCallbacks
def _unsafe_process(self):

View File

@ -20,24 +20,39 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
"""
The pusher pool. This is responsible for dispatching notifications of new events to
the http and email pushers.
It provides three methods which are designed to be called by the rest of the
application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
delegates to each of the relevant pushers.
Note that it is expected that each pusher will have its own 'processing' loop which
will send out the notifications in the background, rather than blocking until the
notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
Pusher.on_new_receipts are not expected to return deferreds.
"""
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
self.start_pushers = _hs.config.start_pushers
self._should_start_pushers = _hs.config.start_pushers
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.pushers = {}
@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
self._start_pushers(pushers)
"""Starts the pushers off in a background process.
"""
if not self._should_start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
run_as_background_process("start_pushers", self._start_pushers)
@defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id,
@ -86,7 +101,7 @@ class PusherPool:
last_stream_ordering=last_stream_ordering,
profile_tag=profile_tag,
)
yield self._refresh_pusher(app_id, pushkey, user_id)
yield self.start_pusher_by_id(app_id, pushkey, user_id)
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
@ -123,45 +138,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'],
)
def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)
@defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id):
def on_new_notifications(self, min_stream_id, max_stream_id):
try:
users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id
)
deferreds = []
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_notifications,
min_stream_id, max_stream_id,
)
)
p.on_new_notifications(min_stream_id, max_stream_id)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)
@defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
@ -171,26 +164,20 @@ class PusherPool:
# This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts])
deferreds = []
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
)
p.on_new_receipts(min_stream_id, max_stream_id)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
def start_pusher_by_id(self, app_id, pushkey, user_id):
"""Look up the details for the given pusher, and start it"""
if not self._should_start_pushers:
return
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
@ -201,34 +188,50 @@ class PusherPool:
p = r
if p:
self._start_pusher(p)
self._start_pushers([p])
@defer.inlineCallbacks
def _start_pushers(self):
"""Start all the pushers
def _start_pushers(self, pushers):
if not self.start_pushers:
logger.info("Not starting pushers because they are disabled in the config")
return
Returns:
Deferred
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
for pusherdict in pushers:
try:
p = self.pusher_factory.create_pusher(pusherdict)
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
continue
if p:
appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
)
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
run_in_background(p.on_started)
self._start_pusher(pusherdict)
logger.info("Started pushers")
def _start_pusher(self, pusherdict):
"""Start the given pusher
Args:
pusherdict (dict):
Returns:
None
"""
try:
p = self.pusher_factory.create_pusher(pusherdict)
except Exception:
logger.exception("Couldn't start a pusher: caught Exception")
return
if not p:
return
appid_pushkey = "%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
)
byuser = self.pushers.setdefault(pusherdict['user_name'], {})
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
appid_pushkey = "%s:%s" % (app_id, pushkey)

View File

@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet):
if room is None:
raise SynapseError(400, "Room does not exist")
dir_handler = self.handlers.directory_handler
requester = yield self.auth.get_user_by_req(request)
try:
# try to auth as a user
requester = yield self.auth.get_user_by_req(request)
try:
user_id = requester.user.to_string()
yield dir_handler.create_association(
user_id, room_alias, room_id, servers
)
yield dir_handler.send_room_alias_update_event(
requester,
user_id,
room_id
)
except SynapseError as e:
raise e
except Exception:
logger.exception("Failed to create association")
raise
except AuthError:
# try to auth as an application service
service = yield self.auth.get_appservice_by_req(request)
yield dir_handler.create_appservice_association(
service, room_alias, room_id, servers
)
logger.info(
"Application service at %s created alias %s pointing to %s",
service.url,
room_alias.to_string(),
room_id
)
yield self.handlers.directory_handler.create_association(
requester, room_alias, room_id, servers
)
defer.returnValue((200, {}))
@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet):
room_alias = RoomAlias.from_string(room_alias)
yield dir_handler.delete_association(
requester, user.to_string(), room_alias
requester, room_alias
)
logger.info(

View File

@ -99,7 +99,7 @@ class AuthRestServlet(RestServlet):
cannot be handled in the normal flow (with requests to the same endpoint).
Current use is for web fallback auth.
"""
PATTERNS = client_v2_patterns("/auth/(?P<stagetype>[\w\.]*)/fallback/web")
PATTERNS = client_v2_patterns(r"/auth/(?P<stagetype>[\w\.]*)/fallback/web")
def __init__(self, hs):
super(AuthRestServlet, self).__init__()

View File

@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse
import twisted.internet.error
import twisted.web.http
from twisted.internet import defer, threads
from twisted.internet import defer
from twisted.web.resource import Resource
from synapse.api.errors import (
@ -36,8 +36,8 @@ from synapse.api.errors import (
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import is_ascii, random_string
@ -492,10 +492,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
t_byte_source = yield logcontext.defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
))
)
if t_byte_source:
try:
@ -534,10 +535,11 @@ class MediaRepository(object):
))
thumbnailer = Thumbnailer(input_path)
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
t_byte_source = yield logcontext.defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer, t_width, t_height, t_method, t_type
))
)
if t_byte_source:
try:
@ -620,15 +622,17 @@ class MediaRepository(object):
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
# Generate the thumbnail
if t_method == "crop":
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
t_byte_source = yield logcontext.defer_to_thread(
self.hs.get_reactor(),
thumbnailer.crop,
t_width, t_height, t_type,
))
)
elif t_method == "scale":
t_byte_source = yield make_deferred_yieldable(threads.deferToThread(
t_byte_source = yield logcontext.defer_to_thread(
self.hs.get_reactor(),
thumbnailer.scale,
t_width, t_height, t_type,
))
)
else:
logger.error("Unrecognized method: %r", t_method)
continue

View File

@ -21,9 +21,10 @@ import sys
import six
from twisted.internet import defer, threads
from twisted.internet import defer
from twisted.protocols.basic import FileSender
from synapse.util import logcontext
from synapse.util.file_consumer import BackgroundFileConsumer
from synapse.util.logcontext import make_deferred_yieldable
@ -64,9 +65,10 @@ class MediaStorage(object):
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
yield make_deferred_yieldable(threads.deferToThread(
yield logcontext.defer_to_thread(
self.hs.get_reactor(),
_write_file_synchronously, source, f,
))
)
yield finish_cb()
defer.returnValue(fname)

View File

@ -674,7 +674,7 @@ def summarize_paragraphs(text_nodes, min_size=200, max_size=500):
# This splits the paragraph into words, but keeping the
# (preceeding) whitespace intact so we can easily concat
# words back together.
for match in re.finditer("\s*\S+", description):
for match in re.finditer(r"\s*\S+", description):
word = match.group()
# Keep adding words while the total length is less than

View File

@ -17,9 +17,10 @@ import logging
import os
import shutil
from twisted.internet import defer, threads
from twisted.internet import defer
from synapse.config._base import Config
from synapse.util import logcontext
from synapse.util.logcontext import run_in_background
from .media_storage import FileResponder
@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider):
if not os.path.exists(dirname):
os.makedirs(dirname)
return threads.deferToThread(
return logcontext.defer_to_thread(
self.hs.get_reactor(),
shutil.copyfile, primary_fname, backup_fname,
)

View File

@ -19,13 +19,14 @@ from collections import namedtuple
from six import iteritems, itervalues
import attr
from frozendict import frozendict
from twisted.internet import defer
from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
from synapse.state import v1
from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@ -372,15 +373,10 @@ class StateHandler(object):
result = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups_ids, None,
self._state_map_factory,
state_res_store=StateResolutionStore(self.store),
)
defer.returnValue(result)
def _state_map_factory(self, ev_ids):
return self.store.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
@defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
@ -398,10 +394,10 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
new_state = yield resolve_events_with_store(
room_version, state_set_ids,
event_map=state_map,
state_map_factory=self._state_map_factory
state_res_store=StateResolutionStore(self.store),
)
new_state = {
@ -436,7 +432,7 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
self, room_id, room_version, state_groups_ids, event_map, state_res_store,
):
"""Resolves conflicts between a set of state groups
@ -454,9 +450,11 @@ class StateResolutionHandler(object):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_map_factory.
events will be requested via state_res_store.
If None, all events will be fetched via state_map_factory.
If None, all events will be fetched via state_res_store.
state_res_store (StateResolutionStore)
Returns:
Deferred[_StateCacheEntry]: resolved state
@ -480,10 +478,10 @@ class StateResolutionHandler(object):
# start by assuming we won't have any conflicted state, and build up the new
# state map by iterating through the state groups. If we discover a conflict,
# we give up and instead use `resolve_events_with_factory`.
# we give up and instead use `resolve_events_with_store`.
#
# XXX: is this actually worthwhile, or should we just let
# resolve_events_with_factory do it?
# resolve_events_with_store do it?
new_state = {}
conflicted_state = False
for st in itervalues(state_groups_ids):
@ -498,11 +496,11 @@ class StateResolutionHandler(object):
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
new_state = yield resolve_events_with_store(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
state_res_store=state_res_store,
)
# if the new state matches any of the input state groups, we can
@ -583,7 +581,7 @@ def _make_state_cache_entry(
)
def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
"""
Args:
room_version(str): Version of the room
@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
If None, all events will be fetched via state_map_factory.
state_map_factory(func): will be called
with a list of event_ids that are needed, and should return with
a Deferred of dict of event_id to event.
state_res_store (StateResolutionStore)
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
return v1.resolve_events_with_factory(
state_sets, event_map, state_map_factory,
if room_version == RoomVersions.V1:
return v1.resolve_events_with_store(
state_sets, event_map, state_res_store.get_events,
)
elif room_version == RoomVersions.VDH_TEST:
return v2.resolve_events_with_store(
state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to
@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
@attr.s
class StateResolutionStore(object):
"""Interface that allows state resolution algorithms to access the database
in well defined way.
Args:
store (DataStore)
"""
store = attr.ib()
def get_events(self, event_ids, allow_rejected=False):
"""Get events from the database
Args:
event_ids (list): The event_ids of the events to fetch
allow_rejected (bool): If True return rejected events.
Returns:
Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
"""
return self.store.get_events(
event_ids,
check_redacted=False,
get_prev_content=False,
allow_rejected=allow_rejected,
)
def get_auth_chain(self, event_ids):
"""Gets the full auth chain for a set of events (including rejected
events).
Includes the given event IDs in the result.
Note that:
1. All events must be state events.
2. For v1 rooms this may not have the full auth chain in the
presence of rejected events
Args:
event_ids (list): The event IDs of the events to fetch the auth
chain for. Must be state events.
Returns:
Deferred[list[str]]: List of event IDs of the auth chain.
"""
return self.store.get_auth_chain_ids(event_ids, include_given=True)

View File

@ -31,7 +31,7 @@ POWER_KEY = (EventTypes.PowerLevels, "")
@defer.inlineCallbacks
def resolve_events_with_factory(state_sets, event_map, state_map_factory):
def resolve_events_with_store(state_sets, event_map, state_map_factory):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,

544
synapse/state/v2.py Normal file
View File

@ -0,0 +1,544 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import heapq
import itertools
import logging
from six import iteritems, itervalues
from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
logger = logging.getLogger(__name__)
@defer.inlineCallbacks
def resolve_events_with_store(state_sets, event_map, state_res_store):
"""Resolves the state using the v2 state resolution algorithm
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
event_map(dict[str,FrozenEvent]|None):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
events will be requested via state_res_store.
If None, all events will be fetched via state_res_store.
state_res_store (StateResolutionStore)
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
logger.debug("Computing conflicted state")
# First split up the un/conflicted state
unconflicted_state, conflicted_state = _seperate(state_sets)
if not conflicted_state:
defer.returnValue(unconflicted_state)
logger.debug("%d conflicted state entries", len(conflicted_state))
logger.debug("Calculating auth chain difference")
# Also fetch all auth events that appear in only some of the state sets'
# auth chains.
auth_diff = yield _get_auth_chain_difference(
state_sets, event_map, state_res_store,
)
full_conflicted_set = set(itertools.chain(
itertools.chain.from_iterable(itervalues(conflicted_state)),
auth_diff,
))
events = yield state_res_store.get_events([
eid for eid in full_conflicted_set
if eid not in event_map
], allow_rejected=True)
event_map.update(events)
full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map)
logger.debug("%d full_conflicted_set entries", len(full_conflicted_set))
# Get and sort all the power events (kicks/bans/etc)
power_events = (
eid for eid in full_conflicted_set
if _is_power_event(event_map[eid])
)
sorted_power_events = yield _reverse_topological_power_sort(
power_events,
event_map,
state_res_store,
full_conflicted_set,
)
logger.debug("sorted %d power events", len(sorted_power_events))
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
sorted_power_events, unconflicted_state, event_map,
state_res_store,
)
logger.debug("resolved power events")
# OK, so we've now resolved the power events. Now sort the remaining
# events using the mainline of the resolved power level.
leftover_events = [
ev_id
for ev_id in full_conflicted_set
if ev_id not in sorted_power_events
]
logger.debug("sorting %d remaining events", len(leftover_events))
pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
leftover_events, pl, event_map, state_res_store,
)
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
leftover_events, resolved_state, event_map,
state_res_store,
)
logger.debug("resolved")
# We make sure that unconflicted state always still applies.
resolved_state.update(unconflicted_state)
logger.debug("done")
defer.returnValue(resolved_state)
@defer.inlineCallbacks
def _get_power_level_for_sender(event_id, event_map, state_res_store):
"""Return the power level of the sender of the given event according to
their auth events.
Args:
event_id (str)
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[int]
"""
event = yield _get_event(event_id, event_map, state_res_store)
pl = None
for aid, _ in event.auth_events:
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
pl = aev
break
if pl is None:
# Couldn't find power level. Check if they're the creator of the room
for aid, _ in event.auth_events:
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.Create, ""):
if aev.content.get("creator") == event.sender:
defer.returnValue(100)
break
defer.returnValue(0)
level = pl.content.get("users", {}).get(event.sender)
if level is None:
level = pl.content.get("users_default", 0)
if level is None:
defer.returnValue(0)
else:
defer.returnValue(int(level))
@defer.inlineCallbacks
def _get_auth_chain_difference(state_sets, event_map, state_res_store):
"""Compare the auth chains of each state set and return the set of events
that only appear in some but not all of the auth chains.
Args:
state_sets (list)
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[set[str]]: Set of event IDs
"""
common = set(itervalues(state_sets[0])).intersection(
*(itervalues(s) for s in state_sets[1:])
)
auth_sets = []
for state_set in state_sets:
auth_ids = set(
eid
for key, eid in iteritems(state_set)
if (key[0] in (
EventTypes.Member,
EventTypes.ThirdPartyInvite,
) or key in (
(EventTypes.PowerLevels, ''),
(EventTypes.Create, ''),
(EventTypes.JoinRules, ''),
)) and eid not in common
)
auth_chain = yield state_res_store.get_auth_chain(auth_ids)
auth_ids.update(auth_chain)
auth_sets.append(auth_ids)
intersection = set(auth_sets[0]).intersection(*auth_sets[1:])
union = set().union(*auth_sets)
defer.returnValue(union - intersection)
def _seperate(state_sets):
"""Return the unconflicted and conflicted state. This is different than in
the original algorithm, as this defines a key to be conflicted if one of
the state sets doesn't have that key.
Args:
state_sets (list)
Returns:
tuple[dict, dict]: A tuple of unconflicted and conflicted state. The
conflicted state dict is a map from type/state_key to set of event IDs
"""
unconflicted_state = {}
conflicted_state = {}
for key in set(itertools.chain.from_iterable(state_sets)):
event_ids = set(state_set.get(key) for state_set in state_sets)
if len(event_ids) == 1:
unconflicted_state[key] = event_ids.pop()
else:
event_ids.discard(None)
conflicted_state[key] = event_ids
return unconflicted_state, conflicted_state
def _is_power_event(event):
"""Return whether or not the event is a "power event", as defined by the
v2 state resolution algorithm
Args:
event (FrozenEvent)
Returns:
boolean
"""
if (event.type, event.state_key) in (
(EventTypes.PowerLevels, ""),
(EventTypes.JoinRules, ""),
(EventTypes.Create, ""),
):
return True
if event.type == EventTypes.Member:
if event.membership in ('leave', 'ban'):
return event.sender != event.state_key
return False
@defer.inlineCallbacks
def _add_event_and_auth_chain_to_graph(graph, event_id, event_map,
state_res_store, auth_diff):
"""Helper function for _reverse_topological_power_sort that add the event
and its auth chain (that is in the auth diff) to the graph
Args:
graph (dict[str, set[str]]): A map from event ID to the events auth
event IDs
event_id (str): Event to add to the graph
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
auth_diff (set[str]): Set of event IDs that are in the auth difference.
"""
state = [event_id]
while state:
eid = state.pop()
graph.setdefault(eid, set())
event = yield _get_event(eid, event_map, state_res_store)
for aid, _ in event.auth_events:
if aid in auth_diff:
if aid not in graph:
state.append(aid)
graph.setdefault(eid, set()).add(aid)
@defer.inlineCallbacks
def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts
Args:
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
auth_diff (set[str]): Set of event IDs that are in the auth difference.
Returns:
Deferred[list[str]]: The sorted list
"""
graph = {}
for event_id in event_ids:
yield _add_event_and_auth_chain_to_graph(
graph, event_id, event_map, state_res_store, auth_diff,
)
event_to_pl = {}
for event_id in graph:
pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store)
event_to_pl[event_id] = pl
def _get_power_order(event_id):
ev = event_map[event_id]
pl = event_to_pl[event_id]
return -pl, ev.origin_server_ts, event_id
# Note: graph is modified during the sort
it = lexicographical_topological_sort(
graph,
key=_get_power_order,
)
sorted_events = list(it)
defer.returnValue(sorted_events)
@defer.inlineCallbacks
def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
event_ids (list[str]): Ordered list of events to apply auth checks to
base_state (dict[tuple[str, str], str]): The set of state to start with
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[dict[tuple[str, str], str]]: Returns the final updated state
"""
resolved_state = base_state.copy()
for event_id in event_ids:
event = event_map[event_id]
auth_events = {}
for aid, _ in event.auth_events:
ev = yield _get_event(aid, event_map, state_res_store)
if ev.rejected_reason is None:
auth_events[(ev.type, ev.state_key)] = ev
for key in event_auth.auth_types_for_event(event):
if key in resolved_state:
ev_id = resolved_state[key]
ev = yield _get_event(ev_id, event_map, state_res_store)
if ev.rejected_reason is None:
auth_events[key] = event_map[ev_id]
try:
event_auth.check(
event, auth_events,
do_sig_check=False,
do_size_check=False
)
resolved_state[(event.type, event.state_key)] = event_id
except AuthError:
pass
defer.returnValue(resolved_state)
@defer.inlineCallbacks
def _mainline_sort(event_ids, resolved_power_event_id, event_map,
state_res_store):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id
Args:
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[list[str]]: The sorted list
"""
mainline = []
pl = resolved_power_event_id
while pl:
mainline.append(pl)
pl_ev = yield _get_event(pl, event_map, state_res_store)
auth_events = pl_ev.auth_events
pl = None
for aid, _ in auth_events:
ev = yield _get_event(aid, event_map, state_res_store)
if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""):
pl = aid
break
mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
event_ids = list(event_ids)
order_map = {}
for ev_id in event_ids:
depth = yield _get_mainline_depth_for_event(
event_map[ev_id], mainline_map,
event_map, state_res_store,
)
order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
event_ids.sort(key=lambda ev_id: order_map[ev_id])
defer.returnValue(event_ids)
@defer.inlineCallbacks
def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_store):
"""Get the mainline depths for the given event based on the mainline map
Args:
event (FrozenEvent)
mainline_map (dict[str, int]): Map from event_id to mainline depth for
events in the mainline.
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[int]
"""
# We do an iterative search, replacing `event with the power level in its
# auth events (if any)
while event:
depth = mainline_map.get(event.event_id)
if depth is not None:
defer.returnValue(depth)
auth_events = event.auth_events
event = None
for aid, _ in auth_events:
aev = yield _get_event(aid, event_map, state_res_store)
if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""):
event = aev
break
# Didn't find a power level auth event, so we just return 0
defer.returnValue(0)
@defer.inlineCallbacks
def _get_event(event_id, event_map, state_res_store):
"""Helper function to look up event in event_map, falling back to looking
it up in the store
Args:
event_id (str)
event_map (dict[str,FrozenEvent])
state_res_store (StateResolutionStore)
Returns:
Deferred[FrozenEvent]
"""
if event_id not in event_map:
events = yield state_res_store.get_events([event_id], allow_rejected=True)
event_map.update(events)
defer.returnValue(event_map[event_id])
def lexicographical_topological_sort(graph, key):
"""Performs a lexicographic reverse topological sort on the graph.
This returns a reverse topological sort (i.e. if node A references B then B
appears before A in the sort), with ties broken lexicographically based on
return value of the `key` function.
NOTE: `graph` is modified during the sort.
Args:
graph (dict[str, set[str]]): A representation of the graph where each
node is a key in the dict and its value are the nodes edges.
key (func): A function that takes a node and returns a value that is
comparable and used to order nodes
Yields:
str: The next node in the topological sort
"""
# Note, this is basically Kahn's algorithm except we look at nodes with no
# outgoing edges, c.f.
# https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm
outdegree_map = graph
reverse_graph = {}
# Lists of nodes with zero out degree. Is actually a tuple of
# `(key(node), node)` so that sorting does the right thing
zero_outdegree = []
for node, edges in iteritems(graph):
if len(edges) == 0:
zero_outdegree.append((key(node), node))
reverse_graph.setdefault(node, set())
for edge in edges:
reverse_graph.setdefault(edge, set()).add(node)
# heapq is a built in implementation of a sorted queue.
heapq.heapify(zero_outdegree)
while zero_outdegree:
_, node = heapq.heappop(zero_outdegree)
for parent in reverse_graph[node]:
out = outdegree_map[parent]
out.discard(node)
if len(out) == 0:
heapq.heappush(zero_outdegree, (key(parent), parent))
yield node

View File

@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Ok, we need to defer to the state handler to resolve our state sets.
def get_events(ev_ids):
return self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
state_groups = {
sg: state_groups_map[sg] for sg in new_state_groups
}
@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups, events_map, get_events
room_id, room_version, state_groups, events_map,
state_res_store=StateResolutionStore(self)
)
defer.returnValue((res.state, None))
@ -854,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
# We want to store event_auth mappings for rejected events, as they're
# used in state res v2.
# This is only necessary if the rejected event appears in an accepted
# event's auth chain, but its easier for now just to store them (and
# it doesn't take much storage compared to storing the entire event
# anyway).
self._simple_insert_many_txn(
txn,
table="event_auth",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
if event.is_state()
],
)
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
events_and_contexts = self._store_rejected_events_txn(
@ -1329,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, event.room_id, event.redacts
)
self._simple_insert_many_txn(
txn,
table="event_auth",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
}
for event, _ in events_and_contexts
for auth_id, _ in event.auth_events
if event.is_state()
],
)
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(

View File

@ -575,7 +575,7 @@ class RegistrationStore(RegistrationWorkerStore,
def _find_next_generated_user_id(txn):
txn.execute("SELECT name FROM users")
regex = re.compile("^@(\d+):")
regex = re.compile(r"^@(\d+):")
found = set()

View File

@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
from twisted.internet import defer
from twisted.internet import defer, threads
logger = logging.getLogger(__name__)
@ -562,58 +562,76 @@ def _set_context_cb(result, context):
return result
# modules to ignore in `logcontext_tracer`
_to_ignore = [
"synapse.util.logcontext",
"synapse.http.server",
"synapse.storage._base",
"synapse.util.async_helpers",
]
def logcontext_tracer(frame, event, arg):
"""A tracer that logs whenever a logcontext "unexpectedly" changes within
a function. Probably inaccurate.
Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
def defer_to_thread(reactor, f, *args, **kwargs):
"""
if event == 'call':
name = frame.f_globals["__name__"]
if name.startswith("synapse"):
if name == "synapse.util.logcontext":
if frame.f_code.co_name in ["__enter__", "__exit__"]:
tracer = frame.f_back.f_trace
if tracer:
tracer.just_changed = True
Calls the function `f` using a thread from the reactor's default threadpool and
returns the result as a Deferred.
tracer = frame.f_trace
if tracer:
return tracer
Creates a new logcontext for `f`, which is created as a child of the current
logcontext (so its CPU usage metrics will get attributed to the current
logcontext). `f` should preserve the logcontext it is given.
if not any(name.startswith(ig) for ig in _to_ignore):
return LineTracer()
The result deferred follows the Synapse logcontext rules: you should `yield`
on it.
Args:
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
the Deferred will be invoked, and whose threadpool we should use for the
function.
Normally this will be hs.get_reactor().
f (callable): The function to call.
args: positional arguments to pass to f.
kwargs: keyword arguments to pass to f.
Returns:
Deferred: A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
class LineTracer(object):
__slots__ = ["context", "just_changed"]
def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
"""
A wrapper for twisted.internet.threads.deferToThreadpool, which handles
logcontexts correctly.
def __init__(self):
self.context = LoggingContext.current_context()
self.just_changed = False
Calls the function `f` using a thread from the given threadpool and returns
the result as a Deferred.
def __call__(self, frame, event, arg):
if event in 'line':
if self.just_changed:
self.context = LoggingContext.current_context()
self.just_changed = False
else:
c = LoggingContext.current_context()
if c != self.context:
logger.info(
"Context changed! %s -> %s, %s, %s",
self.context, c,
frame.f_code.co_filename, frame.f_lineno
)
self.context = c
Creates a new logcontext for `f`, which is created as a child of the current
logcontext (so its CPU usage metrics will get attributed to the current
logcontext). `f` should preserve the logcontext it is given.
return self
The result deferred follows the Synapse logcontext rules: you should `yield`
on it.
Args:
reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
the Deferred will be invoked. Normally this will be hs.get_reactor().
threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
running `f`. Normally this will be hs.get_reactor().getThreadPool().
f (callable): The function to call.
args: positional arguments to pass to f.
kwargs: keyword arguments to pass to f.
Returns:
Deferred: A Deferred which fires a callback with the result of `f`, or an
errback if `f` throws an exception.
"""
logcontext = LoggingContext.current_context()
def g():
with LoggingContext(parent_context=logcontext):
return f(*args, **kwargs)
return make_deferred_yieldable(
threads.deferToThreadPool(reactor, threadpool, g)
)

View File

@ -60,7 +60,7 @@ class FilteringTestCase(unittest.TestCase):
invalid_filters = [
{"boom": {}},
{"account_data": "Hello World"},
{"event_fields": ["\\foo"]},
{"event_fields": [r"\\foo"]},
{"room": {"timeline": {"limit": 0}, "state": {"not_bars": ["*"]}}},
{"event_format": "other"},
{"room": {"not_rooms": ["#foo:pik-test"]}},
@ -109,6 +109,16 @@ class FilteringTestCase(unittest.TestCase):
"event_format": "client",
"event_fields": ["type", "content", "sender"],
},
# a single backslash should be permitted (though it is debatable whether
# it should be permitted before anything other than `.`, and what that
# actually means)
#
# (note that event_fields is implemented in
# synapse.events.utils.serialize_event, and so whether this actually works
# is tested elsewhere. We just want to check that it is allowed through the
# filter validation)
{"event_fields": [r"foo\.bar"]},
]
for filter in valid_filters:
try:

View File

@ -67,6 +67,6 @@ class ConfigGenerationTestCase(unittest.TestCase):
with open(log_config_file) as f:
config = f.read()
# find the 'filename' line
matches = re.findall("^\s*filename:\s*(.*)$", config, re.M)
matches = re.findall(r"^\s*filename:\s*(.*)$", config, re.M)
self.assertEqual(1, len(matches))
self.assertEqual(matches[0], expected)

View File

@ -156,7 +156,7 @@ class SerializeEventTestCase(unittest.TestCase):
room_id="!foo:bar",
content={"key.with.dots": {}},
),
["content.key\.with\.dots"],
[r"content.key\.with\.dots"],
),
{"content": {"key.with.dots": {}}},
)
@ -172,7 +172,7 @@ class SerializeEventTestCase(unittest.TestCase):
"nested.dot.key": {"leaf.key": 42, "not_me_either": 1},
},
),
["content.nested\.dot\.key.leaf\.key"],
[r"content.nested\.dot\.key.leaf\.key"],
),
{"content": {"nested.dot.key": {"leaf.key": 42}}},
)

0
tests/state/__init__.py Normal file
View File

663
tests/state/test_v2.py Normal file
View File

@ -0,0 +1,663 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from six.moves import zip
import attr
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.event_auth import auth_types_for_event
from synapse.events import FrozenEvent
from synapse.state.v2 import lexicographical_topological_sort, resolve_events_with_store
from synapse.types import EventID
from tests import unittest
ALICE = "@alice:example.com"
BOB = "@bob:example.com"
CHARLIE = "@charlie:example.com"
EVELYN = "@evelyn:example.com"
ZARA = "@zara:example.com"
ROOM_ID = "!test:example.com"
MEMBERSHIP_CONTENT_JOIN = {"membership": Membership.JOIN}
MEMBERSHIP_CONTENT_BAN = {"membership": Membership.BAN}
ORIGIN_SERVER_TS = 0
class FakeEvent(object):
"""A fake event we use as a convenience.
NOTE: Again as a convenience we use "node_ids" rather than event_ids to
refer to events. The event_id has node_id as localpart and example.com
as domain.
"""
def __init__(self, id, sender, type, state_key, content):
self.node_id = id
self.event_id = EventID(id, "example.com").to_string()
self.sender = sender
self.type = type
self.state_key = state_key
self.content = content
def to_event(self, auth_events, prev_events):
"""Given the auth_events and prev_events, convert to a Frozen Event
Args:
auth_events (list[str]): list of event_ids
prev_events (list[str]): list of event_ids
Returns:
FrozenEvent
"""
global ORIGIN_SERVER_TS
ts = ORIGIN_SERVER_TS
ORIGIN_SERVER_TS = ORIGIN_SERVER_TS + 1
event_dict = {
"auth_events": [(a, {}) for a in auth_events],
"prev_events": [(p, {}) for p in prev_events],
"event_id": self.node_id,
"sender": self.sender,
"type": self.type,
"content": self.content,
"origin_server_ts": ts,
"room_id": ROOM_ID,
}
if self.state_key is not None:
event_dict["state_key"] = self.state_key
return FrozenEvent(event_dict)
# All graphs start with this set of events
INITIAL_EVENTS = [
FakeEvent(
id="CREATE",
sender=ALICE,
type=EventTypes.Create,
state_key="",
content={"creator": ALICE},
),
FakeEvent(
id="IMA",
sender=ALICE,
type=EventTypes.Member,
state_key=ALICE,
content=MEMBERSHIP_CONTENT_JOIN,
),
FakeEvent(
id="IPOWER",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key="",
content={"users": {ALICE: 100}},
),
FakeEvent(
id="IJR",
sender=ALICE,
type=EventTypes.JoinRules,
state_key="",
content={"join_rule": JoinRules.PUBLIC},
),
FakeEvent(
id="IMB",
sender=BOB,
type=EventTypes.Member,
state_key=BOB,
content=MEMBERSHIP_CONTENT_JOIN,
),
FakeEvent(
id="IMC",
sender=CHARLIE,
type=EventTypes.Member,
state_key=CHARLIE,
content=MEMBERSHIP_CONTENT_JOIN,
),
FakeEvent(
id="IMZ",
sender=ZARA,
type=EventTypes.Member,
state_key=ZARA,
content=MEMBERSHIP_CONTENT_JOIN,
),
FakeEvent(
id="START",
sender=ZARA,
type=EventTypes.Message,
state_key=None,
content={},
),
FakeEvent(
id="END",
sender=ZARA,
type=EventTypes.Message,
state_key=None,
content={},
),
]
INITIAL_EDGES = [
"START", "IMZ", "IMC", "IMB", "IJR", "IPOWER", "IMA", "CREATE",
]
class StateTestCase(unittest.TestCase):
def test_ban_vs_pl(self):
events = [
FakeEvent(
id="PA",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key="",
content={
"users": {
ALICE: 100,
BOB: 50,
}
},
),
FakeEvent(
id="MA",
sender=ALICE,
type=EventTypes.Member,
state_key=ALICE,
content={"membership": Membership.JOIN},
),
FakeEvent(
id="MB",
sender=ALICE,
type=EventTypes.Member,
state_key=BOB,
content={"membership": Membership.BAN},
),
FakeEvent(
id="PB",
sender=BOB,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
]
edges = [
["END", "MB", "MA", "PA", "START"],
["END", "PB", "PA"],
]
expected_state_ids = ["PA", "MA", "MB"]
self.do_check(events, edges, expected_state_ids)
def test_join_rule_evasion(self):
events = [
FakeEvent(
id="JR",
sender=ALICE,
type=EventTypes.JoinRules,
state_key="",
content={"join_rules": JoinRules.PRIVATE},
),
FakeEvent(
id="ME",
sender=EVELYN,
type=EventTypes.Member,
state_key=EVELYN,
content={"membership": Membership.JOIN},
),
]
edges = [
["END", "JR", "START"],
["END", "ME", "START"],
]
expected_state_ids = ["JR"]
self.do_check(events, edges, expected_state_ids)
def test_offtopic_pl(self):
events = [
FakeEvent(
id="PA",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key="",
content={
"users": {
ALICE: 100,
BOB: 50,
}
},
),
FakeEvent(
id="PB",
sender=BOB,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
CHARLIE: 50,
},
},
),
FakeEvent(
id="PC",
sender=CHARLIE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
CHARLIE: 0,
},
},
),
]
edges = [
["END", "PC", "PB", "PA", "START"],
["END", "PA"],
]
expected_state_ids = ["PC"]
self.do_check(events, edges, expected_state_ids)
def test_topic_basic(self):
events = [
FakeEvent(
id="T1",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="PA1",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
FakeEvent(
id="T2",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="PA2",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 0,
},
},
),
FakeEvent(
id="PB",
sender=BOB,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
FakeEvent(
id="T3",
sender=BOB,
type=EventTypes.Topic,
state_key="",
content={},
),
]
edges = [
["END", "PA2", "T2", "PA1", "T1", "START"],
["END", "T3", "PB", "PA1"],
]
expected_state_ids = ["PA2", "T2"]
self.do_check(events, edges, expected_state_ids)
def test_topic_reset(self):
events = [
FakeEvent(
id="T1",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="PA",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
FakeEvent(
id="T2",
sender=BOB,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="MB",
sender=ALICE,
type=EventTypes.Member,
state_key=BOB,
content={"membership": Membership.BAN},
),
]
edges = [
["END", "MB", "T2", "PA", "T1", "START"],
["END", "T1"],
]
expected_state_ids = ["T1", "MB", "PA"]
self.do_check(events, edges, expected_state_ids)
def test_topic(self):
events = [
FakeEvent(
id="T1",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="PA1",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
FakeEvent(
id="T2",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="PA2",
sender=ALICE,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 0,
},
},
),
FakeEvent(
id="PB",
sender=BOB,
type=EventTypes.PowerLevels,
state_key='',
content={
"users": {
ALICE: 100,
BOB: 50,
},
},
),
FakeEvent(
id="T3",
sender=BOB,
type=EventTypes.Topic,
state_key="",
content={},
),
FakeEvent(
id="MZ1",
sender=ZARA,
type=EventTypes.Message,
state_key=None,
content={},
),
FakeEvent(
id="T4",
sender=ALICE,
type=EventTypes.Topic,
state_key="",
content={},
),
]
edges = [
["END", "T4", "MZ1", "PA2", "T2", "PA1", "T1", "START"],
["END", "MZ1", "T3", "PB", "PA1"],
]
expected_state_ids = ["T4", "PA2"]
self.do_check(events, edges, expected_state_ids)
def do_check(self, events, edges, expected_state_ids):
"""Take a list of events and edges and calculate the state of the
graph at END, and asserts it matches `expected_state_ids`
Args:
events (list[FakeEvent])
edges (list[list[str]]): A list of chains of event edges, e.g.
`[[A, B, C]]` are edges A->B and B->C.
expected_state_ids (list[str]): The expected state at END, (excluding
the keys that haven't changed since START).
"""
# We want to sort the events into topological order for processing.
graph = {}
# node_id -> FakeEvent
fake_event_map = {}
for ev in itertools.chain(INITIAL_EVENTS, events):
graph[ev.node_id] = set()
fake_event_map[ev.node_id] = ev
for a, b in pairwise(INITIAL_EDGES):
graph[a].add(b)
for edge_list in edges:
for a, b in pairwise(edge_list):
graph[a].add(b)
# event_id -> FrozenEvent
event_map = {}
# node_id -> state
state_at_event = {}
# We copy the map as the sort consumes the graph
graph_copy = {k: set(v) for k, v in graph.items()}
for node_id in lexicographical_topological_sort(graph_copy, key=lambda e: e):
fake_event = fake_event_map[node_id]
event_id = fake_event.event_id
prev_events = list(graph[node_id])
if len(prev_events) == 0:
state_before = {}
elif len(prev_events) == 1:
state_before = dict(state_at_event[prev_events[0]])
else:
state_d = resolve_events_with_store(
[state_at_event[n] for n in prev_events],
event_map=event_map,
state_res_store=TestStateResolutionStore(event_map),
)
self.assertTrue(state_d.called)
state_before = state_d.result
state_after = dict(state_before)
if fake_event.state_key is not None:
state_after[(fake_event.type, fake_event.state_key)] = event_id
auth_types = set(auth_types_for_event(fake_event))
auth_events = []
for key in auth_types:
if key in state_before:
auth_events.append(state_before[key])
event = fake_event.to_event(auth_events, prev_events)
state_at_event[node_id] = state_after
event_map[event_id] = event
expected_state = {}
for node_id in expected_state_ids:
# expected_state_ids are node IDs rather than event IDs,
# so we have to convert
event_id = EventID(node_id, "example.com").to_string()
event = event_map[event_id]
key = (event.type, event.state_key)
expected_state[key] = event_id
start_state = state_at_event["START"]
end_state = {
key: value
for key, value in state_at_event["END"].items()
if key in expected_state or start_state.get(key) != value
}
self.assertEqual(expected_state, end_state)
class LexicographicalTestCase(unittest.TestCase):
def test_simple(self):
graph = {
"l": {"o"},
"m": {"n", "o"},
"n": {"o"},
"o": set(),
"p": {"o"},
}
res = list(lexicographical_topological_sort(graph, key=lambda x: x))
self.assertEqual(["o", "l", "n", "m", "p"], res)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
@attr.s
class TestStateResolutionStore(object):
event_map = attr.ib()
def get_events(self, event_ids, allow_rejected=False):
"""Get events from the database
Args:
event_ids (list): The event_ids of the events to fetch
allow_rejected (bool): If True return rejected events.
Returns:
Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
"""
return {
eid: self.event_map[eid]
for eid in event_ids
if eid in self.event_map
}
def get_auth_chain(self, event_ids):
"""Gets the full auth chain for a set of events (including rejected
events).
Includes the given event IDs in the result.
Note that:
1. All events must be state events.
2. For v1 rooms this may not have the full auth chain in the
presence of rejected events
Args:
event_ids (list): The event IDs of the events to fetch the auth
chain for. Must be state events.
Returns:
Deferred[list[str]]: List of event IDs of the auth chain.
"""
# Simple DFS for auth chain
result = set()
stack = list(event_ids)
while stack:
event_id = stack.pop()
if event_id in result:
continue
result.add(event_id)
event = self.event_map[event_id]
for aid, _ in event.auth_events:
stack.append(aid)
return list(result)