mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-21 16:41:04 -05:00
Merge pull request #2506 from matrix-org/rav/unhandled_failure
Fix up deferred handling in federation.py
This commit is contained in:
commit
c1c81ee2a4
@ -14,7 +14,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
"""Contains handlers for federation events."""
|
"""Contains handlers for federation events."""
|
||||||
import synapse.util.logcontext
|
|
||||||
from signedjson.key import decode_verify_key_bytes
|
from signedjson.key import decode_verify_key_bytes
|
||||||
from signedjson.sign import verify_signed_json
|
from signedjson.sign import verify_signed_json
|
||||||
from unpaddedbase64 import decode_base64
|
from unpaddedbase64 import decode_base64
|
||||||
@ -26,10 +25,7 @@ from synapse.api.errors import (
|
|||||||
)
|
)
|
||||||
from synapse.api.constants import EventTypes, Membership, RejectedReason
|
from synapse.api.constants import EventTypes, Membership, RejectedReason
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError, logcontext
|
||||||
from synapse.util.logcontext import (
|
|
||||||
preserve_fn, preserve_context_over_deferred
|
|
||||||
)
|
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.async import run_on_reactor, Linearizer
|
from synapse.util.async import run_on_reactor, Linearizer
|
||||||
@ -592,9 +588,9 @@ class FederationHandler(BaseHandler):
|
|||||||
missing_auth - failed_to_fetch
|
missing_auth - failed_to_fetch
|
||||||
)
|
)
|
||||||
|
|
||||||
results = yield preserve_context_over_deferred(defer.gatherResults(
|
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self.replication_layer.get_pdu)(
|
logcontext.preserve_fn(self.replication_layer.get_pdu)(
|
||||||
[dest],
|
[dest],
|
||||||
event_id,
|
event_id,
|
||||||
outlier=True,
|
outlier=True,
|
||||||
@ -786,10 +782,14 @@ class FederationHandler(BaseHandler):
|
|||||||
event_ids = list(extremities.keys())
|
event_ids = list(extremities.keys())
|
||||||
|
|
||||||
logger.debug("calling resolve_state_groups in _maybe_backfill")
|
logger.debug("calling resolve_state_groups in _maybe_backfill")
|
||||||
states = yield preserve_context_over_deferred(defer.gatherResults([
|
states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
|
[
|
||||||
|
logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
|
||||||
|
room_id, [e]
|
||||||
|
)
|
||||||
for e in event_ids
|
for e in event_ids
|
||||||
]))
|
], consumeErrors=True,
|
||||||
|
))
|
||||||
states = dict(zip(event_ids, [s.state for s in states]))
|
states = dict(zip(event_ids, [s.state for s in states]))
|
||||||
|
|
||||||
state_map = yield self.store.get_events(
|
state_map = yield self.store.get_events(
|
||||||
@ -942,9 +942,7 @@ class FederationHandler(BaseHandler):
|
|||||||
# lots of requests for missing prev_events which we do actually
|
# lots of requests for missing prev_events which we do actually
|
||||||
# have. Hence we fire off the deferred, but don't wait for it.
|
# have. Hence we fire off the deferred, but don't wait for it.
|
||||||
|
|
||||||
synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
|
logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
|
||||||
room_queue
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
||||||
@ -1438,7 +1436,7 @@ class FederationHandler(BaseHandler):
|
|||||||
if not backfilled:
|
if not backfilled:
|
||||||
# this intentionally does not yield: we don't care about the result
|
# this intentionally does not yield: we don't care about the result
|
||||||
# and don't need to wait for it.
|
# and don't need to wait for it.
|
||||||
preserve_fn(self.pusher_pool.on_new_notifications)(
|
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
|
||||||
event_stream_id, max_stream_id
|
event_stream_id, max_stream_id
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1451,16 +1449,16 @@ class FederationHandler(BaseHandler):
|
|||||||
a bunch of outliers, but not a chunk of individual events that depend
|
a bunch of outliers, but not a chunk of individual events that depend
|
||||||
on each other for state calculations.
|
on each other for state calculations.
|
||||||
"""
|
"""
|
||||||
contexts = yield preserve_context_over_deferred(defer.gatherResults(
|
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||||
[
|
[
|
||||||
preserve_fn(self._prep_event)(
|
logcontext.preserve_fn(self._prep_event)(
|
||||||
origin,
|
origin,
|
||||||
ev_info["event"],
|
ev_info["event"],
|
||||||
state=ev_info.get("state"),
|
state=ev_info.get("state"),
|
||||||
auth_events=ev_info.get("auth_events"),
|
auth_events=ev_info.get("auth_events"),
|
||||||
)
|
)
|
||||||
for ev_info in event_infos
|
for ev_info in event_infos
|
||||||
]
|
], consumeErrors=True,
|
||||||
))
|
))
|
||||||
|
|
||||||
yield self.store.persist_events(
|
yield self.store.persist_events(
|
||||||
@ -1768,18 +1766,17 @@ class FederationHandler(BaseHandler):
|
|||||||
# Do auth conflict res.
|
# Do auth conflict res.
|
||||||
logger.info("Different auth: %s", different_auth)
|
logger.info("Different auth: %s", different_auth)
|
||||||
|
|
||||||
different_events = yield preserve_context_over_deferred(defer.gatherResults(
|
different_events = yield logcontext.make_deferred_yieldable(
|
||||||
[
|
defer.gatherResults([
|
||||||
preserve_fn(self.store.get_event)(
|
logcontext.preserve_fn(self.store.get_event)(
|
||||||
d,
|
d,
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
allow_rejected=False,
|
allow_rejected=False,
|
||||||
)
|
)
|
||||||
for d in different_auth
|
for d in different_auth
|
||||||
if d in have_events and not have_events[d]
|
if d in have_events and not have_events[d]
|
||||||
],
|
], consumeErrors=True)
|
||||||
consumeErrors=True
|
).addErrback(unwrapFirstError)
|
||||||
)).addErrback(unwrapFirstError)
|
|
||||||
|
|
||||||
if different_events:
|
if different_events:
|
||||||
local_view = dict(auth_events)
|
local_view = dict(auth_events)
|
||||||
|
Loading…
Reference in New Issue
Block a user