Merge branch 'develop' into cross-signing_keys

This commit is contained in:
Hubert Chathi 2019-09-04 19:12:29 -04:00
commit faf72a4c40
132 changed files with 2166 additions and 1701 deletions

View file

@ -51,8 +51,8 @@ class AccountDataEventSource(object):
{"type": account_data_type, "content": content, "room_id": room_id}
)
return (results, current_stream_id)
return results, current_stream_id
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
return ([], config.to_id)
return [], config.to_id

View file

@ -294,12 +294,10 @@ class ApplicationServicesHandler(object):
# we don't know if they are unknown or not since it isn't one of our
# users. We can't poke ASes.
return False
return
user_info = yield self.store.get_user_by_id(user_id)
if user_info:
return False
return
# user not found; could be the AS though, so check.
services = self.store.get_app_services()

View file

@ -280,7 +280,7 @@ class AuthHandler(BaseHandler):
creds,
list(clientdict),
)
return (creds, clientdict, session["id"])
return creds, clientdict, session["id"]
ret = self._auth_dict_for_flows(flows, session)
ret["completed"] = list(creds)
@ -722,7 +722,7 @@ class AuthHandler(BaseHandler):
known_login_type = True
is_valid = yield provider.check_password(qualified_user_id, password)
if is_valid:
return (qualified_user_id, None)
return qualified_user_id, None
if not hasattr(provider, "get_supported_login_types") or not hasattr(
provider, "check_auth"
@ -766,7 +766,7 @@ class AuthHandler(BaseHandler):
)
if canonical_user_id:
return (canonical_user_id, None)
return canonical_user_id, None
if not known_login_type:
raise SynapseError(400, "Unknown login type %s" % login_type)
@ -816,7 +816,7 @@ class AuthHandler(BaseHandler):
result = (result, None)
return result
return (None, None)
return None, None
@defer.inlineCallbacks
def _check_local_password(self, user_id, password):

View file

