Propagate opentracing contexts through EDUs (#5852)

Propagate opentracing contexts through EDUs
Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
Jorik Schellekens 2019-08-22 18:21:10 +01:00 committed by GitHub
parent 0b39fa53b6
commit 8767b63a82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 234 additions and 94 deletions

1
changelog.d/5852.feature Normal file
View File

@ -0,0 +1 @@
Pass opentracing contexts between servers when transmitting EDUs.

View File

@ -92,6 +92,29 @@ two problems, namely:
but that doesn't prevent another server sending you baggage which will be logged but that doesn't prevent another server sending you baggage which will be logged
to OpenTracing's logs. to OpenTracing's logs.
==========
EDU FORMAT
==========
EDUs can contain tracing data in their content. This is not specced but
it could be of interest for other homeservers.
EDU format (if you're using jaeger):
.. code-block:: json
{
"edu_type": "type",
"content": {
"org.matrix.opentracing_context": {
"uber-trace-id": "fe57cf3e65083289"
}
}
}
Though you don't have to use jaeger you must inject the span context into
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
================== ==================
Configuring Jaeger Configuring Jaeger
================== ==================

View File

@ -43,7 +43,7 @@ from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name from synapse.http.endpoint import parse_server_name
from synapse.logging.context import nested_logging_context from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import log_kv, trace from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.replication.http.federation import ( from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet, ReplicationFederationSendEduRestServlet,
@ -811,12 +811,13 @@ class FederationHandlerRegistry(object):
if not handler: if not handler:
logger.warn("No handler registered for EDU type %s", edu_type) logger.warn("No handler registered for EDU type %s", edu_type)
try: with start_active_span_from_edu(content, "handle_edu"):
yield handler(origin, content) try:
except SynapseError as e: yield handler(origin, content)
logger.info("Failed to handle edu %r: %r", edu_type, e) except SynapseError as e:
except Exception: logger.info("Failed to handle edu %r: %r", edu_type, e)
logger.exception("Failed to handle edu %r", edu_type) except Exception:
logger.exception("Failed to handle edu %r", edu_type)
def on_query(self, query_type, args): def on_query(self, query_type, args):
handler = self.query_handlers.get(query_type) handler = self.query_handlers.get(query_type)

View File

@ -14,11 +14,19 @@
# limitations under the License. # limitations under the License.
import logging import logging
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction from synapse.federation.units import Transaction
from synapse.logging.opentracing import (
extract_text_map,
set_tag,
start_active_span_follows_from,
tags,
)
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -44,93 +52,109 @@ class TransactionManager(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus): def send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field # Make a transaction-sending opentracing span. This span follows on from
pending_pdus.sort(key=lambda t: t[1]) # all the edus in that transaction. This needs to be done since there is
pdus = [x[0] for x in pending_pdus] # no active span here, so if the edus were not received by the remote the
edus = pending_edus # span would have no causality and it would be forgotten.
# The span_contexts is a generator so that it won't be evaluated if
# opentracing is disabled. (Yay speed!)
success = True span_contexts = (
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
destination,
txn_id,
len(pdus),
len(edus),
) )
transaction = Transaction.create_new( with start_active_span_follows_from("send_transaction", span_contexts):
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
self._next_txn_id += 1 # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
logger.info( success = True
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction logger.debug("TX [%s] _attempt_new_transaction", destination)
# FIXME (erikj): This is a bit of a hack to make the Pdu age txn_id = str(self._next_txn_id)
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try: logger.debug(
response = yield self._transport_layer.send_transaction( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
transaction, json_data_cb destination,
txn_id,
len(pdus),
len(edus),
) )
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code: transaction = Transaction.create_new(
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) origin_server_ts=int(self.clock.time_msec()),
raise e transaction_id=txn_id,
origin=self._server_name,
destination=destination,
pdus=pdus,
edus=edus,
)
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) self._next_txn_id += 1
if code == 200: logger.info(
for e_id, r in response.get("pdus", {}).items(): "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
if "error" in r: destination,
txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self._transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response", destination, txn_id, code
)
raise e
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s",
destination,
txn_id,
e_id,
r,
)
else:
for p in pdus:
logger.warn( logger.warn(
"TX [%s] {%s} Remote returned error for %s: %s", "TX [%s] {%s} Failed to send event %s",
destination, destination,
txn_id, txn_id,
e_id, p.event_id,
r,
) )
else: success = False
for p in pdus:
logger.warn(
"TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
p.event_id,
)
success = False
return success set_tag(tags.ERROR, not success)
return success

View File

@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
internal_keys = ["origin", "destination"] internal_keys = ["origin", "destination"]
def get_context(self):
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
class Transaction(JsonEncodedObject): class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home """ A transaction is a list of Pdus and Edus to be sent to a remote home

View File

@ -15,9 +15,17 @@
import logging import logging
from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
start_active_span,
whitelisted_homeserver,
)
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
message_id = random_string(16) message_id = random_string(16)
context = get_active_span_text_map()
remote_edu_contents = {} remote_edu_contents = {}
for destination, messages in remote_messages.items(): for destination, messages in remote_messages.items():
remote_edu_contents[destination] = { with start_active_span("to_device_for_user"):
"messages": messages, set_tag("destination", destination)
"sender": sender_user_id, remote_edu_contents[destination] = {
"type": message_type, "messages": messages,
"message_id": message_id, "sender": sender_user_id,
} "type": message_type,
"message_id": message_id,
"org.matrix.opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
}
stream_id = yield self.store.add_messages_to_device_inbox( stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents local_messages, remote_edu_contents

View File

@ -149,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes ``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
in a destination and compares it to the whitelist. in a destination and compares it to the whitelist.
Most injection methods take a 'destination' arg. The context will only be injected
if the destination matches the whitelist or the destination is None.
======= =======
Gotchas Gotchas
======= =======
@ -576,6 +579,29 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
) )
def get_active_span_text_map(destination=None):
"""
Gets a span context as a dict. This can be used instead of manually
injecting a span into an empty carrier.
Args:
destination (str): the name of the remote server.
Returns:
dict: the active span's context if opentracing is enabled, otherwise empty.
"""
if not opentracing or (destination and not whitelisted_homeserver(destination)):
return {}
carrier = {}
opentracing.tracer.inject(
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
)
return carrier
def active_span_context_as_string(): def active_span_context_as_string():
""" """
Returns: Returns:

View File

@ -21,6 +21,11 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.logging.opentracing import (
get_active_span_text_map,
trace,
whitelisted_homeserver,
)
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.background_updates import BackgroundUpdateStore
@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
return {d["device_id"]: d for d in devices} return {d["device_id"]: d for d in devices}
@trace
@defer.inlineCallbacks @defer.inlineCallbacks
def get_devices_by_remote(self, destination, from_stream_id, limit): def get_devices_by_remote(self, destination, from_stream_id, limit):
"""Get stream of updates to send to remote servers """Get stream of updates to send to remote servers
@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
# (user_id, device_id) entries into a map, with the value being # (user_id, device_id) entries into a map, with the value being
# the max stream_id across each set of duplicate entries # the max stream_id across each set of duplicate entries
# #
# maps (user_id, device_id) -> stream_id # maps (user_id, device_id) -> (stream_id, opentracing_context)
# as long as their stream_id does not match that of the last row # as long as their stream_id does not match that of the last row
#
# opentracing_context contains the opentracing metadata for the request
# that created the poke
#
# The most recent request's opentracing_context is used as the
# context which created the Edu.
query_map = {} query_map = {}
for update in updates: for update in updates:
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff: if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
break break
key = (update[0], update[1]) key = (update[0], update[1])
query_map[key] = max(query_map.get(key, 0), update[2])
update_context = update[3]
update_stream_id = update[2]
previous_update_stream_id, _ = query_map.get(key, (0, None))
if update_stream_id > previous_update_stream_id:
query_map[key] = (update_stream_id, update_context)
# If we didn't find any updates with a stream_id lower than the cutoff, it # If we didn't find any updates with a stream_id lower than the cutoff, it
# means that there are more than limit updates all of which have the same # means that there are more than limit updates all of which have the same
@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
List: List of device updates List: List of device updates
""" """
sql = """ sql = """
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
ORDER BY stream_id ORDER BY stream_id
LIMIT ? LIMIT ?
@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
Args: Args:
destination (str): The host the device updates are intended for destination (str): The host the device updates are intended for
from_stream_id (int): The minimum stream_id to filter updates by, exclusive from_stream_id (int): The minimum stream_id to filter updates by, exclusive
query_map (Dict[(str, str): int]): Dictionary mapping query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
user_id/device_id to update stream_id user_id/device_id to update stream_id and the relevent json-encoded
opentracing context
Returns: Returns:
List[Dict]: List of objects representing an device update EDU List[Dict]: List of objects representing an device update EDU
@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
destination, user_id, from_stream_id destination, user_id, from_stream_id
) )
for device_id, device in iteritems(user_devices): for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)] stream_id, opentracing_context = query_map[(user_id, device_id)]
result = { result = {
"user_id": user_id, "user_id": user_id,
"device_id": device_id, "device_id": device_id,
"prev_id": [prev_id] if prev_id else [], "prev_id": [prev_id] if prev_id else [],
"stream_id": stream_id, "stream_id": stream_id,
"org.matrix.opentracing_context": opentracing_context,
} }
prev_id = stream_id prev_id = stream_id
@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
], ],
) )
context = get_active_span_text_map()
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="device_lists_outbound_pokes", table="device_lists_outbound_pokes",
@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"device_id": device_id, "device_id": device_id,
"sent": False, "sent": False,
"ts": now, "ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
} }
for destination in hosts for destination in hosts
for device_id in device_ids for device_id in device_ids

View File

@ -0,0 +1,20 @@
/* Copyright 2019 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Opentracing context data for inclusion in the device_list_update EDUs, as a
* json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
*/
ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;