diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a6df04d85..53615b7ee 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -488,16 +488,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # process all previous updates in the batch as if they had the # final token. if not token or len(batch_updates) > 0: - batch_updates.append(update) - if token and not token > current_token: + if token is None: + # Store this update as part of the batch + batch_updates.append(update) + elif current_token <= current_token: # This batch is older than current_token, dismiss batch_updates = [] - continue - if token: + else: + # Append final update of this batch before sending + batch_updates.append(update) + # Send all updates that are part of this batch with the # found token for update in batch_updates: self.send_command(RdataCommand(stream_name, token, update)) + + # Clear saved batch updates + batch_updates = [] else: # Only send updates newer than the current token if token > current_token: