Merge pull request #1031 from matrix-org/erikj/measure_notifier

Add more Measure blocks
This commit is contained in:
Erik Johnston 2016-08-22 12:13:07 +01:00 committed by GitHub
commit e65bc7d315
6 changed files with 182 additions and 178 deletions

View File

@ -22,6 +22,7 @@ from synapse.util.logcontext import (
preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
preserve_fn preserve_fn
) )
from synapse.util.metrics import Measure
from twisted.internet import defer from twisted.internet import defer
@ -243,59 +244,60 @@ class Keyring(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def do_iterations(): def do_iterations():
merged_results = {} with Measure(self.clock, "get_server_verify_keys"):
merged_results = {}
missing_keys = {}
for verify_request in verify_requests:
missing_keys.setdefault(verify_request.server_name, set()).update(
verify_request.key_ids
)
for fn in key_fetch_fns:
results = yield fn(missing_keys.items())
merged_results.update(results)
# We now need to figure out which verify requests we have keys
# for and which we don't
missing_keys = {} missing_keys = {}
requests_missing_keys = []
for verify_request in verify_requests: for verify_request in verify_requests:
server_name = verify_request.server_name missing_keys.setdefault(verify_request.server_name, set()).update(
result_keys = merged_results[server_name] verify_request.key_ids
)
if verify_request.deferred.called: for fn in key_fetch_fns:
# We've already called this deferred, which probably results = yield fn(missing_keys.items())
# means that we've already found a key for it. merged_results.update(results)
continue
for key_id in verify_request.key_ids: # We now need to figure out which verify requests we have keys
if key_id in result_keys: # for and which we don't
with PreserveLoggingContext(): missing_keys = {}
verify_request.deferred.callback(( requests_missing_keys = []
server_name, for verify_request in verify_requests:
key_id, server_name = verify_request.server_name
result_keys[key_id], result_keys = merged_results[server_name]
))
break
else:
# The else block is only reached if the loop above
# doesn't break.
missing_keys.setdefault(server_name, set()).update(
verify_request.key_ids
)
requests_missing_keys.append(verify_request)
if not missing_keys: if verify_request.deferred.called:
break # We've already called this deferred, which probably
# means that we've already found a key for it.
continue
for verify_request in requests_missing_keys.values(): for key_id in verify_request.key_ids:
verify_request.deferred.errback(SynapseError( if key_id in result_keys:
401, with PreserveLoggingContext():
"No key for %s with id %s" % ( verify_request.deferred.callback((
verify_request.server_name, verify_request.key_ids, server_name,
), key_id,
Codes.UNAUTHORIZED, result_keys[key_id],
)) ))
break
else:
# The else block is only reached if the loop above
# doesn't break.
missing_keys.setdefault(server_name, set()).update(
verify_request.key_ids
)
requests_missing_keys.append(verify_request)
if not missing_keys:
break
for verify_request in requests_missing_keys.values():
verify_request.deferred.errback(SynapseError(
401,
"No key for %s with id %s" % (
verify_request.server_name, verify_request.key_ids,
),
Codes.UNAUTHORIZED,
))
def on_err(err): def on_err(err):
for verify_request in verify_requests: for verify_request in verify_requests:

View File

@ -464,10 +464,10 @@ class SyncHandler(object):
else: else:
state = {} state = {}
defer.returnValue({ defer.returnValue({
(e.type, e.state_key): e (e.type, e.state_key): e
for e in sync_config.filter_collection.filter_room_state(state.values()) for e in sync_config.filter_collection.filter_room_state(state.values())
}) })
@defer.inlineCallbacks @defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config): def unread_notifs_for_room_id(self, room_id, sync_config):
@ -485,9 +485,9 @@ class SyncHandler(object):
) )
defer.returnValue(notifs) defer.returnValue(notifs)
# There is no new information in this period, so your notification # There is no new information in this period, so your notification
# count is whatever it was last time. # count is whatever it was last time.
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def generate_sync_result(self, sync_config, since_token=None, full_state=False): def generate_sync_result(self, sync_config, since_token=None, full_state=False):

View File

@ -19,6 +19,7 @@ from synapse.api.errors import (
) )
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict from synapse.util.caches import intern_dict
from synapse.util.metrics import Measure
import synapse.metrics import synapse.metrics
import synapse.events import synapse.events
@ -74,12 +75,12 @@ response_db_txn_duration = metrics.register_distribution(
_next_request_id = 0 _next_request_id = 0
def request_handler(report_metrics=True): def request_handler(include_metrics=False):
"""Decorator for ``wrap_request_handler``""" """Decorator for ``wrap_request_handler``"""
return lambda request_handler: wrap_request_handler(request_handler, report_metrics) return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
def wrap_request_handler(request_handler, report_metrics): def wrap_request_handler(request_handler, include_metrics=False):
"""Wraps a method that acts as a request handler with the necessary logging """Wraps a method that acts as a request handler with the necessary logging
and exception handling. and exception handling.
@ -103,54 +104,56 @@ def wrap_request_handler(request_handler, report_metrics):
_next_request_id += 1 _next_request_id += 1
with LoggingContext(request_id) as request_context: with LoggingContext(request_id) as request_context:
if report_metrics: with Measure(self.clock, "wrapped_request_handler"):
request_metrics = RequestMetrics() request_metrics = RequestMetrics()
request_metrics.start(self.clock) request_metrics.start(self.clock, name=self.__class__.__name__)
request_context.request = request_id request_context.request = request_id
with request.processing(): with request.processing():
try:
with PreserveLoggingContext(request_context):
yield request_handler(self, request)
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except:
logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{
"error": "Internal server error",
"errcode": Codes.UNKNOWN,
},
send_cors=True
)
finally:
try: try:
if report_metrics: with PreserveLoggingContext(request_context):
request_metrics.stop( if include_metrics:
self.clock, request, self.__class__.__name__ yield request_handler(self, request, request_metrics)
else:
yield request_handler(self, request)
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
) )
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except: except:
pass logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{
"error": "Internal server error",
"errcode": Codes.UNKNOWN,
},
send_cors=True
)
finally:
try:
request_metrics.stop(
self.clock, request
)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
return wrapped_request_handler return wrapped_request_handler
@ -220,9 +223,9 @@ class JsonResource(HttpServer, resource.Resource):
# It does its own metric reporting because _async_render dispatches to # It does its own metric reporting because _async_render dispatches to
# a callback and it's the class name of that callback we want to report # a callback and it's the class name of that callback we want to report
# against rather than the JsonResource itself. # against rather than the JsonResource itself.
@request_handler(report_metrics=False) @request_handler(include_metrics=True)
@defer.inlineCallbacks @defer.inlineCallbacks
def _async_render(self, request): def _async_render(self, request, request_metrics):
""" This gets called from render() every time someone sends us a request. """ This gets called from render() every time someone sends us a request.
This checks if anyone has registered a callback for that method and This checks if anyone has registered a callback for that method and
path. path.
@ -231,9 +234,6 @@ class JsonResource(HttpServer, resource.Resource):
self._send_response(request, 200, {}) self._send_response(request, 200, {})
return return
request_metrics = RequestMetrics()
request_metrics.start(self.clock)
# Loop through all the registered callbacks to check if the method # Loop through all the registered callbacks to check if the method
# and path regex match # and path regex match
for path_entry in self.path_regexs.get(request.method, []): for path_entry in self.path_regexs.get(request.method, []):
@ -247,12 +247,6 @@ class JsonResource(HttpServer, resource.Resource):
callback = path_entry.callback callback = path_entry.callback
servlet_instance = getattr(callback, "__self__", None)
if servlet_instance is not None:
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
kwargs = intern_dict({ kwargs = intern_dict({
name: urllib.unquote(value).decode("UTF-8") if value else value name: urllib.unquote(value).decode("UTF-8") if value else value
for name, value in m.groupdict().items() for name, value in m.groupdict().items()
@ -263,10 +257,13 @@ class JsonResource(HttpServer, resource.Resource):
code, response = callback_return code, response = callback_return
self._send_response(request, code, response) self._send_response(request, code, response)
try: servlet_instance = getattr(callback, "__self__", None)
request_metrics.stop(self.clock, request, servlet_classname) if servlet_instance is not None:
except: servlet_classname = servlet_instance.__class__.__name__
pass else:
servlet_classname = "%r" % callback
request_metrics.name = servlet_classname
return return
@ -298,11 +295,12 @@ class JsonResource(HttpServer, resource.Resource):
class RequestMetrics(object): class RequestMetrics(object):
def start(self, clock): def start(self, clock, name):
self.start = clock.time_msec() self.start = clock.time_msec()
self.start_context = LoggingContext.current_context() self.start_context = LoggingContext.current_context()
self.name = name
def stop(self, clock, request, servlet_classname): def stop(self, clock, request):
context = LoggingContext.current_context() context = LoggingContext.current_context()
tag = "" tag = ""
@ -316,26 +314,26 @@ class RequestMetrics(object):
) )
return return
incoming_requests_counter.inc(request.method, servlet_classname, tag) incoming_requests_counter.inc(request.method, self.name, tag)
response_timer.inc_by( response_timer.inc_by(
clock.time_msec() - self.start, request.method, clock.time_msec() - self.start, request.method,
servlet_classname, tag self.name, tag
) )
ru_utime, ru_stime = context.get_resource_usage() ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by( response_ru_utime.inc_by(
ru_utime, request.method, servlet_classname, tag ru_utime, request.method, self.name, tag
) )
response_ru_stime.inc_by( response_ru_stime.inc_by(
ru_stime, request.method, servlet_classname, tag ru_stime, request.method, self.name, tag
) )
response_db_txn_count.inc_by( response_db_txn_count.inc_by(
context.db_txn_count, request.method, servlet_classname, tag context.db_txn_count, request.method, self.name, tag
) )
response_db_txn_duration.inc_by( response_db_txn_duration.inc_by(
context.db_txn_duration, request.method, servlet_classname, tag context.db_txn_duration, request.method, self.name, tag
) )

View File

@ -20,6 +20,7 @@ from synapse.api.errors import AuthError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.metrics import Measure
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
import synapse.metrics import synapse.metrics
@ -231,24 +232,25 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms. Will wake up all listeners for the given users and rooms.
""" """
with PreserveLoggingContext(): with PreserveLoggingContext():
user_streams = set() with Measure(self.clock, "on_new_event"):
user_streams = set()
for user in users: for user in users:
user_stream = self.user_to_user_stream.get(str(user)) user_stream = self.user_to_user_stream.get(str(user))
if user_stream is not None: if user_stream is not None:
user_streams.add(user_stream) user_streams.add(user_stream)
for room in rooms: for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set()) user_streams |= self.room_to_user_streams.get(room, set())
time_now_ms = self.clock.time_msec() time_now_ms = self.clock.time_msec()
for user_stream in user_streams: for user_stream in user_streams:
try: try:
user_stream.notify(stream_key, new_token, time_now_ms) user_stream.notify(stream_key, new_token, time_now_ms)
except: except:
logger.exception("Failed to notify listener") logger.exception("Failed to notify listener")
self.notify_replication() self.notify_replication()
def on_new_replication_data(self): def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend """Used to inform replication listeners that something has happend

View File

@ -22,6 +22,7 @@ from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
@ -1132,54 +1133,55 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_event_from_row(self, internal_metadata, js, redacted, def _get_event_from_row(self, internal_metadata, js, redacted,
rejected_reason=None): rejected_reason=None):
d = json.loads(js) with Measure(self._clock, "_get_event_from_row"):
internal_metadata = json.loads(internal_metadata) d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
if rejected_reason: if rejected_reason:
rejected_reason = yield self._simple_select_one_onecol( rejected_reason = yield self._simple_select_one_onecol(
table="rejections", table="rejections",
keyvalues={"event_id": rejected_reason}, keyvalues={"event_id": rejected_reason},
retcol="reason", retcol="reason",
desc="_get_event_from_row_rejected_reason", desc="_get_event_from_row_rejected_reason",
)
original_ev = FrozenEvent(
d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
) )
original_ev = FrozenEvent( redacted_event = None
d, if redacted:
internal_metadata_dict=internal_metadata, redacted_event = prune_event(original_ev)
rejected_reason=rejected_reason,
)
redacted_event = None redaction_id = yield self._simple_select_one_onecol(
if redacted: table="redactions",
redacted_event = prune_event(original_ev) keyvalues={"redacts": redacted_event.event_id},
retcol="event_id",
desc="_get_event_from_row_redactions",
)
redaction_id = yield self._simple_select_one_onecol( redacted_event.unsigned["redacted_by"] = redaction_id
table="redactions", # Get the redaction event.
keyvalues={"redacts": redacted_event.event_id},
retcol="event_id", because = yield self.get_event(
desc="_get_event_from_row_redactions", redaction_id,
check_redacted=False,
allow_none=True,
)
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
redacted_event.unsigned["redacted_because"] = because
cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,
) )
redacted_event.unsigned["redacted_by"] = redaction_id self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
# Get the redaction event.
because = yield self.get_event(
redaction_id,
check_redacted=False,
allow_none=True,
)
if because:
# It's fine to do add the event directly, since get_pdu_json
# will serialise this field correctly
redacted_event.unsigned["redacted_because"] = because
cache_entry = _EventCacheEntry(
event=original_ev,
redacted_event=redacted_event,
)
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
defer.returnValue(cache_entry) defer.returnValue(cache_entry)

View File

@ -87,7 +87,7 @@ class Measure(object):
self.db_txn_duration = self.start_context.db_txn_duration self.db_txn_duration = self.start_context.db_txn_duration
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None or not self.start_context: if isinstance(exc_type, Exception) or not self.start_context:
return return
duration = self.clock.time_msec() - self.start duration = self.clock.time_msec() - self.start