Avoid fetching unused account data in sync. (#14973)

The per-room account data is no longer unconditionally
fetched, even if all rooms will be filtered out.

Global account data will not be fetched if it will all be
filtered out.
This commit is contained in:
Patrick Cloke 2023-02-10 09:22:16 -05:00 committed by GitHub
parent d793fcd241
commit cf5233b783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 176 additions and 90 deletions

1
changelog.d/14973.misc Normal file
View File

@ -0,0 +1 @@
Improve performance of `/sync` in a few situations.

View File

@ -219,9 +219,13 @@ class FilterCollection:
self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {})) self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
self._room_state_filter = Filter(hs, room_filter_json.get("state", {})) self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {})) self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
self._room_account_data = Filter(hs, room_filter_json.get("account_data", {})) self._room_account_data_filter = Filter(
hs, room_filter_json.get("account_data", {})
)
self._presence_filter = Filter(hs, filter_json.get("presence", {})) self._presence_filter = Filter(hs, filter_json.get("presence", {}))
self._account_data = Filter(hs, filter_json.get("account_data", {})) self._global_account_data_filter = Filter(
hs, filter_json.get("account_data", {})
)
self.include_leave = filter_json.get("room", {}).get("include_leave", False) self.include_leave = filter_json.get("room", {}).get("include_leave", False)
self.event_fields = filter_json.get("event_fields", []) self.event_fields = filter_json.get("event_fields", [])
@ -256,8 +260,10 @@ class FilterCollection:
) -> List[UserPresenceState]: ) -> List[UserPresenceState]:
return await self._presence_filter.filter(presence_states) return await self._presence_filter.filter(presence_states)
async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]: async def filter_global_account_data(
return await self._account_data.filter(events) self, events: Iterable[JsonDict]
) -> List[JsonDict]:
return await self._global_account_data_filter.filter(events)
async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]: async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
return await self._room_state_filter.filter( return await self._room_state_filter.filter(
@ -279,7 +285,7 @@ class FilterCollection:
async def filter_room_account_data( async def filter_room_account_data(
self, events: Iterable[JsonDict] self, events: Iterable[JsonDict]
) -> List[JsonDict]: ) -> List[JsonDict]:
return await self._room_account_data.filter( return await self._room_account_data_filter.filter(
await self._room_filter.filter(events) await self._room_filter.filter(events)
) )
@ -292,6 +298,13 @@ class FilterCollection:
or self._presence_filter.filters_all_senders() or self._presence_filter.filters_all_senders()
) )
def blocks_all_global_account_data(self) -> bool:
"""True if all global acount data will be filtered out."""
return (
self._global_account_data_filter.filters_all_types()
or self._global_account_data_filter.filters_all_senders()
)
def blocks_all_room_ephemeral(self) -> bool: def blocks_all_room_ephemeral(self) -> bool:
return ( return (
self._room_ephemeral_filter.filters_all_types() self._room_ephemeral_filter.filters_all_types()
@ -299,6 +312,13 @@ class FilterCollection:
or self._room_ephemeral_filter.filters_all_rooms() or self._room_ephemeral_filter.filters_all_rooms()
) )
def blocks_all_room_account_data(self) -> bool:
return (
self._room_account_data_filter.filters_all_types()
or self._room_account_data_filter.filters_all_senders()
or self._room_account_data_filter.filters_all_rooms()
)
def blocks_all_room_timeline(self) -> bool: def blocks_all_room_timeline(self) -> bool:
return ( return (
self._room_timeline_filter.filters_all_types() self._room_timeline_filter.filters_all_types()

View File

@ -343,10 +343,12 @@ class AccountDataEventSource(EventSource[int, JsonDict]):
} }
) )
( account_data = await self.store.get_updated_global_account_data_for_user(
account_data, user_id, last_stream_id
room_account_data, )
) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id) room_account_data = await self.store.get_updated_room_account_data_for_user(
user_id, last_stream_id
)
for account_data_type, content in account_data.items(): for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content}) results.append({"type": account_data_type, "content": content})

View File

