diff --git a/changelog.d/5544.misc b/changelog.d/5544.misc new file mode 100644 index 000000000..81d6f74c3 --- /dev/null +++ b/changelog.d/5544.misc @@ -0,0 +1 @@ +Added opentracing and configuration options. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 7fe7c94ac..0462f0a17 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1395,3 +1395,20 @@ password_config: # module: "my_custom_project.SuperRulesSet" # config: # example_option: 'things' + + +## Opentracing ## +# These settings enable opentracing which implements distributed tracing +# This allows you to observe the causal chain of events across servers +# including requests, key lookups etc. across any server running +# synapse or any other other services which supports opentracing. +# (specifically those implemented with jaeger) + +#opentracing: +# # Enable / disable tracer +# tracer_enabled: false +# # The list of homeservers we wish to expose our current traces to. +# # The list is a list of regexes which are matched against the +# # servername of the homeserver +# homeserver_whitelist: +# - ".*" diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 1ebb7ae53..bd285122e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -243,6 +243,9 @@ def start(hs, listeners=None): # Load the certificate from disk. refresh_certificate(hs) + # Start the tracer + synapse.logging.opentracing.init_tracer(hs.config) + # It is now safe to start your Synapse. hs.start_listening(listeners) hs.get_datastore().start_profiling() diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index acadef4fd..72acad4f1 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -40,6 +40,7 @@ from .spam_checker import SpamCheckerConfig from .stats import StatsConfig from .third_party_event_rules import ThirdPartyRulesConfig from .tls import TlsConfig +from .tracer import TracerConfig from .user_directory import UserDirectoryConfig from .voip import VoipConfig from .workers import WorkerConfig @@ -75,5 +76,6 @@ class HomeServerConfig( ServerNoticesConfig, RoomDirectoryConfig, ThirdPartyRulesConfig, + TracerConfig, ): pass diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py new file mode 100644 index 000000000..63a637984 --- /dev/null +++ b/synapse/config/tracer.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C.d +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config, ConfigError + + +class TracerConfig(Config): + def read_config(self, config, **kwargs): + self.tracer_config = config.get("opentracing") + + self.tracer_config = config.get("opentracing", {"tracer_enabled": False}) + + if self.tracer_config.get("tracer_enabled", False): + # The tracer is enabled so sanitize the config + # If no whitelists are given + self.tracer_config.setdefault("homeserver_whitelist", []) + + if not isinstance(self.tracer_config.get("homeserver_whitelist"), list): + raise ConfigError("Tracer homesererver_whitelist config is malformed") + + def generate_config_section(cls, **kwargs): + return """\ + ## Opentracing ## + # These settings enable opentracing which implements distributed tracing + # This allows you to observe the causal chain of events across servers + # including requests, key lookups etc. across any server running + # synapse or any other other services which supports opentracing. + # (specifically those implemented with jaeger) + + #opentracing: + # # Enable / disable tracer + # tracer_enabled: false + # # The list of homeservers we wish to expose our current traces to. + # # The list is a list of regexes which are matched against the + # # servername of the homeserver + # homeserver_whitelist: + # - ".*" + """ diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2efdcff7e..c45d458d9 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -21,6 +21,7 @@ import re from twisted.internet import defer import synapse +import synapse.logging.opentracing as opentracing from synapse.api.errors import Codes, FederationDeniedError, SynapseError from synapse.api.room_versions import RoomVersions from synapse.api.urls import ( @@ -288,14 +289,29 @@ class BaseFederationServlet(object): logger.warn("authenticate_request failed: %s", e) raise - if origin: - with ratelimiter.ratelimit(origin) as d: - yield d + # Start an opentracing span + with opentracing.start_active_span_from_context( + request.requestHeaders, + "incoming-federation-request", + tags={ + "request_id": request.get_request_id(), + opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER, + opentracing.tags.HTTP_METHOD: request.get_method(), + opentracing.tags.HTTP_URL: request.get_redacted_uri(), + opentracing.tags.PEER_HOST_IPV6: request.getClientIP(), + "authenticated_entity": origin, + }, + ): + if origin: + with ratelimiter.ratelimit(origin) as d: + yield d + response = yield func( + origin, content, request.args, *args, **kwargs + ) + else: response = yield func( origin, content, request.args, *args, **kwargs ) - else: - response = yield func(origin, content, request.args, *args, **kwargs) defer.returnValue(response) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index dee3710f6..e60334547 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -36,6 +36,7 @@ from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +import synapse.logging.opentracing as opentracing import synapse.metrics import synapse.util.retryutils from synapse.api.errors import ( @@ -339,9 +340,25 @@ class MatrixFederationHttpClient(object): else: query_bytes = b"" - headers_dict = {b"User-Agent": [self.version_string_bytes]} + # Retreive current span + scope = opentracing.start_active_span( + "outgoing-federation-request", + tags={ + opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT, + opentracing.tags.PEER_ADDRESS: request.destination, + opentracing.tags.HTTP_METHOD: request.method, + opentracing.tags.HTTP_URL: request.path, + }, + finish_on_close=True, + ) - with limiter: + # Inject the span into the headers + headers_dict = {} + opentracing.inject_active_span_byte_dict(headers_dict, request.destination) + + headers_dict[b"User-Agent"] = [self.version_string_bytes] + + with limiter, scope: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -419,6 +436,10 @@ class MatrixFederationHttpClient(object): response.phrase.decode("ascii", errors="replace"), ) + opentracing.set_tag( + opentracing.tags.HTTP_STATUS_CODE, response.code + ) + if 200 <= response.code < 300: pass else: @@ -499,8 +520,7 @@ class MatrixFederationHttpClient(object): _flatten_response_never_received(e), ) raise - - defer.returnValue(response) + defer.returnValue(response) def build_auth_headers( self, destination, method, url_bytes, content=None, destination_is=None diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index cd8415acd..889038ff2 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -20,6 +20,7 @@ import logging from canonicaljson import json from synapse.api.errors import Codes, SynapseError +from synapse.logging.opentracing import trace_servlet logger = logging.getLogger(__name__) @@ -290,7 +291,11 @@ class RestServlet(object): for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"): if hasattr(self, "on_%s" % (method,)): method_handler = getattr(self, "on_%s" % (method,)) - http_server.register_paths(method, patterns, method_handler) + http_server.register_paths( + method, + patterns, + trace_servlet(self.__class__.__name__, method_handler), + ) else: raise NotImplementedError("RestServlet must register something.") diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 30dfa1d6b..b456c31f7 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -186,6 +186,7 @@ class LoggingContext(object): "alive", "request", "tag", + "scope", ] thread_local = threading.local() @@ -238,6 +239,7 @@ class LoggingContext(object): self.request = None self.tag = "" self.alive = True + self.scope = None self.parent_context = parent_context @@ -322,10 +324,12 @@ class LoggingContext(object): another LoggingContext """ - # 'request' is the only field we currently use in the logger, so that's - # all we need to copy + # we track the current request record.request = self.request + # we also track the current scope: + record.scope = self.scope + def start(self): if get_thread_id() != self.main_thread: logger.warning("Started logcontext %s on different thread", self) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py new file mode 100644 index 000000000..f0ceea2a6 --- /dev/null +++ b/synapse/logging/opentracing.py @@ -0,0 +1,362 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C.d +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import opentracing + + +# NOTE +# This is a small wrapper around opentracing because opentracing is not currently +# packaged downstream (specifically debian). Since opentracing instrumentation is +# fairly invasive it was awkward to make it optional. As a result we opted to encapsulate +# all opentracing state in these methods which effectively noop if opentracing is +# not present. We should strongly consider encouraging the downstream distributers +# to package opentracing and making opentracing a full dependency. In order to facilitate +# this move the methods have work very similarly to opentracing's and it should only +# be a matter of few regexes to move over to opentracing's access patterns proper. + +try: + import opentracing +except ImportError: + opentracing = None +try: + from jaeger_client import Config as JaegerConfig + from synapse.logging.scopecontextmanager import LogContextScopeManager +except ImportError: + JaegerConfig = None + LogContextScopeManager = None + +import contextlib +import logging +import re +from functools import wraps + +from twisted.internet import defer + +logger = logging.getLogger(__name__) + + +class _DumTagNames(object): + """wrapper of opentracings tags. We need to have them if we + want to reference them without opentracing around. Clearly they + should never actually show up in a trace. `set_tags` overwrites + these with the correct ones.""" + + INVALID_TAG = "invalid-tag" + COMPONENT = INVALID_TAG + DATABASE_INSTANCE = INVALID_TAG + DATABASE_STATEMENT = INVALID_TAG + DATABASE_TYPE = INVALID_TAG + DATABASE_USER = INVALID_TAG + ERROR = INVALID_TAG + HTTP_METHOD = INVALID_TAG + HTTP_STATUS_CODE = INVALID_TAG + HTTP_URL = INVALID_TAG + MESSAGE_BUS_DESTINATION = INVALID_TAG + PEER_ADDRESS = INVALID_TAG + PEER_HOSTNAME = INVALID_TAG + PEER_HOST_IPV4 = INVALID_TAG + PEER_HOST_IPV6 = INVALID_TAG + PEER_PORT = INVALID_TAG + PEER_SERVICE = INVALID_TAG + SAMPLING_PRIORITY = INVALID_TAG + SERVICE = INVALID_TAG + SPAN_KIND = INVALID_TAG + SPAN_KIND_CONSUMER = INVALID_TAG + SPAN_KIND_PRODUCER = INVALID_TAG + SPAN_KIND_RPC_CLIENT = INVALID_TAG + SPAN_KIND_RPC_SERVER = INVALID_TAG + + +def only_if_tracing(func): + """Executes the function only if we're tracing. Otherwise return. + Assumes the function wrapped may return None""" + + @wraps(func) + def _only_if_tracing_inner(*args, **kwargs): + if opentracing: + return func(*args, **kwargs) + else: + return + + return _only_if_tracing_inner + + +# Block everything by default +_homeserver_whitelist = None + +tags = _DumTagNames + + +def init_tracer(config): + """Set the whitelists and initialise the JaegerClient tracer + + Args: + config (Config) + The config used by the homeserver. Here it's used to set the service + name to the homeserver's. + """ + global opentracing + if not config.tracer_config.get("tracer_enabled", False): + # We don't have a tracer + opentracing = None + return + + if not opentracing: + logger.error( + "The server has been configure to use opentracing but opentracing is not installed." + ) + raise ModuleNotFoundError("opentracing") + + if not JaegerConfig: + logger.error( + "The server has been configure to use opentracing but opentracing is not installed." + ) + + # Include the worker name + name = config.worker_name if config.worker_name else "master" + + set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"]) + jaeger_config = JaegerConfig( + config={"sampler": {"type": "const", "param": 1}, "logging": True}, + service_name="{} {}".format(config.server_name, name), + scope_manager=LogContextScopeManager(config), + ) + jaeger_config.initialize_tracer() + + # Set up tags to be opentracing's tags + global tags + tags = opentracing.tags + + +@contextlib.contextmanager +def _noop_context_manager(*args, **kwargs): + """Does absolutely nothing really well. Can be entered and exited arbitrarily. + Good substitute for an opentracing scope.""" + yield + + +# Could use kwargs but I want these to be explicit +def start_active_span( + operation_name, + child_of=None, + references=None, + tags=None, + start_time=None, + ignore_active_span=False, + finish_on_close=True, +): + """Starts an active opentracing span. Note, the scope doesn't become active + until it has been entered, however, the span starts from the time this + message is called. + Args: + See opentracing.tracer + Returns: + scope (Scope) or noop_context_manager + """ + if opentracing is None: + return _noop_context_manager() + else: + # We need to enter the scope here for the logcontext to become active + return opentracing.tracer.start_active_span( + operation_name, + child_of=child_of, + references=references, + tags=tags, + start_time=start_time, + ignore_active_span=ignore_active_span, + finish_on_close=finish_on_close, + ) + + +@only_if_tracing +def close_active_span(): + """Closes the active span. This will close it's logcontext if the context + was made for the span""" + opentracing.tracer.scope_manager.active.__exit__(None, None, None) + + +@only_if_tracing +def set_tag(key, value): + """Set's a tag on the active span""" + opentracing.tracer.active_span.set_tag(key, value) + + +@only_if_tracing +def log_kv(key_values, timestamp=None): + """Log to the active span""" + opentracing.tracer.active_span.log_kv(key_values, timestamp) + + +# Note: we don't have a get baggage items because we're trying to hide all +# scope and span state from synapse. I think this method may also be useless +# as a result +@only_if_tracing +def set_baggage_item(key, value): + """Attach baggage to the active span""" + opentracing.tracer.active_span.set_baggage_item(key, value) + + +@only_if_tracing +def set_operation_name(operation_name): + """Sets the operation name of the active span""" + opentracing.tracer.active_span.set_operation_name(operation_name) + + +@only_if_tracing +def set_homeserver_whitelist(homeserver_whitelist): + """Sets the whitelist + + Args: + homeserver_whitelist (iterable of strings): regex of whitelisted homeservers + """ + global _homeserver_whitelist + if homeserver_whitelist: + # Makes a single regex which accepts all passed in regexes in the list + _homeserver_whitelist = re.compile( + "({})".format(")|(".join(homeserver_whitelist)) + ) + + +@only_if_tracing +def whitelisted_homeserver(destination): + """Checks if a destination matches the whitelist + Args: + destination (String)""" + global _homeserver_whitelist + if _homeserver_whitelist: + return _homeserver_whitelist.match(destination) + return False + + +def start_active_span_from_context( + headers, + operation_name, + references=None, + tags=None, + start_time=None, + ignore_active_span=False, + finish_on_close=True, +): + """ + Extracts a span context from Twisted Headers. + args: + headers (twisted.web.http_headers.Headers) + returns: + span_context (opentracing.span.SpanContext) + """ + # Twisted encodes the values as lists whereas opentracing doesn't. + # So, we take the first item in the list. + # Also, twisted uses byte arrays while opentracing expects strings. + if opentracing is None: + return _noop_context_manager() + + header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()} + context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict) + + return opentracing.tracer.start_active_span( + operation_name, + child_of=context, + references=references, + tags=tags, + start_time=start_time, + ignore_active_span=ignore_active_span, + finish_on_close=finish_on_close, + ) + + +@only_if_tracing +def inject_active_span_twisted_headers(headers, destination): + """ + Injects a span context into twisted headers inplace + + Args: + headers (twisted.web.http_headers.Headers) + span (opentracing.Span) + + Returns: + Inplace modification of headers + + Note: + The headers set by the tracer are custom to the tracer implementation which + should be unique enough that they don't interfere with any headers set by + synapse or twisted. If we're still using jaeger these headers would be those + here: + https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py + """ + + if not whitelisted_homeserver(destination): + return + + span = opentracing.tracer.active_span + carrier = {} + opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) + + for key, value in carrier.items(): + headers.addRawHeaders(key, value) + + +@only_if_tracing +def inject_active_span_byte_dict(headers, destination): + """ + Injects a span context into a dict where the headers are encoded as byte + strings + + Args: + headers (dict) + span (opentracing.Span) + + Returns: + Inplace modification of headers + + Note: + The headers set by the tracer are custom to the tracer implementation which + should be unique enough that they don't interfere with any headers set by + synapse or twisted. If we're still using jaeger these headers would be those + here: + https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py + """ + if not whitelisted_homeserver(destination): + return + + span = opentracing.tracer.active_span + + carrier = {} + opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier) + + for key, value in carrier.items(): + headers[key.encode()] = [value.encode()] + + +def trace_servlet(servlet_name, func): + """Decorator which traces a serlet. It starts a span with some servlet specific + tags such as the servlet_name and request information""" + + @wraps(func) + @defer.inlineCallbacks + def _trace_servlet_inner(request, *args, **kwargs): + with start_active_span_from_context( + request.requestHeaders, + "incoming-client-request", + tags={ + "request_id": request.get_request_id(), + tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, + tags.HTTP_METHOD: request.get_method(), + tags.HTTP_URL: request.get_redacted_uri(), + tags.PEER_HOST_IPV6: request.getClientIP(), + "servlet_name": servlet_name, + }, + ): + result = yield defer.maybeDeferred(func, request, *args, **kwargs) + defer.returnValue(result) + + return _trace_servlet_inner diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py new file mode 100644 index 000000000..91e14462f --- /dev/null +++ b/synapse/logging/scopecontextmanager.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.import logging + +import logging + +from opentracing import Scope, ScopeManager + +import twisted + +from synapse.logging.context import LoggingContext, nested_logging_context + +logger = logging.getLogger(__name__) + + +class LogContextScopeManager(ScopeManager): + """ + The LogContextScopeManager tracks the active scope in opentracing + by using the log contexts which are native to synapse. This is so + that the basic opentracing api can be used across twisted defereds. + (I would love to break logcontexts and this into an OS package. but + let's wait for twisted's contexts to be released.) + """ + + def __init__(self, config): + # Set the whitelists + logger.info(config.tracer_config) + self._homeserver_whitelist = config.tracer_config["homeserver_whitelist"] + + @property + def active(self): + """ + Returns the currently active Scope which can be used to access the + currently active Scope.span. + If there is a non-null Scope, its wrapped Span + becomes an implicit parent of any newly-created Span at + Tracer.start_active_span() time. + + Return: + (Scope) : the Scope that is active, or None if not + available. + """ + ctx = LoggingContext.current_context() + if ctx is LoggingContext.sentinel: + return None + else: + return ctx.scope + + def activate(self, span, finish_on_close): + """ + Makes a Span active. + Args + span (Span): the span that should become active. + finish_on_close (Boolean): whether Span should be automatically + finished when Scope.close() is called. + + Returns: + Scope to control the end of the active period for + *span*. It is a programming error to neglect to call + Scope.close() on the returned instance. + """ + + enter_logcontext = False + ctx = LoggingContext.current_context() + + if ctx is LoggingContext.sentinel: + # We don't want this scope to affect. + logger.error("Tried to activate scope outside of loggingcontext") + return Scope(None, span) + elif ctx.scope is not None: + # We want the logging scope to look exactly the same so we give it + # a blank suffix + ctx = nested_logging_context("") + enter_logcontext = True + + scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close) + ctx.scope = scope + return scope + + +class _LogContextScope(Scope): + """ + A custom opentracing scope. The only significant difference is that it will + close the log context it's related to if the logcontext was created specifically + for this scope. + """ + + def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close): + """ + Args: + manager (LogContextScopeManager): + the manager that is responsible for this scope. + span (Span): + the opentracing span which this scope represents the local + lifetime for. + logcontext (LogContext): + the logcontext to which this scope is attached. + enter_logcontext (Boolean): + if True the logcontext will be entered and exited when the scope + is entered and exited respectively + finish_on_close (Boolean): + if True finish the span when the scope is closed + """ + super(_LogContextScope, self).__init__(manager, span) + self.logcontext = logcontext + self._finish_on_close = finish_on_close + self._enter_logcontext = enter_logcontext + + def __enter__(self): + if self._enter_logcontext: + self.logcontext.__enter__() + + def __exit__(self, type, value, traceback): + if type == twisted.internet.defer._DefGen_Return: + super(_LogContextScope, self).__exit__(None, None, None) + else: + super(_LogContextScope, self).__exit__(type, value, traceback) + if self._enter_logcontext: + self.logcontext.__exit__(type, value, traceback) + else: # the logcontext existed before the creation of the scope + self.logcontext.scope = None + + def close(self): + if self.manager.active is not self: + logger.error("Tried to close a none active scope!") + return + + if self._finish_on_close: + self.span.finish() diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 6324c00ef..e7618057b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -95,6 +95,7 @@ CONDITIONAL_REQUIREMENTS = { "url_preview": ["lxml>=3.5.0"], "test": ["mock>=2.0", "parameterized"], "sentry": ["sentry-sdk>=0.7.2"], + "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], "jwt": ["pyjwt>=1.6.4"], }