Call appservices on modern paths, falling back to legacy paths. (#15317)

This uses the specced /_matrix/app/v1/... paths instead of the
"legacy" paths. If the homeserver receives an error it will retry
using the legacy path.
This commit is contained in:
Patrick Cloke 2023-04-03 13:20:32 -04:00 committed by GitHub
parent 56efa9b167
commit cf2f2934ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 172 additions and 48 deletions

View file

@ -17,6 +17,8 @@ import urllib.parse
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
@ -24,10 +26,11 @@ from typing import (
Optional,
Sequence,
Tuple,
TypeVar,
)
from prometheus_client import Counter
from typing_extensions import TypeGuard
from typing_extensions import Concatenate, ParamSpec, TypeGuard
from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException, HttpResponseException
@ -78,7 +81,11 @@ sent_todevice_counter = Counter(
HOUR_IN_MS = 60 * 60 * 1000
APP_SERVICE_PREFIX = "/_matrix/app/unstable"
APP_SERVICE_PREFIX = "/_matrix/app/v1"
APP_SERVICE_UNSTABLE_PREFIX = "/_matrix/app/unstable"
P = ParamSpec("P")
R = TypeVar("R")
def _is_valid_3pe_metadata(info: JsonDict) -> bool:
@ -121,6 +128,47 @@ class ApplicationServiceApi(SimpleHttpClient):
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
)
async def _send_with_fallbacks(
self,
service: "ApplicationService",
prefixes: List[str],
path: str,
func: Callable[Concatenate[str, P], Awaitable[R]],
*args: P.args,
**kwargs: P.kwargs,
) -> R:
"""
Attempt to call an application service with multiple paths, falling back
until one succeeds.
Args:
service: The appliacation service, this provides the base URL.
prefixes: A last of paths to try in order for the requests.
path: A suffix to append to each prefix.
func: The function to call, the first argument will be the full
endpoint to fetch. Other arguments are provided by args/kwargs.
Returns:
The return value of func.
"""
for i, prefix in enumerate(prefixes, start=1):
uri = f"{service.url}{prefix}{path}"
try:
return await func(uri, *args, **kwargs)
except HttpResponseException as e:
# If an error is received that is due to an unrecognised path,
# fallback to next path (if one exists). Otherwise, consider it
# a legitimate error and raise.
if i < len(prefixes) and is_unknown_endpoint(e):
continue
raise
except Exception:
# Unexpected exceptions get sent to the caller.
raise
# The function should always exit via the return or raise above this.
raise RuntimeError("Unexpected fallback behaviour. This should never be seen.")
async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
if service.url is None:
return False
@ -128,10 +176,12 @@ class ApplicationServiceApi(SimpleHttpClient):
# This is required by the configuration.
assert service.hs_token is not None
uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
try:
response = await self.get_json(
uri,
response = await self._send_with_fallbacks(
service,
[APP_SERVICE_PREFIX, ""],
f"/users/{urllib.parse.quote(user_id)}",
self.get_json,
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@ -140,9 +190,9 @@ class ApplicationServiceApi(SimpleHttpClient):
except CodeMessageException as e:
if e.code == 404:
return False
logger.warning("query_user to %s received %s", uri, e.code)
logger.warning("query_user to %s received %s", service.url, e.code)
except Exception as ex:
logger.warning("query_user to %s threw exception %s", uri, ex)
logger.warning("query_user to %s threw exception %s", service.url, ex)
return False
async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
@ -152,21 +202,23 @@ class ApplicationServiceApi(SimpleHttpClient):
# This is required by the configuration.
assert service.hs_token is not None
uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
try:
response = await self.get_json(
uri,
response = await self._send_with_fallbacks(
service,
[APP_SERVICE_PREFIX, ""],
f"/rooms/{urllib.parse.quote(alias)}",
self.get_json,
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
if response is not None: # just an empty json object
return True
except CodeMessageException as e:
logger.warning("query_alias to %s received %s", uri, e.code)
logger.warning("query_alias to %s received %s", service.url, e.code)
if e.code == 404:
return False
except Exception as ex:
logger.warning("query_alias to %s threw exception %s", uri, ex)
logger.warning("query_alias to %s threw exception %s", service.url, ex)
return False
async def query_3pe(
@ -188,25 +240,24 @@ class ApplicationServiceApi(SimpleHttpClient):
# This is required by the configuration.
assert service.hs_token is not None
uri = "%s%s/thirdparty/%s/%s" % (
service.url,
APP_SERVICE_PREFIX,
kind,
urllib.parse.quote(protocol),
)
try:
args: Mapping[Any, Any] = {
**fields,
b"access_token": service.hs_token,
}
response = await self.get_json(
uri,
response = await self._send_with_fallbacks(
service,
[APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
f"/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
self.get_json,
args=args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
if not isinstance(response, list):
logger.warning(
"query_3pe to %s returned an invalid response %r", uri, response
"query_3pe to %s returned an invalid response %r",
service.url,
response,
)
return []
@ -216,12 +267,12 @@ class ApplicationServiceApi(SimpleHttpClient):
ret.append(r)
else:
logger.warning(
"query_3pe to %s returned an invalid result %r", uri, r
"query_3pe to %s returned an invalid result %r", service.url, r
)
return ret
except Exception as ex:
logger.warning("query_3pe to %s threw exception %s", uri, ex)
logger.warning("query_3pe to %s threw exception %s", service.url, ex)
return []
async def get_3pe_protocol(
@ -233,21 +284,20 @@ class ApplicationServiceApi(SimpleHttpClient):
async def _get() -> Optional[JsonDict]:
# This is required by the configuration.
assert service.hs_token is not None
uri = "%s%s/thirdparty/protocol/%s" % (
service.url,
APP_SERVICE_PREFIX,
urllib.parse.quote(protocol),
)
try:
info = await self.get_json(
uri,
info = await self._send_with_fallbacks(
service,
[APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
f"/thirdparty/protocol/{urllib.parse.quote(protocol)}",
self.get_json,
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
if not _is_valid_3pe_metadata(info):
logger.warning(
"query_3pe_protocol to %s did not return a valid result", uri
"query_3pe_protocol to %s did not return a valid result",
service.url,
)
return None
@ -260,7 +310,9 @@ class ApplicationServiceApi(SimpleHttpClient):
return info
except Exception as ex:
logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
logger.warning(
"query_3pe_protocol to %s threw exception %s", service.url, ex
)
return None
key = (service.id, protocol)
@ -274,7 +326,7 @@ class ApplicationServiceApi(SimpleHttpClient):
assert service.hs_token is not None
await self.post_json_get_json(
uri=service.url + "/_matrix/app/unstable/fi.mau.msc2659/ping",
uri=f"{service.url}{APP_SERVICE_UNSTABLE_PREFIX}/fi.mau.msc2659/ping",
post_json={"transaction_id": txn_id},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@ -318,8 +370,6 @@ class ApplicationServiceApi(SimpleHttpClient):
)
txn_id = 0
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
# Never send ephemeral events to appservices that do not support it
body: JsonDict = {"events": serialized_events}
if service.supports_ephemeral:
@ -351,8 +401,11 @@ class ApplicationServiceApi(SimpleHttpClient):
}
try:
await self.put_json(
uri=uri,
await self._send_with_fallbacks(
service,
[APP_SERVICE_PREFIX, ""],
f"/transactions/{urllib.parse.quote(str(txn_id))}",
self.put_json,
json_body=body,
args={"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
@ -360,7 +413,7 @@ class ApplicationServiceApi(SimpleHttpClient):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"push_bulk to %s succeeded! events=%s",
uri,
service.url,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
@ -371,7 +424,7 @@ class ApplicationServiceApi(SimpleHttpClient):
except CodeMessageException as e:
logger.warning(
"push_bulk to %s received code=%s msg=%s",
uri,
service.url,
e.code,
e.msg,
exc_info=logger.isEnabledFor(logging.DEBUG),
@ -379,7 +432,7 @@ class ApplicationServiceApi(SimpleHttpClient):
except Exception as ex:
logger.warning(
"push_bulk to %s threw exception(%s) %s args=%s",
uri,
service.url,
type(ex).__name__,
ex,
ex.args,