Merge branch 'uhoreg/e2e_cross-signing_merged' into cross-signing_keys

This commit is contained in:
Hubert Chathi 2019-08-28 17:36:46 -07:00
commit e3d3fbf63f
207 changed files with 4990 additions and 1521 deletions

View file

@ -20,12 +20,14 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore, db_to_json
class EndToEndKeyWorkerStore(SQLBaseStore):
@trace
@defer.inlineCallbacks
def get_e2e_device_keys(
self, query_list, include_all_devices=False, include_deleted_devices=False
@ -42,6 +44,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
"""
set_tag("query_list", query_list)
if not query_list:
return {}
@ -59,9 +62,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
return results
@trace
def _get_e2e_device_keys_txn(
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
):
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)
query_clauses = []
query_params = []
@ -106,6 +113,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
for user_id, device_id in deleted_devices:
result.setdefault(user_id, {})[device_id] = None
log_kv(result)
return result
@defer.inlineCallbacks
@ -131,8 +139,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
desc="add_e2e_one_time_keys_check",
)
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
return result
@defer.inlineCallbacks
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
@ -148,6 +157,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
"""
def _add_e2e_one_time_keys(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("new_keys", new_keys)
# We are protected from race between lookup and insertion due to
# a unique constraint. If there is a race of two calls to
# `add_e2e_one_time_keys` then they'll conflict and we will only
@ -204,6 +216,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"""
def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)
old_key_json = self._simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
@ -217,6 +234,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False
self._simple_upsert_txn(
@ -225,7 +243,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
@ -233,6 +251,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def claim_e2e_one_time_keys(self, query_list):
"""Take a list of one time keys out of the database"""
@trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
@ -254,7 +273,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
" AND key_id = ?"
)
for user_id, device_id, algorithm, key_id in delete:
log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
@ -264,6 +289,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
def delete_e2e_keys_by_device(self, user_id, device_id):
def delete_e2e_keys_by_device_txn(txn):
log_kv(
{
"message": "Deleting keys for device",
"device_id": device_id,
"user_id": user_id,
}
)
self._simple_delete_txn(
txn,
table="e2e_device_keys_json",