diff --git a/changelog.d/11619.misc b/changelog.d/11619.misc new file mode 100644 index 000000000..2125cbddd --- /dev/null +++ b/changelog.d/11619.misc @@ -0,0 +1 @@ +A number of improvements to opentracing support. diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 0bf58dff0..4a32d430b 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -32,7 +32,7 @@ from synapse.appservice import ApplicationService from synapse.events import EventBase from synapse.http import get_request_user_agent from synapse.http.site import SynapseRequest -from synapse.logging import opentracing as opentracing +from synapse.logging.opentracing import active_span, force_tracing, start_active_span from synapse.storage.databases.main.registration import TokenLookupResult from synapse.types import Requester, StateMap, UserID, create_requester from synapse.util.caches.lrucache import LruCache @@ -149,6 +149,42 @@ class Auth: is invalid. AuthError if access is denied for the user in the access token """ + parent_span = active_span() + with start_active_span("get_user_by_req"): + requester = await self._wrapped_get_user_by_req( + request, allow_guest, rights, allow_expired + ) + + if parent_span: + if requester.authenticated_entity in self._force_tracing_for_users: + # request tracing is enabled for this user, so we need to force it + # tracing on for the parent span (which will be the servlet span). + # + # It's too late for the get_user_by_req span to inherit the setting, + # so we also force it on for that. + force_tracing() + force_tracing(parent_span) + parent_span.set_tag( + "authenticated_entity", requester.authenticated_entity + ) + parent_span.set_tag("user_id", requester.user.to_string()) + if requester.device_id is not None: + parent_span.set_tag("device_id", requester.device_id) + if requester.app_service is not None: + parent_span.set_tag("appservice_id", requester.app_service.id) + return requester + + async def _wrapped_get_user_by_req( + self, + request: SynapseRequest, + allow_guest: bool, + rights: str, + allow_expired: bool, + ) -> Requester: + """Helper for get_user_by_req + + Once get_user_by_req has set up the opentracing span, this does the actual work. + """ try: ip_addr = request.getClientIP() user_agent = get_request_user_agent(request) @@ -177,14 +213,6 @@ class Auth: ) request.requester = user_id - if user_id in self._force_tracing_for_users: - opentracing.force_tracing() - opentracing.set_tag("authenticated_entity", user_id) - opentracing.set_tag("user_id", user_id) - if device_id is not None: - opentracing.set_tag("device_id", device_id) - opentracing.set_tag("appservice_id", app_service.id) - return requester user_info = await self.get_user_by_access_token( @@ -242,13 +270,6 @@ class Auth: ) request.requester = requester - if user_info.token_owner in self._force_tracing_for_users: - opentracing.force_tracing() - opentracing.set_tag("authenticated_entity", user_info.token_owner) - opentracing.set_tag("user_id", user_info.user_id) - if device_id: - opentracing.set_tag("device_id", device_id) - return requester except KeyError: raise MissingClientTokenError() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index bcd10cbb3..d24124d6a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -421,7 +421,7 @@ class SyncHandler: span to track the sync. See `generate_sync_result` for the next part of your indoctrination. """ - with start_active_span("current_sync_for_user"): + with start_active_span("sync.current_sync_for_user"): log_kv({"since_token": since_token}) sync_result = await self.generate_sync_result( sync_config, since_token, full_state @@ -1585,7 +1585,8 @@ class SyncHandler: ) logger.debug("Generated room entry for %s", room_entry.room_id) - await concurrently_execute(handle_room_entries, room_entries, 10) + with start_active_span("sync.generate_room_entries"): + await concurrently_execute(handle_room_entries, room_entries, 10) sync_result_builder.invited.extend(invited) sync_result_builder.knocked.extend(knocked) @@ -2045,7 +2046,7 @@ class SyncHandler: since_token = room_builder.since_token upto_token = room_builder.upto_token - with start_active_span("generate_room_entry"): + with start_active_span("sync.generate_room_entry"): set_tag("room_id", room_id) log_kv({"events": len(events or ())}) diff --git a/synapse/http/server.py b/synapse/http/server.py index 7bbbe7648..e30294659 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -58,12 +58,14 @@ from synapse.api.errors import ( ) from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background -from synapse.logging.opentracing import trace_servlet +from synapse.logging.opentracing import active_span, start_active_span, trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: + import opentracing + from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -759,7 +761,20 @@ async def _async_write_json_to_request_in_thread( expensive. """ - json_str = await defer_to_thread(request.reactor, json_encoder, json_object) + def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes: + # it might take a while for the threadpool to schedule us, so we write + # opentracing logs once we actually get scheduled, so that we can see how + # much that contributed. + if opentracing_span: + opentracing_span.log_kv({"event": "scheduled"}) + res = json_encoder(json_object) + if opentracing_span: + opentracing_span.log_kv({"event": "encoded"}) + return res + + with start_active_span("encode_json_response"): + span = active_span() + json_str = await defer_to_thread(request.reactor, encode, span) _write_bytes_to_request(request, json_str) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 8c4b0f6e5..e99a943d0 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -48,6 +48,7 @@ from synapse.handlers.sync import ( from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string from synapse.http.site import SynapseRequest +from synapse.logging.opentracing import trace from synapse.types import JsonDict, StreamToken from synapse.util import json_decoder @@ -222,6 +223,7 @@ class SyncRestServlet(RestServlet): logger.debug("Event formatting complete") return 200, response_content + @trace(opname="sync.encode_response") async def encode_response( self, time_now: int, @@ -332,6 +334,7 @@ class SyncRestServlet(RestServlet): ] } + @trace(opname="sync.encode_joined") async def encode_joined( self, rooms: List[JoinedSyncResult], @@ -368,6 +371,7 @@ class SyncRestServlet(RestServlet): return joined + @trace(opname="sync.encode_invited") async def encode_invited( self, rooms: List[InvitedSyncResult], @@ -406,6 +410,7 @@ class SyncRestServlet(RestServlet): return invited + @trace(opname="sync.encode_knocked") async def encode_knocked( self, rooms: List[KnockedSyncResult], @@ -460,6 +465,7 @@ class SyncRestServlet(RestServlet): return knocked + @trace(opname="sync.encode_archived") async def encode_archived( self, rooms: List[ArchivedSyncResult],