diff --git a/changelog.d/10157.bugfix b/changelog.d/10157.bugfix new file mode 100644 index 000000000..6eaaa05b8 --- /dev/null +++ b/changelog.d/10157.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response. diff --git a/changelog.d/10157.misc b/changelog.d/10157.misc deleted file mode 100644 index 6c1d0e6e5..000000000 --- a/changelog.d/10157.misc +++ /dev/null @@ -1 +0,0 @@ -Extend `ResponseCache` to pass a context object into the callback. diff --git a/changelog.d/10158.bugfix b/changelog.d/10158.bugfix new file mode 100644 index 000000000..6eaaa05b8 --- /dev/null +++ b/changelog.d/10158.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.21.0 which could cause `/sync` to return immediately with an empty response. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7f2138d80..b9a036105 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -49,7 +49,7 @@ from synapse.types import ( from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache -from synapse.util.caches.response_cache import ResponseCache +from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client @@ -83,12 +83,15 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 +SyncRequestKey = Tuple[Any, ...] + + @attr.s(slots=True, frozen=True) class SyncConfig: user = attr.ib(type=UserID) filter_collection = attr.ib(type=FilterCollection) is_guest = attr.ib(type=bool) - request_key = attr.ib(type=Tuple[Any, ...]) + request_key = attr.ib(type=SyncRequestKey) device_id = attr.ib(type=Optional[str]) @@ -266,9 +269,9 @@ class SyncHandler: self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache( + self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache( hs.get_clock(), "sync" - ) # type: ResponseCache[Tuple[Any, ...]] + ) self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() @@ -307,6 +310,7 @@ class SyncHandler: since_token, timeout, full_state, + cache_context=True, ) logger.debug("Returning sync response for %s", user_id) return res @@ -314,9 +318,10 @@ class SyncHandler: async def _wait_for_sync_for_user( self, sync_config: SyncConfig, - since_token: Optional[StreamToken] = None, - timeout: int = 0, - full_state: bool = False, + since_token: Optional[StreamToken], + timeout: int, + full_state: bool, + cache_context: ResponseCacheContext[SyncRequestKey], ) -> SyncResult: if since_token is None: sync_type = "initial_sync" @@ -343,13 +348,13 @@ class SyncHandler: if timeout == 0 or since_token is None or full_state: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. - result = await self.current_sync_for_user( + result: SyncResult = await self.current_sync_for_user( sync_config, since_token, full_state=full_state ) else: - def current_sync_callback(before_token, after_token): - return self.current_sync_for_user(sync_config, since_token) + async def current_sync_callback(before_token, after_token) -> SyncResult: + return await self.current_sync_for_user(sync_config, since_token) result = await self.notifier.wait_for_events( sync_config.user.to_string(), @@ -358,6 +363,17 @@ class SyncHandler: from_token=since_token, ) + # if nothing has happened in any of the users' rooms since /sync was called, + # the resultant next_batch will be the same as since_token (since the result + # is generated when wait_for_events is first called, and not regenerated + # when wait_for_events times out). + # + # If that happens, we mustn't cache it, so that when the client comes back + # with the same cache token, we don't immediately return the same empty + # result, causing a tightloop. (#8518) + if result.next_batch == since_token: + cache_context.should_cache = False + if result: if sync_config.filter_collection.lazy_load_members(): lazy_loaded = "true" diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 546231bec..bf361c42d 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -75,11 +75,9 @@ REQUIREMENTS = [ "phonenumbers>=8.2.0", # we use GaugeHistogramMetric, which was added in prom-client 0.4.0. "prometheus_client>=0.4.0", - # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: - # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 - # is out in November.) + # we use `order`, which arrived in attrs 19.2.0. # Note: 21.1.0 broke `/sync`, see #9936 - "attrs>=19.1.0,!=21.1.0", + "attrs>=19.2.0,!=21.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", diff --git a/synapse/types.py b/synapse/types.py index 0bdf32659..8d2fa00f7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -404,7 +404,7 @@ def map_username_to_mxid_localpart( return username.decode("ascii") -@attr.s(frozen=True, slots=True, cmp=False) +@attr.s(frozen=True, slots=True, order=False) class RoomStreamToken: """Tokens are positions between events. The token "s1" comes after event 1. diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index b52f78ba6..012910f13 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -558,3 +558,53 @@ class UnreadMessagesTestCase(unittest.HomeserverTestCase): # Store the next batch for the next request. self.next_batch = channel.json_body["next_batch"] + + +class SyncCacheTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + sync.register_servlets, + ] + + def test_noop_sync_does_not_tightloop(self): + """If the sync times out, we shouldn't cache the result + + Essentially a regression test for #8518. + """ + self.user_id = self.register_user("kermit", "monkey") + self.tok = self.login("kermit", "monkey") + + # we should immediately get an initial sync response + channel = self.make_request("GET", "/sync", access_token=self.tok) + self.assertEqual(channel.code, 200, channel.json_body) + + # now, make an incremental sync request, with a timeout + next_batch = channel.json_body["next_batch"] + channel = self.make_request( + "GET", + f"/sync?since={next_batch}&timeout=10000", + access_token=self.tok, + await_result=False, + ) + # that should block for 10 seconds + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=9900) + channel.await_result(timeout_ms=200) + self.assertEqual(channel.code, 200, channel.json_body) + + # we expect the next_batch in the result to be the same as before + self.assertEqual(channel.json_body["next_batch"], next_batch) + + # another incremental sync should also block. + channel = self.make_request( + "GET", + f"/sync?since={next_batch}&timeout=10000", + access_token=self.tok, + await_result=False, + ) + # that should block for 10 seconds + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=9900) + channel.await_result(timeout_ms=200) + self.assertEqual(channel.code, 200, channel.json_body) diff --git a/tests/server.py b/tests/server.py index 9df8cda24..f32d8dc37 100644 --- a/tests/server.py +++ b/tests/server.py @@ -138,21 +138,19 @@ class FakeChannel: def transport(self): return self - def await_result(self, timeout: int = 100) -> None: + def await_result(self, timeout_ms: int = 1000) -> None: """ Wait until the request is finished. """ + end_time = self._reactor.seconds() + timeout_ms / 1000.0 self._reactor.run() - x = 0 while not self.is_finished(): # If there's a producer, tell it to resume producing so we get content if self._producer: self._producer.resumeProducing() - x += 1 - - if x > timeout: + if self._reactor.seconds() > end_time: raise TimedOutException("Timed out waiting for request to finish.") self._reactor.advance(0.1)