From 32090aee169aafc9da4efa176b50a5fc8ede68d2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 20 Nov 2014 16:24:00 +0000 Subject: [PATCH] Add a few missing yields, Move deferred lists inside PreserveLoggingContext because they don't interact well with the logging contexts --- synapse/crypto/keyring.py | 4 +-- synapse/federation/replication.py | 31 +++++++++++--------- synapse/handlers/_base.py | 2 +- synapse/handlers/events.py | 6 ++-- synapse/handlers/federation.py | 4 +-- synapse/handlers/message.py | 8 ++++-- synapse/handlers/presence.py | 48 +++++++++++++++++-------------- synapse/handlers/profile.py | 18 ++++++------ synapse/handlers/register.py | 2 +- synapse/handlers/room.py | 4 +-- synapse/notifier.py | 3 ++ synapse/rest/presence.py | 14 ++++----- synapse/storage/_base.py | 1 - synapse/storage/roommember.py | 4 +-- synapse/util/distributor.py | 39 ++++++++++++++----------- 15 files changed, 105 insertions(+), 83 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 694aed3a7..ceb03ce6c 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -135,7 +135,7 @@ class Keyring(object): time_now_ms = self.clock.time_msec() - self.store.store_server_certificate( + yield self.store.store_server_certificate( server_name, server_name, time_now_ms, @@ -143,7 +143,7 @@ class Keyring(object): ) for key_id, key in verify_keys.items(): - self.store.store_server_verify_key( + yield self.store.store_server_verify_key( server_name, server_name, time_now_ms, key ) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 65a53ae17..996b8ea5b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -24,6 +24,7 @@ from .units import Transaction, Edu from .persistence import TransactionActions from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext import logging @@ -319,19 +320,20 @@ class ReplicationLayer(object): logger.debug("[%s] Transacition is new", transaction.transaction_id) - dl = [] - for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + with PreserveLoggingContext(): + dl = [] + for pdu in pdu_list: + dl.append(self._handle_new_pdu(transaction.origin, pdu)) - if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( - transaction.origin, - edu.edu_type, - edu.content - ) + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) - results = yield defer.DeferredList(dl) + results = yield defer.DeferredList(dl) ret = [] for r in results: @@ -649,7 +651,8 @@ class _TransactionQueue(object): (pdu, deferred, order) ) - self._attempt_new_transaction(destination) + with PreserveLoggingContext(): + self._attempt_new_transaction(destination) deferreds.append(deferred) @@ -669,7 +672,9 @@ class _TransactionQueue(object): deferred.errback(failure) else: logger.exception("Failed to send edu", failure) - self._attempt_new_transaction(destination).addErrback(eb) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) return deferred diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 30c673306..d53cd3df3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -112,7 +112,7 @@ class BaseHandler(object): event.destinations = list(destinations) - self.notifier.on_new_room_event(event, extra_users=extra_users) + yield self.notifier.on_new_room_event(event, extra_users=extra_users) federation_handler = self.hs.get_handlers().federation_handler yield federation_handler.handle_new_event(event, snapshot) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 4993c92b7..d59221a4f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler): self.clock.cancel_call_later( self._stop_timer_per_user.pop(auth_user)) else: - self.distributor.fire( + yield self.distributor.fire( "started_user_eventstream", auth_user ) self._streams_per_user[auth_user] += 1 @@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler): pagin_config.from_token = None rm_handler = self.hs.get_handlers().room_member_handler + logger.debug("BETA") room_ids = yield rm_handler.get_rooms_for_user(auth_user) + logger.debug("ALPHA") with PreserveLoggingContext(): events, tokens = yield self.notifier.get_events_for( auth_user, room_ids, pagin_config, timeout @@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler): logger.debug( "_later stopped_user_eventstream %s", auth_user ) - self.distributor.fire( + yield self.distributor.fire( "stopped_user_eventstream", auth_user ) del self._stop_timer_per_user[auth_user] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 492005a17..e8fb7eae5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -209,7 +209,7 @@ class FederationHandler(BaseHandler): if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -414,7 +414,7 @@ class FederationHandler(BaseHandler): if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de70486b2..f460657f3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import RoomError from synapse.streams.config import PaginationConfig +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler import logging @@ -86,9 +87,10 @@ class MessageHandler(BaseHandler): event, snapshot, suppress_auth=suppress_auth ) - self.hs.get_handlers().presence_handler.bump_presence_active_time( - user - ) + with PreserveLoggingContext(): + self.hs.get_handlers().presence_handler.bump_presence_active_time( + user + ) @defer.inlineCallbacks def get_messages(self, user_id=None, room_id=None, pagin_config=None, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fcc92a8e3..b55d589da 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler): return UserPresenceCache() def registered_user(self, user): - self.store.create_presence(user.localpart) + return self.store.create_presence(user.localpart) @defer.inlineCallbacks def is_presence_visible(self, observer_user, observed_user): @@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler): was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] now_level = self.STATE_LEVELS[state["presence"]] - yield defer.DeferredList([ - self.store.set_presence_state( - target_user.localpart, state_to_store - ), - self.distributor.fire( - "collect_presencelike_data", target_user, state - ), - ]) + yield self.store.set_presence_state( + target_user.localpart, state_to_store + ) + yield self.distributor.fire( + "collect_presencelike_data", target_user, state + ) if now_level > was_level: state["last_active"] = self.clock.time_msec() @@ -256,14 +255,15 @@ class PresenceHandler(BaseHandler): now_online = state["presence"] != PresenceState.OFFLINE was_polling = target_user in self._user_cachemap - if now_online and not was_polling: - self.start_polling_presence(target_user, state=state) - elif not now_online and was_polling: - self.stop_polling_presence(target_user) + with PreserveLoggingContext(): + if now_online and not was_polling: + self.start_polling_presence(target_user, state=state) + elif not now_online and was_polling: + self.stop_polling_presence(target_user) - # TODO(paul): perform a presence push as part of start/stop poll so - # we don't have to do this all the time - self.changed_presencelike_data(target_user, state) + # TODO(paul): perform a presence push as part of start/stop poll so + # we don't have to do this all the time + self.changed_presencelike_data(target_user, state) def bump_presence_active_time(self, user, now=None): if now is None: @@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler): self._user_cachemap_latest_serial += 1 statuscache.update(state, serial=self._user_cachemap_latest_serial) - self.push_presence(user, statuscache=statuscache) + return self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler): yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) - - self.start_polling_presence(observer_user, target_user=observed_user) + with PreserveLoggingContext(): + self.start_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): @@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler): observer_user.localpart, observed_user.to_string() ) - self.stop_polling_presence(observer_user, target_user=observed_user) + with PreserveLoggingContext(): + self.stop_polling_presence( + observer_user, target_user=observed_user + ) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -710,7 +715,8 @@ class PresenceHandler(BaseHandler): if not self._remote_sendmap[user]: del self._remote_sendmap[user] - yield defer.DeferredList(deferreds) + with PreserveLoggingContext(): + yield defer.DeferredList(deferreds) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 7853bf509..814b3b68f 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import Membership +from synapse.util.logcontext import PreserveLoggingContext from ._base import BaseHandler @@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler): ) def registered_user(self, user): - self.store.create_profile(user.localpart) + return self.store.create_profile(user.localpart) @defer.inlineCallbacks def get_displayname(self, target_user): @@ -152,13 +153,14 @@ class ProfileHandler(BaseHandler): if not user.is_mine: defer.returnValue(None) - (displayname, avatar_url) = yield defer.gatherResults( - [ - self.store.get_profile_displayname(user.localpart), - self.store.get_profile_avatar_url(user.localpart), - ], - consumeErrors=True - ) + with PreserveLoggingContext(): + (displayname, avatar_url) = yield defer.gatherResults( + [ + self.store.get_profile_displayname(user.localpart), + self.store.get_profile_avatar_url(user.localpart), + ], + consumeErrors=True + ) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 7df9d9b82..c59ac1a3c 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash ) - self.distributor.fire("registered_user", user) + yield self.distributor.fire("registered_user", user) else: # autogen a random user ID attempts = 0 diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 7d9458e1d..725205174 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -178,7 +178,7 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() - directory_handler.send_room_alias_update_event(user_id, room_id) + yield directory_handler.send_room_alias_update_event(user_id, room_id) defer.returnValue(result) @@ -480,7 +480,7 @@ class RoomMemberHandler(BaseHandler): ) user = self.hs.parse_userid(event.user_id) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=room_id ) diff --git a/synapse/notifier.py b/synapse/notifier.py index c310a9fed..0c8ca6ec6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import run_on_reactor import logging @@ -96,6 +97,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() room_id = event.room_id room_source = self.event_sources.sources["room"] @@ -143,6 +145,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ + yield run_on_reactor() presence_source = self.event_sources.sources["presence"] listeners = set() diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py index 138cc88a0..502ed0d4c 100644 --- a/synapse/rest/presence.py +++ b/synapse/rest/presence.py @@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet): logger.exception("JSON parse error") raise SynapseError(400, "Unable to parse content") - deferreds = [] - if "invite" in content: for u in content["invite"]: if not isinstance(u, basestring): @@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet): if len(u) == 0: continue invited_user = self.hs.parse_userid(u) - deferreds.append(self.handlers.presence_handler.send_invite( - observer_user=user, observed_user=invited_user)) + yield self.handlers.presence_handler.send_invite( + observer_user=user, observed_user=invited_user + ) if "drop" in content: for u in content["drop"]: @@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet): if len(u) == 0: continue dropped_user = self.hs.parse_userid(u) - deferreds.append(self.handlers.presence_handler.drop( - observer_user=user, observed_user=dropped_user)) - - yield defer.DeferredList(deferreds) + yield self.handlers.presence_handler.drop( + observer_user=user, observed_user=dropped_user + ) defer.returnValue((200, {})) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 5d4be09a8..2c04a1c5b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -115,7 +115,6 @@ class SQLBaseStore(object): "[TXN END] {%s} %f", name, end - start ) - with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( inner_func, *args, **kwargs diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 93329703a..c37df59d4 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore): return self._get_members_query(clause, vals) def _get_members_query(self, where_clause, where_values): - return self._db_pool.runInteraction( - self._get_members_query_txn, + return self.runInteraction( + "get_members_query", self._get_members_query_txn, where_clause, where_values ) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index eddbe5837..701ccdb78 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.util.logcontext import PreserveLoggingContext + from twisted.internet import defer import logging @@ -91,6 +93,7 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) + @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -98,22 +101,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) + with PreserveLoggingContext(): + deferreds = [] + for observer in self.observers: + d = defer.maybeDeferred(observer, *args, **kwargs) - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - raise failure - deferreds.append(d.addErrback(eb)) + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + raise failure + deferreds.append(d.addErrback(eb)) - return defer.DeferredList( - deferreds, fireOnOneErrback=not self.suppress_failures - ) + result = yield defer.DeferredList( + deferreds, fireOnOneErrback=not self.suppress_failures + ) + defer.returnValue(result)