diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 53615b7ee..dac4fbeef 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -487,15 +487,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # through tokens until we find one that is not None and then # process all previous updates in the batch as if they had the # final token. - if not token or len(batch_updates) > 0: - if token is None: - # Store this update as part of the batch - batch_updates.append(update) - elif current_token <= current_token: - # This batch is older than current_token, dismiss + if token is None: + # Store this update as part of a batch + batch_updates.append(update) + continue + + if len(batch_updates) > 0: + # There is an ongoing batch and this is the end + if current_token <= current_token: + # This batch is older than current_token, dismiss it batch_updates = [] else: - # Append final update of this batch before sending + # This is the end of the batch. Append final update of + # this batch before sending batch_updates.append(update) # Send all updates that are part of this batch with the @@ -505,10 +509,13 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Clear saved batch updates batch_updates = [] - else: - # Only send updates newer than the current token - if token > current_token: - self.send_command(RdataCommand(stream_name, token, update)) + continue + + # This is an update that's not part of a batch. + # + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) # They're now fully subscribed self.replication_streams.add(stream_name)