@ -27,6 +27,7 @@ from synapse.api.errors import (
HttpResponseException,
RequestSendFailed,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@ -47,6 +48,7 @@ class DeviceWorkerHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
@trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
@ -58,6 +60,7 @@ class DeviceWorkerHandler(BaseHandler):
defer.Deferred: list[dict[str, X]]: info on each device
"""
set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@ -66,8 +69,10 @@ class DeviceWorkerHandler(BaseHandler):
for device in devices:
_update_device_from_client_ips(device, ips)
log_kv(device_map)
return devices
@trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
@ -87,9 +92,14 @@ class DeviceWorkerHandler(BaseHandler):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
set_tag("device", device)
set_tag("ips", ips)
return device
@measure_func("device.get_user_ids_changed")
@trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
@ -99,6 +109,9 @@ class DeviceWorkerHandler(BaseHandler):
user_id (str)
from_token (StreamToken)
"""
set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()
room_ids = yield self.store.get_rooms_for_user(user_id)
@ -150,6 +163,9 @@ class DeviceWorkerHandler(BaseHandler):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
@ -202,7 +218,11 @@ class DeviceWorkerHandler(BaseHandler):
possibly_joined = []
possibly_left = []
return {"changed": list(possibly_joined), "left": list(possibly_left)}
result = {"changed": list(possibly_joined), "left": list(possibly_left)}
log_kv(result)
return result
class DeviceHandler(DeviceWorkerHandler):
@ -269,6 +289,7 @@ class DeviceHandler(DeviceWorkerHandler):
raise errors.StoreError(500, "Couldn't generate a device ID.")
@trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
@ -286,6 +307,10 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass
else:
raise
@ -298,6 +323,7 @@ class DeviceHandler(DeviceWorkerHandler):
yield self.notify_device_update(user_id, [device_id])
@trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
@ -333,6 +359,8 @@ class DeviceHandler(DeviceWorkerHandler):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
@ -373,6 +401,7 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
@trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
@ -388,6 +417,8 @@ class DeviceHandler(DeviceWorkerHandler):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)
set_tag("target_hosts", hosts)
position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
@ -407,6 +438,7 @@ class DeviceHandler(DeviceWorkerHandler):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
log_kv({"message": "sent device update to host", "host": host})
@defer.inlineCallbacks
def notify_user_signature_update(self, from_user_id, user_ids):
@ -468,12 +500,15 @@ class DeviceListUpdater(object):
iterable=True,
)
@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""
set_tag("origin", origin)
set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
@ -488,12 +523,30 @@ class DeviceListUpdater(object):
device_id,
origin,
)
set_tag("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
"device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return
room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
set_tag("error", True)
log_kv(
{
"message": "Got an update from a user for which "
"we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
@ -595,6 +648,7 @@ class DeviceListUpdater(object):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
@ -611,13 +665,20 @@ class DeviceListUpdater(object):
# eventually become consistent.
return
except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception:
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]

View file

@ -22,6 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
log_kv,
set_tag,
start_active_span,
whitelisted_homeserver,
@ -86,7 +87,8 @@ class DeviceMessageHandler(object):
@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
@ -124,6 +126,7 @@ class DeviceMessageHandler(object):
else None,
}
log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
@ -132,6 +135,7 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.

View file

@ -167,7 +167,6 @@ class EventHandler(BaseHandler):
if not event:
return None
return
users = yield self.store.get_users_in_room(event.room_id)
is_peeking = user.to_string() not in users

View file

@ -1428,7 +1428,7 @@ class FederationHandler(BaseHandler):
assert event.user_id == user_id
assert event.state_key == user_id
assert event.room_id == room_id
return (origin, event, format_ver)
return origin, event, format_ver
@defer.inlineCallbacks
@log_function

View file

@ -282,16 +282,3 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
class LookupAlgorithm:
"""
Supported hashing algorithms when performing a 3PID lookup.
SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
encoding
NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
"""
SHA256 = "sha256"
NONE = "none"

View file

@ -449,8 +449,7 @@ class InitialSyncHandler(BaseHandler):
# * The user is a guest user, and has joined the room
# else it will throw.
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
return (member_event.membership, member_event.event_id)
return
return member_event.membership, member_event.event_id
except AuthError:
visibility = yield self.state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
@ -459,8 +458,7 @@ class InitialSyncHandler(BaseHandler):
visibility
and visibility.content["history_visibility"] == "world_readable"
):
return (Membership.JOIN, None)
return
return Membership.JOIN, None
raise AuthError(
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
)

View file

@ -255,7 +255,7 @@ class PresenceHandler(object):
self.unpersisted_users_changes = set()
if unpersisted:
logger.info("Persisting %d upersisted presence updates", len(unpersisted))
logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
yield self.store.update_presence(
[self.user_to_current_state[user_id] for user_id in unpersisted]
)
@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
#
# Hence this guard where we just return nothing so that the sync
# doesn't return. C.f. #5503.
return ([], max_token)
return [], max_token
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
# Always notify self
users_to_states.setdefault(state.user_id, []).append(state)
return (room_ids_to_states, users_to_states)
return room_ids_to_states, users_to_states
@defer.inlineCallbacks

View file

