mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-10-01 08:25:44 -04:00
Improve the logging when handling a federation transaction (#3904)
Let's try to rationalise the logging that happens when we are processing an incoming transaction, to make it easier to figure out what is going wrong when they take ages. In particular: - make everything start with a [room_id event_id] prefix - make sure we log a warning when catching exceptions rather than just turning them into other, more cryptic, exceptions.
This commit is contained in:
parent
cb016baa37
commit
642199570c
1
changelog.d/3904.misc
Normal file
1
changelog.d/3904.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Improve the logging when handling a federation transaction
|
@ -69,6 +69,27 @@ from ._base import BaseHandler
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def shortstr(iterable, maxitems=5):
|
||||||
|
"""If iterable has maxitems or fewer, return the stringification of a list
|
||||||
|
containing those items.
|
||||||
|
|
||||||
|
Otherwise, return the stringification of a a list with the first maxitems items,
|
||||||
|
followed by "...".
|
||||||
|
|
||||||
|
Args:
|
||||||
|
iterable (Iterable): iterable to truncate
|
||||||
|
maxitems (int): number of items to return before truncating
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
unicode
|
||||||
|
"""
|
||||||
|
|
||||||
|
items = list(itertools.islice(iterable, maxitems + 1))
|
||||||
|
if len(items) <= maxitems:
|
||||||
|
return str(items)
|
||||||
|
return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
|
||||||
|
|
||||||
|
|
||||||
class FederationHandler(BaseHandler):
|
class FederationHandler(BaseHandler):
|
||||||
"""Handles events that originated from federation.
|
"""Handles events that originated from federation.
|
||||||
Responsible for:
|
Responsible for:
|
||||||
@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
|
|||||||
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
|
||||||
def on_receive_pdu(
|
def on_receive_pdu(
|
||||||
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
|
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
|
||||||
):
|
):
|
||||||
@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
|
|||||||
Returns (Deferred): completes with None
|
Returns (Deferred): completes with None
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
room_id = pdu.room_id
|
||||||
|
event_id = pdu.event_id
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"[%s %s] handling received PDU: %s",
|
||||||
|
room_id, event_id, pdu,
|
||||||
|
)
|
||||||
|
|
||||||
# We reprocess pdus when we have seen them only as outliers
|
# We reprocess pdus when we have seen them only as outliers
|
||||||
existing = yield self.store.get_event(
|
existing = yield self.store.get_event(
|
||||||
pdu.event_id,
|
event_id,
|
||||||
allow_none=True,
|
allow_none=True,
|
||||||
allow_rejected=True,
|
allow_rejected=True,
|
||||||
)
|
)
|
||||||
@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
if already_seen:
|
if already_seen:
|
||||||
logger.debug("Already seen pdu %s", pdu.event_id)
|
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
# do some initial sanity-checking of the event. In particular, make
|
# do some initial sanity-checking of the event. In particular, make
|
||||||
@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
|
|||||||
try:
|
try:
|
||||||
self._sanity_check_event(pdu)
|
self._sanity_check_event(pdu)
|
||||||
except SynapseError as err:
|
except SynapseError as err:
|
||||||
|
logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
|
||||||
raise FederationError(
|
raise FederationError(
|
||||||
"ERROR",
|
"ERROR",
|
||||||
err.code,
|
err.code,
|
||||||
@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
# If we are currently in the process of joining this room, then we
|
# If we are currently in the process of joining this room, then we
|
||||||
# queue up events for later processing.
|
# queue up events for later processing.
|
||||||
if pdu.room_id in self.room_queues:
|
if room_id in self.room_queues:
|
||||||
logger.info("Ignoring PDU %s for room %s from %s for now; join "
|
logger.info(
|
||||||
"in progress", pdu.event_id, pdu.room_id, origin)
|
"[%s %s] Queuing PDU from %s for now: join in progress",
|
||||||
self.room_queues[pdu.room_id].append((pdu, origin))
|
room_id, event_id, origin,
|
||||||
|
)
|
||||||
|
self.room_queues[room_id].append((pdu, origin))
|
||||||
return
|
return
|
||||||
|
|
||||||
# If we're no longer in the room just ditch the event entirely. This
|
# If we're no longer in the room just ditch the event entirely. This
|
||||||
@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
|
|||||||
# we should check if we *are* in fact in the room. If we are then we
|
# we should check if we *are* in fact in the room. If we are then we
|
||||||
# can magically rejoin the room.
|
# can magically rejoin the room.
|
||||||
is_in_room = yield self.auth.check_host_in_room(
|
is_in_room = yield self.auth.check_host_in_room(
|
||||||
pdu.room_id,
|
room_id,
|
||||||
self.server_name
|
self.server_name
|
||||||
)
|
)
|
||||||
if not is_in_room:
|
if not is_in_room:
|
||||||
@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
if was_in_room:
|
if was_in_room:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Ignoring PDU %s for room %s from %s as we've left the room!",
|
"[%s %s] Ignoring PDU from %s as we've left the room",
|
||||||
pdu.event_id, pdu.room_id, origin,
|
room_id, event_id, origin,
|
||||||
)
|
)
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"_handle_new_pdu min_depth for %s: %d",
|
"[%s %s] min_depth: %d",
|
||||||
pdu.room_id, min_depth
|
room_id, event_id, min_depth,
|
||||||
)
|
)
|
||||||
|
|
||||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||||
@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
|
|||||||
# send to the clients.
|
# send to the clients.
|
||||||
pdu.internal_metadata.outlier = True
|
pdu.internal_metadata.outlier = True
|
||||||
elif min_depth and pdu.depth > min_depth:
|
elif min_depth and pdu.depth > min_depth:
|
||||||
if get_missing and prevs - seen:
|
missing_prevs = prevs - seen
|
||||||
|
if get_missing and missing_prevs:
|
||||||
# If we're missing stuff, ensure we only fetch stuff one
|
# If we're missing stuff, ensure we only fetch stuff one
|
||||||
# at a time.
|
# at a time.
|
||||||
logger.info(
|
logger.info(
|
||||||
"Acquiring lock for room %r to fetch %d missing events: %r...",
|
"[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
|
||||||
pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
|
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
||||||
)
|
)
|
||||||
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
|
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Acquired lock for room %r to fetch %d missing events",
|
"[%s %s] Acquired room lock to fetch %d missing prev_events",
|
||||||
pdu.room_id, len(prevs - seen),
|
room_id, event_id, len(missing_prevs),
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self._get_missing_events_for_pdu(
|
yield self._get_missing_events_for_pdu(
|
||||||
@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
if not prevs - seen:
|
if not prevs - seen:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Found all missing prev events for %s", pdu.event_id
|
"[%s %s] Found all missing prev_events",
|
||||||
|
room_id, event_id,
|
||||||
)
|
)
|
||||||
elif prevs - seen:
|
elif missing_prevs:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Not fetching %d missing events for room %r,event %s: %r...",
|
"[%s %s] Not recursively fetching %d missing prev_events: %s",
|
||||||
len(prevs - seen), pdu.room_id, pdu.event_id,
|
room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
||||||
list(prevs - seen)[:5],
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if sent_to_us_directly and prevs - seen:
|
if sent_to_us_directly and prevs - seen:
|
||||||
# If they have sent it to us directly, and the server
|
# If they have sent it to us directly, and the server
|
||||||
# isn't telling us about the auth events that it's
|
# isn't telling us about the auth events that it's
|
||||||
# made a message referencing, we explode
|
# made a message referencing, we explode
|
||||||
|
logger.warn(
|
||||||
|
"[%s %s] Failed to fetch %d prev events: rejecting",
|
||||||
|
room_id, event_id, len(prevs - seen),
|
||||||
|
)
|
||||||
raise FederationError(
|
raise FederationError(
|
||||||
"ERROR",
|
"ERROR",
|
||||||
403,
|
403,
|
||||||
@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
|
|||||||
auth_chains = set()
|
auth_chains = set()
|
||||||
try:
|
try:
|
||||||
# Get the state of the events we know about
|
# Get the state of the events we know about
|
||||||
ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
|
ours = yield self.store.get_state_groups(room_id, list(seen))
|
||||||
state_groups.append(ours)
|
state_groups.append(ours)
|
||||||
|
|
||||||
# Ask the remote server for the states we don't
|
# Ask the remote server for the states we don't
|
||||||
# know about
|
# know about
|
||||||
for p in prevs - seen:
|
for p in prevs - seen:
|
||||||
|
logger.info(
|
||||||
|
"[%s %s] Requesting state at missing prev_event %s",
|
||||||
|
room_id, event_id, p,
|
||||||
|
)
|
||||||
state, got_auth_chain = (
|
state, got_auth_chain = (
|
||||||
yield self.federation_client.get_state_for_room(
|
yield self.federation_client.get_state_for_room(
|
||||||
origin, pdu.room_id, p
|
origin, room_id, p,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
auth_chains.update(got_auth_chain)
|
auth_chains.update(got_auth_chain)
|
||||||
@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
|
|||||||
ev_ids, get_prev_content=False, check_redacted=False
|
ev_ids, get_prev_content=False, check_redacted=False
|
||||||
)
|
)
|
||||||
|
|
||||||
room_version = yield self.store.get_room_version(pdu.room_id)
|
room_version = yield self.store.get_room_version(room_id)
|
||||||
state_map = yield resolve_events_with_factory(
|
state_map = yield resolve_events_with_factory(
|
||||||
room_version, state_groups, {pdu.event_id: pdu}, fetch
|
room_version, state_groups, {event_id: pdu}, fetch
|
||||||
)
|
)
|
||||||
|
|
||||||
state = (yield self.store.get_events(state_map.values())).values()
|
state = (yield self.store.get_events(state_map.values())).values()
|
||||||
auth_chain = list(auth_chains)
|
auth_chain = list(auth_chains)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
logger.warn(
|
||||||
|
"[%s %s] Error attempting to resolve state at missing "
|
||||||
|
"prev_events",
|
||||||
|
room_id, event_id, exc_info=True,
|
||||||
|
)
|
||||||
raise FederationError(
|
raise FederationError(
|
||||||
"ERROR",
|
"ERROR",
|
||||||
403,
|
403,
|
||||||
"We can't get valid state history.",
|
"We can't get valid state history.",
|
||||||
affected=pdu.event_id,
|
affected=event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self._process_received_pdu(
|
yield self._process_received_pdu(
|
||||||
@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
|
|||||||
prevs (set(str)): List of event ids which we are missing
|
prevs (set(str)): List of event ids which we are missing
|
||||||
min_depth (int): Minimum depth of events to return.
|
min_depth (int): Minimum depth of events to return.
|
||||||
"""
|
"""
|
||||||
# We recalculate seen, since it may have changed.
|
|
||||||
|
room_id = pdu.room_id
|
||||||
|
event_id = pdu.event_id
|
||||||
|
|
||||||
seen = yield self.store.have_seen_events(prevs)
|
seen = yield self.store.have_seen_events(prevs)
|
||||||
|
|
||||||
if not prevs - seen:
|
if not prevs - seen:
|
||||||
return
|
return
|
||||||
|
|
||||||
latest = yield self.store.get_latest_event_ids_in_room(
|
latest = yield self.store.get_latest_event_ids_in_room(room_id)
|
||||||
pdu.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
# We add the prev events that we have seen to the latest
|
# We add the prev events that we have seen to the latest
|
||||||
# list to ensure the remote server doesn't give them to us
|
# list to ensure the remote server doesn't give them to us
|
||||||
@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
|
|||||||
latest |= seen
|
latest |= seen
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Missing %d events for room %r pdu %s: %r...",
|
"[%s %s]: Requesting %d prev_events: %s",
|
||||||
len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
|
room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: we set timeout to 10s to help workaround
|
# XXX: we set timeout to 10s to help workaround
|
||||||
@ -392,7 +438,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
missing_events = yield self.federation_client.get_missing_events(
|
missing_events = yield self.federation_client.get_missing_events(
|
||||||
origin,
|
origin,
|
||||||
pdu.room_id,
|
room_id,
|
||||||
earliest_events_ids=list(latest),
|
earliest_events_ids=list(latest),
|
||||||
latest_events=[pdu],
|
latest_events=[pdu],
|
||||||
limit=10,
|
limit=10,
|
||||||
@ -401,37 +447,46 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Got %d events: %r...",
|
"[%s %s]: Got %d prev_events: %s",
|
||||||
len(missing_events), [e.event_id for e in missing_events[:5]]
|
room_id, event_id, len(missing_events), shortstr(missing_events),
|
||||||
)
|
)
|
||||||
|
|
||||||
# We want to sort these by depth so we process them and
|
# We want to sort these by depth so we process them and
|
||||||
# tell clients about them in order.
|
# tell clients about them in order.
|
||||||
missing_events.sort(key=lambda x: x.depth)
|
missing_events.sort(key=lambda x: x.depth)
|
||||||
|
|
||||||
for e in missing_events:
|
for ev in missing_events:
|
||||||
logger.info("Handling found event %s", e.event_id)
|
logger.info(
|
||||||
|
"[%s %s] Handling received prev_event %s",
|
||||||
|
room_id, event_id, ev.event_id,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
yield self.on_receive_pdu(
|
yield self.on_receive_pdu(
|
||||||
origin,
|
origin,
|
||||||
e,
|
ev,
|
||||||
get_missing=False
|
get_missing=False
|
||||||
)
|
)
|
||||||
except FederationError as e:
|
except FederationError as e:
|
||||||
if e.code == 403:
|
if e.code == 403:
|
||||||
logger.warn("Event %s failed history check.")
|
logger.warn(
|
||||||
|
"[%s %s] Received prev_event %s failed history check.",
|
||||||
|
room_id, event_id, ev.event_id,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@log_function
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _process_received_pdu(self, origin, pdu, state, auth_chain):
|
def _process_received_pdu(self, origin, event, state, auth_chain):
|
||||||
""" Called when we have a new pdu. We need to do auth checks and put it
|
""" Called when we have a new pdu. We need to do auth checks and put it
|
||||||
through the StateHandler.
|
through the StateHandler.
|
||||||
"""
|
"""
|
||||||
event = pdu
|
room_id = event.room_id
|
||||||
|
event_id = event.event_id
|
||||||
|
|
||||||
logger.debug("Processing event: %s", event)
|
logger.debug(
|
||||||
|
"[%s %s] Processing event: %s",
|
||||||
|
room_id, event_id, event,
|
||||||
|
)
|
||||||
|
|
||||||
# FIXME (erikj): Awful hack to make the case where we are not currently
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
||||||
# in the room work
|
# in the room work
|
||||||
@ -440,15 +495,16 @@ class FederationHandler(BaseHandler):
|
|||||||
# event.
|
# event.
|
||||||
if state and auth_chain and not event.internal_metadata.is_outlier():
|
if state and auth_chain and not event.internal_metadata.is_outlier():
|
||||||
is_in_room = yield self.auth.check_host_in_room(
|
is_in_room = yield self.auth.check_host_in_room(
|
||||||
event.room_id,
|
room_id,
|
||||||
self.server_name
|
self.server_name
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
is_in_room = True
|
is_in_room = True
|
||||||
|
|
||||||
if not is_in_room:
|
if not is_in_room:
|
||||||
logger.info(
|
logger.info(
|
||||||
"Got event for room we're not in: %r %r",
|
"[%s %s] Got event for room we're not in",
|
||||||
event.room_id, event.event_id
|
room_id, event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -460,7 +516,7 @@ class FederationHandler(BaseHandler):
|
|||||||
"ERROR",
|
"ERROR",
|
||||||
e.code,
|
e.code,
|
||||||
e.msg,
|
e.msg,
|
||||||
affected=event.event_id,
|
affected=event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -509,12 +565,12 @@ class FederationHandler(BaseHandler):
|
|||||||
affected=event.event_id,
|
affected=event.event_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
room = yield self.store.get_room(event.room_id)
|
room = yield self.store.get_room(room_id)
|
||||||
|
|
||||||
if not room:
|
if not room:
|
||||||
try:
|
try:
|
||||||
yield self.store.store_room(
|
yield self.store.store_room(
|
||||||
room_id=event.room_id,
|
room_id=room_id,
|
||||||
room_creator_user_id="",
|
room_creator_user_id="",
|
||||||
is_public=False,
|
is_public=False,
|
||||||
)
|
)
|
||||||
@ -542,7 +598,7 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
if newly_joined:
|
if newly_joined:
|
||||||
user = UserID.from_string(event.state_key)
|
user = UserID.from_string(event.state_key)
|
||||||
yield self.user_joined_room(user, event.room_id)
|
yield self.user_joined_room(user, room_id)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@ -1459,12 +1515,10 @@ class FederationHandler(BaseHandler):
|
|||||||
else:
|
else:
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
@log_function
|
|
||||||
def get_min_depth_for_context(self, context):
|
def get_min_depth_for_context(self, context):
|
||||||
return self.store.get_min_depth(context)
|
return self.store.get_min_depth(context)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
|
||||||
def _handle_new_event(self, origin, event, state=None, auth_events=None,
|
def _handle_new_event(self, origin, event, state=None, auth_events=None,
|
||||||
backfilled=False):
|
backfilled=False):
|
||||||
context = yield self._prep_event(
|
context = yield self._prep_event(
|
||||||
@ -1664,8 +1718,8 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
except AuthError as e:
|
except AuthError as e:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Rejecting %s because %s",
|
"[%s %s] Rejecting: %s",
|
||||||
event.event_id, e.msg
|
event.room_id, event.event_id, e.msg
|
||||||
)
|
)
|
||||||
|
|
||||||
context.rejected = RejectedReason.AUTH_ERROR
|
context.rejected = RejectedReason.AUTH_ERROR
|
||||||
|
@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
|
|||||||
else:
|
else:
|
||||||
self.retry_interval = self.min_retry_interval
|
self.retry_interval = self.min_retry_interval
|
||||||
|
|
||||||
logger.debug(
|
logger.info(
|
||||||
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
|
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
|
||||||
self.destination, exc_type, exc_val, self.retry_interval
|
self.destination, exc_type, exc_val, self.retry_interval
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user