Merge pull request #3093 from matrix-org/rav/response_cache_wrap

Refactor ResponseCache usage
This commit is contained in:
Richard van der Hoff 2018-04-20 11:31:17 +01:00 committed by GitHub
commit 11a67b7c9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 78 deletions

View File

@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID from synapse.types import ThirdPartyInstanceID
@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None) defer.returnValue(None)
key = (service.id, protocol) key = (service.id, protocol)
result = self.protocol_meta_cache.get(key) return self.protocol_meta_cache.wrap(key, _get)
if not result:
result = self.protocol_meta_cache.set(
key, preserve_fn(_get)()
)
return make_deferred_yieldable(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None): def push_bulk(self, service, events, txn_id=None):

View File

@ -30,7 +30,6 @@ import synapse.metrics
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util import async from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in # when processing incoming transactions, we try to handle multiple rooms in
@ -212,16 +211,17 @@ class FederationServer(FederationBase):
if not in_room: if not in_room:
raise AuthError(403, "Host not in room.") raise AuthError(403, "Host not in room.")
result = self._state_resp_cache.get((room_id, event_id)) # we grab the linearizer to protect ourselves from servers which hammer
if not result: # us. In theory we might already have the response to this query
with (yield self._server_linearizer.queue((origin, room_id))): # in the cache so we could return it without waiting for the linearizer
d = self._state_resp_cache.set( # - but that's non-trivial to get right, and anyway somewhat defeats
(room_id, event_id), # the point of the linearizer.
preserve_fn(self._on_context_state_request_compute)(room_id, event_id) with (yield self._server_linearizer.queue((origin, room_id))):
) resp = yield self._state_resp_cache.wrap(
resp = yield make_deferred_yieldable(d) (room_id, event_id),
else: self._on_context_state_request_compute,
resp = yield make_deferred_yieldable(result) room_id, event_id,
)
defer.returnValue((200, resp)) defer.returnValue((200, resp))

View File

