mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-11 20:24:19 -05:00
Various opentracing enhancements (#11619)
* Wrap `auth.get_user_by_req` in an opentracing span give `get_user_by_req` its own opentracing span, since it can result in a non-trivial number of sub-spans which it is useful to group together. This requires a bit of reorganisation because it also sets some tags (and may force tracing) on the servlet span. * Emit opentracing span for encoding json responses This can be a significant time sink. * Rename all sync spans with a prefix * Write an opentracing span for encoding sync response * opentracing span to group generate_room_entries * opentracing spans within sync.encode_response * changelog * Use the `trace` decorator instead of context managers
This commit is contained in:
parent
dd47788752
commit
2215954147
1
changelog.d/11619.misc
Normal file
1
changelog.d/11619.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
A number of improvements to opentracing support.
|
@ -32,7 +32,7 @@ from synapse.appservice import ApplicationService
|
|||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.http import get_request_user_agent
|
from synapse.http import get_request_user_agent
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging import opentracing as opentracing
|
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
|
||||||
from synapse.storage.databases.main.registration import TokenLookupResult
|
from synapse.storage.databases.main.registration import TokenLookupResult
|
||||||
from synapse.types import Requester, StateMap, UserID, create_requester
|
from synapse.types import Requester, StateMap, UserID, create_requester
|
||||||
from synapse.util.caches.lrucache import LruCache
|
from synapse.util.caches.lrucache import LruCache
|
||||||
@ -149,6 +149,42 @@ class Auth:
|
|||||||
is invalid.
|
is invalid.
|
||||||
AuthError if access is denied for the user in the access token
|
AuthError if access is denied for the user in the access token
|
||||||
"""
|
"""
|
||||||
|
parent_span = active_span()
|
||||||
|
with start_active_span("get_user_by_req"):
|
||||||
|
requester = await self._wrapped_get_user_by_req(
|
||||||
|
request, allow_guest, rights, allow_expired
|
||||||
|
)
|
||||||
|
|
||||||
|
if parent_span:
|
||||||
|
if requester.authenticated_entity in self._force_tracing_for_users:
|
||||||
|
# request tracing is enabled for this user, so we need to force it
|
||||||
|
# tracing on for the parent span (which will be the servlet span).
|
||||||
|
#
|
||||||
|
# It's too late for the get_user_by_req span to inherit the setting,
|
||||||
|
# so we also force it on for that.
|
||||||
|
force_tracing()
|
||||||
|
force_tracing(parent_span)
|
||||||
|
parent_span.set_tag(
|
||||||
|
"authenticated_entity", requester.authenticated_entity
|
||||||
|
)
|
||||||
|
parent_span.set_tag("user_id", requester.user.to_string())
|
||||||
|
if requester.device_id is not None:
|
||||||
|
parent_span.set_tag("device_id", requester.device_id)
|
||||||
|
if requester.app_service is not None:
|
||||||
|
parent_span.set_tag("appservice_id", requester.app_service.id)
|
||||||
|
return requester
|
||||||
|
|
||||||
|
async def _wrapped_get_user_by_req(
|
||||||
|
self,
|
||||||
|
request: SynapseRequest,
|
||||||
|
allow_guest: bool,
|
||||||
|
rights: str,
|
||||||
|
allow_expired: bool,
|
||||||
|
) -> Requester:
|
||||||
|
"""Helper for get_user_by_req
|
||||||
|
|
||||||
|
Once get_user_by_req has set up the opentracing span, this does the actual work.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
ip_addr = request.getClientIP()
|
ip_addr = request.getClientIP()
|
||||||
user_agent = get_request_user_agent(request)
|
user_agent = get_request_user_agent(request)
|
||||||
@ -177,14 +213,6 @@ class Auth:
|
|||||||
)
|
)
|
||||||
|
|
||||||
request.requester = user_id
|
request.requester = user_id
|
||||||
if user_id in self._force_tracing_for_users:
|
|
||||||
opentracing.force_tracing()
|
|
||||||
opentracing.set_tag("authenticated_entity", user_id)
|
|
||||||
opentracing.set_tag("user_id", user_id)
|
|
||||||
if device_id is not None:
|
|
||||||
opentracing.set_tag("device_id", device_id)
|
|
||||||
opentracing.set_tag("appservice_id", app_service.id)
|
|
||||||
|
|
||||||
return requester
|
return requester
|
||||||
|
|
||||||
user_info = await self.get_user_by_access_token(
|
user_info = await self.get_user_by_access_token(
|
||||||
@ -242,13 +270,6 @@ class Auth:
|
|||||||
)
|
)
|
||||||
|
|
||||||
request.requester = requester
|
request.requester = requester
|
||||||
if user_info.token_owner in self._force_tracing_for_users:
|
|
||||||
opentracing.force_tracing()
|
|
||||||
opentracing.set_tag("authenticated_entity", user_info.token_owner)
|
|
||||||
opentracing.set_tag("user_id", user_info.user_id)
|
|
||||||
if device_id:
|
|
||||||
opentracing.set_tag("device_id", device_id)
|
|
||||||
|
|
||||||
return requester
|
return requester
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise MissingClientTokenError()
|
raise MissingClientTokenError()
|
||||||
|
@ -421,7 +421,7 @@ class SyncHandler:
|
|||||||
span to track the sync. See `generate_sync_result` for the next part of your
|
span to track the sync. See `generate_sync_result` for the next part of your
|
||||||
indoctrination.
|
indoctrination.
|
||||||
"""
|
"""
|
||||||
with start_active_span("current_sync_for_user"):
|
with start_active_span("sync.current_sync_for_user"):
|
||||||
log_kv({"since_token": since_token})
|
log_kv({"since_token": since_token})
|
||||||
sync_result = await self.generate_sync_result(
|
sync_result = await self.generate_sync_result(
|
||||||
sync_config, since_token, full_state
|
sync_config, since_token, full_state
|
||||||
@ -1585,7 +1585,8 @@ class SyncHandler:
|
|||||||
)
|
)
|
||||||
logger.debug("Generated room entry for %s", room_entry.room_id)
|
logger.debug("Generated room entry for %s", room_entry.room_id)
|
||||||
|
|
||||||
await concurrently_execute(handle_room_entries, room_entries, 10)
|
with start_active_span("sync.generate_room_entries"):
|
||||||
|
await concurrently_execute(handle_room_entries, room_entries, 10)
|
||||||
|
|
||||||
sync_result_builder.invited.extend(invited)
|
sync_result_builder.invited.extend(invited)
|
||||||
sync_result_builder.knocked.extend(knocked)
|
sync_result_builder.knocked.extend(knocked)
|
||||||
@ -2045,7 +2046,7 @@ class SyncHandler:
|
|||||||
since_token = room_builder.since_token
|
since_token = room_builder.since_token
|
||||||
upto_token = room_builder.upto_token
|
upto_token = room_builder.upto_token
|
||||||
|
|
||||||
with start_active_span("generate_room_entry"):
|
with start_active_span("sync.generate_room_entry"):
|
||||||
set_tag("room_id", room_id)
|
set_tag("room_id", room_id)
|
||||||
log_kv({"events": len(events or ())})
|
log_kv({"events": len(events or ())})
|
||||||
|
|
||||||
|
@ -58,12 +58,14 @@ from synapse.api.errors import (
|
|||||||
)
|
)
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
|
||||||
from synapse.logging.opentracing import trace_servlet
|
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
|
||||||
from synapse.util import json_encoder
|
from synapse.util import json_encoder
|
||||||
from synapse.util.caches import intern_dict
|
from synapse.util.caches import intern_dict
|
||||||
from synapse.util.iterutils import chunk_seq
|
from synapse.util.iterutils import chunk_seq
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
import opentracing
|
||||||
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -759,7 +761,20 @@ async def _async_write_json_to_request_in_thread(
|
|||||||
expensive.
|
expensive.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
|
def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:
|
||||||
|
# it might take a while for the threadpool to schedule us, so we write
|
||||||
|
# opentracing logs once we actually get scheduled, so that we can see how
|
||||||
|
# much that contributed.
|
||||||
|
if opentracing_span:
|
||||||
|
opentracing_span.log_kv({"event": "scheduled"})
|
||||||
|
res = json_encoder(json_object)
|
||||||
|
if opentracing_span:
|
||||||
|
opentracing_span.log_kv({"event": "encoded"})
|
||||||
|
return res
|
||||||
|
|
||||||
|
with start_active_span("encode_json_response"):
|
||||||
|
span = active_span()
|
||||||
|
json_str = await defer_to_thread(request.reactor, encode, span)
|
||||||
|
|
||||||
_write_bytes_to_request(request, json_str)
|
_write_bytes_to_request(request, json_str)
|
||||||
|
|
||||||
|
@ -48,6 +48,7 @@ from synapse.handlers.sync import (
|
|||||||
from synapse.http.server import HttpServer
|
from synapse.http.server import HttpServer
|
||||||
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.types import JsonDict, StreamToken
|
from synapse.types import JsonDict, StreamToken
|
||||||
from synapse.util import json_decoder
|
from synapse.util import json_decoder
|
||||||
|
|
||||||
@ -222,6 +223,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
logger.debug("Event formatting complete")
|
logger.debug("Event formatting complete")
|
||||||
return 200, response_content
|
return 200, response_content
|
||||||
|
|
||||||
|
@trace(opname="sync.encode_response")
|
||||||
async def encode_response(
|
async def encode_response(
|
||||||
self,
|
self,
|
||||||
time_now: int,
|
time_now: int,
|
||||||
@ -332,6 +334,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@trace(opname="sync.encode_joined")
|
||||||
async def encode_joined(
|
async def encode_joined(
|
||||||
self,
|
self,
|
||||||
rooms: List[JoinedSyncResult],
|
rooms: List[JoinedSyncResult],
|
||||||
@ -368,6 +371,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
|
|
||||||
return joined
|
return joined
|
||||||
|
|
||||||
|
@trace(opname="sync.encode_invited")
|
||||||
async def encode_invited(
|
async def encode_invited(
|
||||||
self,
|
self,
|
||||||
rooms: List[InvitedSyncResult],
|
rooms: List[InvitedSyncResult],
|
||||||
@ -406,6 +410,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
|
|
||||||
return invited
|
return invited
|
||||||
|
|
||||||
|
@trace(opname="sync.encode_knocked")
|
||||||
async def encode_knocked(
|
async def encode_knocked(
|
||||||
self,
|
self,
|
||||||
rooms: List[KnockedSyncResult],
|
rooms: List[KnockedSyncResult],
|
||||||
@ -460,6 +465,7 @@ class SyncRestServlet(RestServlet):
|
|||||||
|
|
||||||
return knocked
|
return knocked
|
||||||
|
|
||||||
|
@trace(opname="sync.encode_archived")
|
||||||
async def encode_archived(
|
async def encode_archived(
|
||||||
self,
|
self,
|
||||||
rooms: List[ArchivedSyncResult],
|
rooms: List[ArchivedSyncResult],
|
||||||
|
Loading…
Reference in New Issue
Block a user