mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 11:06:07 -04:00
Convert simple_select_one and simple_select_one_onecol to async (#8162)
This commit is contained in:
parent
56efa9ec71
commit
4c6c56dc58
25 changed files with 220 additions and 113 deletions
|
@ -15,7 +15,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import Dict, Iterable, List, Optional, Set, Tuple
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from synapse.api.errors import Codes, StoreError
|
||||
from synapse.logging.opentracing import (
|
||||
|
@ -47,7 +47,7 @@ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
|
|||
|
||||
|
||||
class DeviceWorkerStore(SQLBaseStore):
|
||||
def get_device(self, user_id: str, device_id: str):
|
||||
async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:
|
||||
"""Retrieve a device. Only returns devices that are not marked as
|
||||
hidden.
|
||||
|
||||
|
@ -55,11 +55,11 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
user_id: The ID of the user which owns the device
|
||||
device_id: The ID of the device to retrieve
|
||||
Returns:
|
||||
defer.Deferred for a dict containing the device information
|
||||
A dict containing the device information
|
||||
Raises:
|
||||
StoreError: if the device is not found
|
||||
"""
|
||||
return self.db_pool.simple_select_one(
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="devices",
|
||||
keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False},
|
||||
retcols=("user_id", "device_id", "display_name"),
|
||||
|
@ -656,11 +656,13 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def get_device_list_last_stream_id_for_remote(self, user_id: str):
|
||||
async def get_device_list_last_stream_id_for_remote(
|
||||
self, user_id: str
|
||||
) -> Optional[Any]:
|
||||
"""Get the last stream_id we got for a user. May be None if we haven't
|
||||
got any information for them.
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="device_lists_remote_extremeties",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcol="stream_id",
|
||||
|
|
|
@ -59,8 +59,8 @@ class DirectoryWorkerStore(SQLBaseStore):
|
|||
|
||||
return RoomAliasMapping(room_id, room_alias.to_string(), servers)
|
||||
|
||||
def get_room_alias_creator(self, room_alias):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_room_alias_creator(self, room_alias: str) -> str:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="room_aliases",
|
||||
keyvalues={"room_alias": room_alias},
|
||||
retcol="creator",
|
||||
|
|
|
@ -223,15 +223,15 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
|
||||
return ret
|
||||
|
||||
def count_e2e_room_keys(self, user_id, version):
|
||||
async def count_e2e_room_keys(self, user_id: str, version: str) -> int:
|
||||
"""Get the number of keys in a backup version.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose backup we're querying
|
||||
version (str): the version ID of the backup we're querying about
|
||||
user_id: the user whose backup we're querying
|
||||
version: the version ID of the backup we're querying about
|
||||
"""
|
||||
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="e2e_room_keys",
|
||||
keyvalues={"user_id": user_id, "version": version},
|
||||
retcol="COUNT(*)",
|
||||
|
|
|
@ -119,19 +119,19 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
|
||||
super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
||||
def get_received_ts(self, event_id):
|
||||
async def get_received_ts(self, event_id: str) -> Optional[int]:
|
||||
"""Get received_ts (when it was persisted) for the event.
|
||||
|
||||
Raises an exception for unknown events.
|
||||
|
||||
Args:
|
||||
event_id (str)
|
||||
event_id: The event ID to query.
|
||||
|
||||
Returns:
|
||||
Deferred[int|None]: Timestamp in milliseconds, or None for events
|
||||
that were persisted before received_ts was implemented.
|
||||
Timestamp in milliseconds, or None for events that were persisted
|
||||
before received_ts was implemented.
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcol="received_ts",
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from typing import List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
|
@ -28,8 +28,8 @@ _DEFAULT_ROLE_ID = ""
|
|||
|
||||
|
||||
class GroupServerWorkerStore(SQLBaseStore):
|
||||
def get_group(self, group_id):
|
||||
return self.db_pool.simple_select_one(
|
||||
async def get_group(self, group_id: str) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="groups",
|
||||
keyvalues={"group_id": group_id},
|
||||
retcols=(
|
||||
|
@ -351,8 +351,10 @@ class GroupServerWorkerStore(SQLBaseStore):
|
|||
)
|
||||
return bool(result)
|
||||
|
||||
def is_user_admin_in_group(self, group_id, user_id):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def is_user_admin_in_group(
|
||||
self, group_id: str, user_id: str
|
||||
) -> Optional[bool]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="group_users",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="is_admin",
|
||||
|
@ -360,10 +362,12 @@ class GroupServerWorkerStore(SQLBaseStore):
|
|||
desc="is_user_admin_in_group",
|
||||
)
|
||||
|
||||
def is_user_invited_to_local_group(self, group_id, user_id):
|
||||
async def is_user_invited_to_local_group(
|
||||
self, group_id: str, user_id: str
|
||||
) -> Optional[bool]:
|
||||
"""Has the group server invited a user?
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="group_invites",
|
||||
keyvalues={"group_id": group_id, "user_id": user_id},
|
||||
retcol="user_id",
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.database import DatabasePool
|
||||
|
||||
|
@ -37,12 +39,13 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
|||
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||
super(MediaRepositoryStore, self).__init__(database, db_conn, hs)
|
||||
|
||||
def get_local_media(self, media_id):
|
||||
async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get the metadata for a local piece of media
|
||||
|
||||
Returns:
|
||||
None if the media_id doesn't exist.
|
||||
"""
|
||||
return self.db_pool.simple_select_one(
|
||||
return await self.db_pool.simple_select_one(
|
||||
"local_media_repository",
|
||||
{"media_id": media_id},
|
||||
(
|
||||
|
@ -191,8 +194,10 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
|||
desc="store_local_thumbnail",
|
||||
)
|
||||
|
||||
def get_cached_remote_media(self, origin, media_id):
|
||||
return self.db_pool.simple_select_one(
|
||||
async def get_cached_remote_media(
|
||||
self, origin, media_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
"remote_media_cache",
|
||||
{"media_origin": origin, "media_id": media_id},
|
||||
(
|
||||
|
|
|
@ -99,17 +99,18 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
|
|||
return users
|
||||
|
||||
@cached(num_args=1)
|
||||
def user_last_seen_monthly_active(self, user_id):
|
||||
async def user_last_seen_monthly_active(self, user_id: str) -> int:
|
||||
"""
|
||||
Checks if a given user is part of the monthly active user group
|
||||
Arguments:
|
||||
user_id (str): user to add/update
|
||||
Return:
|
||||
Deferred[int] : timestamp since last seen, None if never seen
|
||||
Checks if a given user is part of the monthly active user group
|
||||
|
||||
Arguments:
|
||||
user_id: user to add/update
|
||||
|
||||
Return:
|
||||
Timestamp since last seen, None if never seen
|
||||
"""
|
||||
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="monthly_active_users",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcol="timestamp",
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
@ -19,7 +20,7 @@ from synapse.storage.databases.main.roommember import ProfileInfo
|
|||
|
||||
|
||||
class ProfileWorkerStore(SQLBaseStore):
|
||||
async def get_profileinfo(self, user_localpart):
|
||||
async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
|
||||
try:
|
||||
profile = await self.db_pool.simple_select_one(
|
||||
table="profiles",
|
||||
|
@ -38,24 +39,26 @@ class ProfileWorkerStore(SQLBaseStore):
|
|||
avatar_url=profile["avatar_url"], display_name=profile["displayname"]
|
||||
)
|
||||
|
||||
def get_profile_displayname(self, user_localpart):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_profile_displayname(self, user_localpart: str) -> str:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
retcol="displayname",
|
||||
desc="get_profile_displayname",
|
||||
)
|
||||
|
||||
def get_profile_avatar_url(self, user_localpart):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_profile_avatar_url(self, user_localpart: str) -> str:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="profiles",
|
||||
keyvalues={"user_id": user_localpart},
|
||||
retcol="avatar_url",
|
||||
desc="get_profile_avatar_url",
|
||||
)
|
||||
|
||||
def get_from_remote_profile_cache(self, user_id):
|
||||
return self.db_pool.simple_select_one(
|
||||
async def get_from_remote_profile_cache(
|
||||
self, user_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="remote_profile_cache",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcols=("displayname", "avatar_url"),
|
||||
|
|
|
@ -71,8 +71,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@cached(num_args=3)
|
||||
def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_last_receipt_event_id_for_user(
|
||||
self, user_id: str, room_id: str, receipt_type: str
|
||||
) -> Optional[str]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="receipts_linearized",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
import logging
|
||||
import re
|
||||
from typing import Awaitable, Dict, List, Optional
|
||||
from typing import Any, Awaitable, Dict, List, Optional
|
||||
|
||||
from synapse.api.constants import UserTypes
|
||||
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
|
||||
|
@ -46,8 +46,8 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@cached()
|
||||
def get_user_by_id(self, user_id):
|
||||
return self.db_pool.simple_select_one(
|
||||
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
retcols=[
|
||||
|
@ -1259,12 +1259,12 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
|
|||
desc="del_user_pending_deactivation",
|
||||
)
|
||||
|
||||
def get_user_pending_deactivation(self):
|
||||
async def get_user_pending_deactivation(self) -> Optional[str]:
|
||||
"""
|
||||
Gets one user from the table of users waiting to be parted from all the rooms
|
||||
they're in.
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
"users_pending_deactivation",
|
||||
keyvalues={},
|
||||
retcol="user_id",
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
|
||||
|
@ -21,8 +22,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class RejectionsStore(SQLBaseStore):
|
||||
def get_rejection_reason(self, event_id):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_rejection_reason(self, event_id: str) -> Optional[str]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="rejections",
|
||||
retcol="reason",
|
||||
keyvalues={"event_id": event_id},
|
||||
|
|
|
@ -73,15 +73,15 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
|
||||
self.config = hs.config
|
||||
|
||||
def get_room(self, room_id):
|
||||
async def get_room(self, room_id: str) -> dict:
|
||||
"""Retrieve a room.
|
||||
|
||||
Args:
|
||||
room_id (str): The ID of the room to retrieve.
|
||||
room_id: The ID of the room to retrieve.
|
||||
Returns:
|
||||
A dict containing the room information, or None if the room is unknown.
|
||||
"""
|
||||
return self.db_pool.simple_select_one(
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcols=("room_id", "is_public", "creator"),
|
||||
|
@ -330,8 +330,8 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
return ret_val
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def is_room_blocked(self, room_id: str) -> Optional[bool]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="blocked_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="1",
|
||||
|
|
|
@ -260,8 +260,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
return event.content.get("canonical_alias")
|
||||
|
||||
@cached(max_entries=50000)
|
||||
def _get_state_group_for_event(self, event_id):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def _get_state_group_for_event(self, event_id: str) -> Optional[int]:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="event_to_state_groups",
|
||||
keyvalues={"event_id": event_id},
|
||||
retcol="state_group",
|
||||
|
|
|
@ -211,11 +211,11 @@ class StatsStore(StateDeltasStore):
|
|||
|
||||
return len(rooms_to_work_on)
|
||||
|
||||
def get_stats_positions(self):
|
||||
async def get_stats_positions(self) -> int:
|
||||
"""
|
||||
Returns the stats processor positions.
|
||||
"""
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="stats_incremental_position",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
|
@ -300,7 +300,7 @@ class StatsStore(StateDeltasStore):
|
|||
return slice_list
|
||||
|
||||
@cached()
|
||||
def get_earliest_token_for_stats(self, stats_type, id):
|
||||
async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
|
||||
"""
|
||||
Fetch the "earliest token". This is used by the room stats delta
|
||||
processor to ignore deltas that have been processed between the
|
||||
|
@ -308,11 +308,11 @@ class StatsStore(StateDeltasStore):
|
|||
being calculated.
|
||||
|
||||
Returns:
|
||||
Deferred[int]
|
||||
The earliest token.
|
||||
"""
|
||||
table, id_col = TYPE_TO_TABLE[stats_type]
|
||||
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
"%s_current" % (table,),
|
||||
keyvalues={id_col: id},
|
||||
retcol="completed_delta_stream_id",
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.storage.database import DatabasePool
|
||||
|
@ -527,8 +528,8 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|
|||
)
|
||||
|
||||
@cached()
|
||||
def get_user_in_directory(self, user_id):
|
||||
return self.db_pool.simple_select_one(
|
||||
async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, Any]]:
|
||||
return await self.db_pool.simple_select_one(
|
||||
table="user_directory",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcols=("display_name", "avatar_url"),
|
||||
|
@ -663,8 +664,8 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
|
|||
users.update(rows)
|
||||
return list(users)
|
||||
|
||||
def get_user_directory_stream_pos(self):
|
||||
return self.db_pool.simple_select_one_onecol(
|
||||
async def get_user_directory_stream_pos(self) -> int:
|
||||
return await self.db_pool.simple_select_one_onecol(
|
||||
table="user_directory_stream_pos",
|
||||
keyvalues={},
|
||||
retcol="stream_id",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue