From c36228c40340f521ad52591ac3eab14946db4be2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 08:20:42 -0400 Subject: [PATCH] Convert run_as_background_process inner function to async. (#8032) --- changelog.d/8032.misc | 1 + synapse/handlers/appservice.py | 2 +- synapse/http/site.py | 5 ++- synapse/metrics/background_process_metrics.py | 34 +++++++------------ 4 files changed, 16 insertions(+), 26 deletions(-) create mode 100644 changelog.d/8032.misc diff --git a/changelog.d/8032.misc b/changelog.d/8032.misc new file mode 100644 index 000000000..dfe4c0317 --- /dev/null +++ b/changelog.d/8032.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fbc56c351..c9044a501 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -101,7 +101,7 @@ class ApplicationServicesHandler(object): async def start_scheduler(): try: - return self.scheduler.start() + return await self.scheduler.start() except Exception: logger.error("Application Services Failure") diff --git a/synapse/http/site.py b/synapse/http/site.py index 6f3b2258c..f506152fe 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -146,10 +146,9 @@ class SynapseRequest(Request): Returns a context manager; the correct way to use this is: - @defer.inlineCallbacks - def handle_request(request): + async def handle_request(request): with request.processing("FooServlet"): - yield really_handle_the_request() + await really_handle_the_request() Once the context manager is closed, the completion of the request will be logged, and the various metrics will be updated. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index a9269196b..f766d16db 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,16 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging import threading -from asyncio import iscoroutine from functools import wraps from typing import TYPE_CHECKING, Dict, Optional, Set from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from twisted.python.failure import Failure from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -167,7 +166,7 @@ class _BackgroundProcess(object): ) -def run_as_background_process(desc, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs): normal synapse inlineCallbacks function). Args: - desc (str): a description for this background process type + desc: a description for this background process type func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs): follow the synapse logcontext rules. """ - @defer.inlineCallbacks - def run(): + async def run(): with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs): try: result = func(*args, **kwargs) - # We probably don't have an ensureDeferred in our call stack to handle - # coroutine results, so we need to ensureDeferred here. - # - # But we need this check because ensureDeferred doesn't like being - # called on immediate values (as opposed to Deferreds or coroutines). - if iscoroutine(result): - result = defer.ensureDeferred(result) + if inspect.isawaitable(result): + result = await result - return (yield result) + return result except Exception: - # failure.Failure() fishes the original Failure out of our stack, and - # thus gives us a sensible stack trace. - f = Failure() - logger.error( - "Background process '%s' threw an exception", - desc, - exc_info=(f.type, f.value, f.getTracebackObject()), + logger.exception( + "Background process '%s' threw an exception", desc, ) finally: _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): - return run() + # Note that we return a Deferred here so that it can be used in a + # looping_call and other places that expect a Deferred. + return defer.ensureDeferred(run()) def wrap_as_background_process(desc):