Initial hack at wiring together pagination and backfill

This commit is contained in:
Erik Johnston 2015-05-11 18:01:31 +01:00
parent 17653a5dfe
commit 84e6b4001f
3 changed files with 141 additions and 5 deletions

View File

@ -218,10 +218,11 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def backfill(self, dest, room_id, limit): def backfill(self, dest, room_id, limit, extremities=[]):
""" Trigger a backfill request to `dest` for the given `room_id` """ Trigger a backfill request to `dest` for the given `room_id`
""" """
extremities = yield self.store.get_oldest_events_in_room(room_id) if not extremities:
extremities = yield self.store.get_oldest_events_in_room(room_id)
pdus = yield self.replication_layer.backfill( pdus = yield self.replication_layer.backfill(
dest, dest,
@ -248,6 +249,109 @@ class FederationHandler(BaseHandler):
defer.returnValue(events) defer.returnValue(events)
@defer.inlineCallbacks
def maybe_backfill(self, room_id, current_depth):
"""Checks the database to see if we should backfill before paginating
"""
extremities = yield self.store.get_oldest_events_with_depth_in_room(
room_id
)
logger.debug("Got extremeties: %r", extremities)
if not extremities:
return
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(
extremities.items(),
key=lambda e: -int(e[1])
)
max_depth = sorted_extremeties_tuple[0][1]
logger.debug("max_depth: %r", max_depth)
if current_depth > max_depth:
return
# Now we need to decide which hosts to hit first.
# First we try hosts that are already in the room, that were around
# at the time. TODO: HEURISTIC ALERT.
curr_state = yield self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
joined_domains = {}
for u, d in joined_users:
try:
dom = UserID.from_string(u).domain
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except:
pass
return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
logger.debug("curr_domains: %r", curr_domains)
likely_domains = [
domain for domain, depth in curr_domains
]
@defer.inlineCallbacks
def try_backfill(domains):
# TODO: Should we try multiple of these at a time?
for dom in domains:
events = yield self.backfill(
dom, room_id,
limit=100,
extremities=[e for e in extremities.keys()]
)
if events:
defer.returnValue(True)
defer.returnValue(False)
success = yield try_backfill(likely_domains)
if success:
defer.returnValue(True)
# Huh, well *those* domains didn't work out. Lets try some domains
# from the time.
tried_domains = set(likely_domains)
states = yield defer.gatherResults({
e: self.state_handler.resolve_state_groups([e])[1]
for e in extremities.keys()
})
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])[0]
success = yield try_backfill([
dom for dom in likely_domains
if dom not in tried_domains
])
if success:
defer.returnValue(True)
tried_domains.update(likely_domains)
defer.returnValue(False)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_invite(self, target_host, event): def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing. """ Sends the invite to the remote server for signing.

View File

@ -21,7 +21,7 @@ from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID from synapse.types import UserID, RoomStreamToken
from ._base import BaseHandler from ._base import BaseHandler
@ -92,6 +92,14 @@ class MessageHandler(BaseHandler):
yield self.hs.get_event_sources().get_current_token() yield self.hs.get_event_sources().get_current_token()
) )
room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
if room_token.topological is None:
raise SynapseError(400, "Invalid token")
yield self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, room_token.topological
)
user = UserID.from_string(user_id) user = UserID.from_string(user_id)
events, next_key = yield data_source.get_pagination_rows( events, next_key = yield data_source.get_pagination_rows(

View File

@ -79,6 +79,28 @@ class EventFederationStore(SQLBaseStore):
room_id, room_id,
) )
def get_oldest_events_with_depth_in_room(self, room_id):
return self.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
room_id,
)
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id AND g.room_id = e.room_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
txn.execute(sql, (room_id, False,))
return dict(txn.fetchall())
def _get_oldest_events_in_room_txn(self, txn, room_id): def _get_oldest_events_in_room_txn(self, txn, room_id):
return self._simple_select_onecol_txn( return self._simple_select_onecol_txn(
txn, txn,
@ -247,11 +269,13 @@ class EventFederationStore(SQLBaseStore):
do_insert = depth < min_depth if min_depth else True do_insert = depth < min_depth if min_depth else True
if do_insert: if do_insert:
self._simple_insert_txn( self._simple_upsert_txn(
txn, txn,
table="room_depth", table="room_depth",
values={ keyvalues={
"room_id": room_id, "room_id": room_id,
},
values={
"min_depth": depth, "min_depth": depth,
}, },
) )