@ -154,9 +154,8 @@ class InitialSyncHandler:
tags_by_room = await self.store.get_tags_for_user(user_id) tags_by_room = await self.store.get_tags_for_user(user_id)
account_data, account_data_by_room = await self.store.get_account_data_for_user( account_data = await self.store.get_global_account_data_for_user(user_id)
user_id account_data_by_room = await self.store.get_room_account_data_for_user(user_id)
)
public_room_ids = await self.store.get_public_room_ids() public_room_ids = await self.store.get_public_room_ids()

View File

@ -484,7 +484,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
user_id: The user's ID. user_id: The user's ID.
""" """
# Retrieve user account data for predecessor room # Retrieve user account data for predecessor room
user_account_data, _ = await self.store.get_account_data_for_user(user_id) user_account_data = await self.store.get_global_account_data_for_user(user_id)
# Copy direct message state if applicable # Copy direct message state if applicable
direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {}) direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})

View File

@ -1444,9 +1444,9 @@ class SyncHandler:
logger.debug("Fetching account data") logger.debug("Fetching account data")
account_data_by_room = await self._generate_sync_entry_for_account_data( # Global account data is included if it is not filtered out.
sync_result_builder if not sync_config.filter_collection.blocks_all_global_account_data():
) await self._generate_sync_entry_for_account_data(sync_result_builder)
# Presence data is included if the server has it enabled and not filtered out. # Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool( include_presence_data = bool(
@ -1472,9 +1472,7 @@ class SyncHandler:
( (
newly_joined_rooms, newly_joined_rooms,
newly_left_rooms, newly_left_rooms,
) = await self._generate_sync_entry_for_rooms( ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
sync_result_builder, account_data_by_room
)
# Work out which users have joined or left rooms we're in. We use this # Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in # to build the presence and device_list parts of the sync response in
@ -1717,35 +1715,29 @@ class SyncHandler:
async def _generate_sync_entry_for_account_data( async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder" self, sync_result_builder: "SyncResultBuilder"
) -> Dict[str, Dict[str, JsonDict]]: ) -> None:
"""Generates the account data portion of the sync response. """Generates the global account data portion of the sync response.
Account data (called "Client Config" in the spec) can be set either globally Account data (called "Client Config" in the spec) can be set either globally
or for a specific room. Account data consists of a list of events which or for a specific room. Account data consists of a list of events which
accumulate state, much like a room. accumulate state, much like a room.
This function retrieves global and per-room account data. The former is written This function retrieves global account data and writes it to the given
to the given `sync_result_builder`. The latter is returned directly, to be `sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling
later written to the `sync_result_builder` on a room-by-room basis. of per-room account data.
Args: Args:
sync_result_builder sync_result_builder
Returns:
A dictionary whose keys (room ids) map to the per room account data for that
room.
""" """
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string() user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token since_token = sync_result_builder.since_token
if since_token and not sync_result_builder.full_state: if since_token and not sync_result_builder.full_state:
# TODO Do not fetch room account data if it will be unused. global_account_data = (
( await self.store.get_updated_global_account_data_for_user(
global_account_data, user_id, since_token.account_data_key
account_data_by_room, )
) = await self.store.get_updated_account_data_for_user(
user_id, since_token.account_data_key
) )
push_rules_changed = await self.store.have_push_rules_changed_for_user( push_rules_changed = await self.store.have_push_rules_changed_for_user(
@ -1758,28 +1750,26 @@ class SyncHandler:
sync_config.user sync_config.user
) )
else: else:
# TODO Do not fetch room account data if it will be unused. all_global_account_data = await self.store.get_global_account_data_for_user(
( user_id
global_account_data, )
account_data_by_room,
) = await self.store.get_account_data_for_user(sync_config.user.to_string())
global_account_data = dict(global_account_data) global_account_data = dict(all_global_account_data)
global_account_data["m.push_rules"] = await self.push_rules_for_user( global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user sync_config.user
) )
account_data_for_user = await sync_config.filter_collection.filter_account_data( account_data_for_user = (
[ await sync_config.filter_collection.filter_global_account_data(
{"type": account_data_type, "content": content} [
for account_data_type, content in global_account_data.items() {"type": account_data_type, "content": content}
] for account_data_type, content in global_account_data.items()
]
)
) )
sync_result_builder.account_data = account_data_for_user sync_result_builder.account_data = account_data_for_user
return account_data_by_room
async def _generate_sync_entry_for_presence( async def _generate_sync_entry_for_presence(
self, self,
sync_result_builder: "SyncResultBuilder", sync_result_builder: "SyncResultBuilder",
@ -1839,9 +1829,7 @@ class SyncHandler:
sync_result_builder.presence = presence sync_result_builder.presence = presence
async def _generate_sync_entry_for_rooms( async def _generate_sync_entry_for_rooms(
self, self, sync_result_builder: "SyncResultBuilder"
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
) -> Tuple[AbstractSet[str], AbstractSet[str]]: ) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the """Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result. `sync_result_builder` with the result.
@ -1852,7 +1840,6 @@ class SyncHandler:
Args: Args:
sync_result_builder sync_result_builder
account_data_by_room: Dictionary of per room account data
Returns: Returns:
Returns a 2-tuple describing rooms the user has joined or left. Returns a 2-tuple describing rooms the user has joined or left.
@ -1865,9 +1852,30 @@ class SyncHandler:
since_token = sync_result_builder.since_token since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string() user_id = sync_result_builder.sync_config.user.to_string()
blocks_all_rooms = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
)
# 0. Start by fetching room account data (if required).
if (
blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
):
account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
elif since_token and not sync_result_builder.full_state:
account_data_by_room = (
await self.store.get_updated_room_account_data_for_user(
user_id, since_token.account_data_key
)
)
else:
account_data_by_room = await self.store.get_room_account_data_for_user(
user_id
)
# 1. Start by fetching all ephemeral events in rooms we've joined (if required). # 1. Start by fetching all ephemeral events in rooms we've joined (if required).
block_all_room_ephemeral = ( block_all_room_ephemeral = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms() blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral() or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
) )
if block_all_room_ephemeral: if block_all_room_ephemeral:
@ -2294,7 +2302,7 @@ class SyncHandler:
room_builder: "RoomSyncResultBuilder", room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict], ephemeral: List[JsonDict],
tags: Optional[Dict[str, Dict[str, Any]]], tags: Optional[Dict[str, Dict[str, Any]]],
account_data: Dict[str, JsonDict], account_data: Mapping[str, JsonDict],
always_include: bool = False, always_include: bool = False,
) -> None: ) -> None:
"""Populates the `joined` and `archived` section of `sync_result_builder` """Populates the `joined` and `archived` section of `sync_result_builder`

View File

@ -1192,7 +1192,8 @@ class AccountDataRestServlet(RestServlet):
if not await self._store.get_user_by_id(user_id): if not await self._store.get_user_by_id(user_id):
raise NotFoundError("User not found") raise NotFoundError("User not found")
global_data, by_room_data = await self._store.get_account_data_for_user(user_id) global_data = await self._store.get_global_account_data_for_user(user_id)
by_room_data = await self._store.get_room_account_data_for_user(user_id)
return HTTPStatus.OK, { return HTTPStatus.OK, {
"account_data": { "account_data": {
"global": global_data, "global": global_data,

View File

@ -21,6 +21,7 @@ from typing import (
FrozenSet, FrozenSet,
Iterable, Iterable,
List, List,
Mapping,
Optional, Optional,
Tuple, Tuple,
cast, cast,
@ -122,25 +123,25 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
return self._account_data_id_gen.get_current_token() return self._account_data_id_gen.get_current_token()
@cached() @cached()
async def get_account_data_for_user( async def get_global_account_data_for_user(
self, user_id: str self, user_id: str
) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: ) -> Mapping[str, JsonDict]:
""" """
Get all the client account_data for a user. Get all the global client account_data for a user.
If experimental MSC3391 support is enabled, any entries with an empty If experimental MSC3391 support is enabled, any entries with an empty
content body are excluded; as this means they have been deleted. content body are excluded; as this means they have been deleted.
Args: Args:
user_id: The user to get the account_data for. user_id: The user to get the account_data for.
Returns: Returns:
A 2-tuple of a dict of global account_data and a dict mapping from The global account_data.
room_id string to per room account_data dicts.
""" """
def get_account_data_for_user_txn( def get_global_account_data_for_user(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: ) -> Dict[str, JsonDict]:
# The 'content != '{}' condition below prevents us from using # The 'content != '{}' condition below prevents us from using
# `simple_select_list_txn` here, as it doesn't support conditions # `simple_select_list_txn` here, as it doesn't support conditions
# other than 'equals'. # other than 'equals'.
@ -158,10 +159,34 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
txn.execute(sql, (user_id,)) txn.execute(sql, (user_id,))
rows = self.db_pool.cursor_to_dict(txn) rows = self.db_pool.cursor_to_dict(txn)
global_account_data = { return {
row["account_data_type"]: db_to_json(row["content"]) for row in rows row["account_data_type"]: db_to_json(row["content"]) for row in rows
} }
return await self.db_pool.runInteraction(
"get_global_account_data_for_user", get_global_account_data_for_user
)
@cached()
async def get_room_account_data_for_user(
self, user_id: str
) -> Mapping[str, Mapping[str, JsonDict]]:
"""
Get all of the per-room client account_data for a user.
If experimental MSC3391 support is enabled, any entries with an empty
content body are excluded; as this means they have been deleted.
Args:
user_id: The user to get the account_data for.
Returns:
A dict mapping from room_id string to per-room account_data dicts.
"""
def get_room_account_data_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, Dict[str, JsonDict]]:
# The 'content != '{}' condition below prevents us from using # The 'content != '{}' condition below prevents us from using
# `simple_select_list_txn` here, as it doesn't support conditions # `simple_select_list_txn` here, as it doesn't support conditions
# other than 'equals'. # other than 'equals'.
@ -185,10 +210,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
room_data[row["account_data_type"]] = db_to_json(row["content"]) room_data[row["account_data_type"]] = db_to_json(row["content"])
return global_account_data, by_room return by_room
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_account_data_for_user", get_account_data_for_user_txn "get_room_account_data_for_user_txn", get_room_account_data_for_user_txn
) )
@cached(num_args=2, max_entries=5000, tree=True) @cached(num_args=2, max_entries=5000, tree=True)
@ -342,36 +367,61 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"get_updated_room_account_data", get_updated_room_account_data_txn "get_updated_room_account_data", get_updated_room_account_data_txn
) )
async def get_updated_account_data_for_user( async def get_updated_global_account_data_for_user(
self, user_id: str, stream_id: int self, user_id: str, stream_id: int
) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: ) -> Dict[str, JsonDict]:
"""Get all the client account_data for a that's changed for a user """Get all the global account_data that's changed for a user.
Args: Args:
user_id: The user to get the account_data for. user_id: The user to get the account_data for.
stream_id: The point in the stream since which to get updates stream_id: The point in the stream since which to get updates
Returns: Returns:
A deferred pair of a dict of global account_data and a dict A dict of global account_data.
mapping from room_id string to per room account_data dicts.
""" """
def get_updated_account_data_for_user_txn( def get_updated_global_account_data_for_user(
txn: LoggingTransaction, txn: LoggingTransaction,
) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: ) -> Dict[str, JsonDict]:
sql = ( sql = """
"SELECT account_data_type, content FROM account_data" SELECT account_data_type, content FROM account_data
" WHERE user_id = ? AND stream_id > ?" WHERE user_id = ? AND stream_id > ?
) """
txn.execute(sql, (user_id, stream_id)) txn.execute(sql, (user_id, stream_id))
global_account_data = {row[0]: db_to_json(row[1]) for row in txn} return {row[0]: db_to_json(row[1]) for row in txn}
sql = ( changed = self._account_data_stream_cache.has_entity_changed(
"SELECT room_id, account_data_type, content FROM room_account_data" user_id, int(stream_id)
" WHERE user_id = ? AND stream_id > ?" )
) if not changed:
return {}
return await self.db_pool.runInteraction(
"get_updated_global_account_data_for_user",
get_updated_global_account_data_for_user,
)
async def get_updated_room_account_data_for_user(
self, user_id: str, stream_id: int
) -> Dict[str, Dict[str, JsonDict]]:
"""Get all the room account_data that's changed for a user.
Args:
user_id: The user to get the account_data for.
stream_id: The point in the stream since which to get updates
Returns:
A dict mapping from room_id string to per room account_data dicts.
"""
def get_updated_room_account_data_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, Dict[str, JsonDict]]:
sql = """
SELECT room_id, account_data_type, content FROM room_account_data
WHERE user_id = ? AND stream_id > ?
"""
txn.execute(sql, (user_id, stream_id)) txn.execute(sql, (user_id, stream_id))
account_data_by_room: Dict[str, Dict[str, JsonDict]] = {} account_data_by_room: Dict[str, Dict[str, JsonDict]] = {}
@ -379,16 +429,17 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
room_account_data = account_data_by_room.setdefault(row[0], {}) room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = db_to_json(row[2]) room_account_data[row[1]] = db_to_json(row[2])
return global_account_data, account_data_by_room return account_data_by_room
changed = self._account_data_stream_cache.has_entity_changed( changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id) user_id, int(stream_id)
) )
if not changed: if not changed:
return {}, {} return {}
return await self.db_pool.runInteraction( return await self.db_pool.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn "get_updated_room_account_data_for_user",
get_updated_room_account_data_for_user_txn,
) )
@cached(max_entries=5000, iterable=True) @cached(max_entries=5000, iterable=True)
@ -444,7 +495,8 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self.get_global_account_data_by_type_for_user.invalidate( self.get_global_account_data_by_type_for_user.invalidate(
(row.user_id, row.data_type) (row.user_id, row.data_type)
) )
self.get_account_data_for_user.invalidate((row.user_id,)) self.get_global_account_data_for_user.invalidate((row.user_id,))
self.get_room_account_data_for_user.invalidate((row.user_id,))
self.get_account_data_for_room.invalidate((row.user_id, row.room_id)) self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate( self.get_account_data_for_room_and_type.invalidate(
(row.user_id, row.room_id, row.data_type) (row.user_id, row.room_id, row.data_type)
@ -492,7 +544,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) )
self._account_data_stream_cache.entity_has_changed(user_id, next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,)) self.get_room_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill( self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), content (user_id, room_id, account_data_type), content
@ -558,7 +610,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
return None return None
self._account_data_stream_cache.entity_has_changed(user_id, next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,)) self.get_room_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room.invalidate((user_id, room_id))
self.get_account_data_for_room_and_type.prefill( self.get_account_data_for_room_and_type.prefill(
(user_id, room_id, account_data_type), {} (user_id, room_id, account_data_type), {}
@ -593,7 +645,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) )
self._account_data_stream_cache.entity_has_changed(user_id, next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate( self.get_global_account_data_by_type_for_user.invalidate(
(user_id, account_data_type) (user_id, account_data_type)
) )
@ -761,7 +813,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
return None return None
self._account_data_stream_cache.entity_has_changed(user_id, next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.prefill( self.get_global_account_data_by_type_for_user.prefill(
(user_id, account_data_type), {} (user_id, account_data_type), {}
) )
@ -822,7 +874,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
txn, self.get_account_data_for_room_and_type, (user_id,) txn, self.get_account_data_for_room_and_type, (user_id,)
) )
self._invalidate_cache_and_stream( self._invalidate_cache_and_stream(
txn, self.get_account_data_for_user, (user_id,) txn, self.get_global_account_data_for_user, (user_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_room_account_data_for_user, (user_id,)
) )
self._invalidate_cache_and_stream( self._invalidate_cache_and_stream(
txn, self.get_global_account_data_by_type_for_user, (user_id,) txn, self.get_global_account_data_by_type_for_user, (user_id,)