mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-10-01 08:25:44 -04:00
Log contexts and squash things together
This commit is contained in:
parent
639cd07d6d
commit
ee5aef6c72
@ -16,7 +16,9 @@
|
||||
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
from .logcontext import PreserveLoggingContext, preserve_fn
|
||||
from .logcontext import (
|
||||
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
|
||||
)
|
||||
from synapse.util import unwrapFirstError
|
||||
|
||||
from contextlib import contextmanager
|
||||
@ -141,14 +143,6 @@ def concurrently_execute(func, args, limit):
|
||||
], consumeErrors=True).addErrback(unwrapFirstError)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _trigger_defer_manager(d):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
d.callback(None)
|
||||
|
||||
|
||||
class Linearizer(object):
|
||||
"""Linearizes access to resources based on a key. Useful to ensure only one
|
||||
thing is happening at a time on a given resource.
|
||||
@ -177,13 +171,17 @@ class Linearizer(object):
|
||||
new_defer = defer.Deferred()
|
||||
self.key_to_defer[key] = new_defer
|
||||
|
||||
def remove_if_current(_):
|
||||
if current_defer:
|
||||
yield preserve_context_over_deferred(current_defer)
|
||||
|
||||
@contextmanager
|
||||
def _ctx_manager(d):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
d.callback(None)
|
||||
d = self.key_to_defer.get(key)
|
||||
if d is new_defer:
|
||||
self.key_to_defer.pop(key, None)
|
||||
|
||||
new_defer.addBoth(remove_if_current)
|
||||
|
||||
yield current_defer
|
||||
|
||||
defer.returnValue(_trigger_defer_manager(new_defer))
|
||||
defer.returnValue(_ctx_manager(new_defer))
|
||||
|
Loading…
Reference in New Issue
Block a user