Another batch of type annotations (#12726)

This commit is contained in:
David Robertson 2022-05-13 12:35:31 +01:00 committed by GitHub
parent 39bed28b28
commit aec69d2481
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 144 additions and 79 deletions

View file

@ -24,6 +24,7 @@ from typing import (
Mapping,
Match,
MutableMapping,
NoReturn,
Optional,
Set,
Tuple,
@ -35,6 +36,7 @@ from typing import (
import attr
from frozendict import frozendict
from signedjson.key import decode_verify_key_bytes
from signedjson.types import VerifyKey
from typing_extensions import TypedDict
from unpaddedbase64 import decode_base64
from zope.interface import Interface
@ -55,6 +57,7 @@ from synapse.util.stringutils import parse_and_validate_server_name
if TYPE_CHECKING:
from synapse.appservice.api import ApplicationService
from synapse.storage.databases.main import DataStore, PurgeEventsStore
from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
# Define a state map type from type/state_key to T (usually an event ID or
# event)
@ -114,7 +117,7 @@ class Requester:
app_service: Optional["ApplicationService"]
authenticated_entity: str
def serialize(self):
def serialize(self) -> Dict[str, Any]:
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
@ -132,7 +135,9 @@ class Requester:
}
@staticmethod
def deserialize(store, input):
def deserialize(
store: "ApplicationServiceWorkerStore", input: Dict[str, Any]
) -> "Requester":
"""Converts a dict that was produced by `serialize` back into a
Requester.
@ -236,10 +241,10 @@ class DomainSpecificString(metaclass=abc.ABCMeta):
domain: str
# Because this is a frozen class, it is deeply immutable.
def __copy__(self):
def __copy__(self: DS) -> DS:
return self
def __deepcopy__(self, memo):
def __deepcopy__(self: DS, memo: Dict[str, object]) -> DS:
return self
@classmethod
@ -729,12 +734,14 @@ class StreamToken:
)
@property
def room_stream_id(self):
def room_stream_id(self) -> int:
return self.room_key.stream
def copy_and_advance(self, key, new_value) -> "StreamToken":
def copy_and_advance(self, key: str, new_value: Any) -> "StreamToken":
"""Advance the given key in the token to a new value if and only if the
new value is after the old value.
:raises TypeError: if `key` is not the one of the keys tracked by a StreamToken.
"""
if key == "room_key":
new_token = self.copy_and_replace(
@ -751,7 +758,7 @@ class StreamToken:
else:
return self
def copy_and_replace(self, key, new_value) -> "StreamToken":
def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken":
return attr.evolve(self, **{key: new_value})
@ -793,14 +800,14 @@ class ThirdPartyInstanceID:
# Deny iteration because it will bite you if you try to create a singleton
# set by:
# users = set(user)
def __iter__(self):
def __iter__(self) -> NoReturn:
raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
# Because this class is a frozen class, it is deeply immutable.
def __copy__(self):
def __copy__(self) -> "ThirdPartyInstanceID":
return self
def __deepcopy__(self, memo):
def __deepcopy__(self, memo: Dict[str, object]) -> "ThirdPartyInstanceID":
return self
@classmethod
@ -852,25 +859,28 @@ class DeviceListUpdates:
return bool(self.changed or self.left)
def get_verify_key_from_cross_signing_key(key_info):
def get_verify_key_from_cross_signing_key(
key_info: Mapping[str, Any]
) -> Tuple[str, VerifyKey]:
"""Get the key ID and signedjson verify key from a cross-signing key dict
Args:
key_info (dict): a cross-signing key dict, which must have a "keys"
key_info: a cross-signing key dict, which must have a "keys"
property that has exactly one item in it
Returns:
(str, VerifyKey): the key ID and verify key for the cross-signing key
the key ID and verify key for the cross-signing key
"""
# make sure that exactly one key is provided
# make sure that a `keys` field is provided
if "keys" not in key_info:
raise ValueError("Invalid key")
keys = key_info["keys"]
if len(keys) != 1:
raise ValueError("Invalid key")
# and return that one key
for key_id, key_data in keys.items():
# and that it contains exactly one key
if len(keys) == 1:
key_id, key_data = next(iter(keys.items()))
return key_id, decode_verify_key_bytes(key_id, decode_base64(key_data))
else:
raise ValueError("Invalid key")
@attr.s(auto_attribs=True, frozen=True, slots=True)