diff --git a/synapse/federation/handler.py b/synapse/federation/handler.py index 68243d31d..984c1558e 100644 --- a/synapse/federation/handler.py +++ b/synapse/federation/handler.py @@ -74,10 +74,18 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def backfill(self, room_id, limit): - # TODO: Work out which destinations to ask for backfill - # self.replication_layer.backfill(dest, room_id, limit) - pass + def backfill(self, dest, room_id, limit): + pdus = yield self.replication_layer.backfill(dest, room_id, limit) + + if not pdus: + defer.returnValue([]) + + events = [ + self.pdu_codec.event_from_pdu(pdu) + for pdu in pdus + ] + + defer.returnValue(events) @log_function def get_state_for_room(self, destination, room_id): @@ -87,7 +95,7 @@ class FederationEventHandler(object): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu): + def on_receive_pdu(self, pdu, backfilled): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it throught the StateHandler. """ @@ -95,7 +103,7 @@ class FederationEventHandler(object): try: with (yield self.lock_manager.lock(pdu.context)): - if event.is_state: + if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state( pdu ) @@ -104,7 +112,7 @@ class FederationEventHandler(object): else: is_new_state = False - yield self.event_handler.on_receive(event, is_new_state) + yield self.event_handler.on_receive(event, is_new_state, backfilled) except AuthError: # TODO: Implement something in federation that allows us to diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index bc9df2f21..3e5f1a410 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -208,7 +208,7 @@ class ReplicationLayer(object): pdus = [Pdu(outlier=False, **p) for p in transaction.pdus] for pdu in pdus: - yield self._handle_new_pdu(pdu) + yield self._handle_new_pdu(pdu, backfilled=True) defer.returnValue(pdus) @@ -415,7 +415,7 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, pdu): + def _handle_new_pdu(self, pdu, backfilled=False): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu(pdu.pdu_id, pdu.origin) @@ -451,7 +451,10 @@ class ReplicationLayer(object): # Persist the Pdu, but don't mark it as processed yet. yield self.pdu_actions.persist_received(pdu) - ret = yield self.handler.on_receive_pdu(pdu) + if not backfilled: + ret = yield self.handler.on_receive_pdu(pdu, backfilled=backfilled) + else: + ret = None yield self.pdu_actions.mark_as_processed(pdu) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7026df90a..ef9ed274d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,7 +35,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive(self, event, is_new_state): + def on_receive(self, event, is_new_state, backfilled): if hasattr(event, "state_key") and not is_new_state: logger.debug("Ignoring old state.") return @@ -70,6 +70,21 @@ class FederationHandler(BaseHandler): else: with (yield self.room_lock.lock(event.room_id)): - store_id = yield self.store.persist_event(event) + store_id = yield self.store.persist_event(event, backfilled) - yield self.notifier.on_new_room_event(event, store_id) + if not backfilled: + yield self.notifier.on_new_room_event(event, store_id) + + + @log_function + @defer.inlineCallbacks + def backfill(self, dest, room_id, limit): + events = yield self.hs.get_federation().backfill(dest, room_id, limit) + + for event in events: + try: + yield self.store.persist_event(event, backfilled=True) + except: + logger.debug("Failed to persiste event: %s", event) + + defer.returnValue(events) diff --git a/synapse/rest/room.py b/synapse/rest/room.py index dfb2aabe7..89ea9f0d2 100644 --- a/synapse/rest/room.py +++ b/synapse/rest/room.py @@ -383,6 +383,21 @@ class RoomMessageListRestServlet(RestServlet): defer.returnValue((200, msgs)) +class RoomTriggerBackfill(RestServlet): + PATTERN = client_path_pattern("/rooms/(?P[^/]*)/backfill$") + + @defer.inlineCallbacks + def on_GET(self, request, room_id): + remote_server = urllib.unquote(request.args["remote"][0]) + room_id = urllib.unquote(room_id) + limit = int(request.args["limit"][0]) + + handler = self.handlers.federation_handler + events = yield handler.backfill(remote_server, room_id, limit) + + res = [event.get_dict() for event in events] + defer.returnValue((200, res)) + def _parse_json(request): try: content = json.loads(request.content.read()) @@ -403,3 +418,4 @@ def register_servlets(hs, http_server): RoomMemberListRestServlet(hs).register(http_server) RoomMessageListRestServlet(hs).register(http_server) JoinRoomAliasServlet(hs).register(http_server) + RoomTriggerBackfill(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b846081d4..2243a710d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -20,6 +20,8 @@ from synapse.api.events.room import ( RoomConfigEvent, RoomNameEvent, ) +from synapse.util.logutils import log_function + from .directory import DirectoryStore from .feedback import FeedbackStore from .presence import PresenceStore @@ -32,9 +34,13 @@ from .pdu import StatePduStore, PduStore from .transactions import TransactionStore import json +import logging import os +logger = logging.getLogger(__name__) + + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, PduStore, StatePduStore, TransactionStore, @@ -49,6 +55,7 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token = None @defer.inlineCallbacks + @log_function def persist_event(self, event, backfilled=False): if event.type == RoomMemberEvent.TYPE: yield self._store_room_member(event) @@ -83,6 +90,7 @@ class DataStore(RoomMemberStore, RoomStore, defer.returnValue(event) @defer.inlineCallbacks + @log_function def _store_event(self, event, backfilled): # FIXME (erikj): This should be removed when we start amalgamating # event and pdu storage @@ -101,7 +109,7 @@ class DataStore(RoomMemberStore, RoomStore, if not self.min_token_deferred.called: yield self.min_token_deferred self.min_token -= 1 - vals["token_ordering"] = self.min_token + vals["stream_ordering"] = self.min_token unrec = { k: v @@ -110,7 +118,11 @@ class DataStore(RoomMemberStore, RoomStore, } vals["unrecognized_keys"] = json.dumps(unrec) - yield self._simple_insert("events", vals) + try: + yield self._simple_insert("events", vals) + except: + logger.exception("Failed to persist, probably duplicate") + return if not backfilled and hasattr(event, "state_key"): vals = { @@ -161,10 +173,12 @@ class DataStore(RoomMemberStore, RoomStore, def _get_min_token(self): row = yield self._execute( None, - "SELECT MIN(token_ordering) FROM events" + "SELECT MIN(stream_ordering) FROM events" ) - self.min_token = rows[0][0] if rows and rows[0] else 0 + self.min_token = min(row[0][0], -1) if row and row[0] else -1 + + logger.debug("min_token is: %s", self.min_token) defer.returnValue(self.min_token) diff --git a/synapse/storage/pdu.py b/synapse/storage/pdu.py index a24ce7ab7..7655f43ed 100644 --- a/synapse/storage/pdu.py +++ b/synapse/storage/pdu.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from ._base import SQLBaseStore, Table, JoinHelper from synapse.util.logutils import log_function @@ -319,6 +321,7 @@ class PduStore(SQLBaseStore): return [(row[0], row[1], row[2]) for row in results] + @defer.inlineCallbacks def get_oldest_pdus_in_context(self, context): """Get a list of Pdus that we haven't backfilled beyond yet (and haven't seen). This list is used when we want to backfill backwards and is the @@ -331,17 +334,14 @@ class PduStore(SQLBaseStore): Returns: list: A list of PduIdTuple. """ - return self._db_pool.runInteraction( - self._get_oldest_pdus_in_context, context - ) - - def _get_oldest_pdus_in_context(self, txn, context): - txn.execute( + results = yield self._execute( + None, "SELECT pdu_id, origin FROM %(back)s WHERE context = ?" % {"back": PduBackwardExtremitiesTable.table_name, }, - (context,) + context ) - return [PduIdTuple(i, o) for i, o in txn.fetchall()] + + defer.returnValue([PduIdTuple(i, o) for i, o in results]) def is_pdu_new(self, pdu_id, origin, context, depth): """For a given Pdu, try and figure out if it's 'new', i.e., if it's