Merge remote-tracking branch 'upstream/release-v1.29.0'

This commit is contained in:
Tulir Asokan 2021-03-04 12:49:37 +02:00
commit adb990d8ba
126 changed files with 2399 additions and 765 deletions

View file

@ -49,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import Collection
# python 3 does not have a maximum int value
@ -381,7 +380,10 @@ class DatabasePool:
_TXN_ID = 0
def __init__(
self, hs, database_config: DatabaseConnectionConfig, engine: BaseDatabaseEngine
self,
hs,
database_config: DatabaseConnectionConfig,
engine: BaseDatabaseEngine,
):
self.hs = hs
self._clock = hs.get_clock()
@ -420,16 +422,6 @@ class DatabasePool:
self._check_safe_to_upsert,
)
# We define this sequence here so that it can be referenced from both
# the DataStore and PersistEventStore.
def get_chain_id_txn(txn):
txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
return txn.fetchone()[0]
self.event_chain_id_gen = build_sequence_generator(
engine, get_chain_id_txn, "event_auth_chain_id"
)
def is_running(self) -> bool:
"""Is the database pool currently running"""
return self._db_pool.running

View file

@ -79,7 +79,7 @@ class Databases:
# If we're on a process that can persist events also
# instantiate a `PersistEventsStore`
if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main)
persist_events = PersistEventsStore(hs, database, main, db_conn)
if "state" in database_config.databases:
logger.info(

View file

@ -16,7 +16,7 @@
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Tuple
from typing import List, Optional, Tuple
from synapse.api.constants import PresenceState
from synapse.config.homeserver import HomeServerConfig
@ -27,7 +27,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
from synapse.types import get_domain_from_id
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from .account_data import AccountDataStore
@ -264,7 +264,7 @@ class DataStore(
return [UserPresenceState(**row) for row in rows]
async def get_users(self) -> List[Dict[str, Any]]:
async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
Returns:
@ -292,7 +292,7 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
) -> Tuple[List[Dict[str, Any]], int]:
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
total number of users matching the filter criteria.
@ -353,7 +353,7 @@ class DataStore(
"get_users_paginate_txn", get_users_paginate_txn
)
async def search_users(self, term: str) -> Optional[List[Dict[str, Any]]]:
async def search_users(self, term: str) -> Optional[List[JsonDict]]:
"""Function to search users list for one or more users with
the matched term.

View file

@ -42,7 +42,9 @@ from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter, sorted_topologically
@ -90,7 +92,11 @@ class PersistEventsStore:
"""
def __init__(
self, hs: "HomeServer", db: DatabasePool, main_data_store: "DataStore"
self,
hs: "HomeServer",
db: DatabasePool,
main_data_store: "DataStore",
db_conn: Connection,
):
self.hs = hs
self.db_pool = db
@ -474,6 +480,7 @@ class PersistEventsStore:
self._add_chain_cover_index(
txn,
self.db_pool,
self.store.event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,
@ -484,6 +491,7 @@ class PersistEventsStore:
cls,
txn,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
@ -630,6 +638,7 @@ class PersistEventsStore:
new_chain_tuples = cls._allocate_chain_ids(
txn,
db_pool,
event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,
@ -768,6 +777,7 @@ class PersistEventsStore:
def _allocate_chain_ids(
txn,
db_pool: DatabasePool,
event_chain_id_gen: SequenceGenerator,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
@ -880,7 +890,7 @@ class PersistEventsStore:
chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]
# Generate new chain IDs for all unallocated chain IDs.
newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
newly_allocated_chain_ids = event_chain_id_gen.get_next_mult_txn(
txn, len(unallocated_chain_ids)
)

View file

@ -696,7 +696,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
if not has_event_auth:
for auth_id in event.auth_event_ids():
# Old, dodgy, events may have duplicate auth events, which we
# need to deduplicate as we have a unique constraint.
for auth_id in set(event.auth_event_ids()):
auth_events.append(
{
"room_id": event.room_id,
@ -917,6 +919,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
PersistEventsStore._add_chain_cover_index(
txn,
self.db_pool,
self.event_chain_id_gen,
event_to_room_id,
event_to_types,
event_to_auth_chain,

View file

@ -45,6 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
from synapse.storage.database import DatabasePool
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import Collection, JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@ -156,6 +157,21 @@ class EventsWorkerStore(SQLBaseStore):
self._event_fetch_list = []
self._event_fetch_ongoing = 0
# We define this sequence here so that it can be referenced from both
# the DataStore and PersistEventStore.
def get_chain_id_txn(txn):
txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
return txn.fetchone()[0]
self.event_chain_id_gen = build_sequence_generator(
db_conn,
database.engine,
get_chain_id_txn,
"event_auth_chain_id",
table="event_auth_chains",
id_column="chain_id",
)
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(instance_name, token)

View file

@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional, Tuple
from synapse.storage._base import SQLBaseStore
@ -23,6 +24,22 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = (
)
class MediaSortOrder(Enum):
"""
Enum to define the sorting method used when returning media with
get_local_media_by_user_paginate
"""
MEDIA_ID = "media_id"
UPLOAD_NAME = "upload_name"
CREATED_TS = "created_ts"
LAST_ACCESS_TS = "last_access_ts"
MEDIA_LENGTH = "media_length"
MEDIA_TYPE = "media_type"
QUARANTINED_BY = "quarantined_by"
SAFE_FROM_QUARANTINE = "safe_from_quarantine"
class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@ -118,7 +135,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
)
async def get_local_media_by_user_paginate(
self, start: int, limit: int, user_id: str
self,
start: int,
limit: int,
user_id: str,
order_by: str = MediaSortOrder.CREATED_TS.value,
direction: str = "f",
) -> Tuple[List[Dict[str, Any]], int]:
"""Get a paginated list of metadata for a local piece of media
which an user_id has uploaded
@ -127,6 +149,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
start: offset in the list
limit: maximum amount of media_ids to retrieve
user_id: fully-qualified user id
order_by: the sort order of the returned list
direction: sort ascending or descending
Returns:
A paginated list of all metadata of user's media,
plus the total count of all the user's media
@ -134,6 +158,14 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def get_local_media_by_user_paginate_txn(txn):
# Set ordering
order_by_column = MediaSortOrder(order_by).value
if direction == "b":
order = "DESC"
else:
order = "ASC"
args = [user_id]
sql = """
SELECT COUNT(*) as total_media
@ -155,9 +187,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
"safe_from_quarantine"
FROM local_media_repository
WHERE user_id = ?
ORDER BY created_ts DESC, media_id DESC
ORDER BY {order_by_column} {order}, media_id ASC
LIMIT ? OFFSET ?
"""
""".format(
order_by_column=order_by_column,
order=order,
)
args += [limit, start]
txn.execute(sql, args)
@ -344,16 +379,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
thumbnail_method,
thumbnail_length,
):
await self.db_pool.simple_insert(
"local_media_repository_thumbnails",
{
await self.db_pool.simple_upsert(
table="local_media_repository_thumbnails",
keyvalues={
"media_id": media_id,
"thumbnail_width": thumbnail_width,
"thumbnail_height": thumbnail_height,
"thumbnail_method": thumbnail_method,
"thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length,
},
values={"thumbnail_length": thumbnail_length},
desc="store_local_thumbnail",
)
@ -498,18 +533,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
thumbnail_method,
thumbnail_length,
):
await self.db_pool.simple_insert(
"remote_media_cache_thumbnails",
{
await self.db_pool.simple_upsert(
table="remote_media_cache_thumbnails",
keyvalues={
"media_origin": origin,
"media_id": media_id,
"thumbnail_width": thumbnail_width,
"thumbnail_height": thumbnail_height,
"thumbnail_method": thumbnail_method,
"thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length,
"filesystem_id": filesystem_id,
},
values={"thumbnail_length": thumbnail_length},
insertion_values={"filesystem_id": filesystem_id},
desc="store_remote_media_thumbnail",
)

View file

@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
"""Deletes room history before a certain point
"""Deletes room history before a certain point.
Note that only a single purge can occur at once, this is guaranteed via
a higher level (in the PaginationHandler).
Args:
room_id:
@ -52,7 +55,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
delete_local_events,
)
def _purge_history_txn(self, txn, room_id, token, delete_local_events):
def _purge_history_txn(
self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
) -> Set[int]:
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@ -103,7 +108,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
# having any backwards extremities)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)
@ -154,7 +159,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
# We calculate the new entries for the backward extremities by finding
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
@ -296,7 +301,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"purge_room", self._purge_room_txn, room_id
)
def _purge_room_txn(self, txn, room_id):
def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
@ -310,6 +315,31 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
state_groups = [row[0] for row in txn]
# Get all the auth chains that are referenced by events that are to be
# deleted.
txn.execute(
"""
SELECT chain_id, sequence_number FROM events
LEFT JOIN event_auth_chains USING (event_id)
WHERE room_id = ?
""",
(room_id,),
)
referenced_chain_id_tuples = list(txn)
logger.info("[purge] removing events from event_auth_chain_links")
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
(origin_chain_id = ? AND origin_sequence_number = ?) OR
(target_chain_id = ? AND target_sequence_number = ?)
""",
(
(chain_id, seq_num, chain_id, seq_num)
for (chain_id, seq_num) in referenced_chain_id_tuples
),
)
# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
@ -319,6 +349,8 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
"event_auth_chains",
"event_auth_chain_to_calculate",
"redactions",
"rejections",
"state_events",

View file

@ -39,6 +39,16 @@ class PusherWorkerStore(SQLBaseStore):
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
self.db_pool.updates.register_background_update_handler(
"remove_deactivated_pushers",
self._remove_deactivated_pushers,
)
self.db_pool.updates.register_background_update_handler(
"remove_stale_pushers",
self._remove_stale_pushers,
)
def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
"""JSON-decode the data in the rows returned from the `pushers` table
@ -284,6 +294,101 @@ class PusherWorkerStore(SQLBaseStore):
lock=False,
)
async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
"""A background update that deletes all pushers for deactivated users.
Note that we don't proacively tell the pusherpool that we've deleted
these (just because its a bit off a faff to do from here), but they will
get cleaned up at the next restart
"""
last_user = progress.get("last_user", "")
def _delete_pushers(txn) -> int:
sql = """
SELECT name FROM users
WHERE deactivated = ? and name > ?
ORDER BY name ASC
LIMIT ?
"""
txn.execute(sql, (1, last_user, batch_size))
users = [row[0] for row in txn]
self.db_pool.simple_delete_many_txn(
txn,
table="pushers",
column="user_name",
iterable=users,
keyvalues={},
)
if users:
self.db_pool.updates._background_update_progress_txn(
txn, "remove_deactivated_pushers", {"last_user": users[-1]}
)
return len(users)
number_deleted = await self.db_pool.runInteraction(
"_remove_deactivated_pushers", _delete_pushers
)
if number_deleted < batch_size:
await self.db_pool.updates._end_background_update(
"remove_deactivated_pushers"
)
return number_deleted
async def _remove_stale_pushers(self, progress: dict, batch_size: int) -> int:
"""A background update that deletes all pushers for logged out devices.
Note that we don't proacively tell the pusherpool that we've deleted
these (just because its a bit off a faff to do from here), but they will
get cleaned up at the next restart
"""
last_pusher = progress.get("last_pusher", 0)
def _delete_pushers(txn) -> int:
sql = """
SELECT p.id, access_token FROM pushers AS p
LEFT JOIN access_tokens AS a ON (p.access_token = a.id)
WHERE p.id > ?
ORDER BY p.id ASC
LIMIT ?
"""
txn.execute(sql, (last_pusher, batch_size))
pushers = [(row[0], row[1]) for row in txn]
self.db_pool.simple_delete_many_txn(
txn,
table="pushers",
column="id",
iterable=(pusher_id for pusher_id, token in pushers if token is None),
keyvalues={},
)
if pushers:
self.db_pool.updates._background_update_progress_txn(
txn, "remove_stale_pushers", {"last_pusher": pushers[-1][0]}
)
return len(pushers)
number_deleted = await self.db_pool.runInteraction(
"_remove_stale_pushers", _delete_pushers
)
if number_deleted < batch_size:
await self.db_pool.updates._end_background_update("remove_stale_pushers")
return number_deleted
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self) -> int:
@ -373,3 +478,46 @@ class PusherStore(PusherWorkerStore):
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
async def delete_all_pushers_for_user(self, user_id: str) -> None:
"""Delete all pushers associated with an account."""
# We want to generate a row in `deleted_pushers` for each pusher we're
# deleting, so we fetch the list now so we can generate the appropriate
# number of stream IDs.
#
# Note: technically there could be a race here between adding/deleting
# pushers, but a) the worst case if we don't stop a pusher until the
# next restart and b) this is only called when we're deactivating an
# account.
pushers = list(await self.get_pushers_by_user_id(user_id))
def delete_pushers_txn(txn, stream_ids):
self._invalidate_cache_and_stream( # type: ignore
txn, self.get_if_user_has_pusher, (user_id,)
)
self.db_pool.simple_delete_txn(
txn,
table="pushers",
keyvalues={"user_name": user_id},
)
self.db_pool.simple_insert_many_txn(
txn,
table="deleted_pushers",
values=[
{
"stream_id": stream_id,
"app_id": pusher.app_id,
"pushkey": pusher.pushkey,
"user_id": user_id,
}
for stream_id, pusher in zip(stream_ids, pushers)
],
)
async with self._pushers_id_gen.get_next_mult(len(pushers)) as stream_ids:
await self.db_pool.runInteraction(
"delete_all_pushers_for_user", delete_pushers_txn, stream_ids
)

View file

@ -23,7 +23,7 @@ import attr
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import DatabasePool
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.types import Connection, Cursor
@ -70,7 +70,12 @@ class TokenLookupResult:
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.config = hs.config
@ -79,9 +84,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
# call `find_max_generated_user_id_localpart` each time, which is
# expensive if there are many entries.
self._user_id_seq = build_sequence_generator(
db_conn,
database.engine,
find_max_generated_user_id_localpart,
"user_id_seq",
table=None,
id_column=None,
)
self._account_validity = hs.config.account_validity
@ -1036,7 +1044,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._clock = hs.get_clock()

View file

@ -1,4 +1,4 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,5 +13,8 @@
* limitations under the License.
*/
-- We may not have deleted all pushers for deactivated accounts, so we set up a
-- background job to delete them.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5828, 'rejected_events_metadata', '{}');
(5908, 'remove_deactivated_pushers', '{}');

View file

@ -0,0 +1,20 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Delete all pushers associated with deleted devices. This is to clear up after
-- a bug where they weren't correctly deleted when using workers.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5908, 'remove_stale_pushers', '{}');

View file

@ -0,0 +1,26 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This originally was in 58/, but landed after 59/ was created, and so some
-- servers running develop didn't run this delta. Running it again should be
-- safe.
--
-- We first delete any in progress `rejected_events_metadata` background update,
-- to ensure that we don't conflict when trying to insert the new one. (We could
-- alternatively do an ON CONFLICT DO NOTHING, but that syntax isn't supported
-- by older SQLite versions. Plus, this should be a rare case).
DELETE FROM background_updates WHERE update_name = 'rejected_events_metadata';
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5828, 'rejected_events_metadata', '{}');

