From ef3934ec8f123f6f553b07471588fbcc7f444cd8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 May 2020 19:45:42 +0100 Subject: [PATCH] Ensure we persist and ack the same token --- synapse/app/generic_worker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 28ebf6745..f3ec2a34e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -890,16 +890,18 @@ class FederationSenderHandler(object): # we're not being re-entered? with (await self._fed_position_linearizer.queue(None)): + # We persist and ack the same position, so we take a copy of it + # here as otherwise it can get modified from underneath us. + current_position = self.federation_position + await self.store.update_federation_out_pos( - "federation", self.federation_position + "federation", current_position ) # We ACK this token over replication so that the master can drop # its in memory queues - self._hs.get_tcp_replication().send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + self._hs.get_tcp_replication().send_federation_ack(current_position) + self._last_ack = current_position except Exception: logger.exception("Error updating federation stream position")