mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Correctly handle receiving 'missing' Pdus from federation, rather than just discarding them.
This commit is contained in:
parent
e639a3516d
commit
59516a8bb1
@ -100,17 +100,11 @@ class FederationHandler(BaseHandler):
|
|||||||
is_new_state = yield self.state_handler.handle_new_state(
|
is_new_state = yield self.state_handler.handle_new_state(
|
||||||
pdu
|
pdu
|
||||||
)
|
)
|
||||||
if not is_new_state:
|
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
is_new_state = False
|
is_new_state = False
|
||||||
# TODO: Implement something in federation that allows us to
|
# TODO: Implement something in federation that allows us to
|
||||||
# respond to PDU.
|
# respond to PDU.
|
||||||
|
|
||||||
if hasattr(event, "state_key") and not is_new_state:
|
|
||||||
logger.debug("Ignoring old state: %s", event.event_id)
|
|
||||||
return
|
|
||||||
|
|
||||||
target_is_mine = False
|
target_is_mine = False
|
||||||
if hasattr(event, "target_host"):
|
if hasattr(event, "target_host"):
|
||||||
target_is_mine = event.target_host == self.hs.hostname
|
target_is_mine = event.target_host == self.hs.hostname
|
||||||
@ -141,7 +135,11 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
else:
|
else:
|
||||||
with (yield self.room_lock.lock(event.room_id)):
|
with (yield self.room_lock.lock(event.room_id)):
|
||||||
yield self.store.persist_event(event, backfilled)
|
yield self.store.persist_event(
|
||||||
|
event,
|
||||||
|
backfilled,
|
||||||
|
is_new_state=is_new_state
|
||||||
|
)
|
||||||
|
|
||||||
room = yield self.store.get_room(event.room_id)
|
room = yield self.store.get_room(event.room_id)
|
||||||
|
|
||||||
|
@ -68,7 +68,8 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def persist_event(self, event=None, backfilled=False, pdu=None):
|
def persist_event(self, event=None, backfilled=False, pdu=None,
|
||||||
|
is_new_state=True):
|
||||||
stream_ordering = None
|
stream_ordering = None
|
||||||
if backfilled:
|
if backfilled:
|
||||||
if not self.min_token_deferred.called:
|
if not self.min_token_deferred.called:
|
||||||
@ -83,6 +84,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
event=event,
|
event=event,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
stream_ordering=stream_ordering,
|
stream_ordering=stream_ordering,
|
||||||
|
is_new_state=is_new_state,
|
||||||
)
|
)
|
||||||
except _RollbackButIsFineException as e:
|
except _RollbackButIsFineException as e:
|
||||||
pass
|
pass
|
||||||
@ -109,12 +111,14 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
defer.returnValue(event)
|
defer.returnValue(event)
|
||||||
|
|
||||||
def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
|
def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
|
||||||
backfilled=False, stream_ordering=None):
|
backfilled=False, stream_ordering=None,
|
||||||
|
is_new_state=True):
|
||||||
if pdu is not None:
|
if pdu is not None:
|
||||||
self._persist_event_pdu_txn(txn, pdu)
|
self._persist_event_pdu_txn(txn, pdu)
|
||||||
if event is not None:
|
if event is not None:
|
||||||
return self._persist_event_txn(
|
return self._persist_event_txn(
|
||||||
txn, event, backfilled, stream_ordering
|
txn, event, backfilled, stream_ordering,
|
||||||
|
is_new_state=is_new_state,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _persist_event_pdu_txn(self, txn, pdu):
|
def _persist_event_pdu_txn(self, txn, pdu):
|
||||||
@ -141,7 +145,8 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
|
self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None):
|
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
|
||||||
|
is_new_state=True):
|
||||||
if event.type == RoomMemberEvent.TYPE:
|
if event.type == RoomMemberEvent.TYPE:
|
||||||
self._store_room_member_txn(txn, event)
|
self._store_room_member_txn(txn, event)
|
||||||
elif event.type == FeedbackEvent.TYPE:
|
elif event.type == FeedbackEvent.TYPE:
|
||||||
@ -195,7 +200,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||||||
)
|
)
|
||||||
raise _RollbackButIsFineException("_persist_event")
|
raise _RollbackButIsFineException("_persist_event")
|
||||||
|
|
||||||
if not backfilled and hasattr(event, "state_key"):
|
if is_new_state and hasattr(event, "state_key"):
|
||||||
vals = {
|
vals = {
|
||||||
"event_id": event.event_id,
|
"event_id": event.event_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
|
@ -74,7 +74,9 @@ class FederationTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
|
yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
|
||||||
|
|
||||||
self.datastore.persist_event.assert_called_once_with(ANY, False)
|
self.datastore.persist_event.assert_called_once_with(
|
||||||
|
ANY, False, is_new_state=False
|
||||||
|
)
|
||||||
self.notifier.on_new_room_event.assert_called_once_with(ANY)
|
self.notifier.on_new_room_event.assert_called_once_with(ANY)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
Loading…
Reference in New Issue
Block a user