Revert "Merge pull request #283 from matrix-org/erikj/atomic_join_federation"

This reverts commit 5879edbb09, reversing
changes made to b43930d4c9.
This commit is contained in:
Daniel Wagner-Hall 2015-10-05 19:10:47 -05:00
parent 58e6a58eb7
commit 34d26d3687

View File

@ -125,72 +125,60 @@ class FederationHandler(BaseHandler):
) )
if not is_in_room and not event.internal_metadata.is_outlier(): if not is_in_room and not event.internal_metadata.is_outlier():
logger.debug("Got event for room we're not in.") logger.debug("Got event for room we're not in.")
current_state = state
try: event_ids = set()
event_stream_id, max_stream_id = yield self._persist_auth_tree( if state:
auth_chain, state, event event_ids |= {e.event_id for e in state}
) if auth_chain:
except AuthError as e: event_ids |= {e.event_id for e in auth_chain}
raise FederationError(
"ERROR",
e.code,
e.msg,
affected=event.event_id,
)
else: seen_ids = set(
event_ids = set() (yield self.store.have_events(event_ids)).keys()
if state: )
event_ids |= {e.event_id for e in state}
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
seen_ids = set( if state and auth_chain is not None:
(yield self.store.have_events(event_ids)).keys() # If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)
event_infos = []
for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue
e.internal_metadata.outlier = True
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
event_infos.append({
"event": e,
"auth_events": auth,
})
seen_ids.add(e.event_id)
yield self._handle_new_events(
origin,
event_infos,
outliers=True
) )
if state and auth_chain is not None: try:
# If we have any state or auth_chain given to us by the replication _, event_stream_id, max_stream_id = yield self._handle_new_event(
# layer, then we should handle them (if we haven't before.) origin,
event,
event_infos = [] state=state,
backfilled=backfilled,
for e in itertools.chain(auth_chain, state): current_state=current_state,
if e.event_id in seen_ids: )
continue except AuthError as e:
e.internal_metadata.outlier = True raise FederationError(
auth_ids = [e_id for e_id, _ in e.auth_events] "ERROR",
auth = { e.code,
(e.type, e.state_key): e for e in auth_chain e.msg,
if e.event_id in auth_ids affected=event.event_id,
} )
event_infos.append({
"event": e,
"auth_events": auth,
})
seen_ids.add(e.event_id)
yield self._handle_new_events(
origin,
event_infos,
outliers=True
)
try:
_, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
event,
state=state,
backfilled=backfilled,
current_state=current_state,
)
except AuthError as e:
raise FederationError(
"ERROR",
e.code,
e.msg,
affected=event.event_id,
)
# if we're receiving valid events from an origin, # if we're receiving valid events from an origin,
# it's probably a good idea to mark it as not in retry-state # it's probably a good idea to mark it as not in retry-state
@ -662,8 +650,35 @@ class FederationHandler(BaseHandler):
# FIXME # FIXME
pass pass
event_stream_id, max_stream_id = yield self._persist_auth_tree( ev_infos = []
auth_chain, state, event for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id:
continue
e.internal_metadata.outlier = True
auth_ids = [e_id for e_id, _ in e.auth_events]
ev_infos.append({
"event": e,
"auth_events": {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
})
yield self._handle_new_events(origin, ev_infos, outliers=True)
auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
_, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
new_event,
state=state,
current_state=state,
auth_events=auth_events,
) )
with PreserveLoggingContext(): with PreserveLoggingContext():
@ -1019,76 +1034,6 @@ class FederationHandler(BaseHandler):
is_new_state=(not outliers and not backfilled), is_new_state=(not outliers and not backfilled),
) )
@defer.inlineCallbacks
def _persist_auth_tree(self, auth_events, state, event):
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event seperately.
Returns:
2-tuple of (event_stream_id, max_stream_id) from the persist_event
call for `event`
"""
events_to_context = {}
for e in itertools.chain(auth_events, state):
ctx = yield self.state_handler.compute_event_context(
e, outlier=True,
)
events_to_context[e.event_id] = ctx
e.internal_metadata.outlier = True
event_map = {
e.event_id: e
for e in auth_events
}
create_event = None
for e in auth_events:
if (e.type, e.state_key) == (EventTypes.Create, ""):
create_event = e
break
for e in itertools.chain(auth_events, state, [event]):
auth_for_e = {
(event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
for e_id, _ in e.auth_events
}
if create_event:
auth_for_e[(EventTypes.Create, "")] = create_event
try:
self.auth.check(e, auth_events=auth_for_e)
except AuthError as err:
logger.warn(
"Rejecting %s because %s",
e.event_id, err.msg
)
if e == event:
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
yield self.store.persist_events(
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
is_new_state=False,
)
new_event_context = yield self.state_handler.compute_event_context(
event, old_state=state, outlier=False,
)
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
backfilled=False,
is_new_state=True,
current_state=state,
)
defer.returnValue((event_stream_id, max_stream_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False, def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None): current_state=None, auth_events=None):