@ -148,7 +148,7 @@ class ReceiptEventSource(object):
to_key = yield self.get_current_key()
if from_key == to_key:
return ([], to_key)
return [], to_key
events = yield self.store.get_linearized_receipts_for_rooms(
room_ids, from_key=from_key, to_key=to_key

View file

@ -24,13 +24,11 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
InvalidCaptchaError,
LimitExceededError,
RegistrationError,
SynapseError,
)
from synapse.config.server import is_threepid_reserved
from synapse.http.client import CaptchaServerHttpClient
from synapse.http.servlet import assert_params_in_dict
from synapse.replication.http.login import RegisterDeviceReplicationServlet
from synapse.replication.http.register import (
@ -39,7 +37,6 @@ from synapse.replication.http.register import (
)
from synapse.types import RoomAlias, RoomID, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@ -59,7 +56,6 @@ class RegistrationHandler(BaseHandler):
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self.identity_handler = self.hs.get_handlers().identity_handler
self.ratelimiter = hs.get_registration_ratelimiter()
@ -362,70 +358,6 @@ class RegistrationHandler(BaseHandler):
)
return user_id
@defer.inlineCallbacks
def check_recaptcha(self, ip, private_key, challenge, response):
"""
Checks a recaptcha is correct.
Used only by c/s api v1
"""
captcha_response = yield self._validate_captcha(
ip, private_key, challenge, response
)
if not captcha_response["valid"]:
logger.info(
"Invalid captcha entered from %s. Error: %s",
ip,
captcha_response["error_url"],
)
raise InvalidCaptchaError(error_url=captcha_response["error_url"])
else:
logger.info("Valid captcha entered from %s", ip)
@defer.inlineCallbacks
def register_email(self, threepidCreds):
"""
Registers emails with an identity server.
Used only by c/s api v1
"""
for c in threepidCreds:
logger.info(
"validating threepidcred sid %s on id server %s",
c["sid"],
c["idServer"],
)
try:
threepid = yield self.identity_handler.threepid_from_creds(c)
except Exception:
logger.exception("Couldn't validate 3pid")
raise RegistrationError(400, "Couldn't validate 3pid")
if not threepid:
raise RegistrationError(400, "Couldn't validate 3pid")
logger.info(
"got threepid with medium '%s' and address '%s'",
threepid["medium"],
threepid["address"],
)
if not check_3pid_allowed(self.hs, threepid["medium"], threepid["address"]):
raise RegistrationError(403, "Third party identifier is not allowed")
@defer.inlineCallbacks
def bind_emails(self, user_id, threepidCreds):
"""Links emails with a user ID and informs an identity server.
Used only by c/s api v1
"""
# Now we have a matrix ID, bind it to the threepids we were given
for c in threepidCreds:
# XXX: This should be a deferred list, shouldn't it?
yield self.identity_handler.bind_threepid(c, user_id)
def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
# don't allow people to register the server notices mxid
if self._server_notices_mxid is not None:
@ -463,45 +395,8 @@ class RegistrationHandler(BaseHandler):
self._next_generated_user_id += 1
return str(id)
@defer.inlineCallbacks
def _validate_captcha(self, ip_addr, private_key, challenge, response):
"""Validates the captcha provided.
Used only by c/s api v1
Returns:
dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
"""
response = yield self._submit_captcha(ip_addr, private_key, challenge, response)
# parse Google's response. Lovely format..
lines = response.split("\n")
json = {
"valid": lines[0] == "true",
"error_url": "http://www.recaptcha.net/recaptcha/api/challenge?"
+ "error=%s" % lines[1],
}
return json
@defer.inlineCallbacks
def _submit_captcha(self, ip_addr, private_key, challenge, response):
"""
Used only by c/s api v1
"""
data = yield self.captcha_client.post_urlencoded_get_raw(
"http://www.recaptcha.net:80/recaptcha/api/verify",
args={
"privatekey": private_key,
"remoteip": ip_addr,
"challenge": challenge,
"response": response,
},
)
return data
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
@ -622,7 +517,7 @@ class RegistrationHandler(BaseHandler):
initial_display_name=initial_display_name,
is_guest=is_guest,
)
return (r["device_id"], r["access_token"])
return r["device_id"], r["access_token"]
valid_until_ms = None
if self.session_lifetime is not None:
@ -648,9 +543,7 @@ class RegistrationHandler(BaseHandler):
return (device_id, access_token)
@defer.inlineCallbacks
def post_registration_actions(
self, user_id, auth_result, access_token, bind_email, bind_msisdn
):
def post_registration_actions(self, user_id, auth_result, access_token):
"""A user has completed registration
Args:
@ -659,18 +552,10 @@ class RegistrationHandler(BaseHandler):
registered user.
access_token (str|None): The access token of the newly logged in
device, or None if `inhibit_login` enabled.
bind_email (bool): Whether to bind the email with the identity
server.
bind_msisdn (bool): Whether to bind the msisdn with the identity
server.
"""
if self.hs.config.worker_app:
yield self._post_registration_client(
user_id=user_id,
auth_result=auth_result,
access_token=access_token,
bind_email=bind_email,
bind_msisdn=bind_msisdn,
user_id=user_id, auth_result=auth_result, access_token=access_token
)
return
@ -683,13 +568,11 @@ class RegistrationHandler(BaseHandler):
):
yield self.store.upsert_monthly_active_user(user_id)
yield self._register_email_threepid(
user_id, threepid, access_token, bind_email
)
yield self._register_email_threepid(user_id, threepid, access_token)
if auth_result and LoginType.MSISDN in auth_result:
threepid = auth_result[LoginType.MSISDN]
yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn)
yield self._register_msisdn_threepid(user_id, threepid)
if auth_result and LoginType.TERMS in auth_result:
yield self._on_user_consented(user_id, self.hs.config.user_consent_version)
@ -708,14 +591,12 @@ class RegistrationHandler(BaseHandler):
yield self.post_consent_actions(user_id)
@defer.inlineCallbacks
def _register_email_threepid(self, user_id, threepid, token, bind_email):
def _register_email_threepid(self, user_id, threepid, token):
"""Add an email address as a 3pid identifier
Also adds an email pusher for the email address, if configured in the
HS config
Also optionally binds emails to the given user_id on the identity server
Must be called on master.
Args:
@ -723,8 +604,6 @@ class RegistrationHandler(BaseHandler):
threepid (object): m.login.email.identity auth response
token (str|None): access_token for the user, or None if not logged
in.
bind_email (bool): true if the client requested the email to be
bound at the identity server
Returns:
defer.Deferred:
"""
@ -766,29 +645,15 @@ class RegistrationHandler(BaseHandler):
data={},
)
if bind_email:
logger.info("bind_email specified: binding")
logger.debug("Binding emails %s to %s" % (threepid, user_id))
yield self.identity_handler.bind_threepid(
threepid["threepid_creds"], user_id
)
else:
logger.info("bind_email not specified: not binding email")
@defer.inlineCallbacks
def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
def _register_msisdn_threepid(self, user_id, threepid):
"""Add a phone number as a 3pid identifier
Also optionally binds msisdn to the given user_id on the identity server
Must be called on master.
Args:
user_id (str): id of user
threepid (object): m.login.msisdn auth response
token (str): access_token for the user
bind_email (bool): true if the client requested the email to be
bound at the identity server
Returns:
defer.Deferred:
"""
@ -804,12 +669,3 @@ class RegistrationHandler(BaseHandler):
yield self._auth_handler.add_threepid(
user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
)
if bind_msisdn:
logger.info("bind_msisdn specified: binding")
logger.debug("Binding msisdn %s to %s", threepid, user_id)
yield self.identity_handler.bind_threepid(
threepid["threepid_creds"], user_id
)
else:
logger.info("bind_msisdn not specified: not binding msisdn")

