diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5787f854d..bbeec61f7 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) +from synapse.util.metrics import measure_func import synapse.metrics import logging @@ -51,7 +52,7 @@ class TransactionQueue(object): self.transport_layer = transport_layer - self._clock = hs.get_clock() + self.clock = hs.get_clock() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -82,7 +83,7 @@ class TransactionQueue(object): self.pending_failures_by_dest = {} # HACK to get unique tx id - self._next_txn_id = int(self._clock.time_msec()) + self._next_txn_id = int(self.clock.time_msec()) def can_send_to(self, destination): """Can we send messages to the given server? @@ -197,6 +198,7 @@ class TransactionQueue(object): yield deferred + @measure_func("attempt_new_transaction") @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -246,7 +248,7 @@ class TransactionQueue(object): limiter = yield get_retry_limiter( destination, - self._clock, + self.clock, self.store, ) @@ -262,7 +264,7 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), + origin_server_ts=int(self.clock.time_msec()), transaction_id=txn_id, origin=self.server_name, destination=destination, @@ -293,7 +295,7 @@ class TransactionQueue(object): # keys work def json_data_cb(): data = transaction.get_dict() - now = int(self._clock.time_msec()) + now = int(self.clock.time_msec()) if "pdus" in data: for p in data["pdus"]: if "age_ts" in p: diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0b944d3e6..4d7fee886 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.util.logcontext import LoggingContext import synapse.metrics +from functools import wraps import logging @@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution( ) +def measure_func(name): + def wrapper(func): + @wraps(func) + @defer.inlineCallbacks + def measured_func(self, *args, **kwargs): + with Measure(self.clock, name): + r = yield func(self, *args, **kwargs) + defer.returnValue(r) + return measured_func + return wrapper + + class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime",