Ensure we persist and ack the same token

This commit is contained in:
Erik Johnston 2020-05-27 19:45:42 +01:00
parent 3d7f1b53d9
commit ef3934ec8f

View File

@ -890,16 +890,18 @@ class FederationSenderHandler(object):
# we're not being re-entered? # we're not being re-entered?
with (await self._fed_position_linearizer.queue(None)): with (await self._fed_position_linearizer.queue(None)):
# We persist and ack the same position, so we take a copy of it
# here as otherwise it can get modified from underneath us.
current_position = self.federation_position
await self.store.update_federation_out_pos( await self.store.update_federation_out_pos(
"federation", self.federation_position "federation", current_position
) )
# We ACK this token over replication so that the master can drop # We ACK this token over replication so that the master can drop
# its in memory queues # its in memory queues
self._hs.get_tcp_replication().send_federation_ack( self._hs.get_tcp_replication().send_federation_ack(current_position)
self.federation_position self._last_ack = current_position
)
self._last_ack = self.federation_position
except Exception: except Exception:
logger.exception("Error updating federation stream position") logger.exception("Error updating federation stream position")