Try to ensure we don't persist an event we have already persisted. In persist_event check if we already have the event, if so then update instead of replacing so that we don't cause a bump of the stream_ordering.

This commit is contained in:
Erik Johnston 2015-02-03 10:38:14 +00:00
parent 40c6fe1b81
commit e7ca813dd4
3 changed files with 68 additions and 19 deletions

View File

@ -112,6 +112,14 @@ class FederationHandler(BaseHandler):
logger.debug("Event: %s", event)
event_ids = set()
if state:
event_ids += {e.event_id for e in state}
if auth_chain:
event_ids += {e.event_id for e in auth_chain}
seen_ids = (yield self.store.have_events(event_ids)).keys()
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
current_state = None
@ -124,20 +132,26 @@ class FederationHandler(BaseHandler):
current_state = state
if state and auth_chain is not None:
for e in state:
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(origin, e, auth_events=auth)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
for list_of_pdus in [auth_chain, state]:
for e in list_of_pdus:
if e.event_id in seen_ids:
continue
e.internal_metadata.outlier = True
try:
auth_ids = [e_id for e_id, _ in e.auth_events]
auth = {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
yield self._handle_new_event(
origin, e, auth_events=auth
)
except:
logger.exception(
"Failed to handle state event %s",
e.event_id,
)
try:
yield self._handle_new_event(

View File

@ -161,6 +161,39 @@ class DataStore(RoomMemberStore, RoomStore,
outlier = event.internal_metadata.is_outlier()
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
retcol="event_id",
allow_none=True,
)
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
if have_persisted:
if not outlier:
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
(metadata_json.decode("UTF-8"), event.event_id,)
)
sql = (
"UPDATE events SET outlier = 0"
" WHERE event_id = ?"
)
txn.execute(
sql,
(event.event_id,)
)
return
event_dict = {
k: v
for k, v in event.get_dict().items()
@ -170,10 +203,6 @@ class DataStore(RoomMemberStore, RoomStore,
]
}
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
self._simple_insert_txn(
txn,
table="event_json",
@ -482,6 +511,9 @@ class DataStore(RoomMemberStore, RoomStore,
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "

View File

@ -91,7 +91,10 @@ class FederationTestCase(unittest.TestCase):
self.datastore.persist_event.return_value = defer.succeed(None)
self.datastore.get_room.return_value = defer.succeed(True)
self.auth.check_host_in_room.return_value = defer.succeed(True)
self.datastore.have_events.return_value = defer.succeed({})
def have_events(event_ids):
return defer.succeed({})
self.datastore.have_events.side_effect = have_events
def annotate(ev, old_state=None):
context = Mock()