diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2f913adf2..34948c30c 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached from syutil.base64util import encode_base64 import logging +from Queue import PriorityQueue logger = logging.getLogger(__name__) @@ -380,41 +381,41 @@ class EventFederationStore(SQLBaseStore): event_results = set(event_list) - front = event_list + # We want to make sure that we do a breadth-first, "depth" ordered + # search. query = ( - "SELECT prev_event_id FROM event_edges " - "WHERE room_id = ? AND event_id = ? " - "LIMIT ?" + "SELECT depth, prev_event_id FROM event_edges" + " INNER JOIN events" + " ON prev_event_id = events.event_id" + " AND event_edges.room_id = events.room_id" + " WHERE event_edges.room_id = ? AND event_edges.event_id = ?" + " LIMIT ?" ) - # We iterate through all event_ids in `front` to select their previous - # events. These are dumped in `new_front`. - # We continue until we reach the limit *or* new_front is empty (i.e., - # we've run out of things to select - while front and len(event_results) < limit: + queue = PriorityQueue() - new_front = [] - for event_id in front: - logger.debug( - "_backfill_interaction: id=%s", - event_id - ) + for event_id in event_list: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) - txn.execute( - query, - (room_id, event_id, limit - len(event_results)) - ) + for row in txn.fetchall(): + queue.put(row) - for row in txn.fetchall(): - logger.debug( - "_backfill_interaction: got id=%s", - *row - ) - new_front.append(row[0]) + while not queue.empty() and len(event_results) < limit: + _, event_id = queue.get_nowait() - front = new_front - event_results += new_front + event_results.add(event_id) + + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for row in txn.fetchall(): + queue.put(row) return self._get_events_txn(txn, event_results)