mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-06-13 10:42:51 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
This commit is contained in:
commit
076bc0510b
3 changed files with 17 additions and 8 deletions
|
@ -365,6 +365,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
self.streamer.new_connection(self)
|
self.streamer.new_connection(self)
|
||||||
|
|
||||||
def on_NAME(self, cmd):
|
def on_NAME(self, cmd):
|
||||||
|
logger.info("[%s] Renamed to %r", self.id(), cmd.data)
|
||||||
self.name = cmd.data
|
self.name = cmd.data
|
||||||
|
|
||||||
def on_USER_SYNC(self, cmd):
|
def on_USER_SYNC(self, cmd):
|
||||||
|
@ -414,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
token, row = update[0], update[1]
|
token, row = update[0], update[1]
|
||||||
self.send_command(RdataCommand(stream_name, token, row))
|
self.send_command(RdataCommand(stream_name, token, row))
|
||||||
|
|
||||||
# Now we can send any updates that came in while we were subscribing
|
|
||||||
pending_rdata = self.pending_rdata.pop(stream_name, [])
|
|
||||||
for token, update in pending_rdata:
|
|
||||||
self.send_command(RdataCommand(stream_name, token, update))
|
|
||||||
|
|
||||||
# We send a POSITION command to ensure that they have an up to
|
# We send a POSITION command to ensure that they have an up to
|
||||||
# date token (especially useful if we didn't send any updates
|
# date token (especially useful if we didn't send any updates
|
||||||
# above)
|
# above)
|
||||||
self.send_command(PositionCommand(stream_name, current_token))
|
self.send_command(PositionCommand(stream_name, current_token))
|
||||||
|
|
||||||
|
# Now we can send any updates that came in while we were subscribing
|
||||||
|
pending_rdata = self.pending_rdata.pop(stream_name, [])
|
||||||
|
for token, update in pending_rdata:
|
||||||
|
# Only send updates newer than the current token
|
||||||
|
if token > current_token:
|
||||||
|
self.send_command(RdataCommand(stream_name, token, update))
|
||||||
|
|
||||||
# They're now fully subscribed
|
# They're now fully subscribed
|
||||||
self.replication_streams.add(stream_name)
|
self.replication_streams.add(stream_name)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -442,7 +445,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
self.send_command(RdataCommand(stream_name, token, data))
|
self.send_command(RdataCommand(stream_name, token, data))
|
||||||
elif stream_name in self.connecting_streams:
|
elif stream_name in self.connecting_streams:
|
||||||
# The client is being subscribed to the stream
|
# The client is being subscribed to the stream
|
||||||
logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
|
logger.debug("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
|
||||||
self.pending_rdata.setdefault(stream_name, []).append((token, data))
|
self.pending_rdata.setdefault(stream_name, []).append((token, data))
|
||||||
else:
|
else:
|
||||||
# The client isn't subscribed
|
# The client isn't subscribed
|
||||||
|
@ -453,7 +456,6 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
|
||||||
|
|
||||||
def on_connection_closed(self):
|
def on_connection_closed(self):
|
||||||
BaseReplicationStreamProtocol.on_connection_closed(self)
|
BaseReplicationStreamProtocol.on_connection_closed(self)
|
||||||
logger.info("[%s] Replication connection closed", self.id())
|
|
||||||
self.streamer.lost_connection(self)
|
self.streamer.lost_connection(self)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ class ReplicationStreamer(object):
|
||||||
# Don't bother if nothing is listening. We still need to advance
|
# Don't bother if nothing is listening. We still need to advance
|
||||||
# the stream tokens otherwise they'll fall beihind forever
|
# the stream tokens otherwise they'll fall beihind forever
|
||||||
for stream in self.streams:
|
for stream in self.streams:
|
||||||
stream.advance_current_token()
|
stream.discard_updates_and_advance()
|
||||||
return
|
return
|
||||||
|
|
||||||
# If we're in the process of checking for new updates, mark that fact
|
# If we're in the process of checking for new updates, mark that fact
|
||||||
|
|
|
@ -89,6 +89,13 @@ class Stream(object):
|
||||||
"""
|
"""
|
||||||
self.upto_token = self.current_token()
|
self.upto_token = self.current_token()
|
||||||
|
|
||||||
|
def discard_updates_and_advance(self):
|
||||||
|
"""Called when the stream should advance but the updates would be discarded,
|
||||||
|
e.g. when there are no currently connected workers.
|
||||||
|
"""
|
||||||
|
self.upto_token = self.current_token()
|
||||||
|
self.last_token = self.upto_token
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_updates(self):
|
def get_updates(self):
|
||||||
"""Gets all updates since the last time this function was called (or
|
"""Gets all updates since the last time this function was called (or
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue