mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-23 07:01:05 -05:00
Ensure that we do not cache empty sync responses after a timeout (#10158)
Fixes #8518 by telling the ResponseCache not to cache the /sync response if the next_batch param is the same as the since token.
This commit is contained in:
parent
9cf6e0eae7
commit
fcf3c7032b
1
changelog.d/10157.bugfix
Normal file
1
changelog.d/10157.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response.
|
@ -1 +0,0 @@
|
|||||||
Extend `ResponseCache` to pass a context object into the callback.
|
|
1
changelog.d/10158.bugfix
Normal file
1
changelog.d/10158.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response.
|
@ -49,7 +49,7 @@ from synapse.types import (
|
|||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
@ -83,12 +83,15 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
|||||||
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
||||||
|
|
||||||
|
|
||||||
|
SyncRequestKey = Tuple[Any, ...]
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, frozen=True)
|
@attr.s(slots=True, frozen=True)
|
||||||
class SyncConfig:
|
class SyncConfig:
|
||||||
user = attr.ib(type=UserID)
|
user = attr.ib(type=UserID)
|
||||||
filter_collection = attr.ib(type=FilterCollection)
|
filter_collection = attr.ib(type=FilterCollection)
|
||||||
is_guest = attr.ib(type=bool)
|
is_guest = attr.ib(type=bool)
|
||||||
request_key = attr.ib(type=Tuple[Any, ...])
|
request_key = attr.ib(type=SyncRequestKey)
|
||||||
device_id = attr.ib(type=Optional[str])
|
device_id = attr.ib(type=Optional[str])
|
||||||
|
|
||||||
|
|
||||||
@ -266,9 +269,9 @@ class SyncHandler:
|
|||||||
self.presence_handler = hs.get_presence_handler()
|
self.presence_handler = hs.get_presence_handler()
|
||||||
self.event_sources = hs.get_event_sources()
|
self.event_sources = hs.get_event_sources()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.response_cache = ResponseCache(
|
self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
|
||||||
hs.get_clock(), "sync"
|
hs.get_clock(), "sync"
|
||||||
) # type: ResponseCache[Tuple[Any, ...]]
|
)
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.auth = hs.get_auth()
|
self.auth = hs.get_auth()
|
||||||
self.storage = hs.get_storage()
|
self.storage = hs.get_storage()
|
||||||
@ -307,6 +310,7 @@ class SyncHandler:
|
|||||||
since_token,
|
since_token,
|
||||||
timeout,
|
timeout,
|
||||||
full_state,
|
full_state,
|
||||||
|
cache_context=True,
|
||||||
)
|
)
|
||||||
logger.debug("Returning sync response for %s", user_id)
|
logger.debug("Returning sync response for %s", user_id)
|
||||||
return res
|
return res
|
||||||
@ -314,9 +318,10 @@ class SyncHandler:
|
|||||||
async def _wait_for_sync_for_user(
|
async def _wait_for_sync_for_user(
|
||||||
self,
|
self,
|
||||||
sync_config: SyncConfig,
|
sync_config: SyncConfig,
|
||||||
since_token: Optional[StreamToken] = None,
|
since_token: Optional[StreamToken],
|
||||||
timeout: int = 0,
|
timeout: int,
|
||||||
full_state: bool = False,
|
full_state: bool,
|
||||||
|
cache_context: ResponseCacheContext[SyncRequestKey],
|
||||||
) -> SyncResult:
|
) -> SyncResult:
|
||||||
if since_token is None:
|
if since_token is None:
|
||||||
sync_type = "initial_sync"
|
sync_type = "initial_sync"
|
||||||
@ -343,13 +348,13 @@ class SyncHandler:
|
|||||||
if timeout == 0 or since_token is None or full_state:
|
if timeout == 0 or since_token is None or full_state:
|
||||||
# we are going to return immediately, so don't bother calling
|
# we are going to return immediately, so don't bother calling
|
||||||
# notifier.wait_for_events.
|
# notifier.wait_for_events.
|
||||||
result = await self.current_sync_for_user(
|
result: SyncResult = await self.current_sync_for_user(
|
||||||
sync_config, since_token, full_state=full_state
|
sync_config, since_token, full_state=full_state
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
|
||||||
def current_sync_callback(before_token, after_token):
|
async def current_sync_callback(before_token, after_token) -> SyncResult:
|
||||||
return self.current_sync_for_user(sync_config, since_token)
|
return await self.current_sync_for_user(sync_config, since_token)
|
||||||
|
|
||||||
result = await self.notifier.wait_for_events(
|
result = await self.notifier.wait_for_events(
|
||||||
sync_config.user.to_string(),
|
sync_config.user.to_string(),
|
||||||
@ -358,6 +363,17 @@ class SyncHandler:
|
|||||||
from_token=since_token,
|
from_token=since_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# if nothing has happened in any of the users' rooms since /sync was called,
|
||||||
|
# the resultant next_batch will be the same as since_token (since the result
|
||||||
|
# is generated when wait_for_events is first called, and not regenerated
|
||||||
|
# when wait_for_events times out).
|
||||||
|
#
|
||||||
|
# If that happens, we mustn't cache it, so that when the client comes back
|
||||||
|
# with the same cache token, we don't immediately return the same empty
|
||||||
|
# result, causing a tightloop. (#8518)
|
||||||
|
if result.next_batch == since_token:
|
||||||
|
cache_context.should_cache = False
|
||||||
|
|
||||||
if result:
|
if result:
|
||||||
if sync_config.filter_collection.lazy_load_members():
|
if sync_config.filter_collection.lazy_load_members():
|
||||||
lazy_loaded = "true"
|
lazy_loaded = "true"
|
||||||
|
@ -75,11 +75,9 @@ REQUIREMENTS = [
|
|||||||
"phonenumbers>=8.2.0",
|
"phonenumbers>=8.2.0",
|
||||||
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
|
# we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
|
||||||
"prometheus_client>=0.4.0",
|
"prometheus_client>=0.4.0",
|
||||||
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
|
# we use `order`, which arrived in attrs 19.2.0.
|
||||||
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
|
|
||||||
# is out in November.)
|
|
||||||
# Note: 21.1.0 broke `/sync`, see #9936
|
# Note: 21.1.0 broke `/sync`, see #9936
|
||||||
"attrs>=19.1.0,!=21.1.0",
|
"attrs>=19.2.0,!=21.1.0",
|
||||||
"netaddr>=0.7.18",
|
"netaddr>=0.7.18",
|
||||||
"Jinja2>=2.9",
|
"Jinja2>=2.9",
|
||||||
"bleach>=1.4.3",
|
"bleach>=1.4.3",
|
||||||
|
@ -404,7 +404,7 @@ def map_username_to_mxid_localpart(
|
|||||||
return username.decode("ascii")
|
return username.decode("ascii")
|
||||||
|
|
||||||
|
|
||||||
@attr.s(frozen=True, slots=True, cmp=False)
|
@attr.s(frozen=True, slots=True, order=False)
|
||||||
class RoomStreamToken:
|
class RoomStreamToken:
|
||||||
"""Tokens are positions between events. The token "s1" comes after event 1.
|
"""Tokens are positions between events. The token "s1" comes after event 1.
|
||||||
|
|
||||||
|
@ -558,3 +558,53 @@ class UnreadMessagesTestCase(unittest.HomeserverTestCase):
|
|||||||
|
|
||||||
# Store the next batch for the next request.
|
# Store the next batch for the next request.
|
||||||
self.next_batch = channel.json_body["next_batch"]
|
self.next_batch = channel.json_body["next_batch"]
|
||||||
|
|
||||||
|
|
||||||
|
class SyncCacheTestCase(unittest.HomeserverTestCase):
|
||||||
|
servlets = [
|
||||||
|
synapse.rest.admin.register_servlets,
|
||||||
|
login.register_servlets,
|
||||||
|
sync.register_servlets,
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_noop_sync_does_not_tightloop(self):
|
||||||
|
"""If the sync times out, we shouldn't cache the result
|
||||||
|
|
||||||
|
Essentially a regression test for #8518.
|
||||||
|
"""
|
||||||
|
self.user_id = self.register_user("kermit", "monkey")
|
||||||
|
self.tok = self.login("kermit", "monkey")
|
||||||
|
|
||||||
|
# we should immediately get an initial sync response
|
||||||
|
channel = self.make_request("GET", "/sync", access_token=self.tok)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
# now, make an incremental sync request, with a timeout
|
||||||
|
next_batch = channel.json_body["next_batch"]
|
||||||
|
channel = self.make_request(
|
||||||
|
"GET",
|
||||||
|
f"/sync?since={next_batch}&timeout=10000",
|
||||||
|
access_token=self.tok,
|
||||||
|
await_result=False,
|
||||||
|
)
|
||||||
|
# that should block for 10 seconds
|
||||||
|
with self.assertRaises(TimedOutException):
|
||||||
|
channel.await_result(timeout_ms=9900)
|
||||||
|
channel.await_result(timeout_ms=200)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
|
||||||
|
# we expect the next_batch in the result to be the same as before
|
||||||
|
self.assertEqual(channel.json_body["next_batch"], next_batch)
|
||||||
|
|
||||||
|
# another incremental sync should also block.
|
||||||
|
channel = self.make_request(
|
||||||
|
"GET",
|
||||||
|
f"/sync?since={next_batch}&timeout=10000",
|
||||||
|
access_token=self.tok,
|
||||||
|
await_result=False,
|
||||||
|
)
|
||||||
|
# that should block for 10 seconds
|
||||||
|
with self.assertRaises(TimedOutException):
|
||||||
|
channel.await_result(timeout_ms=9900)
|
||||||
|
channel.await_result(timeout_ms=200)
|
||||||
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
|
@ -138,21 +138,19 @@ class FakeChannel:
|
|||||||
def transport(self):
|
def transport(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def await_result(self, timeout: int = 100) -> None:
|
def await_result(self, timeout_ms: int = 1000) -> None:
|
||||||
"""
|
"""
|
||||||
Wait until the request is finished.
|
Wait until the request is finished.
|
||||||
"""
|
"""
|
||||||
|
end_time = self._reactor.seconds() + timeout_ms / 1000.0
|
||||||
self._reactor.run()
|
self._reactor.run()
|
||||||
x = 0
|
|
||||||
|
|
||||||
while not self.is_finished():
|
while not self.is_finished():
|
||||||
# If there's a producer, tell it to resume producing so we get content
|
# If there's a producer, tell it to resume producing so we get content
|
||||||
if self._producer:
|
if self._producer:
|
||||||
self._producer.resumeProducing()
|
self._producer.resumeProducing()
|
||||||
|
|
||||||
x += 1
|
if self._reactor.seconds() > end_time:
|
||||||
|
|
||||||
if x > timeout:
|
|
||||||
raise TimedOutException("Timed out waiting for request to finish.")
|
raise TimedOutException("Timed out waiting for request to finish.")
|
||||||
|
|
||||||
self._reactor.advance(0.1)
|
self._reactor.advance(0.1)
|
||||||
|
Loading…
Reference in New Issue
Block a user