Merge pull request #1978 from matrix-org/rav/refactor_received_pdu

Refactor FederationServer._handle_new_pdu
This commit is contained in:
Richard van der Hoff 2017-03-13 12:45:46 +00:00 committed by GitHub
commit 2cad971ab4
2 changed files with 207 additions and 163 deletions

View File

@ -52,7 +52,6 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth() self.auth = hs.get_auth()
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
self._server_linearizer = Linearizer("fed_server") self._server_linearizer = Linearizer("fed_server")
# We cache responses to state queries, as they take a while and often # We cache responses to state queries, as they take a while and often
@ -165,7 +164,7 @@ class FederationServer(FederationBase):
) )
try: try:
yield self._handle_new_pdu(transaction.origin, pdu) yield self._handle_received_pdu(transaction.origin, pdu)
results.append({}) results.append({})
except FederationError as e: except FederationError as e:
self.send_failure(e, transaction.origin) self.send_failure(e, transaction.origin)
@ -497,27 +496,16 @@ class FederationServer(FederationBase):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function def _handle_received_pdu(self, origin, pdu):
def _handle_new_pdu(self, origin, pdu, get_missing=True): """ Process a PDU received in a federation /send/ transaction.
# We reprocess pdus when we have seen them only as outliers Args:
existing = yield self._get_persisted_pdu( origin (str): server which sent the pdu
origin, pdu.event_id, do_auth=False pdu (FrozenEvent): received pdu
)
# FIXME: Currently we fetch an event again when we already have it
# if it has been marked as an outlier.
already_seen = (
existing and (
not existing.internal_metadata.is_outlier()
or pdu.internal_metadata.is_outlier()
)
)
if already_seen:
logger.debug("Already seen pdu %s", pdu.event_id)
return
Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match
"""
# Check signature. # Check signature.
try: try:
pdu = yield self._check_sigs_and_hash(pdu) pdu = yield self._check_sigs_and_hash(pdu)
@ -529,143 +517,7 @@ class FederationServer(FederationBase):
affected=pdu.event_id, affected=pdu.event_id,
) )
state = None yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
auth_chain = []
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
fetch_state = False
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = yield self.handler.get_min_depth_for_context(
pdu.room_id
)
logger.debug(
"_handle_new_pdu min_depth for %s: %d",
pdu.room_id, min_depth
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
# message, to work around the fact that some events will
# reference really really old events we really don't want to
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"Acquiring lock for room %r to fetch %d missing events: %r...",
pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"Acquired lock for room %r to fetch %d missing events",
pdu.room_id, len(prevs - seen),
)
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
if prevs - seen:
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen
logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)
# XXX: we set timeout to 10s to help workaround
# https://github.com/matrix-org/synapse/issues/1733.
# The reason is to avoid holding the linearizer lock
# whilst processing inbound /send transactions, causing
# FDs to stack up and block other inbound transactions
# which empirically can currently take up to 30 minutes.
#
# N.B. this explicitly disables retry attempts.
#
# N.B. this also increases our chances of falling back to
# fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room,
# causing additional state resolution? See #1760.
# However, fetching state doesn't hold the linearizer lock
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
)
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
logger.info(
"Still missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)
fetch_state = True
if fetch_state:
# We need to get the state at this event, since we haven't
# processed all the prev events.
logger.debug(
"_handle_new_pdu getting state for %s",
pdu.room_id
)
try:
state, auth_chain = yield self.get_state_for_room(
origin, pdu.room_id, pdu.event_id,
)
except:
logger.exception("Failed to get state for event: %s", pdu.event_id)
yield self.handler.on_receive_pdu(
origin,
pdu,
state=state,
auth_chain=auth_chain,
)
def __str__(self): def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name return "<ReplicationLayer(%s)>" % self.server_name

View File

@ -31,7 +31,7 @@ from synapse.util.logcontext import (
) )
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.frozenutils import unfreeze from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import ( from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures, compute_event_signature, add_hashes_and_signatures,
@ -79,12 +79,204 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up # When joining a room we need to queue any events for that room up
self.room_queues = {} self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
@log_function
def on_receive_pdu(self, origin, pdu, get_missing=True):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
Args:
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
get_missing (bool): True if we should fetch missing prev_events
Returns (Deferred): completes with None
"""
# We reprocess pdus when we have seen them only as outliers
existing = yield self.get_persisted_pdu(
origin, pdu.event_id, do_auth=False
)
# FIXME: Currently we fetch an event again when we already have it
# if it has been marked as an outlier.
already_seen = (
existing and (
not existing.internal_metadata.is_outlier()
or pdu.internal_metadata.is_outlier()
)
)
if already_seen:
logger.debug("Already seen pdu %s", pdu.event_id)
return
state = None
auth_chain = []
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
fetch_state = False
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = yield self.get_min_depth_for_context(
pdu.room_id
)
logger.debug(
"_handle_new_pdu min_depth for %s: %d",
pdu.room_id, min_depth
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
# message, to work around the fact that some events will
# reference really really old events we really don't want to
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
"Acquiring lock for room %r to fetch %d missing events: %r...",
pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
"Acquired lock for room %r to fetch %d missing events",
pdu.room_id, len(prevs - seen),
)
yield self._get_missing_events_for_pdu(
origin, pdu, prevs, min_depth
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
if prevs - seen:
logger.info(
"Still missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)
fetch_state = True
if fetch_state:
# We need to get the state at this event, since we haven't
# processed all the prev events.
logger.debug(
"_handle_new_pdu getting state for %s",
pdu.room_id
)
try:
state, auth_chain = yield self.replication_layer.get_state_for_room(
origin, pdu.room_id, pdu.event_id,
)
except:
logger.exception("Failed to get state for event: %s", pdu.event_id)
yield self._process_received_pdu(
origin,
pdu,
state=state,
auth_chain=auth_chain,
)
@defer.inlineCallbacks
def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
Args:
origin (str): Origin of the pdu. Will be called to get the missing events
pdu: received pdu
prevs (str[]): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
Returns:
Deferred<dict(str, str?)>: updated have_seen dictionary
"""
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
if not prevs - seen:
# nothing left to do
defer.returnValue(have_seen)
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen
logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)
# XXX: we set timeout to 10s to help workaround
# https://github.com/matrix-org/synapse/issues/1733.
# The reason is to avoid holding the linearizer lock
# whilst processing inbound /send transactions, causing
# FDs to stack up and block other inbound transactions
# which empirically can currently take up to 30 minutes.
#
# N.B. this explicitly disables retry attempts.
#
# N.B. this also increases our chances of falling back to
# fetching fresh state for the room if the missing event
# can't be found, which slightly reduces our security.
# it may also increase our DAG extremity count for the room,
# causing additional state resolution? See #1760.
# However, fetching state doesn't hold the linearizer lock
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
missing_events = yield self.replication_layer.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
for e in missing_events:
yield self.on_receive_pdu(
origin,
e,
get_missing=False
)
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
defer.returnValue(have_seen)
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to """ Called when we have a new pdu. We need to do auth checks and put it
do auth checks and put it through the StateHandler. through the StateHandler.
auth_chain and state are None if we already have the necessary state auth_chain and state are None if we already have the necessary state
and prev_events in the db and prev_events in the db
@ -738,7 +930,7 @@ class FederationHandler(BaseHandler):
continue continue
try: try:
self.on_receive_pdu(origin, p) self._process_received_pdu(origin, p)
except: except:
logger.exception("Couldn't handle pdu") logger.exception("Couldn't handle pdu")