Actually fetch state for new backwards extremeties when backfilling.

This commit is contained in:
Erik Johnston 2015-05-20 11:59:02 +01:00
parent 9084cdd70f
commit 20814fabdd
2 changed files with 108 additions and 62 deletions

View File

@ -169,6 +169,10 @@ class FederationClient(FederationBase):
pdus[i] = yield self._check_sigs_and_hash(pdu)
# FIXME: We should handle signature failures more gracefully.
pdus[:] = yield defer.gatherResults(
[self._check_sigs_and_hash(pdu) for pdu in pdus],
consumeErrors=True,
).addErrback(unwrapFirstError)
defer.returnValue(pdus)

View File

@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
if not extremities:
extremities = yield self.store.get_oldest_events_in_room(room_id)
pdus = yield self.replication_layer.backfill(
events = yield self.replication_layer.backfill(
dest,
room_id,
limit,
limit=limit,
extremities=extremities,
)
events = []
event_map = {e.event_id: e for e in events}
for pdu in pdus:
event = pdu
event_ids = set(e.event_id for e in events)
# FIXME (erikj): Not sure this actually works :/
context = yield self.state_handler.compute_event_context(event)
edges = [
ev.event_id
for ev in events
if set(e_id for e_id, _ in ev.prev_events) - event_ids
]
events.append((event, context))
# For each edge get the current state.
yield self.store.persist_event(
event,
context=context,
backfilled=True
auth_events = {}
events_to_state = {}
for e_id in edges:
state, auth = yield self.replication_layer.get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id
)
auth_events.update({a.event_id: a for a in auth})
events_to_state[e_id] = state
yield defer.gatherResults(
[
self._handle_new_event(dest, a)
for a in auth_events.values()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
yield defer.gatherResults(
[
self._handle_new_event(
dest, event_map[e_id],
state=events_to_state[e_id],
backfilled=True,
)
for e_id in events_to_state
],
consumeErrors=True
).addErrback(unwrapFirstError)
events.sort(key=lambda e: e.depth)
for event in events:
if event in events_to_state:
continue
yield self._handle_new_event(
dest, event,
backfilled=True,
)
defer.returnValue(events)
@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
logger.info(e.message)
continue
except Exception as e:
logger.warn(
logger.exception(
"Failed to backfill from %s because %s",
dom, e,
)
@ -517,54 +555,9 @@ class FederationHandler(BaseHandler):
# FIXME
pass
auth_ids_to_deferred = {}
def process_auth_ev(ev):
auth_ids = [e_id for e_id, _ in ev.auth_events]
prev_ds = [
auth_ids_to_deferred[i]
for i in auth_ids
if i in auth_ids_to_deferred
]
d = defer.Deferred()
auth_ids_to_deferred[ev.event_id] = d
@defer.inlineCallbacks
def f(*_):
ev.internal_metadata.outlier = True
try:
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, ev, auth_events=auth
yield self._handle_auth_events(
origin, [e for e in auth_chain if e.event_id != event.event_id]
)
except:
logger.exception(
"Failed to handle auth event %s",
ev.event_id,
)
d.callback(None)
if prev_ds:
dx = defer.DeferredList(prev_ds)
dx.addBoth(f)
else:
f()
for e in auth_chain:
if e.event_id == event.event_id:
return
process_auth_ev(e)
yield defer.DeferredList(auth_ids_to_deferred.values())
@defer.inlineCallbacks
def handle_state(e):
@ -1348,3 +1341,52 @@ class FederationHandler(BaseHandler):
},
"missing": [e.event_id for e in missing_locals],
})
@defer.inlineCallbacks
def _handle_auth_events(self, origin, auth_events):
auth_ids_to_deferred = {}
def process_auth_ev(ev):
auth_ids = [e_id for e_id, _ in ev.auth_events]
prev_ds = [
auth_ids_to_deferred[i]
for i in auth_ids
if i in auth_ids_to_deferred
]
d = defer.Deferred()
auth_ids_to_deferred[ev.event_id] = d
@defer.inlineCallbacks
def f(*_):
ev.internal_metadata.outlier = True
try:
auth = {
(e.type, e.state_key): e for e in auth_events
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, ev, auth_events=auth
)
except:
logger.exception(
"Failed to handle auth event %s",
ev.event_id,
)
d.callback(None)
if prev_ds:
dx = defer.DeferredList(prev_ds)
dx.addBoth(f)
else:
f()
for e in auth_events:
process_auth_ev(e)
yield defer.DeferredList(auth_ids_to_deferred.values())