Add a few missing yields, Move deferred lists inside PreserveLoggingContext because they don't interact well with the logging contexts

This commit is contained in:
Mark Haines 2014-11-20 16:24:00 +00:00
parent 217950b9ad
commit 32090aee16
15 changed files with 105 additions and 83 deletions

View File

@ -135,7 +135,7 @@ class Keyring(object):
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()
self.store.store_server_certificate( yield self.store.store_server_certificate(
server_name, server_name,
server_name, server_name,
time_now_ms, time_now_ms,
@ -143,7 +143,7 @@ class Keyring(object):
) )
for key_id, key in verify_keys.items(): for key_id, key in verify_keys.items():
self.store.store_server_verify_key( yield self.store.store_server_verify_key(
server_name, server_name, time_now_ms, key server_name, server_name, time_now_ms, key
) )

View File

@ -24,6 +24,7 @@ from .units import Transaction, Edu
from .persistence import TransactionActions from .persistence import TransactionActions
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
import logging import logging
@ -319,6 +320,7 @@ class ReplicationLayer(object):
logger.debug("[%s] Transacition is new", transaction.transaction_id) logger.debug("[%s] Transacition is new", transaction.transaction_id)
with PreserveLoggingContext():
dl = [] dl = []
for pdu in pdu_list: for pdu in pdu_list:
dl.append(self._handle_new_pdu(transaction.origin, pdu)) dl.append(self._handle_new_pdu(transaction.origin, pdu))
@ -649,6 +651,7 @@ class _TransactionQueue(object):
(pdu, deferred, order) (pdu, deferred, order)
) )
with PreserveLoggingContext():
self._attempt_new_transaction(destination) self._attempt_new_transaction(destination)
deferreds.append(deferred) deferreds.append(deferred)
@ -669,6 +672,8 @@ class _TransactionQueue(object):
deferred.errback(failure) deferred.errback(failure)
else: else:
logger.exception("Failed to send edu", failure) logger.exception("Failed to send edu", failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
return deferred return deferred

View File

@ -112,7 +112,7 @@ class BaseHandler(object):
event.destinations = list(destinations) event.destinations = list(destinations)
self.notifier.on_new_room_event(event, extra_users=extra_users) yield self.notifier.on_new_room_event(event, extra_users=extra_users)
federation_handler = self.hs.get_handlers().federation_handler federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.handle_new_event(event, snapshot) yield federation_handler.handle_new_event(event, snapshot)

View File

@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler):
self.clock.cancel_call_later( self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user)) self._stop_timer_per_user.pop(auth_user))
else: else:
self.distributor.fire( yield self.distributor.fire(
"started_user_eventstream", auth_user "started_user_eventstream", auth_user
) )
self._streams_per_user[auth_user] += 1 self._streams_per_user[auth_user] += 1
@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler):
pagin_config.from_token = None pagin_config.from_token = None
rm_handler = self.hs.get_handlers().room_member_handler rm_handler = self.hs.get_handlers().room_member_handler
logger.debug("BETA")
room_ids = yield rm_handler.get_rooms_for_user(auth_user) room_ids = yield rm_handler.get_rooms_for_user(auth_user)
logger.debug("ALPHA")
with PreserveLoggingContext(): with PreserveLoggingContext():
events, tokens = yield self.notifier.get_events_for( events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout auth_user, room_ids, pagin_config, timeout
@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler):
logger.debug( logger.debug(
"_later stopped_user_eventstream %s", auth_user "_later stopped_user_eventstream %s", auth_user
) )
self.distributor.fire( yield self.distributor.fire(
"stopped_user_eventstream", auth_user "stopped_user_eventstream", auth_user
) )
del self._stop_timer_per_user[auth_user] del self._stop_timer_per_user[auth_user]

View File

@ -209,7 +209,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE: if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key) user = self.hs.parse_userid(event.state_key)
self.distributor.fire( yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id "user_joined_room", user=user, room_id=event.room_id
) )
@ -414,7 +414,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE: if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN: if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key) user = self.hs.parse_userid(event.state_key)
self.distributor.fire( yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id "user_joined_room", user=user, room_id=event.room_id
) )

