Convert E2E key and room key handlers to async/await. (#7851)

This commit is contained in:
Patrick Cloke 2020-07-15 08:48:58 -04:00 committed by GitHub
parent 111e70d75c
commit b11450dedc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 522 additions and 362 deletions

View file

@ -16,8 +16,6 @@
import logging
from twisted.internet import defer
from synapse.api.errors import (
Codes,
NotFoundError,
@ -50,8 +48,7 @@ class E2eRoomKeysHandler(object):
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@trace
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
async def get_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
room, or a given session.
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
@ -71,17 +68,17 @@ class E2eRoomKeysHandler(object):
# we deliberately take the lock to get keys so that changing the version
# works atomically
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
# make sure the backup version exists
try:
yield self.store.get_e2e_room_keys_version_info(user_id, version)
await self.store.get_e2e_room_keys_version_info(user_id, version)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
else:
raise
results = yield self.store.get_e2e_room_keys(
results = await self.store.get_e2e_room_keys(
user_id, version, room_id, session_id
)
@ -89,8 +86,7 @@ class E2eRoomKeysHandler(object):
return results
@trace
@defer.inlineCallbacks
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
async def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
room or a given session.
See EndToEndRoomKeyStore.delete_e2e_room_keys for full details.
@ -109,10 +105,10 @@ class E2eRoomKeysHandler(object):
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
# make sure the backup version exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
version_info = await self.store.get_e2e_room_keys_version_info(
user_id, version
)
except StoreError as e:
@ -121,19 +117,18 @@ class E2eRoomKeysHandler(object):
else:
raise
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
await self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
version_etag = version_info["etag"] + 1
yield self.store.update_e2e_room_keys_version(
await self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
count = yield self.store.count_e2e_room_keys(user_id, version)
count = await self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
async def upload_room_keys(self, user_id, version, room_keys):
"""Bulk upload a list of room keys into a given backup version, asserting
that the given version is the current backup version. room_keys are merged
into the current backup as described in RoomKeysServlet.on_PUT().
@ -169,11 +164,11 @@ class E2eRoomKeysHandler(object):
# TODO: Validate the JSON to make sure it has the right keys.
# XXX: perhaps we should use a finer grained lock here?
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
# Check that the version we're trying to upload is the current version
try:
version_info = yield self.store.get_e2e_room_keys_version_info(user_id)
version_info = await self.store.get_e2e_room_keys_version_info(user_id)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Version '%s' not found" % (version,))
@ -183,7 +178,7 @@ class E2eRoomKeysHandler(object):
if version_info["version"] != version:
# Check that the version we're trying to upload actually exists
try:
version_info = yield self.store.get_e2e_room_keys_version_info(
version_info = await self.store.get_e2e_room_keys_version_info(
user_id, version
)
# if we get this far, the version must exist
@ -198,7 +193,7 @@ class E2eRoomKeysHandler(object):
# submitted. Then compare them with the submitted keys. If the
# key is new, insert it; if the key should be updated, then update
# it; otherwise, drop it.
existing_keys = yield self.store.get_e2e_room_keys_multi(
existing_keys = await self.store.get_e2e_room_keys_multi(
user_id, version, room_keys["rooms"]
)
to_insert = [] # batch the inserts together
@ -227,7 +222,7 @@ class E2eRoomKeysHandler(object):
# updates are done one at a time in the DB, so send
# updates right away rather than batching them up,
# like we do with the inserts
yield self.store.update_e2e_room_key(
await self.store.update_e2e_room_key(
user_id, version, room_id, session_id, room_key
)
changed = True
@ -246,16 +241,16 @@ class E2eRoomKeysHandler(object):
changed = True
if len(to_insert):
yield self.store.add_e2e_room_keys(user_id, version, to_insert)
await self.store.add_e2e_room_keys(user_id, version, to_insert)
version_etag = version_info["etag"]
if changed:
version_etag = version_etag + 1
yield self.store.update_e2e_room_keys_version(
await self.store.update_e2e_room_keys_version(
user_id, version, None, version_etag
)
count = yield self.store.count_e2e_room_keys(user_id, version)
count = await self.store.count_e2e_room_keys(user_id, version)
return {"etag": str(version_etag), "count": count}
@staticmethod
@ -291,8 +286,7 @@ class E2eRoomKeysHandler(object):
return True
@trace
@defer.inlineCallbacks
def create_version(self, user_id, version_info):
async def create_version(self, user_id, version_info):
"""Create a new backup version. This automatically becomes the new
backup version for the user's keys; previous backups will no longer be
writeable to.
@ -313,14 +307,13 @@ class E2eRoomKeysHandler(object):
# TODO: Validate the JSON to make sure it has the right keys.
# lock everyone out until we've switched version
with (yield self._upload_linearizer.queue(user_id)):
new_version = yield self.store.create_e2e_room_keys_version(
with (await self._upload_linearizer.queue(user_id)):
new_version = await self.store.create_e2e_room_keys_version(
user_id, version_info
)
return new_version
@defer.inlineCallbacks
def get_version_info(self, user_id, version=None):
async def get_version_info(self, user_id, version=None):
"""Get the info about a given version of the user's backup
Args:
@ -339,22 +332,21 @@ class E2eRoomKeysHandler(object):
}
"""
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
try:
res = yield self.store.get_e2e_room_keys_version_info(user_id, version)
res = await self.store.get_e2e_room_keys_version_info(user_id, version)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
else:
raise
res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
res["count"] = await self.store.count_e2e_room_keys(user_id, res["version"])
res["etag"] = str(res["etag"])
return res
@trace
@defer.inlineCallbacks
def delete_version(self, user_id, version=None):
async def delete_version(self, user_id, version=None):
"""Deletes a given version of the user's e2e_room_keys backup
Args:
@ -364,9 +356,9 @@ class E2eRoomKeysHandler(object):
NotFoundError: if this backup version doesn't exist
"""
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
try:
yield self.store.delete_e2e_room_keys_version(user_id, version)
await self.store.delete_e2e_room_keys_version(user_id, version)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown backup version")
@ -374,8 +366,7 @@ class E2eRoomKeysHandler(object):
raise
@trace
@defer.inlineCallbacks
def update_version(self, user_id, version, version_info):
async def update_version(self, user_id, version, version_info):
"""Update the info about a given version of the user's backup
Args:
@ -393,9 +384,9 @@ class E2eRoomKeysHandler(object):
raise SynapseError(
400, "Version in body does not match", Codes.INVALID_PARAM
)
with (yield self._upload_linearizer.queue(user_id)):
with (await self._upload_linearizer.queue(user_id)):
try:
old_info = yield self.store.get_e2e_room_keys_version_info(
old_info = await self.store.get_e2e_room_keys_version_info(
user_id, version
)
except StoreError as e:
@ -406,7 +397,7 @@ class E2eRoomKeysHandler(object):
if old_info["algorithm"] != version_info["algorithm"]:
raise SynapseError(400, "Algorithm does not match", Codes.INVALID_PARAM)
yield self.store.update_e2e_room_keys_version(
await self.store.update_e2e_room_keys_version(
user_id, version, version_info
)