Fix up deferred handling in federation.py

* Avoid preserve_context_over_deferred, which is broken

* set consumeErrors=True on defer.gatherResults, to avoid spurious "unhandled
  failure" erros
This commit is contained in:
Richard van der Hoff 2017-10-06 22:14:24 +01:00
parent 3ddda939d3
commit c8f568ddf9

View File

@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
"""Contains handlers for federation events.""" """Contains handlers for federation events."""
import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
@ -26,10 +25,7 @@ from synapse.api.errors import (
) )
from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
preserve_fn, preserve_context_over_deferred
)
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor, Linearizer from synapse.util.async import run_on_reactor, Linearizer
@ -592,9 +588,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch missing_auth - failed_to_fetch
) )
results = yield preserve_context_over_deferred(defer.gatherResults( results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self.replication_layer.get_pdu)( logcontext.preserve_fn(self.replication_layer.get_pdu)(
[dest], [dest],
event_id, event_id,
outlier=True, outlier=True,
@ -786,10 +782,14 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys()) event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill") logger.debug("calling resolve_state_groups in _maybe_backfill")
states = yield preserve_context_over_deferred(defer.gatherResults([ states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) [
for e in event_ids logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
])) room_id, [e]
)
for e in event_ids
], consumeErrors=True,
))
states = dict(zip(event_ids, [s.state for s in states])) states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events( state_map = yield self.store.get_events(
@ -942,9 +942,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually # lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it. # have. Hence we fire off the deferred, but don't wait for it.
synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
room_queue
)
defer.returnValue(True) defer.returnValue(True)
@ -1438,7 +1436,7 @@ class FederationHandler(BaseHandler):
if not backfilled: if not backfilled:
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
# and don't need to wait for it. # and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)( logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id event_stream_id, max_stream_id
) )
@ -1451,16 +1449,16 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations. on each other for state calculations.
""" """
contexts = yield preserve_context_over_deferred(defer.gatherResults( contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[ [
preserve_fn(self._prep_event)( logcontext.preserve_fn(self._prep_event)(
origin, origin,
ev_info["event"], ev_info["event"],
state=ev_info.get("state"), state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"), auth_events=ev_info.get("auth_events"),
) )
for ev_info in event_infos for ev_info in event_infos
] ], consumeErrors=True,
)) ))
yield self.store.persist_events( yield self.store.persist_events(
@ -1768,18 +1766,17 @@ class FederationHandler(BaseHandler):
# Do auth conflict res. # Do auth conflict res.
logger.info("Different auth: %s", different_auth) logger.info("Different auth: %s", different_auth)
different_events = yield preserve_context_over_deferred(defer.gatherResults( different_events = yield logcontext.make_deferred_yieldable(
[ defer.gatherResults([
preserve_fn(self.store.get_event)( logcontext.preserve_fn(self.store.get_event)(
d, d,
allow_none=True, allow_none=True,
allow_rejected=False, allow_rejected=False,
) )
for d in different_auth for d in different_auth
if d in have_events and not have_events[d] if d in have_events and not have_events[d]
], ], consumeErrors=True)
consumeErrors=True ).addErrback(unwrapFirstError)
)).addErrback(unwrapFirstError)
if different_events: if different_events:
local_view = dict(auth_events) local_view = dict(auth_events)