View File

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.api.errors import RoomError from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler from ._base import BaseHandler
import logging import logging
@ -86,6 +87,7 @@ class MessageHandler(BaseHandler):
event, snapshot, suppress_auth=suppress_auth event, snapshot, suppress_auth=suppress_auth
) )
with PreserveLoggingContext():
self.hs.get_handlers().presence_handler.bump_presence_active_time( self.hs.get_handlers().presence_handler.bump_presence_active_time(
user user
) )

View File

@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler from ._base import BaseHandler
@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler):
return UserPresenceCache() return UserPresenceCache()
def registered_user(self, user): def registered_user(self, user):
self.store.create_presence(user.localpart) return self.store.create_presence(user.localpart)
@defer.inlineCallbacks @defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user): def is_presence_visible(self, observer_user, observed_user):
@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler):
was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]] was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
now_level = self.STATE_LEVELS[state["presence"]] now_level = self.STATE_LEVELS[state["presence"]]
yield defer.DeferredList([ yield self.store.set_presence_state(
self.store.set_presence_state(
target_user.localpart, state_to_store target_user.localpart, state_to_store
), )
self.distributor.fire( yield self.distributor.fire(
"collect_presencelike_data", target_user, state "collect_presencelike_data", target_user, state
), )
])
if now_level > was_level: if now_level > was_level:
state["last_active"] = self.clock.time_msec() state["last_active"] = self.clock.time_msec()
@ -256,6 +255,7 @@ class PresenceHandler(BaseHandler):
now_online = state["presence"] != PresenceState.OFFLINE now_online = state["presence"] != PresenceState.OFFLINE
was_polling = target_user in self._user_cachemap was_polling = target_user in self._user_cachemap
with PreserveLoggingContext():
if now_online and not was_polling: if now_online and not was_polling:
self.start_polling_presence(target_user, state=state) self.start_polling_presence(target_user, state=state)
elif not now_online and was_polling: elif not now_online and was_polling:
@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler):
self._user_cachemap_latest_serial += 1 self._user_cachemap_latest_serial += 1
statuscache.update(state, serial=self._user_cachemap_latest_serial) statuscache.update(state, serial=self._user_cachemap_latest_serial)
self.push_presence(user, statuscache=statuscache) return self.push_presence(user, statuscache=statuscache)
@log_function @log_function
def started_user_eventstream(self, user): def started_user_eventstream(self, user):
@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_list_accepted( yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string() observer_user.localpart, observed_user.to_string()
) )
with PreserveLoggingContext():
self.start_polling_presence(observer_user, target_user=observed_user) self.start_polling_presence(
observer_user, target_user=observed_user
)
@defer.inlineCallbacks @defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user): def deny_presence(self, observed_user, observer_user):
@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string() observer_user.localpart, observed_user.to_string()
) )
self.stop_polling_presence(observer_user, target_user=observed_user) with PreserveLoggingContext():
self.stop_polling_presence(
observer_user, target_user=observed_user
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None): def get_presence_list(self, observer_user, accepted=None):
@ -710,6 +715,7 @@ class PresenceHandler(BaseHandler):
if not self._remote_sendmap[user]: if not self._remote_sendmap[user]:
del self._remote_sendmap[user] del self._remote_sendmap[user]
with PreserveLoggingContext():
yield defer.DeferredList(deferreds) yield defer.DeferredList(deferreds)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler from ._base import BaseHandler
@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler):
) )
def registered_user(self, user): def registered_user(self, user):
self.store.create_profile(user.localpart) return self.store.create_profile(user.localpart)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_displayname(self, target_user): def get_displayname(self, target_user):
@ -152,6 +153,7 @@ class ProfileHandler(BaseHandler):
if not user.is_mine: if not user.is_mine:
defer.returnValue(None) defer.returnValue(None)
with PreserveLoggingContext():
(displayname, avatar_url) = yield defer.gatherResults( (displayname, avatar_url) = yield defer.gatherResults(
[ [
self.store.get_profile_displayname(user.localpart), self.store.get_profile_displayname(user.localpart),

View File

@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash password_hash=password_hash
) )
self.distributor.fire("registered_user", user) yield self.distributor.fire("registered_user", user)
else: else:
# autogen a random user ID # autogen a random user ID
attempts = 0 attempts = 0

View File

@ -178,7 +178,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias: if room_alias:
result["room_alias"] = room_alias.to_string() result["room_alias"] = room_alias.to_string()
directory_handler.send_room_alias_update_event(user_id, room_id) yield directory_handler.send_room_alias_update_event(user_id, room_id)
defer.returnValue(result) defer.returnValue(result)
@ -480,7 +480,7 @@ class RoomMemberHandler(BaseHandler):
) )
user = self.hs.parse_userid(event.user_id) user = self.hs.parse_userid(event.user_id)
self.distributor.fire( yield self.distributor.fire(
"user_joined_room", user=user, room_id=room_id "user_joined_room", user=user, room_id=room_id
) )

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
import logging import logging
@ -96,6 +97,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the listening to the room, and any listeners for the users in the
`extra_users` param. `extra_users` param.
""" """
yield run_on_reactor()
room_id = event.room_id room_id = event.room_id
room_source = self.event_sources.sources["room"] room_source = self.event_sources.sources["room"]
@ -143,6 +145,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms. Will wake up all listeners for the given users and rooms.
""" """
yield run_on_reactor()
presence_source = self.event_sources.sources["presence"] presence_source = self.event_sources.sources["presence"]
listeners = set() listeners = set()

