mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-09-18 11:14:40 -04:00
Merge remote-tracking branch 'origin/develop' into rav/warn_on_logcontext_fail
This commit is contained in:
commit
093d8c415a
173 changed files with 3282 additions and 1451 deletions
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.util.logcontext import PreserveLoggingContext
|
||||
|
||||
from twisted.internet import defer, reactor, task
|
||||
|
@ -24,11 +23,6 @@ import logging
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeferredTimedOutError(SynapseError):
|
||||
def __init__(self):
|
||||
super(DeferredTimedOutError, self).__init__(504, "Timed out")
|
||||
|
||||
|
||||
def unwrapFirstError(failure):
|
||||
# defer.gatherResults and DeferredLists wrap failures.
|
||||
failure.trap(defer.FirstError)
|
||||
|
@ -85,53 +79,3 @@ class Clock(object):
|
|||
except Exception:
|
||||
if not ignore_errs:
|
||||
raise
|
||||
|
||||
def time_bound_deferred(self, given_deferred, time_out):
|
||||
if given_deferred.called:
|
||||
return given_deferred
|
||||
|
||||
ret_deferred = defer.Deferred()
|
||||
|
||||
def timed_out_fn():
|
||||
e = DeferredTimedOutError()
|
||||
|
||||
try:
|
||||
ret_deferred.errback(e)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
given_deferred.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
timer = None
|
||||
|
||||
def cancel(res):
|
||||
try:
|
||||
self.cancel_call_later(timer)
|
||||
except Exception:
|
||||
pass
|
||||
return res
|
||||
|
||||
ret_deferred.addBoth(cancel)
|
||||
|
||||
def success(res):
|
||||
try:
|
||||
ret_deferred.callback(res)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return res
|
||||
|
||||
def err(res):
|
||||
try:
|
||||
ret_deferred.errback(res)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
given_deferred.addCallbacks(callback=success, errback=err)
|
||||
|
||||
timer = self.call_later(time_out, timed_out_fn)
|
||||
|
||||
return ret_deferred
|
||||
|
|
|
@ -15,9 +15,11 @@
|
|||
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.defer import CancelledError
|
||||
from twisted.python import failure
|
||||
|
||||
from .logcontext import (
|
||||
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
|
||||
PreserveLoggingContext, make_deferred_yieldable, run_in_background
|
||||
)
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
|
||||
|
@ -25,6 +27,8 @@ from contextlib import contextmanager
|
|||
|
||||
import logging
|
||||
|
||||
from six.moves import range
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -156,13 +160,13 @@ def concurrently_execute(func, args, limit):
|
|||
def _concurrently_execute_inner():
|
||||
try:
|
||||
while True:
|
||||
yield func(it.next())
|
||||
yield func(next(it))
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
return logcontext.make_deferred_yieldable(defer.gatherResults([
|
||||
preserve_fn(_concurrently_execute_inner)()
|
||||
for _ in xrange(limit)
|
||||
run_in_background(_concurrently_execute_inner)
|
||||
for _ in range(limit)
|
||||
], consumeErrors=True)).addErrback(unwrapFirstError)
|
||||
|
||||
|
||||
|
@ -392,3 +396,68 @@ class ReadWriteLock(object):
|
|||
self.key_to_current_writer.pop(key)
|
||||
|
||||
defer.returnValue(_ctx_manager())
|
||||
|
||||
|
||||
class DeferredTimeoutError(Exception):
|
||||
"""
|
||||
This error is raised by default when a L{Deferred} times out.
|
||||
"""
|
||||
|
||||
|
||||
def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
|
||||
"""
|
||||
Add a timeout to a deferred by scheduling it to be cancelled after
|
||||
timeout seconds.
|
||||
|
||||
This is essentially a backport of deferred.addTimeout, which was introduced
|
||||
in twisted 16.5.
|
||||
|
||||
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
|
||||
unless a cancelable function was passed to its initialization or unless
|
||||
a different on_timeout_cancel callable is provided.
|
||||
|
||||
Args:
|
||||
deferred (defer.Deferred): deferred to be timed out
|
||||
timeout (Number): seconds to time out after
|
||||
|
||||
on_timeout_cancel (callable): A callable which is called immediately
|
||||
after the deferred times out, and not if this deferred is
|
||||
otherwise cancelled before the timeout.
|
||||
|
||||
It takes an arbitrary value, which is the value of the deferred at
|
||||
that exact point in time (probably a CancelledError Failure), and
|
||||
the timeout.
|
||||
|
||||
The default callable (if none is provided) will translate a
|
||||
CancelledError Failure into a DeferredTimeoutError.
|
||||
"""
|
||||
timed_out = [False]
|
||||
|
||||
def time_it_out():
|
||||
timed_out[0] = True
|
||||
deferred.cancel()
|
||||
|
||||
delayed_call = reactor.callLater(timeout, time_it_out)
|
||||
|
||||
def convert_cancelled(value):
|
||||
if timed_out[0]:
|
||||
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
|
||||
return to_call(value, timeout)
|
||||
return value
|
||||
|
||||
deferred.addBoth(convert_cancelled)
|
||||
|
||||
def cancel_timeout(result):
|
||||
# stop the pending call to cancel the deferred if it's been fired
|
||||
if delayed_call.active():
|
||||
delayed_call.cancel()
|
||||
return result
|
||||
|
||||
deferred.addBoth(cancel_timeout)
|
||||
|
||||
|
||||
def _cancelled_to_timed_out_error(value, timeout):
|
||||
if isinstance(value, failure.Failure):
|
||||
value.trap(CancelledError)
|
||||
raise DeferredTimeoutError(timeout, "Deferred")
|
||||
return value
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015, 2016 OpenMarket Ltd
|
||||
# Copyright 2018 New Vector Ltd
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -39,12 +40,11 @@ _CacheSentinel = object()
|
|||
|
||||
class CacheEntry(object):
|
||||
__slots__ = [
|
||||
"deferred", "sequence", "callbacks", "invalidated"
|
||||
"deferred", "callbacks", "invalidated"
|
||||
]
|
||||
|
||||
def __init__(self, deferred, sequence, callbacks):
|
||||
def __init__(self, deferred, callbacks):
|
||||
self.deferred = deferred
|
||||
self.sequence = sequence
|
||||
self.callbacks = set(callbacks)
|
||||
self.invalidated = False
|
||||
|
||||
|
@ -62,7 +62,6 @@ class Cache(object):
|
|||
"max_entries",
|
||||
"name",
|
||||
"keylen",
|
||||
"sequence",
|
||||
"thread",
|
||||
"metrics",
|
||||
"_pending_deferred_cache",
|
||||
|
@ -80,7 +79,6 @@ class Cache(object):
|
|||
|
||||
self.name = name
|
||||
self.keylen = keylen
|
||||
self.sequence = 0
|
||||
self.thread = None
|
||||
self.metrics = register_cache(name, self.cache)
|
||||
|
||||
|
@ -113,11 +111,10 @@ class Cache(object):
|
|||
callbacks = [callback] if callback else []
|
||||
val = self._pending_deferred_cache.get(key, _CacheSentinel)
|
||||
if val is not _CacheSentinel:
|
||||
if val.sequence == self.sequence:
|
||||
val.callbacks.update(callbacks)
|
||||
if update_metrics:
|
||||
self.metrics.inc_hits()
|
||||
return val.deferred
|
||||
val.callbacks.update(callbacks)
|
||||
if update_metrics:
|
||||
self.metrics.inc_hits()
|
||||
return val.deferred
|
||||
|
||||
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
|
||||
if val is not _CacheSentinel:
|
||||
|
@ -137,12 +134,9 @@ class Cache(object):
|
|||
self.check_thread()
|
||||
entry = CacheEntry(
|
||||
deferred=value,
|
||||
sequence=self.sequence,
|
||||
callbacks=callbacks,
|
||||
)
|
||||
|
||||
entry.callbacks.update(callbacks)
|
||||
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry:
|
||||
existing_entry.invalidate()
|
||||
|
@ -150,13 +144,25 @@ class Cache(object):
|
|||
self._pending_deferred_cache[key] = entry
|
||||
|
||||
def shuffle(result):
|
||||
if self.sequence == entry.sequence:
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
entry.invalidate()
|
||||
existing_entry = self._pending_deferred_cache.pop(key, None)
|
||||
if existing_entry is entry:
|
||||
self.cache.set(key, result, entry.callbacks)
|
||||
else:
|
||||
# oops, the _pending_deferred_cache has been updated since
|
||||
# we started our query, so we are out of date.
|
||||
#
|
||||
# Better put back whatever we took out. (We do it this way
|
||||
# round, rather than peeking into the _pending_deferred_cache
|
||||
# and then removing on a match, to make the common case faster)
|
||||
if existing_entry is not None:
|
||||
self._pending_deferred_cache[key] = existing_entry
|
||||
|
||||
# we're not going to put this entry into the cache, so need
|
||||
# to make sure that the invalidation callbacks are called.
|
||||
# That was probably done when _pending_deferred_cache was
|
||||
# updated, but it's possible that `set` was called without
|
||||
# `invalidate` being previously called, in which case it may
|
||||
# not have been. Either way, let's double-check now.
|
||||
entry.invalidate()
|
||||
return result
|
||||
|
||||
|
@ -168,25 +174,29 @@ class Cache(object):
|
|||
|
||||
def invalidate(self, key):
|
||||
self.check_thread()
|
||||
self.cache.pop(key, None)
|
||||
|
||||
# Increment the sequence number so that any SELECT statements that
|
||||
# raced with the INSERT don't update the cache (SYN-369)
|
||||
self.sequence += 1
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, which will (a) stop it being returned
|
||||
# for future queries and (b) stop it being persisted as a proper entry
|
||||
# in self.cache.
|
||||
entry = self._pending_deferred_cache.pop(key, None)
|
||||
|
||||
# run the invalidation callbacks now, rather than waiting for the
|
||||
# deferred to resolve.
|
||||
if entry:
|
||||
entry.invalidate()
|
||||
|
||||
self.cache.pop(key, None)
|
||||
|
||||
def invalidate_many(self, key):
|
||||
self.check_thread()
|
||||
if not isinstance(key, tuple):
|
||||
raise TypeError(
|
||||
"The cache key must be a tuple not %r" % (type(key),)
|
||||
)
|
||||
self.sequence += 1
|
||||
self.cache.del_multi(key)
|
||||
|
||||
# if we have a pending lookup for this key, remove it from the
|
||||
# _pending_deferred_cache, as above
|
||||
entry_dict = self._pending_deferred_cache.pop(key, None)
|
||||
if entry_dict is not None:
|
||||
for entry in iterate_tree_cache_entry(entry_dict):
|
||||
|
@ -194,8 +204,10 @@ class Cache(object):
|
|||
|
||||
def invalidate_all(self):
|
||||
self.check_thread()
|
||||
self.sequence += 1
|
||||
self.cache.clear()
|
||||
for entry in self._pending_deferred_cache.itervalues():
|
||||
entry.invalidate()
|
||||
self._pending_deferred_cache.clear()
|
||||
|
||||
|
||||
class _CacheDescriptorBase(object):
|
||||
|
|
|
@ -132,9 +132,13 @@ class DictionaryCache(object):
|
|||
self._update_or_insert(key, value, known_absent)
|
||||
|
||||
def _update_or_insert(self, key, value, known_absent):
|
||||
entry = self.cache.setdefault(key, DictionaryEntry(False, set(), {}))
|
||||
# We pop and reinsert as we need to tell the cache the size may have
|
||||
# changed
|
||||
|
||||
entry = self.cache.pop(key, DictionaryEntry(False, set(), {}))
|
||||
entry.value.update(value)
|
||||
entry.known_absent.update(known_absent)
|
||||
self.cache[key] = entry
|
||||
|
||||
def _insert(self, key, value, known_absent):
|
||||
self.cache[key] = DictionaryEntry(True, known_absent, value)
|
||||
|
|
|
@ -154,14 +154,21 @@ class LruCache(object):
|
|||
def cache_set(key, value, callbacks=[]):
|
||||
node = cache.get(key, None)
|
||||
if node is not None:
|
||||
if value != node.value:
|
||||
# We sometimes store large objects, e.g. dicts, which cause
|
||||
# the inequality check to take a long time. So let's only do
|
||||
# the check if we have some callbacks to call.
|
||||
if node.callbacks and value != node.value:
|
||||
for cb in node.callbacks:
|
||||
cb()
|
||||
node.callbacks.clear()
|
||||
|
||||
if size_callback:
|
||||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
# We don't bother to protect this by value != node.value as
|
||||
# generally size_callback will be cheap compared with equality
|
||||
# checks. (For example, taking the size of two dicts is quicker
|
||||
# than comparing them for equality.)
|
||||
if size_callback:
|
||||
cached_cache_len[0] -= size_callback(node.value)
|
||||
cached_cache_len[0] += size_callback(value)
|
||||
|
||||
node.callbacks.update(callbacks)
|
||||
|
||||
|
|
|
@ -12,8 +12,15 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.caches import metrics as cache_metrics
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResponseCache(object):
|
||||
|
@ -24,20 +31,68 @@ class ResponseCache(object):
|
|||
used rather than trying to compute a new response.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, timeout_ms=0):
|
||||
def __init__(self, hs, name, timeout_ms=0):
|
||||
self.pending_result_cache = {} # Requests that haven't finished yet.
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.timeout_sec = timeout_ms / 1000.
|
||||
|
||||
self._name = name
|
||||
self._metrics = cache_metrics.register_cache(
|
||||
"response_cache",
|
||||
size_callback=lambda: self.size(),
|
||||
cache_name=name,
|
||||
)
|
||||
|
||||
def size(self):
|
||||
return len(self.pending_result_cache)
|
||||
|
||||
def get(self, key):
|
||||
"""Look up the given key.
|
||||
|
||||
Can return either a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules), or, if the request has completed, the actual
|
||||
result. You will probably want to make_deferred_yieldable the result.
|
||||
|
||||
If there is no entry for the key, returns None. It is worth noting that
|
||||
this means there is no way to distinguish a completed result of None
|
||||
from an absent cache entry.
|
||||
|
||||
Args:
|
||||
key (hashable):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred|None|E: None if there is no entry
|
||||
for this key; otherwise either a deferred result or the result
|
||||
itself.
|
||||
"""
|
||||
result = self.pending_result_cache.get(key)
|
||||
if result is not None:
|
||||
self._metrics.inc_hits()
|
||||
return result.observe()
|
||||
else:
|
||||
self._metrics.inc_misses()
|
||||
return None
|
||||
|
||||
def set(self, key, deferred):
|
||||
"""Set the entry for the given key to the given deferred.
|
||||
|
||||
*deferred* should run its callbacks in the sentinel logcontext (ie,
|
||||
you should wrap normal synapse deferreds with
|
||||
logcontext.run_in_background).
|
||||
|
||||
Can return either a new Deferred (which also doesn't follow the synapse
|
||||
logcontext rules), or, if *deferred* was already complete, the actual
|
||||
result. You will probably want to make_deferred_yieldable the result.
|
||||
|
||||
Args:
|
||||
key (hashable):
|
||||
deferred (twisted.internet.defer.Deferred[T):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
|
||||
result.
|
||||
"""
|
||||
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||
self.pending_result_cache[key] = result
|
||||
|
||||
|
@ -53,3 +108,52 @@ class ResponseCache(object):
|
|||
|
||||
result.addBoth(remove)
|
||||
return result.observe()
|
||||
|
||||
def wrap(self, key, callback, *args, **kwargs):
|
||||
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||
|
||||
First looks up the key in the cache, and if it is present makes it
|
||||
follow the synapse logcontext rules and returns it.
|
||||
|
||||
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
||||
follow the synapse logcontext rules, and adds the result to the cache.
|
||||
|
||||
Example usage:
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_request(request):
|
||||
# etc
|
||||
defer.returnValue(result)
|
||||
|
||||
result = yield response_cache.wrap(
|
||||
key,
|
||||
handle_request,
|
||||
request,
|
||||
)
|
||||
|
||||
Args:
|
||||
key (hashable): key to get/set in the cache
|
||||
|
||||
callback (callable): function to call if the key is not found in
|
||||
the cache
|
||||
|
||||
*args: positional parameters to pass to the callback, if it is used
|
||||
|
||||
**kwargs: named paramters to pass to the callback, if it is used
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred: yieldable result
|
||||
"""
|
||||
result = self.get(key)
|
||||
if not result:
|
||||
logger.info("[%s]: no cached result for [%s], calculating new one",
|
||||
self._name, key)
|
||||
d = run_in_background(callback, *args, **kwargs)
|
||||
result = self.set(key, d)
|
||||
elif not isinstance(result, defer.Deferred) or result.called:
|
||||
logger.info("[%s]: using completed cached result for [%s]",
|
||||
self._name, key)
|
||||
else:
|
||||
logger.info("[%s]: using incomplete cached result for [%s]",
|
||||
self._name, key)
|
||||
return make_deferred_yieldable(result)
|
||||
|
|
|
@ -15,9 +15,9 @@
|
|||
|
||||
from twisted.internet import threads, reactor
|
||||
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
import Queue
|
||||
from six.moves import queue
|
||||
|
||||
|
||||
class BackgroundFileConsumer(object):
|
||||
|
@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
|
|||
|
||||
# Queue of slices of bytes to be written. When producer calls
|
||||
# unregister a final None is sent.
|
||||
self._bytes_queue = Queue.Queue()
|
||||
self._bytes_queue = queue.Queue()
|
||||
|
||||
# Deferred that is resolved when finished writing
|
||||
self._finished_deferred = None
|
||||
|
@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
|
|||
|
||||
self._producer = producer
|
||||
self.streaming = streaming
|
||||
self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
|
||||
self._finished_deferred = run_in_background(
|
||||
threads.deferToThread, self._writer
|
||||
)
|
||||
if not streaming:
|
||||
self._producer.resumeProducing()
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from frozendict import frozendict
|
||||
import simplejson as json
|
||||
|
||||
|
||||
def freeze(o):
|
||||
|
@ -49,3 +50,21 @@ def unfreeze(o):
|
|||
pass
|
||||
|
||||
return o
|
||||
|
||||
|
||||
def _handle_frozendict(obj):
|
||||
"""Helper for EventEncoder. Makes frozendicts serializable by returning
|
||||
the underlying dict
|
||||
"""
|
||||
if type(obj) is frozendict:
|
||||
# fishing the protected dict out of the object is a bit nasty,
|
||||
# but we don't really want the overhead of copying the dict.
|
||||
return obj._dict
|
||||
raise TypeError('Object of type %s is not JSON serializable' %
|
||||
obj.__class__.__name__)
|
||||
|
||||
|
||||
# A JSONEncoder which is capable of encoding frozendics without barfing
|
||||
frozendict_json_encoder = json.JSONEncoder(
|
||||
default=_handle_frozendict,
|
||||
)
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.web.resource import Resource
|
||||
from twisted.web.resource import NoResource
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -40,12 +40,15 @@ def create_resource_tree(desired_tree, root_resource):
|
|||
# extra resources to existing nodes. See self._resource_id for the key.
|
||||
resource_mappings = {}
|
||||
for full_path, res in desired_tree.items():
|
||||
# twisted requires all resources to be bytes
|
||||
full_path = full_path.encode("utf-8")
|
||||
|
||||
logger.info("Attaching %s to path %s", res, full_path)
|
||||
last_resource = root_resource
|
||||
for path_seg in full_path.split('/')[1:-1]:
|
||||
for path_seg in full_path.split(b'/')[1:-1]:
|
||||
if path_seg not in last_resource.listNames():
|
||||
# resource doesn't exist, so make a "dummy resource"
|
||||
child_resource = Resource()
|
||||
child_resource = NoResource()
|
||||
last_resource.putChild(path_seg, child_resource)
|
||||
res_id = _resource_id(last_resource, path_seg)
|
||||
resource_mappings[res_id] = child_resource
|
||||
|
@ -57,7 +60,7 @@ def create_resource_tree(desired_tree, root_resource):
|
|||
|
||||
# ===========================
|
||||
# now attach the actual desired resource
|
||||
last_path_seg = full_path.split('/')[-1]
|
||||
last_path_seg = full_path.split(b'/')[-1]
|
||||
|
||||
# if there is already a resource here, thieve its children and
|
||||
# replace it
|
||||
|
|
|
@ -92,6 +92,7 @@ class LoggingContext(object):
|
|||
|
||||
def __nonzero__(self):
|
||||
return False
|
||||
__bool__ = __nonzero__ # python3
|
||||
|
||||
sentinel = Sentinel()
|
||||
|
||||
|
@ -301,31 +302,49 @@ def preserve_fn(f):
|
|||
def run_in_background(f, *args, **kwargs):
|
||||
"""Calls a function, ensuring that the current context is restored after
|
||||
return from the function, and that the sentinel context is set once the
|
||||
deferred returned by the funtion completes.
|
||||
deferred returned by the function completes.
|
||||
|
||||
Useful for wrapping functions that return a deferred which you don't yield
|
||||
on.
|
||||
on (for instance because you want to pass it to deferred.gatherResults()).
|
||||
|
||||
Note that if you completely discard the result, you should make sure that
|
||||
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
|
||||
CRITICAL error about an unhandled error will be logged without much
|
||||
indication about where it came from.
|
||||
"""
|
||||
current = LoggingContext.current_context()
|
||||
res = f(*args, **kwargs)
|
||||
if isinstance(res, defer.Deferred) and not res.called:
|
||||
# The function will have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
LoggingContext.set_current_context(current)
|
||||
try:
|
||||
res = f(*args, **kwargs)
|
||||
except: # noqa: E722
|
||||
# the assumption here is that the caller doesn't want to be disturbed
|
||||
# by synchronous exceptions, so let's turn them into Failures.
|
||||
return defer.fail()
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
res.addBoth(_set_context_cb, LoggingContext.sentinel)
|
||||
if not isinstance(res, defer.Deferred):
|
||||
return res
|
||||
|
||||
if res.called and not res.paused:
|
||||
# The function should have maintained the logcontext, so we can
|
||||
# optimise out the messing about
|
||||
return res
|
||||
|
||||
# The function may have reset the context before returning, so
|
||||
# we need to restore it now.
|
||||
ctx = LoggingContext.set_current_context(current)
|
||||
|
||||
# The original context will be restored when the deferred
|
||||
# completes, but there is nothing waiting for it, so it will
|
||||
# get leaked into the reactor or some other function which
|
||||
# wasn't expecting it. We therefore need to reset the context
|
||||
# here.
|
||||
#
|
||||
# (If this feels asymmetric, consider it this way: we are
|
||||
# effectively forking a new thread of execution. We are
|
||||
# probably currently within a ``with LoggingContext()`` block,
|
||||
# which is supposed to have a single entry and exit point. But
|
||||
# by spawning off another deferred, we are effectively
|
||||
# adding a new exit point.)
|
||||
res.addBoth(_set_context_cb, ctx)
|
||||
return res
|
||||
|
||||
|
||||
|
@ -340,11 +359,20 @@ def make_deferred_yieldable(deferred):
|
|||
returning a deferred. Then, when the deferred completes, restores the
|
||||
current logcontext before running callbacks/errbacks.
|
||||
|
||||
(This is more-or-less the opposite operation to preserve_fn.)
|
||||
(This is more-or-less the opposite operation to run_in_background.)
|
||||
"""
|
||||
if isinstance(deferred, defer.Deferred) and not deferred.called:
|
||||
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
if not isinstance(deferred, defer.Deferred):
|
||||
return deferred
|
||||
|
||||
if deferred.called and not deferred.paused:
|
||||
# it looks like this deferred is ready to run any callbacks we give it
|
||||
# immediately. We may as well optimise out the logcontext faffery.
|
||||
return deferred
|
||||
|
||||
# ok, we can't be sure that a yield won't block, so let's reset the
|
||||
# logcontext, and add a callback to the deferred to restore it.
|
||||
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
|
||||
deferred.addBoth(_set_context_cb, prev_context)
|
||||
return deferred
|
||||
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
|
||||
import StringIO
|
||||
from six import StringIO
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
|
@ -32,7 +32,7 @@ class LogFormatter(logging.Formatter):
|
|||
super(LogFormatter, self).__init__(*args, **kwargs)
|
||||
|
||||
def formatException(self, ei):
|
||||
sio = StringIO.StringIO()
|
||||
sio = StringIO()
|
||||
(typ, val, tb) = ei
|
||||
|
||||
# log the stack above the exception capture point if possible, but
|
||||
|
|
|
@ -18,7 +18,10 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import LimitExceededError
|
||||
|
||||
from synapse.util.async import sleep
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
from synapse.util.logcontext import (
|
||||
run_in_background, make_deferred_yieldable,
|
||||
PreserveLoggingContext,
|
||||
)
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
|
@ -150,7 +153,7 @@ class _PerHostRatelimiter(object):
|
|||
"Ratelimit [%s]: sleeping req",
|
||||
id(request_id),
|
||||
)
|
||||
ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
|
||||
ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
|
||||
|
||||
self.sleeping_requests.add(request_id)
|
||||
|
||||
|
@ -176,6 +179,9 @@ class _PerHostRatelimiter(object):
|
|||
return r
|
||||
|
||||
def on_err(r):
|
||||
# XXX: why is this necessary? this is called before we start
|
||||
# processing the request so why would the request be in
|
||||
# current_processing?
|
||||
self.current_processing.discard(request_id)
|
||||
return r
|
||||
|
||||
|
@ -187,7 +193,7 @@ class _PerHostRatelimiter(object):
|
|||
|
||||
ret_defer.addCallbacks(on_start, on_err)
|
||||
ret_defer.addBoth(on_both)
|
||||
return ret_defer
|
||||
return make_deferred_yieldable(ret_defer)
|
||||
|
||||
def _on_exit(self, request_id):
|
||||
logger.debug(
|
||||
|
@ -197,7 +203,12 @@ class _PerHostRatelimiter(object):
|
|||
self.current_processing.discard(request_id)
|
||||
try:
|
||||
request_id, deferred = self.ready_request_queue.popitem()
|
||||
|
||||
# XXX: why do we do the following? the on_start callback above will
|
||||
# do it for us.
|
||||
self.current_processing.add(request_id)
|
||||
deferred.callback(None)
|
||||
|
||||
with PreserveLoggingContext():
|
||||
deferred.callback(None)
|
||||
except KeyError:
|
||||
pass
|
||||
|
|
|
@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
|
|||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to store set_destination_retry_timings",
|
||||
"Failed to store destination_retry_timings",
|
||||
)
|
||||
|
||||
# we deliberately do this in the background.
|
||||
synapse.util.logcontext.preserve_fn(store_retry_timings)()
|
||||
synapse.util.logcontext.run_in_background(store_retry_timings)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import random
|
||||
import string
|
||||
from six.moves import range
|
||||
|
||||
_string_with_symbols = (
|
||||
string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
|
||||
|
@ -22,12 +23,12 @@ _string_with_symbols = (
|
|||
|
||||
|
||||
def random_string(length):
|
||||
return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
|
||||
return ''.join(random.choice(string.ascii_letters) for _ in range(length))
|
||||
|
||||
|
||||
def random_string_with_symbols(length):
|
||||
return ''.join(
|
||||
random.choice(_string_with_symbols) for _ in xrange(length)
|
||||
random.choice(_string_with_symbols) for _ in range(length)
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six.moves import range
|
||||
|
||||
|
||||
class _Entry(object):
|
||||
__slots__ = ["end_key", "queue"]
|
||||
|
@ -68,7 +70,7 @@ class WheelTimer(object):
|
|||
# Add empty entries between the end of the current list and when we want
|
||||
# to insert. This ensures there are no gaps.
|
||||
self.entries.extend(
|
||||
_Entry(key) for key in xrange(last_key, then_key + 1)
|
||||
_Entry(key) for key in range(last_key, then_key + 1)
|
||||
)
|
||||
|
||||
self.entries[-1].queue.append(obj)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue