Merge branch 'develop' into markjh/direct_to_device

This commit is contained in:
Mark Haines 2016-08-25 18:34:46 +01:00
commit ab34fdecb7
22 changed files with 238 additions and 103 deletions

View file

@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@ -163,10 +163,10 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
results = yield defer.DeferredList([
self.appservice_api.query_3pe(service, kind, protocol, fields)
results = yield preserve_context_over_deferred(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
], consumeErrors=True)
], consumeErrors=True))
ret = []
for (success, result) in results:
@ -175,6 +175,16 @@ class ApplicationServicesHandler(object):
defer.returnValue(ret)
@defer.inlineCallbacks
def get_3pe_protocols(self):
services = yield self.store.get_app_services()
protocols = {}
for s in services:
for p in s.protocols:
protocols[p] = yield self.appservice_api.get_3pe_protocol(s, p)
defer.returnValue(protocols)
@defer.inlineCallbacks
def _get_services_for_event(self, event):
"""Retrieve a list of application services interested in this event.

View file

@ -26,7 +26,9 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
)
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@ -361,9 +363,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
results = yield defer.gatherResults(
results = yield preserve_context_over_deferred(defer.gatherResults(
[
self.replication_layer.get_pdu(
preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@ -372,7 +374,7 @@ class FederationHandler(BaseHandler):
for event_id in missing_auth - failed_to_fetch
],
consumeErrors=True
).addErrback(unwrapFirstError)
)).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results if a})
required_auth.update(
a_id for event in results for a_id, _ in event.auth_events if event
@ -552,10 +554,10 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
states = yield defer.gatherResults([
self.state_handler.resolve_state_groups(room_id, [e])
states = yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
for e in event_ids
])
]))
states = dict(zip(event_ids, [s[1] for s in states]))
for e_id, _ in sorted_extremeties_tuple:
@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
contexts = yield defer.gatherResults(
contexts = yield preserve_context_over_deferred(defer.gatherResults(
[
self._prep_event(
preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler):
)
for ev_info in event_infos
]
)
))
yield self.store.persist_events(
[
@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
different_events = yield defer.gatherResults(
different_events = yield preserve_context_over_deferred(defer.gatherResults(
[
self.store.get_event(
preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler):
if d in have_events and not have_events[d]
],
consumeErrors=True
).addErrback(unwrapFirstError)
)).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)

View file

@ -28,7 +28,8 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
lambda states: states[event.event_id]
)
(messages, token), current_state = yield defer.gatherResults(
[
self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
end_token=room_end_token,
),
deferred_room_state,
]
(messages, token), current_state = yield preserve_context_over_deferred(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(
event.room_id,
limit=limit,
end_token=room_end_token,
),
deferred_room_state,
]
)
).addErrback(unwrapFirstError)
messages = yield filter_events_for_client(
@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
get_presence(),
get_receipts(),
self.store.get_recent_events_for_room(
preserve_fn(get_presence)(),
preserve_fn(get_receipts)(),
preserve_fn(self.store.get_recent_events_for_room)(
room_id,
limit=limit,
end_token=now_token.room_key,
@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
@measure_func("_create_new_client_event")
@defer.inlineCallbacks
def _create_new_client_event(self, builder, prev_event_ids=None):
if prev_event_ids:
@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
(event, context,)
)
@measure_func("handle_new_client_event")
@defer.inlineCallbacks
def handle_new_client_event(
self,
@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
self.notifier.on_new_room_event(
yield self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
federation_handler.handle_new_event(
preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)

View file

@ -16,7 +16,9 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
)
from synapse.util.metrics import Measure
from synapse.types import UserID
@ -169,13 +171,13 @@ class TypingHandler(object):
deferreds = []
for domain in domains:
if domain == self.server_name:
self._push_update_local(
preserve_fn(self._push_update_local)(
room_id=room_id,
user_id=user_id,
typing=typing
)
else:
deferreds.append(self.federation.send_edu(
deferreds.append(preserve_fn(self.federation.send_edu)(
destination=domain,
edu_type="m.typing",
content={
@ -185,7 +187,9 @@ class TypingHandler(object):
},
))
yield defer.DeferredList(deferreds, consumeErrors=True)
yield preserve_context_over_deferred(
defer.DeferredList(deferreds, consumeErrors=True)
)
@defer.inlineCallbacks
def _recv_edu(self, origin, content):