Merge pull request #998 from matrix-org/erikj/pdu_fail_cache

Various federation /event/ improvements
This commit is contained in:
Erik Johnston 2016-08-10 14:09:14 +01:00 committed by GitHub
commit c9f724caa4
2 changed files with 53 additions and 21 deletions

View File

@ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
class FederationClient(FederationBase): class FederationClient(FederationBase):
def __init__(self, hs): def __init__(self, hs):
super(FederationClient, self).__init__(hs) super(FederationClient, self).__init__(hs)
self.pdu_destination_tried = {}
self._clock.looping_call(
self._clear_tried_cache, 60 * 1000,
)
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
old_dict = self.pdu_destination_tried
self.pdu_destination_tried = {}
for event_id, destination_dict in old_dict.items():
destination_dict = {
dest: time
for dest, time in destination_dict.items()
if time + PDU_RETRY_TIME_MS > now
}
if destination_dict:
self.pdu_destination_tried[event_id] = destination_dict
def start_get_pdu_cache(self): def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache( self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache", cache_name="get_pdu_cache",
@ -240,8 +264,15 @@ class FederationClient(FederationBase):
if ev: if ev:
defer.returnValue(ev) defer.returnValue(ev)
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
pdu = None pdu = None
for destination in destinations: for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
continue
try: try:
limiter = yield get_retry_limiter( limiter = yield get_retry_limiter(
destination, destination,
@ -269,25 +300,19 @@ class FederationClient(FederationBase):
break break
pdu_attempts[destination] = now
except SynapseError as e: except SynapseError as e:
logger.info( logger.info(
"Failed to get PDU %s from %s because %s", "Failed to get PDU %s from %s because %s",
event_id, destination, e, event_id, destination, e,
) )
continue
except CodeMessageException as e:
if 400 <= e.code < 500:
raise
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except NotRetryingDestination as e: except NotRetryingDestination as e:
logger.info(e.message) logger.info(e.message)
continue continue
except Exception as e: except Exception as e:
pdu_attempts[destination] = now
logger.info( logger.info(
"Failed to get PDU %s from %s because %s", "Failed to get PDU %s from %s because %s",
event_id, destination, e, event_id, destination, e,
@ -406,7 +431,7 @@ class FederationClient(FederationBase):
events and the second is a list of event ids that we failed to fetch. events and the second is a list of event ids that we failed to fetch.
""" """
if return_local: if return_local:
seen_events = yield self.store.get_events(event_ids) seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values() signed_events = seen_events.values()
else: else:
seen_events = yield self.store.have_events(event_ids) seen_events = yield self.store.have_events(event_ids)

View File

@ -249,7 +249,7 @@ class FederationHandler(BaseHandler):
if ev.type != EventTypes.Member: if ev.type != EventTypes.Member:
continue continue
try: try:
domain = UserID.from_string(ev.state_key).domain domain = get_domain_from_id(ev.state_key)
except: except:
continue continue
@ -1093,16 +1093,17 @@ class FederationHandler(BaseHandler):
) )
if event: if event:
# FIXME: This is a temporary work around where we occasionally if self.hs.is_mine_id(event.event_id):
# return events slightly differently than when they were # FIXME: This is a temporary work around where we occasionally
# originally signed # return events slightly differently than when they were
event.signatures.update( # originally signed
compute_event_signature( event.signatures.update(
event, compute_event_signature(
self.hs.hostname, event,
self.hs.config.signing_key[0] self.hs.hostname,
self.hs.config.signing_key[0]
)
) )
)
if do_auth: if do_auth:
in_room = yield self.auth.check_host_in_room( in_room = yield self.auth.check_host_in_room(
@ -1112,6 +1113,12 @@ class FederationHandler(BaseHandler):
if not in_room: if not in_room:
raise AuthError(403, "Host not in room.") raise AuthError(403, "Host not in room.")
events = yield self._filter_events_for_server(
origin, event.room_id, [event]
)
event = events[0]
defer.returnValue(event) defer.returnValue(event)
else: else:
defer.returnValue(None) defer.returnValue(None)