diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 6450a1289..68a9de17b 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -150,12 +150,12 @@ class _TransactionController(object): if service_is_up: sent = yield txn.send(self.as_api) if sent: - txn.complete(self.store) + yield txn.complete(self.store) else: - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) except Exception as e: logger.exception(e) - self._start_recoverer(service) + preserve_fn(self._start_recoverer)(service) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1735ca934..d7211ee9b 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -308,15 +308,15 @@ class Keyring(object): @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_server_verify_keys( + preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids ).addCallback(lambda ks, server: (server, ks), server_name) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(dict(res)) @@ -337,13 +337,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(p_name, p_keys) + preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) union_of_keys = {} for result in results: @@ -383,13 +383,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - get_key(server_name, key_ids) + preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) merged = {} for result in results: @@ -466,9 +466,9 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ - self.store_keys( + preserve_fn(self.store_keys)( server_name=server_name, from_server=perspective_name, verify_keys=response_keys, @@ -476,7 +476,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -524,7 +524,7 @@ class Keyring(object): keys.update(response_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -534,7 +534,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(keys) @@ -600,7 +600,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -613,7 +613,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) results[server_name] = response_keys @@ -702,7 +702,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield defer.gatherResults( + yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -710,4 +710,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index da2f5e8cf..2339cc903 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError from synapse.util import unwrapFirstError +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -102,10 +103,10 @@ class FederationBase(object): warn, pdu ) - valid_pdus = yield defer.gatherResults( + valid_pdus = yield preserve_context_over_deferred(defer.gatherResults( deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if include_none: defer.returnValue(valid_pdus) @@ -129,7 +130,7 @@ class FederationBase(object): for pdu in pdus ] - deferreds = self.keyring.verify_json_objects_for_server([ + deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([ (p.origin, p.get_pdu_json()) for p in redacted_pdus ]) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ba315171..f2b3aceb4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent import synapse.metrics @@ -225,10 +226,10 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield defer.gatherResults( + pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) defer.returnValue(pdus) @@ -457,14 +458,16 @@ class FederationClient(FederationBase): batch = set(missing_events[i:i + batch_size]) deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id in batch ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for success, result in res: if success: signed_events.append(result) @@ -853,14 +856,16 @@ class FederationClient(FederationBase): return srvs deferreds = [ - self.get_pdu( + preserve_fn(self.get_pdu)( destinations=random_server_list(), event_id=e_id, ) for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - res = yield defer.DeferredList(deferreds, consumeErrors=True) + res = yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) for (result, val), (e_id, _) in zip(res, ordered_missing): if result and val: signed_events.append(val) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index dd285452c..306686a38 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -163,10 +163,10 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield defer.DeferredList([ - self.appservice_api.query_3pe(service, kind, protocol, fields) + results = yield preserve_context_over_deferred(defer.DeferredList([ + preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services - ], consumeErrors=True) + ], consumeErrors=True)) ret = [] for (success, result) in results: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 328f8f484..01a761715 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -26,7 +26,9 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -361,9 +363,9 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield defer.gatherResults( + results = yield preserve_context_over_deferred(defer.gatherResults( [ - self.replication_layer.get_pdu( + preserve_fn(self.replication_layer.get_pdu)( [dest], event_id, outlier=True, @@ -372,10 +374,10 @@ class FederationHandler(BaseHandler): for event_id in missing_auth - failed_to_fetch ], consumeErrors=True - ).addErrback(unwrapFirstError) - auth_events.update({a.event_id: a for a in results}) + )).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results if a}) required_auth.update( - a_id for event in results for a_id, _ in event.auth_events + a_id for event in results for a_id, _ in event.auth_events if event ) missing_auth = required_auth - set(auth_events) @@ -552,10 +554,10 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups(room_id, [e]) + states = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids - ]) + ])) states = dict(zip(event_ids, [s[1] for s in states])) for e_id, _ in sorted_extremeties_tuple: @@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield defer.gatherResults( + contexts = yield preserve_context_over_deferred(defer.gatherResults( [ - self._prep_event( + preserve_fn(self._prep_event)( origin, ev_info["event"], state=ev_info.get("state"), @@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler): ) for ev_info in event_infos ] - ) + )) yield self.store.persist_events( [ @@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield defer.gatherResults( + different_events = yield preserve_context_over_deferred(defer.gatherResults( [ - self.store.get_event( + preserve_fn(self.store.get_event)( d, allow_none=True, allow_rejected=False, @@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ).addErrback(unwrapFirstError) + )).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index dc76d34a5..4c3cd9d12 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -28,7 +28,8 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -502,15 +503,17 @@ class MessageHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield defer.gatherResults( - [ - self.store.get_recent_events_for_room( - event.room_id, - limit=limit, - end_token=room_end_token, - ), - deferred_room_state, - ] + (messages, token), current_state = yield preserve_context_over_deferred( + defer.gatherResults( + [ + preserve_fn(self.store.get_recent_events_for_room)( + event.room_id, + limit=limit, + end_token=room_end_token, + ), + deferred_room_state, + ] + ) ).addErrback(unwrapFirstError) messages = yield filter_events_for_client( @@ -719,9 +722,9 @@ class MessageHandler(BaseHandler): presence, receipts, (messages, token) = yield defer.gatherResults( [ - get_presence(), - get_receipts(), - self.store.get_recent_events_for_room( + preserve_fn(get_presence)(), + preserve_fn(get_receipts)(), + preserve_fn(self.store.get_recent_events_for_room)( room_id, limit=limit, end_token=now_token.room_key, @@ -755,6 +758,7 @@ class MessageHandler(BaseHandler): defer.returnValue(ret) + @measure_func("_create_new_client_event") @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): if prev_event_ids: @@ -806,6 +810,7 @@ class MessageHandler(BaseHandler): (event, context,) ) + @measure_func("handle_new_client_event") @defer.inlineCallbacks def handle_new_client_event( self, @@ -934,7 +939,7 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - self.notifier.on_new_room_event( + yield self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) @@ -944,6 +949,6 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( + preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 4709112a0..8b17632fd 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -59,10 +59,13 @@ class RoomMemberHandler(BaseHandler): prev_event_ids, txn_id=None, ratelimit=True, + content=None, ): + if content is None: + content = {} msg_handler = self.hs.get_handlers().message_handler - content = {"membership": membership} + content["membership"] = membership if requester.is_guest: content["kind"] = "guest" @@ -140,6 +143,7 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=None, third_party_signed=None, ratelimit=True, + content=None, ): key = (room_id,) @@ -153,6 +157,7 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=remote_room_hosts, third_party_signed=third_party_signed, ratelimit=ratelimit, + content=content, ) defer.returnValue(result) @@ -168,7 +173,11 @@ class RoomMemberHandler(BaseHandler): remote_room_hosts=None, third_party_signed=None, ratelimit=True, + content=None, ): + if content is None: + content = {} + effective_membership_state = action if action in ["kick", "unban"]: effective_membership_state = "leave" @@ -218,7 +227,7 @@ class RoomMemberHandler(BaseHandler): if inviter and not self.hs.is_mine(inviter): remote_room_hosts.append(inviter.domain) - content = {"membership": Membership.JOIN} + content["membership"] = Membership.JOIN profile = self.hs.get_handlers().profile_handler content["displayname"] = yield profile.get_displayname(target) @@ -272,6 +281,7 @@ class RoomMemberHandler(BaseHandler): txn_id=txn_id, ratelimit=ratelimit, prev_event_ids=latest_event_ids, + content=content, ) @defer.inlineCallbacks diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 5589296c0..46181984c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, +) from synapse.util.metrics import Measure from synapse.types import UserID @@ -169,13 +171,13 @@ class TypingHandler(object): deferreds = [] for domain in domains: if domain == self.server_name: - self._push_update_local( + preserve_fn(self._push_update_local)( room_id=room_id, user_id=user_id, typing=typing ) else: - deferreds.append(self.federation.send_edu( + deferreds.append(preserve_fn(self.federation.send_edu)( destination=domain, edu_type="m.typing", content={ @@ -185,7 +187,9 @@ class TypingHandler(object): }, )) - yield defer.DeferredList(deferreds, consumeErrors=True) + yield preserve_context_over_deferred( + defer.DeferredList(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks def _recv_edu(self, origin, content): diff --git a/synapse/notifier.py b/synapse/notifier.py index c48024096..b86648f5e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,7 +19,7 @@ from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from synapse.util.metrics import Measure from synapse.types import StreamToken from synapse.visibility import filter_events_for_client @@ -174,6 +174,7 @@ class Notifier(object): lambda: len(self.user_to_user_stream), ) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -195,6 +196,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -212,6 +214,7 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) + @preserve_fn def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. @@ -226,6 +229,7 @@ class Notifier(object): rooms=[event.room_id], ) + @preserve_fn def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. @@ -252,6 +256,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index d555a33e9..becb8ef1a 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,14 +17,15 @@ from twisted.internet import defer from synapse.util.presentable_names import ( calculate_room_name, name_from_member_event ) +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @defer.inlineCallbacks def get_badge_count(store, user_id): - invites, joins = yield defer.gatherResults([ - store.get_invited_rooms_for_user(user_id), - store.get_rooms_for_user(user_id), - ], consumeErrors=True) + invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(store.get_invited_rooms_for_user)(user_id), + preserve_fn(store.get_rooms_for_user)(user_id), + ], consumeErrors=True)) my_receipts_by_room = yield store.get_receipts_for_user( user_id, "m.read", diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 54c0f1b84..3837be523 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer import pusher -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.async import run_on_reactor import logging @@ -130,10 +130,12 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_notifications(min_stream_id, max_stream_id) + preserve_fn(p.on_new_notifications)( + min_stream_id, max_stream_id + ) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_notifications") @@ -155,10 +157,10 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - p.on_new_receipts(min_stream_id, max_stream_id) + preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield defer.gatherResults(deferreds) + yield preserve_context_over_deferred(defer.gatherResults(deferreds)) except: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 89c389511..0d8175701 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -268,6 +268,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet): action="join", txn_id=txn_id, remote_room_hosts=remote_room_hosts, + content=content, third_party_signed=content.get("third_party_signed", None), ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 943f5676a..2121bd75e 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet): # register the user's device device_id = params.get("device_id") initial_display_name = params.get("initial_device_display_name") - device_id = self.device_handler.check_device_registered( + return self.device_handler.check_device_registered( user_id, device_id, initial_display_name ) - return device_id @defer.inlineCallbacks def _do_guest_registration(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 97aef2532..57e500528 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import preserve_fn, PreserveLoggingContext +from synapse.util.logcontext import ( + preserve_fn, PreserveLoggingContext, preserve_context_over_deferred +) from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.api.constants import EventTypes @@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore): deferreds = [] for room_id, evs_ctxs in partitioned.items(): - d = self._event_persist_queue.add_to_queue( + d = preserve_fn(self._event_persist_queue.add_to_queue)( room_id, evs_ctxs, backfilled=backfilled, current_state=None, @@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore): for room_id in partitioned.keys(): self._maybe_start_persisting(room_id) - return defer.gatherResults(deferreds, consumeErrors=True) + return preserve_context_over_deferred( + defer.gatherResults(deferreds, consumeErrors=True) + ) @defer.inlineCallbacks @log_function @@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore): self._maybe_start_persisting(event.room_id) - yield deferred + yield preserve_context_over_deferred(deferred) max_persisted_id = yield self._stream_id_gen.get_current_token() defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) @@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore): if not allow_rejected: rows[:] = [r for r in rows if not r["rejects"]] - res = yield defer.gatherResults( + res = yield preserve_context_over_deferred(defer.gatherResults( [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], @@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore): for row in rows ], consumeErrors=True - ) + )) defer.returnValue({ e.event.event_id: e diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py new file mode 100644 index 000000000..033144341 --- /dev/null +++ b/synapse/storage/schema/delta/34/received_txn_purge.py @@ -0,0 +1,32 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.storage.engines import PostgresEngine + +import logging + +logger = logging.getLogger(__name__) + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + cur.execute("TRUNCATE received_transactions") + else: + cur.execute("DELETE FROM received_transactions") + + cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") + + +def run_upgrade(cur, database_engine, *args, **kwargs): + pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 862c5c3ea..0577a0525 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield defer.gatherResults([ + res = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) for room_id in rm_ids - ]) + ])) results.update(dict(zip(rm_ids, res))) defer.returnValue(results) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6258ff172..58d4de4f1 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore): self.last_transaction = {} reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) - hs.get_clock().looping_call( - self._persist_in_mem_txns, - 1000, - ) + self._clock.looping_call(self._persist_in_mem_txns, 1000) + + self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore): "origin": origin, "response_code": code, "response_json": buffer(encode_canonical_json(response_dict)), + "ts": self._clock.time_msec(), }, or_ignore=True, desc="set_received_txn_response", @@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore): yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") + + def _cleanup_transactions(self): + now = self._clock.time_msec() + month_ago = now - 30 * 24 * 60 * 60 * 1000 + + def _cleanup_transactions_txn(txn): + txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,)) + + return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn) diff --git a/synapse/util/async.py b/synapse/util/async.py index c84b23ff4..347fb1e38 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return defer.gatherResults([ + return preserve_context_over_deferred(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) - ], consumeErrors=True).addErrback(unwrapFirstError) + ], consumeErrors=True)).addErrback(unwrapFirstError) class Linearizer(object): @@ -181,7 +181,8 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - yield preserve_context_over_deferred(current_defer) + with PreserveLoggingContext(): + yield current_defer @contextmanager def _ctx_manager(): @@ -264,7 +265,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield defer.gatherResults(to_wait_on) + yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7a87045f8..6c83eb213 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs): return res -def preserve_context_over_deferred(deferred): +def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - current_context = LoggingContext.current_context() - d = _PreservingContextDeferred(current_context) + if context is None: + context = LoggingContext.current_context() + d = _PreservingContextDeferred(context) deferred.chainDeferred(d) return d @@ -316,7 +317,13 @@ def preserve_fn(f): def g(*args, **kwargs): with PreserveLoggingContext(current): - return f(*args, **kwargs) + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred( + res, context=LoggingContext.sentinel + ) + else: + return res return g diff --git a/synapse/visibility.py b/synapse/visibility.py index 948ad5177..cc12c0a23 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn +from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred import logging @@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): given events events ([synapse.events.EventBase]): list of events to filter """ - forgotten = yield defer.gatherResults([ + forgotten = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(store.who_forgot_in_room)( room_id, ) for room_id in frozenset(e.room_id for e in events) - ], consumeErrors=True) + ], consumeErrors=True)) # Set of membership event_ids that have been forgotten event_id_forgotten = frozenset(