mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-03 02:54:52 -04:00
Merge branch 'cross-signing_hidden' into cross-signing_keys
This commit is contained in:
commit
336c546d6a
298 changed files with 3414 additions and 2145 deletions
|
@ -66,7 +66,7 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
for device in devices:
|
||||
_update_device_from_client_ips(device, ips)
|
||||
|
||||
defer.returnValue(devices)
|
||||
return devices
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_device(self, user_id, device_id):
|
||||
|
@ -87,7 +87,7 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
raise errors.NotFoundError
|
||||
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
|
||||
_update_device_from_client_ips(device, ips)
|
||||
defer.returnValue(device)
|
||||
return device
|
||||
|
||||
@measure_func("device.get_user_ids_changed")
|
||||
@defer.inlineCallbacks
|
||||
|
@ -202,9 +202,7 @@ class DeviceWorkerHandler(BaseHandler):
|
|||
possibly_joined = []
|
||||
possibly_left = []
|
||||
|
||||
defer.returnValue(
|
||||
{"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
)
|
||||
return {"changed": list(possibly_joined), "left": list(possibly_left)}
|
||||
|
||||
|
||||
class DeviceHandler(DeviceWorkerHandler):
|
||||
|
@ -213,12 +211,12 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||
|
||||
self.federation_sender = hs.get_federation_sender()
|
||||
|
||||
self._edu_updater = DeviceListEduUpdater(hs, self)
|
||||
self.device_list_updater = DeviceListUpdater(hs, self)
|
||||
|
||||
federation_registry = hs.get_federation_registry()
|
||||
|
||||
federation_registry.register_edu_handler(
|
||||
"m.device_list_update", self._edu_updater.incoming_device_list_update
|
||||
"m.device_list_update", self.device_list_updater.incoming_device_list_update
|
||||
)
|
||||
federation_registry.register_query_handler(
|
||||
"user_devices", self.on_federation_query_user_devices
|
||||
|
@ -252,7 +250,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||
)
|
||||
if new_device:
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
defer.returnValue(device_id)
|
||||
return device_id
|
||||
|
||||
# if the device id is not specified, we'll autogen one, but loop a few
|
||||
# times in case of a clash.
|
||||
|
@ -266,7 +264,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||
)
|
||||
if new_device:
|
||||
yield self.notify_device_update(user_id, [device_id])
|
||||
defer.returnValue(device_id)
|
||||
return device_id
|
||||
attempts += 1
|
||||
|
||||
raise errors.StoreError(500, "Couldn't generate a device ID.")
|
||||
|
@ -428,9 +426,7 @@ class DeviceHandler(DeviceWorkerHandler):
|
|||
@defer.inlineCallbacks
|
||||
def on_federation_query_user_devices(self, user_id):
|
||||
stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
|
||||
defer.returnValue(
|
||||
{"user_id": user_id, "stream_id": stream_id, "devices": devices}
|
||||
)
|
||||
return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_left_room(self, user, room_id):
|
||||
|
@ -447,7 +443,7 @@ def _update_device_from_client_ips(device, client_ips):
|
|||
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
|
||||
|
||||
|
||||
class DeviceListEduUpdater(object):
|
||||
class DeviceListUpdater(object):
|
||||
"Handles incoming device list updates from federation and updates the DB"
|
||||
|
||||
def __init__(self, hs, device_handler):
|
||||
|
@ -540,75 +536,7 @@ class DeviceListEduUpdater(object):
|
|||
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
|
||||
|
||||
if resync:
|
||||
# Fetch all devices for the user.
|
||||
origin = get_domain_from_id(user_id)
|
||||
try:
|
||||
result = yield self.federation.query_user_devices(origin, user_id)
|
||||
except (
|
||||
NotRetryingDestination,
|
||||
RequestSendFailed,
|
||||
HttpResponseException,
|
||||
):
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
logger.warn("Failed to handle device list update for %s", user_id)
|
||||
# We abort on exceptions rather than accepting the update
|
||||
# as otherwise synapse will 'forget' that its device list
|
||||
# is out of date. If we bail then we will retry the resync
|
||||
# next time we get a device list update for this user_id.
|
||||
# This makes it more likely that the device lists will
|
||||
# eventually become consistent.
|
||||
return
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
return
|
||||
except Exception:
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
logger.exception(
|
||||
"Failed to handle device list update for %s", user_id
|
||||
)
|
||||
return
|
||||
|
||||
stream_id = result["stream_id"]
|
||||
devices = result["devices"]
|
||||
|
||||
# If the remote server has more than ~1000 devices for this user
|
||||
# we assume that something is going horribly wrong (e.g. a bot
|
||||
# that logs in and creates a new device every time it tries to
|
||||
# send a message). Maintaining lots of devices per user in the
|
||||
# cache can cause serious performance issues as if this request
|
||||
# takes more than 60s to complete, internal replication from the
|
||||
# inbound federation worker to the synapse master may time out
|
||||
# causing the inbound federation to fail and causing the remote
|
||||
# server to retry, causing a DoS. So in this scenario we give
|
||||
# up on storing the total list of devices and only handle the
|
||||
# delta instead.
|
||||
if len(devices) > 1000:
|
||||
logger.warn(
|
||||
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
|
||||
user_id,
|
||||
len(devices),
|
||||
)
|
||||
devices = []
|
||||
|
||||
for device in devices:
|
||||
logger.debug(
|
||||
"Handling resync update %r/%r, ID: %r",
|
||||
user_id,
|
||||
device["device_id"],
|
||||
stream_id,
|
||||
)
|
||||
|
||||
yield self.store.update_remote_device_list_cache(
|
||||
user_id, devices, stream_id
|
||||
)
|
||||
device_ids = [device["device_id"] for device in devices]
|
||||
yield self.device_handler.notify_device_update(user_id, device_ids)
|
||||
|
||||
# We clobber the seen updates since we've re-synced from a given
|
||||
# point.
|
||||
self._seen_updates[user_id] = set([stream_id])
|
||||
yield self.user_device_resync(user_id)
|
||||
else:
|
||||
# Simply update the single device, since we know that is the only
|
||||
# change (because of the single prev_id matching the current cache)
|
||||
|
@ -640,7 +568,7 @@ class DeviceListEduUpdater(object):
|
|||
for _, stream_id, prev_ids, _ in updates:
|
||||
if not prev_ids:
|
||||
# We always do a resync if there are no previous IDs
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
for prev_id in prev_ids:
|
||||
if prev_id == extremity:
|
||||
|
@ -650,8 +578,82 @@ class DeviceListEduUpdater(object):
|
|||
elif prev_id in stream_id_in_updates:
|
||||
continue
|
||||
else:
|
||||
defer.returnValue(True)
|
||||
return True
|
||||
|
||||
stream_id_in_updates.add(stream_id)
|
||||
|
||||
defer.returnValue(False)
|
||||
return False
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def user_device_resync(self, user_id):
|
||||
"""Fetches all devices for a user and updates the device cache with them.
|
||||
|
||||
Args:
|
||||
user_id (str): The user's id whose device_list will be updated.
|
||||
Returns:
|
||||
Deferred[dict]: a dict with device info as under the "devices" in the result of this
|
||||
request:
|
||||
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
|
||||
"""
|
||||
# Fetch all devices for the user.
|
||||
origin = get_domain_from_id(user_id)
|
||||
try:
|
||||
result = yield self.federation.query_user_devices(origin, user_id)
|
||||
except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
logger.warn("Failed to handle device list update for %s", user_id)
|
||||
# We abort on exceptions rather than accepting the update
|
||||
# as otherwise synapse will 'forget' that its device list
|
||||
# is out of date. If we bail then we will retry the resync
|
||||
# next time we get a device list update for this user_id.
|
||||
# This makes it more likely that the device lists will
|
||||
# eventually become consistent.
|
||||
return
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
return
|
||||
except Exception:
|
||||
# TODO: Remember that we are now out of sync and try again
|
||||
# later
|
||||
logger.exception("Failed to handle device list update for %s", user_id)
|
||||
return
|
||||
stream_id = result["stream_id"]
|
||||
devices = result["devices"]
|
||||
|
||||
# If the remote server has more than ~1000 devices for this user
|
||||
# we assume that something is going horribly wrong (e.g. a bot
|
||||
# that logs in and creates a new device every time it tries to
|
||||
# send a message). Maintaining lots of devices per user in the
|
||||
# cache can cause serious performance issues as if this request
|
||||
# takes more than 60s to complete, internal replication from the
|
||||
# inbound federation worker to the synapse master may time out
|
||||
# causing the inbound federation to fail and causing the remote
|
||||
# server to retry, causing a DoS. So in this scenario we give
|
||||
# up on storing the total list of devices and only handle the
|
||||
# delta instead.
|
||||
if len(devices) > 1000:
|
||||
logger.warn(
|
||||
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
|
||||
user_id,
|
||||
len(devices),
|
||||
)
|
||||
devices = []
|
||||
|
||||
for device in devices:
|
||||
logger.debug(
|
||||
"Handling resync update %r/%r, ID: %r",
|
||||
user_id,
|
||||
device["device_id"],
|
||||
stream_id,
|
||||
)
|
||||
|
||||
yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
|
||||
device_ids = [device["device_id"] for device in devices]
|
||||
yield self.device_handler.notify_device_update(user_id, device_ids)
|
||||
|
||||
# We clobber the seen updates since we've re-synced from a given
|
||||
# point.
|
||||
self._seen_updates[user_id] = set([stream_id])
|
||||
|
||||
defer.returnValue(result)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue