Merge pull request #999 from matrix-org/erikj/measure_more

Measure federation send transaction resources
This commit is contained in:
Erik Johnston 2016-08-10 14:16:14 +01:00 committed by GitHub
commit 5aeadb7414
2 changed files with 21 additions and 5 deletions

View File

@ -26,6 +26,7 @@ from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import ( from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination, get_retry_limiter, NotRetryingDestination,
) )
from synapse.util.metrics import measure_func
import synapse.metrics import synapse.metrics
import logging import logging
@ -51,7 +52,7 @@ class TransactionQueue(object):
self.transport_layer = transport_layer self.transport_layer = transport_layer
self._clock = hs.get_clock() self.clock = hs.get_clock()
# Is a mapping from destinations -> deferreds. Used to keep track # Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are # of which destinations have transactions in flight and when they are
@ -82,7 +83,7 @@ class TransactionQueue(object):
self.pending_failures_by_dest = {} self.pending_failures_by_dest = {}
# HACK to get unique tx id # HACK to get unique tx id
self._next_txn_id = int(self._clock.time_msec()) self._next_txn_id = int(self.clock.time_msec())
def can_send_to(self, destination): def can_send_to(self, destination):
"""Can we send messages to the given server? """Can we send messages to the given server?
@ -197,6 +198,7 @@ class TransactionQueue(object):
yield deferred yield deferred
@measure_func("attempt_new_transaction")
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _attempt_new_transaction(self, destination): def _attempt_new_transaction(self, destination):
@ -246,7 +248,7 @@ class TransactionQueue(object):
limiter = yield get_retry_limiter( limiter = yield get_retry_limiter(
destination, destination,
self._clock, self.clock,
self.store, self.store,
) )
@ -262,7 +264,7 @@ class TransactionQueue(object):
logger.debug("TX [%s] Persisting transaction...", destination) logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new( transaction = Transaction.create_new(
origin_server_ts=int(self._clock.time_msec()), origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id, transaction_id=txn_id,
origin=self.server_name, origin=self.server_name,
destination=destination, destination=destination,
@ -293,7 +295,7 @@ class TransactionQueue(object):
# keys work # keys work
def json_data_cb(): def json_data_cb():
data = transaction.get_dict() data = transaction.get_dict()
now = int(self._clock.time_msec()) now = int(self.clock.time_msec())
if "pdus" in data: if "pdus" in data:
for p in data["pdus"]: for p in data["pdus"]:
if "age_ts" in p: if "age_ts" in p:

View File

@ -13,10 +13,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
import synapse.metrics import synapse.metrics
from functools import wraps
import logging import logging
@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution(
) )
def measure_func(name):
def wrapper(func):
@wraps(func)
@defer.inlineCallbacks
def measured_func(self, *args, **kwargs):
with Measure(self.clock, name):
r = yield func(self, *args, **kwargs)
defer.returnValue(r)
return measured_func
return wrapper
class Measure(object): class Measure(object):
__slots__ = [ __slots__ = [
"clock", "name", "start_context", "start", "new_context", "ru_utime", "clock", "name", "start_context", "start", "new_context", "ru_utime",