Rearrange persist_event so that do all the queries that need to be done before returning early if we have already persisted that event.

This commit is contained in:
Erik Johnston 2015-02-04 10:16:51 +00:00
parent 02be8da5e1
commit c0462dbf15
3 changed files with 77 additions and 72 deletions

View File

@ -77,7 +77,7 @@ class EventBase(object):
return self.content["membership"] return self.content["membership"]
def is_state(self): def is_state(self):
return hasattr(self, "state_key") return hasattr(self, "state_key") and self.state_key is not None
def get_dict(self): def get_dict(self):
d = dict(self._event_dict) d = dict(self._event_dict)

View File

@ -515,6 +515,8 @@ class FederationHandler(BaseHandler):
"Failed to get destination from event %s", s.event_id "Failed to get destination from event %s", s.event_id
) )
destinations.remove(origin)
logger.debug( logger.debug(
"on_send_join_request: Sending event: %s, signatures: %s", "on_send_join_request: Sending event: %s, signatures: %s",
event.event_id, event.event_id,

View File

@ -163,19 +163,70 @@ class DataStore(RoomMemberStore, RoomStore,
def _persist_event_txn(self, txn, event, context, backfilled, def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True, stream_ordering=None, is_new_state=True,
current_state=None): current_state=None):
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event) # We purposefully do this first since if we include a `current_state`
elif event.type == EventTypes.Feedback: # key, we *want* to update the `current_state_events` table
self._store_feedback_txn(txn, event) if current_state:
elif event.type == EventTypes.Name: txn.execute(
self._store_room_name_txn(txn, event) "DELETE FROM current_state_events WHERE room_id = ?",
elif event.type == EventTypes.Topic: (event.room_id,)
self._store_room_topic_txn(txn, event) )
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event) for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
outlier = event.internal_metadata.is_outlier() outlier = event.internal_metadata.is_outlier()
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
have_persisted = self._simple_select_one_onecol_txn( have_persisted = self._simple_select_one_onecol_txn(
txn, txn,
table="event_json", table="event_json",
@ -209,6 +260,17 @@ class DataStore(RoomMemberStore, RoomStore,
) )
return return
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = { event_dict = {
k: v k: v
for k, v in event.get_dict().items() for k, v in event.get_dict().items()
@ -273,41 +335,10 @@ class DataStore(RoomMemberStore, RoomStore,
) )
raise _RollbackButIsFineException("_persist_event") raise _RollbackButIsFineException("_persist_event")
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
if not outlier:
self._store_state_groups_txn(txn, event, context)
if context.rejected: if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected) self._store_rejections_txn(txn, event.event_id, context.rejected)
if current_state: if event.is_state():
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
is_state = hasattr(event, "state_key") and event.state_key is not None
if is_state:
vals = { vals = {
"event_id": event.event_id, "event_id": event.event_id,
"room_id": event.room_id, "room_id": event.room_id,
@ -315,6 +346,7 @@ class DataStore(RoomMemberStore, RoomStore,
"state_key": event.state_key, "state_key": event.state_key,
} }
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"): if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state vals["prev_state"] = event.replaces_state
@ -351,28 +383,6 @@ class DataStore(RoomMemberStore, RoomStore,
or_ignore=True, or_ignore=True,
) )
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
for hash_alg, hash_base64 in event.hashes.items(): for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64) hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn( self._store_event_content_hash_txn(
@ -403,13 +413,6 @@ class DataStore(RoomMemberStore, RoomStore,
txn, event.event_id, ref_alg, ref_hash_bytes txn, event.event_id, ref_alg, ref_hash_bytes
) )
if not outlier:
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
def _store_redaction(self, txn, event): def _store_redaction(self, txn, event):
txn.execute( txn.execute(
"INSERT OR IGNORE INTO redactions " "INSERT OR IGNORE INTO redactions "