View file

@ -852,7 +852,6 @@ class RoomContextHandler(object):
)
if not event:
return None
return
filtered = yield (filter_evts([event]))
if not filtered:

View file

@ -29,11 +29,9 @@ from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
from synapse.handlers.identity import LookupAlgorithm
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
from synapse.util.hash import sha256_and_url_safe_base64
from ._base import BaseHandler
@ -525,7 +523,7 @@ class RoomMemberHandler(object):
event (SynapseEvent): The membership event.
context: The context of the event.
is_guest (bool): Whether the sender is a guest.
remote_room_hosts (list[str]|None): Homeservers which are likely to already be in
room_hosts ([str]): Homeservers which are likely to already be in
the room, and could be danced with in order to join this
homeserver for the first time.
ratelimit (bool): Whether to rate limit this request.
@ -636,7 +634,7 @@ class RoomMemberHandler(object):
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
return RoomID.from_string(room_id), servers
return (RoomID.from_string(room_id), servers)
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
@ -699,44 +697,6 @@ class RoomMemberHandler(object):
raise SynapseError(
403, "Looking up third-party identifiers is denied from this server"
)
# Check what hashing details are supported by this identity server
use_v1 = False
hash_details = None
try:
hash_details = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server)
)
except (HttpResponseException, ValueError) as e:
# Catch HttpResponseExcept for a non-200 response code
# Catch ValueError for non-JSON response body
# Check if this identity server does not know about v2 lookups
if e.code == 404:
# This is an old identity server that does not yet support v2 lookups
use_v1 = True
else:
logger.warn("Error when looking up hashing details: %s" % (e,))
return None
if use_v1:
return (yield self._lookup_3pid_v1(id_server, medium, address))
return (yield self._lookup_3pid_v2(id_server, medium, address, hash_details))
@defer.inlineCallbacks
def _lookup_3pid_v1(self, id_server, medium, address):
"""Looks up a 3pid in the passed identity server using v1 lookup.
Args:
id_server (str): The server name (including port, if required)
of the identity server to use.
medium (str): The type of the third party identifier (e.g. "email").
address (str): The third party identifier (e.g. "foo@example.com").
Returns:
str: the matrix ID of the 3pid, or None if it is not recognized.
"""
try:
data = yield self.simple_http_client.get_json(
"%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@ -751,83 +711,8 @@ class RoomMemberHandler(object):
except IOError as e:
logger.warn("Error from identity server lookup: %s" % (e,))
return None
@defer.inlineCallbacks
def _lookup_3pid_v2(self, id_server, medium, address, hash_details):
"""Looks up a 3pid in the passed identity server using v2 lookup.
Args:
id_server (str): The server name (including port, if required)
of the identity server to use.
medium (str): The type of the third party identifier (e.g. "email").
address (str): The third party identifier (e.g. "foo@example.com").
hash_details (dict[str, str|list]): A dictionary containing hashing information
provided by an identity server.
Returns:
Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
"""
# Extract information from hash_details
supported_lookup_algorithms = hash_details["algorithms"]
lookup_pepper = hash_details["lookup_pepper"]
# Check if any of the supported lookup algorithms are present
if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
# Perform a hashed lookup
lookup_algorithm = LookupAlgorithm.SHA256
# Hash address, medium and the pepper with sha256
to_hash = "%s %s %s" % (address, medium, lookup_pepper)
lookup_value = sha256_and_url_safe_base64(to_hash)
elif LookupAlgorithm.NONE in supported_lookup_algorithms:
# Perform a non-hashed lookup
lookup_algorithm = LookupAlgorithm.NONE
# Combine together plaintext address and medium
lookup_value = "%s %s" % (address, medium)
else:
logger.warn(
"None of the provided lookup algorithms of %s%s are supported: %s",
id_server_scheme,
id_server,
hash_details["algorithms"],
)
raise SynapseError(
400,
"Provided identity server does not support any v2 lookup "
"algorithms that this homeserver supports.",
)
try:
lookup_results = yield self.simple_http_client.post_json_get_json(
"%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
{
"addresses": [lookup_value],
"algorithm": lookup_algorithm,
"pepper": lookup_pepper,
},
)
except (HttpResponseException, ValueError) as e:
# Catch HttpResponseExcept for a non-200 response code
# Catch ValueError for non-JSON response body
logger.warn("Error when performing a 3pid lookup: %s" % (e,))
return None
# Check for a mapping from what we looked up to an MXID
if "mappings" not in lookup_results or not isinstance(
lookup_results["mappings"], dict
):
logger.debug("No results from 3pid lookup")
return None
# Return the MXID if it's available, or None otherwise
mxid = lookup_results["mappings"].get(lookup_value)
return mxid
@defer.inlineCallbacks
def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
@ -1018,7 +903,7 @@ class RoomMemberHandler(object):
if not public_keys:
public_keys.append(fallback_public_key)
display_name = data["display_name"]
return (token, public_keys, fallback_public_key, display_name)
return token, public_keys, fallback_public_key, display_name
@defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):

