mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-08 17:32:13 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_out_persistence_store
This commit is contained in:
commit
ec6de1cc7d
146 changed files with 1245 additions and 722 deletions
|
@ -94,13 +94,16 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||
self._all_done = False
|
||||
|
||||
def start_doing_background_updates(self):
|
||||
run_as_background_process("background_updates", self._run_background_updates)
|
||||
run_as_background_process("background_updates", self.run_background_updates)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _run_background_updates(self):
|
||||
def run_background_updates(self, sleep=True):
|
||||
logger.info("Starting background schema updates")
|
||||
while True:
|
||||
yield self.hs.get_clock().sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
|
||||
if sleep:
|
||||
yield self.hs.get_clock().sleep(
|
||||
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0
|
||||
)
|
||||
|
||||
try:
|
||||
result = yield self.do_next_background_update(
|
||||
|
|
|
@ -321,9 +321,17 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
def _delete_e2e_room_keys_version_txn(txn):
|
||||
if version is None:
|
||||
this_version = self._get_current_version(txn, user_id)
|
||||
if this_version is None:
|
||||
raise StoreError(404, "No current backup version")
|
||||
else:
|
||||
this_version = version
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="e2e_room_keys",
|
||||
keyvalues={"user_id": user_id, "version": this_version},
|
||||
)
|
||||
|
||||
return self._simple_update_one_txn(
|
||||
txn,
|
||||
table="e2e_room_keys_versions",
|
||||
|
|
|
@ -248,6 +248,73 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
|
||||
return self.runInteraction("count_e2e_one_time_keys", _count_e2e_one_time_keys)
|
||||
|
||||
def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None):
|
||||
"""Returns a user's cross-signing key.
|
||||
|
||||
Args:
|
||||
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||
user_id (str): the user whose key is being requested
|
||||
key_type (str): the type of key that is being set: either 'master'
|
||||
for a master key, 'self_signing' for a self-signing key, or
|
||||
'user_signing' for a user-signing key
|
||||
from_user_id (str): if specified, signatures made by this user on
|
||||
the key will be included in the result
|
||||
|
||||
Returns:
|
||||
dict of the key data or None if not found
|
||||
"""
|
||||
sql = (
|
||||
"SELECT keydata "
|
||||
" FROM e2e_cross_signing_keys "
|
||||
" WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1"
|
||||
)
|
||||
txn.execute(sql, (user_id, key_type))
|
||||
row = txn.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
key = json.loads(row[0])
|
||||
|
||||
device_id = None
|
||||
for k in key["keys"].values():
|
||||
device_id = k
|
||||
|
||||
if from_user_id is not None:
|
||||
sql = (
|
||||
"SELECT key_id, signature "
|
||||
" FROM e2e_cross_signing_signatures "
|
||||
" WHERE user_id = ? "
|
||||
" AND target_user_id = ? "
|
||||
" AND target_device_id = ? "
|
||||
)
|
||||
txn.execute(sql, (from_user_id, user_id, device_id))
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
key.setdefault("signatures", {}).setdefault(from_user_id, {})[
|
||||
row[0]
|
||||
] = row[1]
|
||||
|
||||
return key
|
||||
|
||||
def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
|
||||
"""Returns a user's cross-signing key.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose self-signing key is being requested
|
||||
key_type (str): the type of cross-signing key to get
|
||||
from_user_id (str): if specified, signatures made by this user on
|
||||
the self-signing key will be included in the result
|
||||
|
||||
Returns:
|
||||
dict of the key data or None if not found
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_e2e_cross_signing_key",
|
||||
self._get_e2e_cross_signing_key_txn,
|
||||
user_id,
|
||||
key_type,
|
||||
from_user_id,
|
||||
)
|
||||
|
||||
|
||||
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
||||
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
|
||||
|
@ -426,73 +493,6 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
key,
|
||||
)
|
||||
|
||||
def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None):
|
||||
"""Returns a user's cross-signing key.
|
||||
|
||||
Args:
|
||||
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||
user_id (str): the user whose key is being requested
|
||||
key_type (str): the type of key that is being set: either 'master'
|
||||
for a master key, 'self_signing' for a self-signing key, or
|
||||
'user_signing' for a user-signing key
|
||||
from_user_id (str): if specified, signatures made by this user on
|
||||
the key will be included in the result
|
||||
|
||||
Returns:
|
||||
dict of the key data or None if not found
|
||||
"""
|
||||
sql = (
|
||||
"SELECT keydata "
|
||||
" FROM e2e_cross_signing_keys "
|
||||
" WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1"
|
||||
)
|
||||
txn.execute(sql, (user_id, key_type))
|
||||
row = txn.fetchone()
|
||||
if not row:
|
||||
return None
|
||||
key = json.loads(row[0])
|
||||
|
||||
device_id = None
|
||||
for k in key["keys"].values():
|
||||
device_id = k
|
||||
|
||||
if from_user_id is not None:
|
||||
sql = (
|
||||
"SELECT key_id, signature "
|
||||
" FROM e2e_cross_signing_signatures "
|
||||
" WHERE user_id = ? "
|
||||
" AND target_user_id = ? "
|
||||
" AND target_device_id = ? "
|
||||
)
|
||||
txn.execute(sql, (from_user_id, user_id, device_id))
|
||||
row = txn.fetchone()
|
||||
if row:
|
||||
key.setdefault("signatures", {}).setdefault(from_user_id, {})[
|
||||
row[0]
|
||||
] = row[1]
|
||||
|
||||
return key
|
||||
|
||||
def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
|
||||
"""Returns a user's cross-signing key.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose self-signing key is being requested
|
||||
key_type (str): the type of cross-signing key to get
|
||||
from_user_id (str): if specified, signatures made by this user on
|
||||
the self-signing key will be included in the result
|
||||
|
||||
Returns:
|
||||
dict of the key data or None if not found
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_e2e_cross_signing_key",
|
||||
self._get_e2e_cross_signing_key_txn,
|
||||
user_id,
|
||||
key_type,
|
||||
from_user_id,
|
||||
)
|
||||
|
||||
def store_e2e_cross_signing_signatures(self, user_id, signatures):
|
||||
"""Stores cross-signing signatures.
|
||||
|
||||
|
|
|
@ -364,9 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||
)
|
||||
|
||||
def _get_backfill_events(self, txn, room_id, event_list, limit):
|
||||
logger.debug(
|
||||
"_get_backfill_events: %s, %s, %s", room_id, repr(event_list), limit
|
||||
)
|
||||
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
|
||||
|
||||
event_results = set()
|
||||
|
||||
|
|
|
@ -1881,12 +1881,11 @@ class EventsStore(
|
|||
|
||||
logger.info("[purge] done")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_event_after(self, event_id1, event_id2):
|
||||
async def is_event_after(self, event_id1, event_id2):
|
||||
"""Returns True if event_id1 is after event_id2 in the stream
|
||||
"""
|
||||
to_1, so_1 = yield self._get_event_ordering(event_id1)
|
||||
to_2, so_2 = yield self._get_event_ordering(event_id2)
|
||||
to_1, so_1 = await self._get_event_ordering(event_id1)
|
||||
to_2, so_2 = await self._get_event_ordering(event_id2)
|
||||
return (to_1, so_1) > (to_2, so_2)
|
||||
|
||||
@cachedInlineCallbacks(max_entries=5000)
|
||||
|
|
|
@ -201,13 +201,17 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
where_clauses.append(
|
||||
"""
|
||||
(
|
||||
name LIKE ?
|
||||
OR topic LIKE ?
|
||||
OR canonical_alias LIKE ?
|
||||
LOWER(name) LIKE ?
|
||||
OR LOWER(topic) LIKE ?
|
||||
OR LOWER(canonical_alias) LIKE ?
|
||||
)
|
||||
"""
|
||||
)
|
||||
query_args += [search_term, search_term, search_term]
|
||||
query_args += [
|
||||
search_term.lower(),
|
||||
search_term.lower(),
|
||||
search_term.lower(),
|
||||
]
|
||||
|
||||
where_clause = ""
|
||||
if where_clauses:
|
||||
|
|
|
@ -720,7 +720,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
# See bulk_get_push_rules_for_room for how we work around this.
|
||||
assert state_group is not None
|
||||
|
||||
cache = self._get_joined_hosts_cache(room_id)
|
||||
cache = yield self._get_joined_hosts_cache(room_id)
|
||||
joined_hosts = yield cache.get_destinations(state_entry)
|
||||
|
||||
return joined_hosts
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/* delete room keys that belong to deleted room key version, or to room key
|
||||
* versions that don't exist (anymore)
|
||||
*/
|
||||
DELETE FROM e2e_room_keys
|
||||
WHERE version NOT IN (
|
||||
SELECT version
|
||||
FROM e2e_room_keys_versions
|
||||
WHERE e2e_room_keys.user_id = e2e_room_keys_versions.user_id
|
||||
AND e2e_room_keys_versions.deleted = 0
|
||||
);
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import Iterable, Tuple
|
||||
|
||||
from six import iteritems, itervalues
|
||||
from six.moves import range
|
||||
|
@ -23,6 +24,8 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import NotFoundError
|
||||
from synapse.events import EventBase
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
|
||||
|
@ -1215,7 +1218,9 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
|
|||
def __init__(self, db_conn, hs):
|
||||
super(StateStore, self).__init__(db_conn, hs)
|
||||
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
def _store_event_state_mappings_txn(
|
||||
self, txn, events_and_contexts: Iterable[Tuple[EventBase, EventContext]]
|
||||
):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
|
|
|
@ -332,7 +332,7 @@ class StatsStore(StateDeltasStore):
|
|||
def _bulk_update_stats_delta_txn(txn):
|
||||
for stats_type, stats_updates in updates.items():
|
||||
for stats_id, fields in stats_updates.items():
|
||||
logger.info(
|
||||
logger.debug(
|
||||
"Updating %s stats for %s: %s", stats_type, stats_id, fields
|
||||
)
|
||||
self._update_stats_delta_txn(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue