From 7e6fa29cb5ba1abd8b4f3873b0ef171c7c8aba26 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 11:22:42 +0000 Subject: [PATCH 1/2] Remove preserve_context_over_{fn, deferred} Both of these functions ae known to leak logcontexts. Replace the remaining calls to them and kill them off. --- docs/log_contexts.rst | 4 ---- synapse/federation/federation_client.py | 4 ++-- synapse/handlers/appservice.py | 4 ++-- synapse/handlers/initial_sync.py | 4 ++-- synapse/push/pusherpool.py | 6 ++--- synapse/storage/stream.py | 4 ++-- synapse/util/async.py | 6 ++--- synapse/util/distributor.py | 24 +++++++------------ synapse/util/logcontext.py | 31 ------------------------- synapse/visibility.py | 4 ++-- 10 files changed, 24 insertions(+), 67 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index eb1784e70..b19b7fa1e 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -298,10 +298,6 @@ It can be used like this: # this will now be logged against the request context logger.debug("Request handling complete") -XXX: I think ``preserve_context_over_fn`` is supposed to do the first option, -but the fact that it does ``preserve_context_over_deferred`` on its results -means that its use is fraught with difficulty. - Passing synapse deferreds into third-party functions ---------------------------------------------------- diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7c5e5d957..b8f02f539 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,7 @@ from synapse.api.errors import ( from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.events import FrozenEvent, builder import synapse.metrics @@ -420,7 +420,7 @@ class FederationClient(FederationBase): for e_id in batch ] - res = yield preserve_context_over_deferred( + res = yield make_deferred_yieldable( defer.DeferredList(deferreds, consumeErrors=True) ) for success, result in res: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 543bf28ae..feca3e4c1 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import logging @@ -159,7 +159,7 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield preserve_context_over_deferred(defer.DeferredList([ + results = yield make_deferred_yieldable(defer.DeferredList([ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 9718d4abc..c5267b4b8 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield preserve_context_over_deferred( + (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ preserve_fn(self.store.get_recent_events_for_room)( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 7c069b662..34cb108dc 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .pusher import PusherFactory -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import run_on_reactor import logging @@ -136,7 +136,7 @@ class PusherPool: ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) + yield make_deferred_yieldable(defer.gatherResults(deferreds)) except Exception: logger.exception("Exception in pusher on_new_notifications") @@ -161,7 +161,7 @@ class PusherPool: preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) + yield make_deferred_yieldable(defer.gatherResults(deferreds)) except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index dddd5fc0e..52bdce5be 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield preserve_context_over_deferred(defer.gatherResults([ + res = yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) diff --git a/synapse/util/async.py b/synapse/util/async.py index e786fb38a..0729bb286 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, + PreserveLoggingContext, make_deferred_yieldable, preserve_fn ) from synapse.util import logcontext, unwrapFirstError @@ -351,7 +351,7 @@ class ReadWriteLock(object): # We wait for the latest writer to finish writing. We can safely ignore # any existing readers... as they're readers. - yield curr_writer + yield make_deferred_yieldable(curr_writer) @contextmanager def _ctx_manager(): @@ -380,7 +380,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) + yield make_deferred_yieldable(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index e68f94ce7..734331caa 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,32 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_fn -) - -from synapse.util import unwrapFirstError - import logging +from twisted.internet import defer + +from synapse.util import unwrapFirstError +from synapse.util.logcontext import PreserveLoggingContext logger = logging.getLogger(__name__) def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) + with PreserveLoggingContext(): + distributor.fire("user_left_room", user=user, room_id=room_id) def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) + with PreserveLoggingContext(): + distributor.fire("user_joined_room", user=user, room_id=room_id) class Distributor(object): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 9683cc726..92b9413a3 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -291,37 +291,6 @@ class _PreservingContextDeferred(defer.Deferred): return g -def preserve_context_over_fn(fn, *args, **kwargs): - """Takes a function and invokes it with the given arguments, but removes - and restores the current logging context while doing so. - - If the result is a deferred, call preserve_context_over_deferred before - returning it. - """ - with PreserveLoggingContext(): - res = fn(*args, **kwargs) - - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred(res) - else: - return res - - -def preserve_context_over_deferred(deferred, context=None): - """Given a deferred wrap it such that any callbacks added later to it will - be invoked with the current context. - - Deprecated: this almost certainly doesn't do want you want, ie make - the deferred follow the synapse logcontext rules: try - ``make_deferred_yieldable`` instead. - """ - if context is None: - context = LoggingContext.current_context() - d = _PreservingContextDeferred(context) - deferred.chainDeferred(d) - return d - - def preserve_fn(f): """Wraps a function, to ensure that the current context is restored after return from the function, and that the sentinel context is set once the diff --git a/synapse/visibility.py b/synapse/visibility.py index d7dbdc77f..aaca2c584 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import logging @@ -58,7 +58,7 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state, always_include_ids (set(event_id)): set of event ids to specifically include (unless sender is ignored) """ - forgotten = yield preserve_context_over_deferred(defer.gatherResults([ + forgotten = yield make_deferred_yieldable(defer.gatherResults([ defer.maybeDeferred( preserve_fn(store.who_forgot_in_room), room_id, From b2cd6accf5e584cbaf7f3c6b44addb06a883a222 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 23:00:10 +0000 Subject: [PATCH 2/2] Remove __PreservingContextDeferred too --- synapse/util/logcontext.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 92b9413a3..48c9f6802 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -261,36 +261,6 @@ class PreserveLoggingContext(object): ) -class _PreservingContextDeferred(defer.Deferred): - """A deferred that ensures that all callbacks and errbacks are called with - the given logging context. - """ - def __init__(self, context): - self._log_context = context - defer.Deferred.__init__(self) - - def addCallbacks(self, callback, errback=None, - callbackArgs=None, callbackKeywords=None, - errbackArgs=None, errbackKeywords=None): - callback = self._wrap_callback(callback) - errback = self._wrap_callback(errback) - return defer.Deferred.addCallbacks( - self, callback, - errback=errback, - callbackArgs=callbackArgs, - callbackKeywords=callbackKeywords, - errbackArgs=errbackArgs, - errbackKeywords=errbackKeywords, - ) - - def _wrap_callback(self, f): - def g(res, *args, **kwargs): - with PreserveLoggingContext(self._log_context): - res = f(res, *args, **kwargs) - return res - return g - - def preserve_fn(f): """Wraps a function, to ensure that the current context is restored after return from the function, and that the sentinel context is set once the