View file

@ -14,15 +14,14 @@
# limitations under the License.
import logging
from collections import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.constants import EventTypes, Membership
from synapse.handlers.state_deltas import StateDeltasHandler
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
def notify_new_event(self):
"""Called when there may be more deltas to process
"""
if not self.hs.config.stats_enabled:
if not self.hs.config.stats_enabled or self._is_processing:
return
if self._is_processing:
return
self._is_processing = True
@defer.inlineCallbacks
def process():
@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler):
finally:
self._is_processing = False
self._is_processing = True
run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = yield self.store.get_stats_stream_pos()
# If still None then the initial background update hasn't happened yet
if self.pos is None:
return None
self.pos = yield self.store.get_stats_positions()
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(self.pos)
if not deltas:
return
deltas = yield self.store.get_current_state_deltas(self.pos)
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
self.pos = deltas[-1]["stream_id"]
yield self.store.update_stats_stream_pos(self.pos)
max_pos = deltas[-1]["stream_id"]
else:
room_deltas = {}
user_deltas = {}
max_pos = yield self.store.get_room_max_stream_ordering()
event_processing_positions.labels("stats").set(self.pos)
# Then count deltas for total_events and total_event_bytes.
room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
self.pos, max_pos
)
for room_id, fields in room_count.items():
room_deltas.setdefault(room_id, {}).update(fields)
for user_id, fields in user_count.items():
user_deltas.setdefault(user_id, {}).update(fields)
logger.debug("room_deltas: %s", room_deltas)
logger.debug("user_deltas: %s", user_deltas)
# Always call this so that we update the stats position.
yield self.store.bulk_update_stats_delta(
self.clock.time_msec(),
updates={"room": room_deltas, "user": user_deltas},
stream_id=max_pos,
)
event_processing_positions.labels("stats").set(max_pos)
if self.pos == max_pos:
break
self.pos = max_pos
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
Returns:
Deferred[tuple[dict[str, Counter], dict[str, counter]]]
Resovles to two dicts, the room deltas and the user deltas,
mapping from room/user ID to changes in the various fields.
"""
Called with the state deltas to process
"""
room_to_stats_deltas = {}
user_to_stats_deltas = {}
room_to_state_updates = {}
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
token = yield self.store.get_earliest_token_for_room_stats(room_id)
token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler):
continue
if event_id is None and prev_event_id is None:
# Errr...
logger.error(
"event ID is None and so is the previous event ID. stream_id: %s",
stream_id,
)
continue
event_content = {}
sender = None
if event_id is not None:
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}
sender = event.sender
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
# quantise time to the nearest bucket
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
room_state = room_to_state_updates.setdefault(room_id, {})
if prev_event_id is None:
# this state event doesn't overwrite another,
# so it is a new effective/current state event
room_stats_delta["current_state_events"] += 1
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
prev_event_content = {}
# We take None rather than leave as a previous membership
# in the absence of a previous event because we do not want to
# reduce the leave count when a new-to-the-room user joins.
prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
prev_membership = prev_event_content.get(
"membership", Membership.LEAVE
)
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
if prev_membership == membership:
continue
if prev_membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", -1
)
if prev_membership is None:
logger.debug("No previous membership for this user.")
elif membership == prev_membership:
pass # noop
elif prev_membership == Membership.JOIN:
room_stats_delta["joined_members"] -= 1
elif prev_membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", -1
)
room_stats_delta["invited_members"] -= 1
elif prev_membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", -1
)
room_stats_delta["left_members"] -= 1
elif prev_membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", -1
)
room_stats_delta["banned_members"] -= 1
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
logger.error(err)
raise ValueError(err)
raise ValueError(
"%r is not a valid prev_membership" % (prev_membership,)
)
if membership == prev_membership:
pass # noop
if membership == Membership.JOIN:
yield self.store.update_stats_delta(
now, "room", room_id, "joined_members", +1
)
room_stats_delta["joined_members"] += 1
elif membership == Membership.INVITE:
yield self.store.update_stats_delta(
now, "room", room_id, "invited_members", +1
)
room_stats_delta["invited_members"] += 1
if sender and self.is_mine_id(sender):
user_to_stats_deltas.setdefault(sender, Counter())[
"invites_sent"
] += 1
elif membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now, "room", room_id, "left_members", +1
)
room_stats_delta["left_members"] += 1
elif membership == Membership.BAN:
yield self.store.update_stats_delta(
now, "room", room_id, "banned_members", +1
)
room_stats_delta["banned_members"] += 1
else:
err = "%s is not a valid membership" % (repr(membership),)
logger.error(err)
raise ValueError(err)
raise ValueError("%r is not a valid membership" % (membership,))
user_id = state_key
if self.is_mine_id(user_id):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
# this accounts for transitions like leave → ban and so on.
has_changed_joinedness = (prev_membership == Membership.JOIN) != (
membership == Membership.JOIN
)
if membership == Membership.LEAVE:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
-1,
)
elif membership == Membership.JOIN:
yield self.store.update_stats_delta(
now,
"user",
user_id,
"public_rooms" if public else "private_rooms",
+1,
)
if has_changed_joinedness:
delta = +1 if membership == Membership.JOIN else -1
user_to_stats_deltas.setdefault(user_id, Counter())[
"joined_rooms"
] += delta
room_stats_delta["local_users_in_room"] += delta
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
yield self.store.update_room_state(
room_id,
{
"join_rules": None,
"history_visibility": None,
"encryption": None,
"name": None,
"topic": None,
"avatar": None,
"canonical_alias": None,
},
)
room_state["is_federatable"] = event_content.get("m.federate", True)
if sender and self.is_mine_id(sender):
user_to_stats_deltas.setdefault(sender, Counter())[
"rooms_created"
] += 1
elif typ == EventTypes.JoinRules:
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
room_state["join_rules"] = event_content.get("join_rule")
elif typ == EventTypes.RoomHistoryVisibility:
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
room_state["history_visibility"] = event_content.get(
"history_visibility"
)
is_public = yield self._get_key_change(
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
room_id, {"encryption": event_content.get("algorithm")}
)
room_state["encryption"] = event_content.get("algorithm")
elif typ == EventTypes.Name:
yield self.store.update_room_state(
room_id, {"name": event_content.get("name")}
)
room_state["name"] = event_content.get("name")
elif typ == EventTypes.Topic:
yield self.store.update_room_state(
room_id, {"topic": event_content.get("topic")}
)
room_state["topic"] = event_content.get("topic")
elif typ == EventTypes.RoomAvatar:
yield self.store.update_room_state(
room_id, {"avatar": event_content.get("url")}
)
room_state["avatar"] = event_content.get("url")
elif typ == EventTypes.CanonicalAlias:
yield self.store.update_room_state(
room_id, {"canonical_alias": event_content.get("alias")}
)
room_state["canonical_alias"] = event_content.get("alias")
elif typ == EventTypes.GuestAccess:
room_state["guest_access"] = event_content.get("guest_access")
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
Increment/decrement a user's number of public rooms when a room they are
in changes to/from public visibility.
for room_id, state in room_to_state_updates.items():
yield self.store.update_room_state(room_id, state)
Args:
ts (int): Timestamp in seconds
room_id (str)
is_public (bool)
"""
# For now, blindly iterate over all local users in the room so that
# we can handle the whole problem of copying buckets over as needed
user_ids = yield self.store.get_users_in_room(room_id)
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
ts, "user", user_id, "public_rooms", +1 if is_public else -1
)
yield self.store.update_stats_delta(
ts, "user", user_id, "private_rooms", -1 if is_public else +1
)
@defer.inlineCallbacks
def _is_public_room(self, room_id):
join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
history_visibility = yield self.state.get_current_state(
room_id, EventTypes.RoomHistoryVisibility
)
if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
(
history_visibility
and history_visibility.content.get("history_visibility")
== "world_readable"
)
):
return True
else:
return False
return room_to_stats_deltas, user_to_stats_deltas

View file

@ -378,7 +378,7 @@ class SyncHandler(object):
event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
return (now_token, ephemeral_by_room)
return now_token, ephemeral_by_room
@defer.inlineCallbacks
def _load_filtered_recents(
@ -578,7 +578,6 @@ class SyncHandler(object):
if not last_events:
return None
return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
@ -1337,7 +1336,7 @@ class SyncHandler(object):
)
if not tags_by_room:
logger.debug("no-oping sync")
return ([], [], [], [])
return [], [], [], []
ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id=user_id
@ -1647,7 +1646,7 @@ class SyncHandler(object):
)
room_entries.append(entry)
return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
return room_entries, invited, newly_joined_rooms, newly_left_rooms
@defer.inlineCallbacks
def _get_all_rooms(self, sync_result_builder, ignored_users):
@ -1721,7 +1720,7 @@ class SyncHandler(object):
)
)
return (room_entries, invited, [])
return room_entries, invited, []
@defer.inlineCallbacks
def _generate_room_entry(

View file

@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
return self.get_typing_handler()._latest_room_serial
def get_pagination_rows(self, user, pagination_config, key):
return ([], pagination_config.from_key)
return [], pagination_config.from_key