@ -20,7 +20,6 @@ from ._base import BaseHandler
from synapse.api.constants import ( from synapse.api.constants import (
EventTypes, JoinRules, EventTypes, JoinRules,
) )
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
@ -78,18 +77,11 @@ class RoomListHandler(BaseHandler):
) )
key = (limit, since_token, network_tuple) key = (limit, since_token, network_tuple)
result = self.response_cache.get(key) return self.response_cache.wrap(
if not result: key,
logger.info("No cached result, calculating one.") self._get_public_room_list,
result = self.response_cache.set( limit, since_token, network_tuple=network_tuple,
key, )
preserve_fn(self._get_public_room_list)(
limit, since_token, network_tuple=network_tuple
)
)
else:
logger.info("Using cached deferred result.")
return make_deferred_yieldable(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None, def _get_public_room_list(self, limit=None, since_token=None,
@ -423,18 +415,14 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks, server_name, limit, since_token, include_all_networks,
third_party_instance_id, third_party_instance_id,
) )
result = self.remote_response_cache.get(key) return self.remote_response_cache.wrap(
if not result: key,
result = self.remote_response_cache.set( repl_layer.get_public_rooms,
key, server_name, limit=limit, since_token=since_token,
repl_layer.get_public_rooms( search_filter=search_filter,
server_name, limit=limit, since_token=since_token, include_all_networks=include_all_networks,
search_filter=search_filter, third_party_instance_id=third_party_instance_id,
include_all_networks=include_all_networks, )
third_party_instance_id=third_party_instance_id,
)
)
return result
class RoomListNextBatch(namedtuple("RoomListNextBatch", ( class RoomListNextBatch(namedtuple("RoomListNextBatch", (

View File

@ -15,7 +15,7 @@
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user from synapse.push.clientformat import format_push_rules_for_user
@ -180,15 +180,11 @@ class SyncHandler(object):
Returns: Returns:
A Deferred SyncResult. A Deferred SyncResult.
""" """
result = self.response_cache.get(sync_config.request_key) return self.response_cache.wrap(
if not result: sync_config.request_key,
result = self.response_cache.set( self._wait_for_sync_for_user,
sync_config.request_key, sync_config, since_token, timeout, full_state,
preserve_fn(self._wait_for_sync_for_user)( )
sync_config, since_token, timeout, full_state
)
)
return make_deferred_yieldable(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout, def _wait_for_sync_for_user(self, sync_config, since_token, timeout,

View File

@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet):
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id): def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id) return self.response_cache.wrap(
if not result: event_id,
result = self.response_cache.set( self._handle_request,
event_id, request
self._handle_request(request) )
)
else:
logger.warn("Returning cached response")
return make_deferred_yieldable(result)
@preserve_fn
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_request(self, request): def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"): with Measure(self.clock, "repl_send_event_parse"):

View File

@ -12,9 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics from synapse.util.caches import metrics as cache_metrics
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class ResponseCache(object): class ResponseCache(object):
@ -31,6 +37,7 @@ class ResponseCache(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000. self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache( self._metrics = cache_metrics.register_cache(
"response_cache", "response_cache",
size_callback=lambda: self.size(), size_callback=lambda: self.size(),
@ -43,15 +50,21 @@ class ResponseCache(object):
def get(self, key): def get(self, key):
"""Look up the given key. """Look up the given key.
Returns a deferred which doesn't follow the synapse logcontext rules, Can return either a new Deferred (which also doesn't follow the synapse
so you'll probably want to make_deferred_yieldable it. logcontext rules), or, if the request has completed, the actual
result. You will probably want to make_deferred_yieldable the result.
If there is no entry for the key, returns None. It is worth noting that
this means there is no way to distinguish a completed result of None
from an absent cache entry.
Args: Args:
key (str): key (hashable):
Returns: Returns:
twisted.internet.defer.Deferred|None: None if there is no entry twisted.internet.defer.Deferred|None|E: None if there is no entry
for this key; otherwise a deferred result. for this key; otherwise either a deferred result or the result
itself.
""" """
result = self.pending_result_cache.get(key) result = self.pending_result_cache.get(key)
if result is not None: if result is not None:
@ -68,19 +81,17 @@ class ResponseCache(object):
you should wrap normal synapse deferreds with you should wrap normal synapse deferreds with
logcontext.run_in_background). logcontext.run_in_background).
Returns a new Deferred which also doesn't follow the synapse logcontext Can return either a new Deferred (which also doesn't follow the synapse
rules, so you will want to make_deferred_yieldable it logcontext rules), or, if *deferred* was already complete, the actual
result. You will probably want to make_deferred_yieldable the result.
(TODO: before using this more widely, it might make sense to refactor
it and get() so that they do the necessary wrapping rather than having
to do it everywhere ResponseCache is used.)
Args: Args:
key (str): key (hashable):
deferred (twisted.internet.defer.Deferred): deferred (twisted.internet.defer.Deferred[T):
Returns: Returns:
twisted.internet.defer.Deferred twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
result.
""" """
result = ObservableDeferred(deferred, consumeErrors=True) result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result self.pending_result_cache[key] = result
@ -97,3 +108,52 @@ class ResponseCache(object):
result.addBoth(remove) result.addBoth(remove)
return result.observe() return result.observe()
def wrap(self, key, callback, *args, **kwargs):
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Example usage:
@defer.inlineCallbacks
def handle_request(request):
# etc
defer.returnValue(result)
result = yield response_cache.wrap(
key,
handle_request,
request,
)
Args:
key (hashable): key to get/set in the cache
callback (callable): function to call if the key is not found in
the cache
*args: positional parameters to pass to the callback, if it is used
**kwargs: named paramters to pass to the callback, if it is used
Returns:
twisted.internet.defer.Deferred: yieldable result
"""
result = self.get(key)
if not result:
logger.info("[%s]: no cached result for [%s], calculating new one",
self._name, key)
d = run_in_background(callback, *args, **kwargs)
result = self.set(key, d)
elif not isinstance(result, defer.Deferred) or result.called:
logger.info("[%s]: using completed cached result for [%s]",
self._name, key)
else:
logger.info("[%s]: using incomplete cached result for [%s]",
self._name, key)
return make_deferred_yieldable(result)