diff --git a/changelog.d/7482.bugfix b/changelog.d/7482.bugfix new file mode 100644 index 000000000..018bf5cc8 --- /dev/null +++ b/changelog.d/7482.bugfix @@ -0,0 +1 @@ +Fix Redis reconnection logic that can result in missed updates over replication if master reconnects to Redis without restarting. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 1b0546848..e6a50aa74 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -151,6 +151,13 @@ class ReplicationCommandHandler: hs.get_reactor().connectTCP(host, port, self._factory) async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand): + self.send_positions_to_connection(conn) + + def send_positions_to_connection(self, conn: AbstractConnection): + """Send current position of all streams this process is source of to + the connection. + """ + # We only want to announce positions by the writer of the streams. # Currently this is just the master process. if not self._is_master: @@ -158,7 +165,7 @@ class ReplicationCommandHandler: for stream_name, stream in self._streams.items(): current_token = stream.current_token(self._instance_name) - self.send_command( + conn.send_command( PositionCommand(stream_name, self._instance_name, current_token) ) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 55bfa71df..e776b6318 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -70,7 +70,6 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): logger.info("Connected to redis") super().connectionMade() run_as_background_process("subscribe-replication", self._send_subscribe) - self.handler.new_connection(self) async def _send_subscribe(self): # it's important to make sure that we only send the REPLICATE command once we @@ -81,9 +80,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): logger.info( "Successfully subscribed to redis stream, sending REPLICATE command" ) + self.handler.new_connection(self) await self._async_send_command(ReplicateCommand()) logger.info("REPLICATE successfully sent") + # We send out our positions when there is a new connection in case the + # other side missed updates. We do this for Redis connections as the + # otherside won't know we've connected and so won't issue a REPLICATE. + self.handler.send_positions_to_connection(self) + def messageReceived(self, pattern: str, channel: str, message: str): """Received a message from redis. """