Make push rules use proper structures. (#13522)

This improves load times for push rules:

| Version              | Time per user | Time for 1k users | 
| -------------------- | ------------- | ----------------- |
| Before               |       138 µs  |             138ms |
| Now (with custom)    |       2.11 µs |            2.11ms |
| Now (without custom) |       49.7 ns |           0.05 ms |

This therefore has a large impact on send times for rooms
with large numbers of local users in the room.
This commit is contained in:
Erik Johnston 2022-08-16 12:22:17 +01:00 committed by GitHub
parent d642ce4b32
commit 5442891cbc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 495 additions and 334 deletions

View file

@ -14,11 +14,23 @@
# limitations under the License.
import abc
import logging
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Collection,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Union,
cast,
)
from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
from synapse.push.baserules import list_with_base_rules
from synapse.push.baserules import FilteredPushRules, PushRule, compile_push_rules
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@ -50,60 +62,30 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
def _is_experimental_rule_enabled(
rule_id: str, experimental_config: ExperimentalConfig
) -> bool:
"""Used by `_load_rules` to filter out experimental rules when they
have not been enabled.
"""
if (
rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
and not experimental_config.msc3786_enabled
):
return False
if (
rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
and not experimental_config.msc3772_enabled
):
return False
return True
def _load_rules(
rawrules: List[JsonDict],
enabled_map: Dict[str, bool],
experimental_config: ExperimentalConfig,
) -> List[JsonDict]:
ruleslist = []
for rawrule in rawrules:
rule = dict(rawrule)
rule["conditions"] = db_to_json(rawrule["conditions"])
rule["actions"] = db_to_json(rawrule["actions"])
rule["default"] = False
ruleslist.append(rule)
) -> FilteredPushRules:
"""Take the DB rows returned from the DB and convert them into a full
`FilteredPushRules` object.
"""
# We're going to be mutating this a lot, so copy it. We also filter out
# any experimental default push rules that aren't enabled.
rules = [
rule
for rule in list_with_base_rules(ruleslist)
if _is_experimental_rule_enabled(rule["rule_id"], experimental_config)
ruleslist = [
PushRule(
rule_id=rawrule["rule_id"],
priority_class=rawrule["priority_class"],
conditions=db_to_json(rawrule["conditions"]),
actions=db_to_json(rawrule["actions"]),
)
for rawrule in rawrules
]
for i, rule in enumerate(rules):
rule_id = rule["rule_id"]
push_rules = compile_push_rules(ruleslist)
if rule_id not in enabled_map:
continue
if rule.get("enabled", True) == bool(enabled_map[rule_id]):
continue
filtered_rules = FilteredPushRules(push_rules, enabled_map, experimental_config)
# Rules are cached across users.
rule = dict(rule)
rule["enabled"] = bool(enabled_map[rule_id])
rules[i] = rule
return rules
return filtered_rules
# The ABCMeta metaclass ensures that it cannot be instantiated without
@ -162,7 +144,7 @@ class PushRulesWorkerStore(
raise NotImplementedError()
@cached(max_entries=5000)
async def get_push_rules_for_user(self, user_id: str) -> List[JsonDict]:
async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules:
rows = await self.db_pool.simple_select_list(
table="push_rules",
keyvalues={"user_name": user_id},
@ -216,11 +198,11 @@ class PushRulesWorkerStore(
@cachedList(cached_method_name="get_push_rules_for_user", list_name="user_ids")
async def bulk_get_push_rules(
self, user_ids: Collection[str]
) -> Dict[str, List[JsonDict]]:
) -> Dict[str, FilteredPushRules]:
if not user_ids:
return {}
results: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
rows = await self.db_pool.simple_select_many_batch(
table="push_rules",
@ -234,11 +216,13 @@ class PushRulesWorkerStore(
rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
for row in rows:
results.setdefault(row["user_name"], []).append(row)
raw_rules.setdefault(row["user_name"], []).append(row)
enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
for user_id, rules in results.items():
results: Dict[str, FilteredPushRules] = {}
for user_id, rules in raw_rules.items():
results[user_id] = _load_rules(
rules, enabled_map_by_user.get(user_id, {}), self.hs.config.experimental
)
@ -345,8 +329,8 @@ class PushRuleStore(PushRulesWorkerStore):
user_id: str,
rule_id: str,
priority_class: int,
conditions: List[Dict[str, str]],
actions: List[Union[JsonDict, str]],
conditions: Sequence[Mapping[str, str]],
actions: Sequence[Union[Mapping[str, Any], str]],
before: Optional[str] = None,
after: Optional[str] = None,
) -> None:
@ -817,7 +801,7 @@ class PushRuleStore(PushRulesWorkerStore):
return self._push_rules_stream_id_gen.get_current_token()
async def copy_push_rule_from_room_to_room(
self, new_room_id: str, user_id: str, rule: dict
self, new_room_id: str, user_id: str, rule: PushRule
) -> None:
"""Copy a single push rule from one room to another for a specific user.
@ -827,21 +811,27 @@ class PushRuleStore(PushRulesWorkerStore):
rule: A push rule.
"""
# Create new rule id
rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
rule_id_scope = "/".join(rule.rule_id.split("/")[:-1])
new_rule_id = rule_id_scope + "/" + new_room_id
new_conditions = []
# Change room id in each condition
for condition in rule.get("conditions", []):
for condition in rule.conditions:
new_condition = condition
if condition.get("key") == "room_id":
condition["pattern"] = new_room_id
new_condition = dict(condition)
new_condition["pattern"] = new_room_id
new_conditions.append(new_condition)
# Add the rule for the new room
await self.add_push_rule(
user_id=user_id,
rule_id=new_rule_id,
priority_class=rule["priority_class"],
conditions=rule["conditions"],
actions=rule["actions"],
priority_class=rule.priority_class,
conditions=new_conditions,
actions=rule.actions,
)
async def copy_push_rules_from_room_to_room_for_user(
@ -859,8 +849,11 @@ class PushRuleStore(PushRulesWorkerStore):
user_push_rules = await self.get_push_rules_for_user(user_id)
# Get rules relating to the old room and copy them to the new room
for rule in user_push_rules:
conditions = rule.get("conditions", [])
for rule, enabled in user_push_rules:
if not enabled:
continue
conditions = rule.conditions
if any(
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
for c in conditions