View file

@ -497,8 +497,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
async def add_users_in_public_rooms(
self, room_id: str, user_ids: Iterable[str]
) -> None:
"""Insert entries into the users_who_share_private_rooms table. The first
user should be a local user.
"""Insert entries into the users_in_public_rooms table.
Args:
room_id
@ -556,6 +555,11 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._prefer_local_users_in_search = (
hs.config.user_directory_search_prefer_local_users
)
self._server_name = hs.config.server_name
async def remove_from_user_dir(self, user_id: str) -> None:
def _remove_from_user_dir_txn(txn):
self.db_pool.simple_delete_txn(
@ -665,7 +669,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
users.update(rows)
return list(users)
@cached()
async def get_shared_rooms_for_users(
self, user_id: str, other_user_id: str
) -> Set[str]:
@ -754,9 +757,24 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
)
"""
# We allow manipulating the ranking algorithm by injecting statements
# based on config options.
additional_ordering_statements = []
ordering_arguments = ()
if isinstance(self.database_engine, PostgresEngine):
full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
# If enabled, this config option will rank local users higher than those on
# remote instances.
if self._prefer_local_users_in_search:
# This statement checks whether a given user's user ID contains a server name
# that matches the local server
statement = "* (CASE WHEN user_id LIKE ? THEN 2.0 ELSE 1.0 END)"
additional_ordering_statements.append(statement)
ordering_arguments += ("%:" + self._server_name,)
# We order by rank and then if they have profile info
# The ranking algorithm is hand tweaked for "best" results. Broadly
# the idea is we give a higher weight to exact matches.
@ -767,7 +785,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
FROM user_directory_search as t
INNER JOIN user_directory AS d USING (user_id)
WHERE
%s
%(where_clause)s
AND vector @@ to_tsquery('simple', ?)
ORDER BY
(CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
@ -787,33 +805,54 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
8
)
)
%(order_case_statements)s
DESC,
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
""" % (
where_clause,
""" % {
"where_clause": where_clause,
"order_case_statements": " ".join(additional_ordering_statements),
}
args = (
join_args
+ (full_query, exact_query, prefix_query)
+ ordering_arguments
+ (limit + 1,)
)
args = join_args + (full_query, exact_query, prefix_query, limit + 1)
elif isinstance(self.database_engine, Sqlite3Engine):
search_query = _parse_query_sqlite(search_term)
# If enabled, this config option will rank local users higher than those on
# remote instances.
if self._prefer_local_users_in_search:
# This statement checks whether a given user's user ID contains a server name
# that matches the local server
#
# Note that we need to include a comma at the end for valid SQL
statement = "user_id LIKE ? DESC,"
additional_ordering_statements.append(statement)
ordering_arguments += ("%:" + self._server_name,)
sql = """
SELECT d.user_id AS user_id, display_name, avatar_url
FROM user_directory_search as t
INNER JOIN user_directory AS d USING (user_id)
WHERE
%s
%(where_clause)s
AND value MATCH ?
ORDER BY
rank(matchinfo(user_directory_search)) DESC,
%(order_statements)s
display_name IS NULL,
avatar_url IS NULL
LIMIT ?
""" % (
where_clause,
)
args = join_args + (search_query, limit + 1)
""" % {
"where_clause": where_clause,
"order_statements": " ".join(additional_ordering_statements),
}
args = join_args + (search_query,) + ordering_arguments + (limit + 1,)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")

View file

@ -97,10 +97,12 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
return txn.fetchone()[0]
self._state_group_seq_gen = build_sequence_generator(
self.database_engine, get_max_state_group_txn, "state_group_id_seq"
)
self._state_group_seq_gen.check_consistency(
db_conn, table="state_groups", id_column="id"
db_conn,
self.database_engine,
get_max_state_group_txn,
"state_group_id_seq",
table="state_groups",
id_column="id",
)
@cached(max_entries=10000, iterable=True)

View file

@ -73,9 +73,6 @@ class PurgeEventsStorage:
Returns:
The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
@ -111,8 +108,6 @@ class PurgeEventsStorage:
next_to_search |= prevs
state_groups_seen |= prevs
graph.update(edges)
to_delete = state_groups_seen - referenced_groups
return to_delete

View file

@ -25,7 +25,7 @@ RoomsForUser = namedtuple(
)
GetRoomsForUserWithStreamOrdering = namedtuple(
"_GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
"GetRoomsForUserWithStreamOrdering", ("room_id", "event_pos")
)

View file

@ -251,9 +251,14 @@ class LocalSequenceGenerator(SequenceGenerator):
def build_sequence_generator(
db_conn: "LoggingDatabaseConnection",
database_engine: BaseDatabaseEngine,
get_first_callback: GetFirstCallbackType,
sequence_name: str,
table: Optional[str],
id_column: Optional[str],
stream_name: Optional[str] = None,
positive: bool = True,
) -> SequenceGenerator:
"""Get the best impl of SequenceGenerator available
@ -265,8 +270,23 @@ def build_sequence_generator(
get_first_callback: a callback which gets the next sequence ID. Used if
we're on sqlite.
sequence_name: the name of a postgres sequence to use.
table, id_column, stream_name, positive: If set then `check_consistency`
is called on the created sequence. See docstring for
`check_consistency` details.
"""
if isinstance(database_engine, PostgresEngine):
return PostgresSequenceGenerator(sequence_name)
seq = PostgresSequenceGenerator(sequence_name) # type: SequenceGenerator
else:
return LocalSequenceGenerator(get_first_callback)
seq = LocalSequenceGenerator(get_first_callback)
if table:
assert id_column
seq.check_consistency(
db_conn=db_conn,
table=table,
id_column=id_column,
stream_name=stream_name,
positive=positive,
)
return seq