Move the impl of backfill to use events.

This commit is contained in:
Erik Johnston 2014-10-31 09:59:02 +00:00
parent d9a9e9eb30
commit f2de2d644a
3 changed files with 114 additions and 5 deletions

View File

@ -181,7 +181,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def backfill(self, dest, context, limit): def backfill(self, dest, context, limit, extremities):
"""Requests some more historic PDUs for the given context from the """Requests some more historic PDUs for the given context from the
given destination server. given destination server.
@ -189,12 +189,12 @@ class ReplicationLayer(object):
dest (str): The remote home server to ask. dest (str): The remote home server to ask.
context (str): The context to backfill. context (str): The context to backfill.
limit (int): The maximum number of PDUs to return. limit (int): The maximum number of PDUs to return.
extremities (list): List of PDU id and origins of the first pdus
we have seen from the context
Returns: Returns:
Deferred: Results in the received PDUs. Deferred: Results in the received PDUs.
""" """
extremities = yield self.store.get_oldest_pdus_in_context(context)
logger.debug("backfill extrem=%s", extremities) logger.debug("backfill extrem=%s", extremities)
# If there are no extremeties then we've (probably) reached the start. # If there are no extremeties then we've (probably) reached the start.

View File

@ -181,7 +181,17 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def backfill(self, dest, room_id, limit): def backfill(self, dest, room_id, limit):
pdus = yield self.replication_layer.backfill(dest, room_id, limit) extremities = yield self.store.get_oldest_events_in_room(room_id)
pdus = yield self.replication_layer.backfill(
dest,
room_id,
limit,
extremities=[
self.pdu_codec.decode_event_id(e)
for e in extremities
]
)
events = [] events = []
@ -390,6 +400,21 @@ class FederationHandler(BaseHandler):
else: else:
defer.returnValue([]) defer.returnValue([])
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, context, pdu_list, limit):
events = yield self.store.get_backfill_events(
context,
[self.pdu_codec.encode_event_id(i, o) for i, o in pdu_list],
limit
)
defer.returnValue([
self.pdu_codec.pdu_from_event(e)
for e in events
])
@log_function @log_function
def _on_user_joined(self, user, room_id): def _on_user_joined(self, user, room_id):
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), []) waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])

View File

@ -24,6 +24,23 @@ logger = logging.getLogger(__name__)
class EventFederationStore(SQLBaseStore): class EventFederationStore(SQLBaseStore):
def get_oldest_events_in_room(self, room_id):
return self.runInteraction(
"get_oldest_events_in_room",
self._get_oldest_events_in_room_txn,
room_id,
)
def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn(
txn,
table="event_backward_extremities",
keyvalues={
"room_id": room_id,
},
retcol="event_id",
)
def get_latest_events_in_room(self, room_id): def get_latest_events_in_room(self, room_id):
return self.runInteraction( return self.runInteraction(
"get_latest_events_in_room", "get_latest_events_in_room",
@ -159,4 +176,71 @@ class EventFederationStore(SQLBaseStore):
"AND not events.outlier " "AND not events.outlier "
")" ")"
) )
txn.execute(query) txn.execute(query)
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occured before (and
including) the pdus in pdu_list. Return a list of max size `limit`.
Args:
txn
room_id (str)
event_list (list)
limit (int)
Return:
list: A list of PduTuples
"""
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
)
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
"_get_backfill_events: %s, %s, %s",
room_id, repr(event_list), limit
)
# We seed the pdu_results with the things from the pdu_list.
event_results = event_list
front = event_list
query = (
"SELECT prev_event_id FROM event_edges "
"WHERE room_id = ? AND event_id = ? "
"LIMIT ?"
)
# We iterate through all event_ids in `front` to select their previous
# events. These are dumped in `new_front`.
# We continue until we reach the limit *or* new_front is empty (i.e.,
# we've run out of things to select
while front and len(event_results) < limit:
new_front = []
for event_id in front:
logger.debug(
"_backfill_interaction: id=%s",
event_id
)
txn.execute(
query,
(room_id, event_id, limit - len(event_results))
)
for row in txn.fetchall():
logger.debug(
"_backfill_interaction: got id=%s",
*row
)
new_front.append(row)
front = new_front
event_results += new_front
# We also want to update the `prev_pdus` attributes before returning.
return self._get_pdu_tuples(txn, event_results)