View File

@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet):
logger.exception("JSON parse error") logger.exception("JSON parse error")
raise SynapseError(400, "Unable to parse content") raise SynapseError(400, "Unable to parse content")
deferreds = []
if "invite" in content: if "invite" in content:
for u in content["invite"]: for u in content["invite"]:
if not isinstance(u, basestring): if not isinstance(u, basestring):
@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0: if len(u) == 0:
continue continue
invited_user = self.hs.parse_userid(u) invited_user = self.hs.parse_userid(u)
deferreds.append(self.handlers.presence_handler.send_invite( yield self.handlers.presence_handler.send_invite(
observer_user=user, observed_user=invited_user)) observer_user=user, observed_user=invited_user
)
if "drop" in content: if "drop" in content:
for u in content["drop"]: for u in content["drop"]:
@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0: if len(u) == 0:
continue continue
dropped_user = self.hs.parse_userid(u) dropped_user = self.hs.parse_userid(u)
deferreds.append(self.handlers.presence_handler.drop( yield self.handlers.presence_handler.drop(
observer_user=user, observed_user=dropped_user)) observer_user=user, observed_user=dropped_user
)
yield defer.DeferredList(deferreds)
defer.returnValue((200, {})) defer.returnValue((200, {}))

View File

@ -115,7 +115,6 @@ class SQLBaseStore(object):
"[TXN END] {%s} %f", "[TXN END] {%s} %f",
name, end - start name, end - start
) )
with PreserveLoggingContext(): with PreserveLoggingContext():
result = yield self._db_pool.runInteraction( result = yield self._db_pool.runInteraction(
inner_func, *args, **kwargs inner_func, *args, **kwargs

View File

@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore):
return self._get_members_query(clause, vals) return self._get_members_query(clause, vals)
def _get_members_query(self, where_clause, where_values): def _get_members_query(self, where_clause, where_values):
return self._db_pool.runInteraction( return self.runInteraction(
self._get_members_query_txn, "get_members_query", self._get_members_query_txn,
where_clause, where_values where_clause, where_values
) )

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.logcontext import PreserveLoggingContext
from twisted.internet import defer from twisted.internet import defer
import logging import logging
@ -91,6 +93,7 @@ class Signal(object):
Each observer callable may return a Deferred.""" Each observer callable may return a Deferred."""
self.observers.append(observer) self.observers.append(observer)
@defer.inlineCallbacks
def fire(self, *args, **kwargs): def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and """Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is kwargs. Exceptions thrown by observers are logged but ignored. It is
@ -98,6 +101,7 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have Returns a Deferred that will complete when all the observers have
completed.""" completed."""
with PreserveLoggingContext():
deferreds = [] deferreds = []
for observer in self.observers: for observer in self.observers:
d = defer.maybeDeferred(observer, *args, **kwargs) d = defer.maybeDeferred(observer, *args, **kwargs)
@ -114,6 +118,7 @@ class Signal(object):
raise failure raise failure
deferreds.append(d.addErrback(eb)) deferreds.append(d.addErrback(eb))
return defer.DeferredList( result = yield defer.DeferredList(
deferreds, fireOnOneErrback=not self.suppress_failures deferreds, fireOnOneErrback=not self.suppress_failures
) )
defer.returnValue(result)