From 5f0c449dd50fa84ff741e09f34cad5330c6e4745 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 13:56:49 +0000 Subject: [PATCH 1/5] Prevent replication wedging --- synapse/replication/tcp/protocol.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 49ae5b335..a6df04d85 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): - """Subscribe the remote to a streams. + """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those updates down if they have. During that time new updates for the stream @@ -478,10 +478,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) + batch_updates = [] for token, update in pending_rdata: - # Only send updates newer than the current token - if token > current_token: - self.send_command(RdataCommand(stream_name, token, update)) + # If the token is null, it is part of a batch update. Batches + # are multiple updates that share a single token. To denote + # this, the token is set to None for all tokens in the batch + # except for the last. If we find a None token, we keep looking + # through tokens until we find one that is not None and then + # process all previous updates in the batch as if they had the + # final token. + if not token or len(batch_updates) > 0: + batch_updates.append(update) + if token and not token > current_token: + # This batch is older than current_token, dismiss + batch_updates = [] + continue + if token: + # Send all updates that are part of this batch with the + # found token + for update in batch_updates: + self.send_command(RdataCommand(stream_name, token, update)) + else: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) # They're now fully subscribed self.replication_streams.add(stream_name) From 0bc50fb60a563e33fd37d43276107ffc4b7d2d9a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 14:05:16 +0000 Subject: [PATCH 2/5] Add changelog --- changelog.d/4792.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/4792.bugfix diff --git a/changelog.d/4792.bugfix b/changelog.d/4792.bugfix new file mode 100644 index 000000000..b127b6254 --- /dev/null +++ b/changelog.d/4792.bugfix @@ -0,0 +1 @@ +Handle batch updates in worker replication protocol. \ No newline at end of file From 9f7cdf3da16e4e6c29229dcc80d9cf060cd64584 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 14:36:52 +0000 Subject: [PATCH 3/5] Clearer branching, fix missing list clear --- synapse/replication/tcp/protocol.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index a6df04d85..53615b7ee 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -488,16 +488,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # process all previous updates in the batch as if they had the # final token. if not token or len(batch_updates) > 0: - batch_updates.append(update) - if token and not token > current_token: + if token is None: + # Store this update as part of the batch + batch_updates.append(update) + elif current_token <= current_token: # This batch is older than current_token, dismiss batch_updates = [] - continue - if token: + else: + # Append final update of this batch before sending + batch_updates.append(update) + # Send all updates that are part of this batch with the # found token for update in batch_updates: self.send_command(RdataCommand(stream_name, token, update)) + + # Clear saved batch updates + batch_updates = [] else: # Only send updates newer than the current token if token > current_token: From fe7bd23a85988c5251fe17e78589b69f92f21dd7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 4 Mar 2019 15:08:15 +0000 Subject: [PATCH 4/5] Clean up logic and add comments --- synapse/replication/tcp/protocol.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 53615b7ee..dac4fbeef 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -487,15 +487,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # through tokens until we find one that is not None and then # process all previous updates in the batch as if they had the # final token. - if not token or len(batch_updates) > 0: - if token is None: - # Store this update as part of the batch - batch_updates.append(update) - elif current_token <= current_token: - # This batch is older than current_token, dismiss + if token is None: + # Store this update as part of a batch + batch_updates.append(update) + continue + + if len(batch_updates) > 0: + # There is an ongoing batch and this is the end + if current_token <= current_token: + # This batch is older than current_token, dismiss it batch_updates = [] else: - # Append final update of this batch before sending + # This is the end of the batch. Append final update of + # this batch before sending batch_updates.append(update) # Send all updates that are part of this batch with the @@ -505,10 +509,13 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Clear saved batch updates batch_updates = [] - else: - # Only send updates newer than the current token - if token > current_token: - self.send_command(RdataCommand(stream_name, token, update)) + continue + + # This is an update that's not part of a batch. + # + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) # They're now fully subscribed self.replication_streams.add(stream_name) From b9f61630927752422fb80cf7ece083741aefd399 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 5 Mar 2019 13:58:30 +0000 Subject: [PATCH 5/5] Simplify token replication logic --- synapse/replication/tcp/protocol.py | 37 +++++++++++------------------ 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index dac4fbeef..55630ba9a 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) - batch_updates = [] + updates = [] for token, update in pending_rdata: # If the token is null, it is part of a batch update. Batches # are multiple updates that share a single token. To denote @@ -489,34 +489,25 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # final token. if token is None: # Store this update as part of a batch - batch_updates.append(update) + updates.append(update) continue - if len(batch_updates) > 0: - # There is an ongoing batch and this is the end - if current_token <= current_token: - # This batch is older than current_token, dismiss it - batch_updates = [] - else: - # This is the end of the batch. Append final update of - # this batch before sending - batch_updates.append(update) - - # Send all updates that are part of this batch with the - # found token - for update in batch_updates: - self.send_command(RdataCommand(stream_name, token, update)) - - # Clear saved batch updates - batch_updates = [] + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] continue - # This is an update that's not part of a batch. - # - # Only send updates newer than the current token - if token > current_token: + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: