Improve the performance of calculating ignored users in large rooms (#9024)

This allows for efficiently finding which users ignore a particular
user.

Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
Patrick Cloke 2021-01-07 08:03:38 -05:00 committed by GitHub
parent 1d5c021a45
commit 23d701864f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 304 additions and 34 deletions

View file

@ -16,7 +16,7 @@
import abc
import logging
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Set, Tuple
from synapse.api.constants import AccountDataTypes
from synapse.storage._base import SQLBaseStore, db_to_json
@ -24,7 +24,7 @@ from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
@ -287,23 +287,25 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn
)
@cached(num_args=2, cache_context=True, max_entries=5000)
async def is_ignored_by(
self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext
) -> bool:
ignored_account_data = await self.get_global_account_data_by_type_for_user(
AccountDataTypes.IGNORED_USER_LIST,
ignorer_user_id,
on_invalidate=cache_context.invalidate,
)
if not ignored_account_data:
return False
@cached(max_entries=5000, iterable=True)
async def ignored_by(self, user_id: str) -> Set[str]:
"""
Get users which ignore the given user.
try:
return ignored_user_id in ignored_account_data.get("ignored_users", {})
except TypeError:
# The type of the ignored_users field is invalid.
return False
Params:
user_id: The user ID which might be ignored.
Return:
The user IDs which ignore the given user.
"""
return set(
await self.db_pool.simple_select_onecol(
table="ignored_users",
keyvalues={"ignored_user_id": user_id},
retcol="ignorer_user_id",
desc="ignored_by",
)
)
class AccountDataStore(AccountDataWorkerStore):
@ -390,18 +392,14 @@ class AccountDataStore(AccountDataWorkerStore):
Returns:
The maximum stream ID.
"""
content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
await self.db_pool.simple_upsert(
desc="add_user_account_data",
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
await self.db_pool.runInteraction(
"add_user_account_data",
self._add_account_data_for_user,
next_id,
user_id,
account_data_type,
content,
)
# it's theoretically possible for the above to succeed and the
@ -424,6 +422,71 @@ class AccountDataStore(AccountDataWorkerStore):
return self._account_data_id_gen.get_current_token()
def _add_account_data_for_user(
self,
txn,
next_id: int,
user_id: str,
account_data_type: str,
content: JsonDict,
) -> None:
content_json = json_encoder.encode(content)
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
lock=False,
)
# Ignored users get denormalized into a separate table as an optimisation.
if account_data_type != AccountDataTypes.IGNORED_USER_LIST:
return
# Insert / delete to sync the list of ignored users.
previously_ignored_users = set(
self.db_pool.simple_select_onecol_txn(
txn,
table="ignored_users",
keyvalues={"ignorer_user_id": user_id},
retcol="ignored_user_id",
)
)
# If the data is invalid, no one is ignored.
ignored_users_content = content.get("ignored_users", {})
if isinstance(ignored_users_content, dict):
currently_ignored_users = set(ignored_users_content)
else:
currently_ignored_users = set()
# Delete entries which are no longer ignored.
self.db_pool.simple_delete_many_txn(
txn,
table="ignored_users",
column="ignored_user_id",
iterable=previously_ignored_users - currently_ignored_users,
keyvalues={"ignorer_user_id": user_id},
)
# Add entries which are newly ignored.
self.db_pool.simple_insert_many_txn(
txn,
table="ignored_users",
values=[
{"ignorer_user_id": user_id, "ignored_user_id": u}
for u in currently_ignored_users - previously_ignored_users
],
)
# Invalidate the cache for any ignored users which were added or removed.
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
async def _update_max_stream_id(self, next_id: int) -> None:
"""Update the max stream_id