mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-26 09:45:56 -05:00
Fix tight loop handling presence replication. (#9900)
Only affects workers. Introduced in #9819. Fixes #9899.
This commit is contained in:
parent
8ba086980d
commit
e4ab8676b4
1
changelog.d/9900.bugfix
Normal file
1
changelog.d/9900.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
|
@ -2026,18 +2026,40 @@ class PresenceFederationQueue:
|
||||
)
|
||||
return result["updates"], result["upto_token"], result["limited"]
|
||||
|
||||
# If the from_token is the current token then there's nothing to return
|
||||
# and we can trivially no-op.
|
||||
if from_token == self._next_id - 1:
|
||||
return [], upto_token, False
|
||||
|
||||
# We can find the correct position in the queue by noting that there is
|
||||
# exactly one entry per stream ID, and that the last entry has an ID of
|
||||
# `self._next_id - 1`, so we can count backwards from the end.
|
||||
#
|
||||
# Since we are returning all states in the range `from_token < stream_id
|
||||
# <= upto_token` we look for the index with a `stream_id` of `from_token
|
||||
# + 1`.
|
||||
#
|
||||
# Since the start of the queue is periodically truncated we need to
|
||||
# handle the case where `from_token` stream ID has already been dropped.
|
||||
start_idx = max(from_token - self._next_id, -len(self._queue))
|
||||
start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
|
||||
|
||||
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
|
||||
limited = False
|
||||
new_id = upto_token
|
||||
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
|
||||
if stream_id <= from_token:
|
||||
# Paranoia check that we are actually only sending states that
|
||||
# are have stream_id strictly greater than from_token. We should
|
||||
# never hit this.
|
||||
logger.warning(
|
||||
"Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
|
||||
stream_id,
|
||||
from_token,
|
||||
self._next_id,
|
||||
len(self._queue),
|
||||
)
|
||||
continue
|
||||
|
||||
if stream_id > upto_token:
|
||||
break
|
||||
|
||||
|
@ -509,6 +509,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", upto_token, now_token, 10)
|
||||
)
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
self.assertCountEqual(rows, [])
|
||||
|
||||
def test_send_and_get_split(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
@ -538,6 +546,20 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
now_token = self.queue.get_current_token(self.instance_name)
|
||||
rows, upto_token, limited = self.get_success(
|
||||
self.queue.get_replication_rows("master", upto_token, now_token, 10)
|
||||
)
|
||||
|
||||
self.assertEqual(upto_token, now_token)
|
||||
self.assertFalse(limited)
|
||||
|
||||
expected_rows = [
|
||||
(2, ("dest3", "@user3:test")),
|
||||
]
|
||||
|
||||
self.assertCountEqual(rows, expected_rows)
|
||||
|
||||
def test_clear_queue_all(self):
|
||||
state1 = UserPresenceState.default("@user1:test")
|
||||
state2 = UserPresenceState.default("@user2:test")
|
||||
|
Loading…
x
Reference in New Issue
Block a user