mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-22 21:11:00 -05:00
Improve tracing for to device messages (#9686)
This commit is contained in:
parent
bb0fe02a52
commit
33548f37aa
1
changelog.d/9686.misc
Normal file
1
changelog.d/9686.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Improve Jaeger tracing for `to_device` messages.
|
@ -29,6 +29,7 @@ from synapse.api.presence import UserPresenceState
|
|||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.federation.units import Edu
|
from synapse.federation.units import Edu
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
|
from synapse.logging.opentracing import SynapseTags, set_tag
|
||||||
from synapse.metrics import sent_transactions_counter
|
from synapse.metrics import sent_transactions_counter
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.types import ReadReceipt
|
from synapse.types import ReadReceipt
|
||||||
@ -557,6 +558,13 @@ class PerDestinationQueue:
|
|||||||
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
|
contents, stream_id = await self._store.get_new_device_msgs_for_remote(
|
||||||
self._destination, last_device_stream_id, to_device_stream_id, limit
|
self._destination, last_device_stream_id, to_device_stream_id, limit
|
||||||
)
|
)
|
||||||
|
for content in contents:
|
||||||
|
message_id = content.get("message_id")
|
||||||
|
if not message_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||||
|
|
||||||
edus = [
|
edus = [
|
||||||
Edu(
|
Edu(
|
||||||
origin=self._server_name,
|
origin=self._server_name,
|
||||||
|
@ -21,10 +21,10 @@ from synapse.api.errors import SynapseError
|
|||||||
from synapse.api.ratelimiting import Ratelimiter
|
from synapse.api.ratelimiting import Ratelimiter
|
||||||
from synapse.logging.context import run_in_background
|
from synapse.logging.context import run_in_background
|
||||||
from synapse.logging.opentracing import (
|
from synapse.logging.opentracing import (
|
||||||
|
SynapseTags,
|
||||||
get_active_span_text_map,
|
get_active_span_text_map,
|
||||||
log_kv,
|
log_kv,
|
||||||
set_tag,
|
set_tag,
|
||||||
start_active_span,
|
|
||||||
)
|
)
|
||||||
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
||||||
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
||||||
@ -183,7 +183,10 @@ class DeviceMessageHandler:
|
|||||||
) -> None:
|
) -> None:
|
||||||
sender_user_id = requester.user.to_string()
|
sender_user_id = requester.user.to_string()
|
||||||
|
|
||||||
set_tag("number_of_messages", len(messages))
|
message_id = random_string(16)
|
||||||
|
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||||
|
|
||||||
|
log_kv({"number_of_to_device_messages": len(messages)})
|
||||||
set_tag("sender", sender_user_id)
|
set_tag("sender", sender_user_id)
|
||||||
local_messages = {}
|
local_messages = {}
|
||||||
remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
|
remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
|
||||||
@ -205,23 +208,27 @@ class DeviceMessageHandler:
|
|||||||
"content": message_content,
|
"content": message_content,
|
||||||
"type": message_type,
|
"type": message_type,
|
||||||
"sender": sender_user_id,
|
"sender": sender_user_id,
|
||||||
|
"message_id": message_id,
|
||||||
}
|
}
|
||||||
for device_id, message_content in by_device.items()
|
for device_id, message_content in by_device.items()
|
||||||
}
|
}
|
||||||
if messages_by_device:
|
if messages_by_device:
|
||||||
local_messages[user_id] = messages_by_device
|
local_messages[user_id] = messages_by_device
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"user_id": user_id,
|
||||||
|
"device_id": list(messages_by_device),
|
||||||
|
}
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
destination = get_domain_from_id(user_id)
|
destination = get_domain_from_id(user_id)
|
||||||
remote_messages.setdefault(destination, {})[user_id] = by_device
|
remote_messages.setdefault(destination, {})[user_id] = by_device
|
||||||
|
|
||||||
message_id = random_string(16)
|
|
||||||
|
|
||||||
context = get_active_span_text_map()
|
context = get_active_span_text_map()
|
||||||
|
|
||||||
remote_edu_contents = {}
|
remote_edu_contents = {}
|
||||||
for destination, messages in remote_messages.items():
|
for destination, messages in remote_messages.items():
|
||||||
with start_active_span("to_device_for_user"):
|
log_kv({"destination": destination})
|
||||||
set_tag("destination", destination)
|
|
||||||
remote_edu_contents[destination] = {
|
remote_edu_contents[destination] = {
|
||||||
"messages": messages,
|
"messages": messages,
|
||||||
"sender": sender_user_id,
|
"sender": sender_user_id,
|
||||||
@ -230,7 +237,6 @@ class DeviceMessageHandler:
|
|||||||
"org.matrix.opentracing_context": json_encoder.encode(context),
|
"org.matrix.opentracing_context": json_encoder.encode(context),
|
||||||
}
|
}
|
||||||
|
|
||||||
log_kv({"local_messages": local_messages})
|
|
||||||
stream_id = await self.store.add_messages_to_device_inbox(
|
stream_id = await self.store.add_messages_to_device_inbox(
|
||||||
local_messages, remote_edu_contents
|
local_messages, remote_edu_contents
|
||||||
)
|
)
|
||||||
@ -239,7 +245,6 @@ class DeviceMessageHandler:
|
|||||||
"to_device_key", stream_id, users=local_messages.keys()
|
"to_device_key", stream_id, users=local_messages.keys()
|
||||||
)
|
)
|
||||||
|
|
||||||
log_kv({"remote_messages": remote_messages})
|
|
||||||
if self.federation_sender:
|
if self.federation_sender:
|
||||||
for destination in remote_messages.keys():
|
for destination in remote_messages.keys():
|
||||||
# Enqueue a new federation transaction to send the new
|
# Enqueue a new federation transaction to send the new
|
||||||
|
@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership
|
|||||||
from synapse.api.filtering import FilterCollection
|
from synapse.api.filtering import FilterCollection
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.logging.context import current_context
|
from synapse.logging.context import current_context
|
||||||
|
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
|
||||||
from synapse.push.clientformat import format_push_rules_for_user
|
from synapse.push.clientformat import format_push_rules_for_user
|
||||||
from synapse.storage.roommember import MemberSummary
|
from synapse.storage.roommember import MemberSummary
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
@ -340,7 +341,14 @@ class SyncHandler:
|
|||||||
full_state: bool = False,
|
full_state: bool = False,
|
||||||
) -> SyncResult:
|
) -> SyncResult:
|
||||||
"""Get the sync for client needed to match what the server has now."""
|
"""Get the sync for client needed to match what the server has now."""
|
||||||
return await self.generate_sync_result(sync_config, since_token, full_state)
|
with start_active_span("current_sync_for_user"):
|
||||||
|
log_kv({"since_token": since_token})
|
||||||
|
sync_result = await self.generate_sync_result(
|
||||||
|
sync_config, since_token, full_state
|
||||||
|
)
|
||||||
|
|
||||||
|
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
|
||||||
|
return sync_result
|
||||||
|
|
||||||
async def push_rules_for_user(self, user: UserID) -> JsonDict:
|
async def push_rules_for_user(self, user: UserID) -> JsonDict:
|
||||||
user_id = user.to_string()
|
user_id = user.to_string()
|
||||||
@ -964,6 +972,7 @@ class SyncHandler:
|
|||||||
# to query up to a given point.
|
# to query up to a given point.
|
||||||
# Always use the `now_token` in `SyncResultBuilder`
|
# Always use the `now_token` in `SyncResultBuilder`
|
||||||
now_token = self.event_sources.get_current_token()
|
now_token = self.event_sources.get_current_token()
|
||||||
|
log_kv({"now_token": now_token})
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Calculating sync response for %r between %s and %s",
|
"Calculating sync response for %r between %s and %s",
|
||||||
@ -1225,6 +1234,13 @@ class SyncHandler:
|
|||||||
user_id, device_id, since_stream_id, now_token.to_device_key
|
user_id, device_id, since_stream_id, now_token.to_device_key
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for message in messages:
|
||||||
|
# We pop here as we shouldn't be sending the message ID down
|
||||||
|
# `/sync`
|
||||||
|
message_id = message.pop("message_id", None)
|
||||||
|
if message_id:
|
||||||
|
set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Returning %d to-device messages between %d and %d (current token: %d)",
|
"Returning %d to-device messages between %d and %d (current token: %d)",
|
||||||
len(messages),
|
len(messages),
|
||||||
|
@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
|
|||||||
|
|
||||||
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
|
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
|
||||||
from synapse.appservice import ApplicationService
|
from synapse.appservice import ApplicationService
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import (
|
||||||
|
run_as_background_process,
|
||||||
|
wrap_as_background_process,
|
||||||
|
)
|
||||||
from synapse.replication.tcp.streams import TypingStream
|
from synapse.replication.tcp.streams import TypingStream
|
||||||
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
@ -86,6 +89,7 @@ class FollowerTypingHandler:
|
|||||||
self._member_last_federation_poke = {}
|
self._member_last_federation_poke = {}
|
||||||
self.wheel_timer = WheelTimer(bucket_size=5000)
|
self.wheel_timer = WheelTimer(bucket_size=5000)
|
||||||
|
|
||||||
|
@wrap_as_background_process("typing._handle_timeouts")
|
||||||
def _handle_timeouts(self) -> None:
|
def _handle_timeouts(self) -> None:
|
||||||
logger.debug("Checking for typing timeouts")
|
logger.debug("Checking for typing timeouts")
|
||||||
|
|
||||||
|
@ -259,6 +259,14 @@ except ImportError:
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SynapseTags:
|
||||||
|
# The message ID of any to_device message processed
|
||||||
|
TO_DEVICE_MESSAGE_ID = "to_device.message_id"
|
||||||
|
|
||||||
|
# Whether the sync response has new data to be returned to the client.
|
||||||
|
SYNC_RESULT = "sync.new_data"
|
||||||
|
|
||||||
|
|
||||||
# Block everything by default
|
# Block everything by default
|
||||||
# A regex which matches the server_names to expose traces for.
|
# A regex which matches the server_names to expose traces for.
|
||||||
# None means 'block everything'.
|
# None means 'block everything'.
|
||||||
|
@ -39,6 +39,7 @@ from synapse.api.errors import AuthError
|
|||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
from synapse.logging.context import PreserveLoggingContext
|
from synapse.logging.context import PreserveLoggingContext
|
||||||
|
from synapse.logging.opentracing import log_kv, start_active_span
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
@ -136,6 +137,15 @@ class _NotifierUserStream:
|
|||||||
self.last_notified_ms = time_now_ms
|
self.last_notified_ms = time_now_ms
|
||||||
noify_deferred = self.notify_deferred
|
noify_deferred = self.notify_deferred
|
||||||
|
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"notify": self.user_id,
|
||||||
|
"stream": stream_key,
|
||||||
|
"stream_id": stream_id,
|
||||||
|
"listeners": self.count_listeners(),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
users_woken_by_stream_counter.labels(stream_key).inc()
|
users_woken_by_stream_counter.labels(stream_key).inc()
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
@ -404,6 +414,13 @@ class Notifier:
|
|||||||
with Measure(self.clock, "on_new_event"):
|
with Measure(self.clock, "on_new_event"):
|
||||||
user_streams = set()
|
user_streams = set()
|
||||||
|
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"waking_up_explicit_users": len(users),
|
||||||
|
"waking_up_explicit_rooms": len(rooms),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
for user in users:
|
for user in users:
|
||||||
user_stream = self.user_to_user_stream.get(str(user))
|
user_stream = self.user_to_user_stream.get(str(user))
|
||||||
if user_stream is not None:
|
if user_stream is not None:
|
||||||
@ -476,12 +493,34 @@ class Notifier:
|
|||||||
(end_time - now) / 1000.0,
|
(end_time - now) / 1000.0,
|
||||||
self.hs.get_reactor(),
|
self.hs.get_reactor(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
with start_active_span("wait_for_events.deferred"):
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"wait_for_events": "sleep",
|
||||||
|
"token": prev_token,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
await listener.deferred
|
await listener.deferred
|
||||||
|
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"wait_for_events": "woken",
|
||||||
|
"token": user_stream.current_token,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
current_token = user_stream.current_token
|
current_token = user_stream.current_token
|
||||||
|
|
||||||
result = await callback(prev_token, current_token)
|
result = await callback(prev_token, current_token)
|
||||||
|
log_kv(
|
||||||
|
{
|
||||||
|
"wait_for_events": "result",
|
||||||
|
"result": bool(result),
|
||||||
|
}
|
||||||
|
)
|
||||||
if result:
|
if result:
|
||||||
break
|
break
|
||||||
|
|
||||||
@ -489,8 +528,10 @@ class Notifier:
|
|||||||
# has happened between the old prev_token and the current_token
|
# has happened between the old prev_token and the current_token
|
||||||
prev_token = current_token
|
prev_token = current_token
|
||||||
except defer.TimeoutError:
|
except defer.TimeoutError:
|
||||||
|
log_kv({"wait_for_events": "timeout"})
|
||||||
break
|
break
|
||||||
except defer.CancelledError:
|
except defer.CancelledError:
|
||||||
|
log_kv({"wait_for_events": "cancelled"})
|
||||||
break
|
break
|
||||||
|
|
||||||
if result is None:
|
if result is None:
|
||||||
|
Loading…
Reference in New Issue
Block a user