make it work and fix pep8

This commit is contained in:
Matthew Hodgson 2017-12-05 21:44:25 +00:00 committed by Hubert Chathi
parent 0bc4627a73
commit 6b8c07abc2
7 changed files with 133 additions and 81 deletions

View File

@ -13,15 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import ujson as json
import logging
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException
from synapse.api.errors import StoreError
from synapse.util.async import Linearizer
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@ -29,11 +26,13 @@ logger = logging.getLogger(__name__)
class E2eRoomKeysHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
self._upload_linearizer = async.Linearizer("upload_room_keys_lock")
self._upload_linearizer = Linearizer("upload_room_keys_lock")
@defer.inlineCallbacks
def get_room_keys(self, user_id, version, room_id, session_id):
results = yield self.store.get_e2e_room_keys(user_id, version, room_id, session_id)
results = yield self.store.get_e2e_room_keys(
user_id, version, room_id, session_id
)
defer.returnValue(results)
@defer.inlineCallbacks
@ -46,31 +45,49 @@ class E2eRoomKeysHandler(object):
# TODO: Validate the JSON to make sure it has the right keys.
# XXX: perhaps we should use a finer grained lock here?
with (yield self._upload_linearizer.queue(user_id):
with (yield self._upload_linearizer.queue(user_id)):
# go through the room_keys
for room_id in room_keys['rooms']:
for session_id in room_keys['rooms'][room_id]['sessions']:
room_key = room_keys['rooms'][room_id]['sessions'][session_id]
yield self._upload_room_key(
user_id, version, room_id, session_id, room_key
)
@defer.inlineCallbacks
def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
# get the room_key for this particular row
current_room_key = None
try:
current_room_key = yield self.store.get_e2e_room_key(
user_id, version, room_id, session_id
)
except StoreError as e:
if e.code == 404:
pass
else:
raise
# check whether we merge or not. spelling it out with if/elifs rather than
# lots of booleans for legibility.
replace = False
# check whether we merge or not. spelling it out with if/elifs rather
# than lots of booleans for legibility.
upsert = True
if current_room_key:
if room_key['is_verified'] and not current_room_key['is_verified']:
replace = True
elif room_key['first_message_index'] < current_room_key['first_message_index']:
replace = True
pass
elif (
room_key['first_message_index'] <
current_room_key['first_message_index']
):
pass
elif room_key['forwarded_count'] < room_key['forwarded_count']:
replace = True
pass
else:
upsert = False
# if so, we set the new room_key
if replace:
if upsert:
yield self.store.set_e2e_room_key(
user_id, version, room_id, session_id, room_key
)

View File

@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import (
receipts,
register,
report_event,
room_keys,
sendtodevice,
sync,
tags,
@ -102,6 +103,7 @@ class ClientRestResource(JsonResource):
auth.register_servlets(hs, client_resource)
receipts.register_servlets(hs, client_resource)
read_marker.register_servlets(hs, client_resource)
room_keys.register_servlets(hs, client_resource)
keys.register_servlets(hs, client_resource)
tokenrefresh.register_servlets(hs, client_resource)
tags.register_servlets(hs, client_resource)

View File

@ -17,26 +17,25 @@ import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, parse_integer
RestServlet, parse_json_object_from_request
)
from synapse.http.servlet import parse_string
from synapse.types import StreamToken
from ._base import client_v2_patterns
logger = logging.getLogger(__name__)
class RoomKeysServlet(RestServlet):
PATTERNS = client_v2_patterns("/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$")
PATTERNS = client_v2_patterns(
"/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$"
)
def __init__(self, hs):
"""
Args:
hs (synapse.server.HomeServer): server
"""
super(RoomKeysUploadServlet, self).__init__()
super(RoomKeysServlet, self).__init__()
self.auth = hs.get_auth()
self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler()
@ -45,24 +44,32 @@ class RoomKeysServlet(RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=False)
user_id = requester.user.to_string()
body = parse_json_object_from_request(request)
version = request.args.get("version", None)
version = request.args.get("version")[0]
if session_id:
body = { "sessions": { session_id : body } }
body = {
"sessions": {
session_id: body
}
}
if room_id:
body = { "rooms": { room_id : body } }
body = {
"rooms": {
room_id: body
}
}
result = yield self.e2e_room_keys_handler.upload_room_keys(
yield self.e2e_room_keys_handler.upload_room_keys(
user_id, version, body
)
defer.returnValue((200, result))
defer.returnValue((200, {}))
@defer.inlineCallbacks
def on_GET(self, request, room_id, session_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=False)
user_id = requester.user.to_string()
version = request.args.get("version", None)
version = request.args.get("version")[0]
room_keys = yield self.e2e_room_keys_handler.get_room_keys(
user_id, version, room_id, session_id
@ -73,7 +80,7 @@ class RoomKeysServlet(RestServlet):
def on_DELETE(self, request, room_id, session_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=False)
user_id = requester.user.to_string()
version = request.args.get("version", None)
version = request.args.get("version")[0]
yield self.e2e_room_keys_handler.delete_room_keys(
user_id, version, room_id, session_id

View File

@ -49,6 +49,7 @@ from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.device import DeviceHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
@ -127,6 +128,7 @@ class HomeServer(object):
'auth_handler',
'device_handler',
'e2e_keys_handler',
'e2e_room_keys_handler',
'event_handler',
'event_stream_handler',
'initial_sync_handler',
@ -288,6 +290,9 @@ class HomeServer(object):
def build_e2e_keys_handler(self):
return E2eKeysHandler(self)
def build_e2e_room_keys_handler(self):
return E2eRoomKeysHandler(self)
def build_application_service_api(self):
return ApplicationServiceApi(self)

View File

@ -30,6 +30,7 @@ from .appservice import ApplicationServiceStore, ApplicationServiceTransactionSt
from .client_ips import ClientIpStore
from .deviceinbox import DeviceInboxStore
from .directory import DirectoryStore
from .e2e_room_keys import EndToEndRoomKeyStore
from .end_to_end_keys import EndToEndKeyStore
from .engines import PostgresEngine
from .event_federation import EventFederationStore
@ -76,6 +77,7 @@ class DataStore(RoomMemberStore, RoomStore,
ApplicationServiceTransactionStore,
ReceiptsStore,
EndToEndKeyStore,
EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,

View File

@ -15,11 +15,6 @@
from twisted.internet import defer
from synapse.util.caches.descriptors import cached
from canonicaljson import encode_canonical_json
import ujson as json
from ._base import SQLBaseStore
@ -45,29 +40,27 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="get_e2e_room_key",
)
defer.returnValue(row);
defer.returnValue(row)
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
def _set_e2e_room_key_txn(txn):
self._simple_upsert(
self._simple_upsert_txn(
txn,
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
"room_id": room_id,
"session_id": session_id,
}
values=[
{
},
values={
"version": version,
"first_message_index": room_key['first_message_index'],
"forwarded_count": room_key['forwarded_count'],
"is_verified": room_key['is_verified'],
"session_data": room_key['session_data'],
}
],
},
lock=False,
)
@ -77,7 +70,6 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"set_e2e_room_key", _set_e2e_room_key_txn
)
# XXX: this isn't currently used and isn't tested anywhere
# it could be used in future for bulk-uploading new versions of room_keys
# for a user or something though.
@ -85,23 +77,27 @@ class EndToEndRoomKeyStore(SQLBaseStore):
def _set_e2e_room_keys_txn(txn):
self._simple_insert_many_txn(
txn,
table="e2e_room_keys",
values=[
values = []
for room_id in room_keys['rooms']:
for session_id in room_keys['rooms'][room_id]['sessions']:
session = room_keys['rooms'][room_id]['sessions'][session_id]
values.append(
{
"user_id": user_id,
"room_id": room_id,
"session_id": session_id,
"version": version,
"first_message_index": room_keys['rooms'][room_id]['sessions'][session_id]['first_message_index'],
"forwarded_count": room_keys['rooms'][room_id]['sessions'][session_id]['forwarded_count'],
"is_verified": room_keys['rooms'][room_id]['sessions'][session_id]['is_verified'],
"session_data": room_keys['rooms'][room_id]['sessions'][session_id]['session_data'],
"first_message_index": session['first_message_index'],
"forwarded_count": session['forwarded_count'],
"is_verified": session['is_verified'],
"session_data": session['session_data'],
}
for session_id in room_keys['rooms'][room_id]['sessions']
for room_id in room_keys['rooms']
]
)
self._simple_insert_many_txn(
txn,
table="e2e_room_keys",
values=values
)
return True
@ -117,13 +113,18 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"user_id": user_id,
"version": version,
}
if room_id: keyvalues['room_id'] = room_id
if session_id: keyvalues['session_id'] = session_id
if room_id:
keyvalues['room_id'] = room_id
if session_id:
keyvalues['session_id'] = session_id
rows = yield self._simple_select_list(
table="e2e_room_keys",
keyvalues=keyvalues,
retcols=(
"user_id",
"room_id",
"session_id",
"first_message_index",
"forwarded_count",
"is_verified",
@ -132,9 +133,25 @@ class EndToEndRoomKeyStore(SQLBaseStore):
desc="get_e2e_room_keys",
)
sessions = {}
sessions['rooms'][roomId]['sessions'][session_id] = row for row in rows;
defer.returnValue(sessions);
# perlesque autovivification from https://stackoverflow.com/a/19829714/6764493
class AutoVivification(dict):
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
sessions = AutoVivification()
for row in rows:
sessions['rooms'][row['room_id']]['sessions'][row['session_id']] = {
"first_message_index": row["first_message_index"],
"forwarded_count": row["forwarded_count"],
"is_verified": row["is_verified"],
"session_data": row["session_data"],
}
defer.returnValue(sessions)
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id, session_id):
@ -143,8 +160,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"user_id": user_id,
"version": version,
}
if room_id: keyvalues['room_id'] = room_id
if session_id: keyvalues['session_id'] = session_id
if room_id:
keyvalues['room_id'] = room_id
if session_id:
keyvalues['session_id'] = session_id
yield self._simple_delete(
table="e2e_room_keys",

View File

@ -29,7 +29,7 @@ CREATE UNIQUE INDEX e2e_room_keys_user_idx ON e2e_room_keys(user_id);
CREATE UNIQUE INDEX e2e_room_keys_room_idx ON e2e_room_keys(room_id);
CREATE UNIQUE INDEX e2e_room_keys_session_idx ON e2e_room_keys(session_id);
-- the versioning metadata about versions of users' encrypted e2e session backups
-- the metadata for each generation of encrypted e2e session backups
CREATE TABLE e2e_room_key_versions (
user_id TEXT NOT NULL,
version INT NOT NULL,