synapse-product/synapse/logging/_structured.py

375 lines
12 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os.path
import sys
import typing
import warnings
import attr
from constantly import NamedConstant, Names, ValueConstant, Values
from zope.interface import implementer
from twisted.logger import (
FileLogObserver,
FilteringLogObserver,
ILogObserver,
LogBeginner,
Logger,
LogLevel,
LogLevelFilterPredicate,
LogPublisher,
eventAsText,
globalLogBeginner,
jsonFileLogObserver,
)
from synapse.config._base import ConfigError
from synapse.logging._terse_json import (
TerseJSONToConsoleLogObserver,
TerseJSONToTCPLogObserver,
)
from synapse.logging.context import LoggingContext
def stdlib_log_level_to_twisted(level: str) -> LogLevel:
"""
Convert a stdlib log level to Twisted's log level.
"""
lvl = level.lower().replace("warning", "warn")
return LogLevel.levelWithName(lvl)
@attr.s
@implementer(ILogObserver)
class LogContextObserver(object):
"""
An ILogObserver which adds Synapse-specific log context information.
Attributes:
observer (ILogObserver): The target parent observer.
"""
observer = attr.ib()
def __call__(self, event: dict) -> None:
"""
Consume a log event and emit it to the parent observer after filtering
and adding log context information.
Args:
event (dict)
"""
# Filter out some useless events that Twisted outputs
if "log_text" in event:
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
return
if event["log_text"].startswith("(UDP Port "):
return
if event["log_text"].startswith("Timing out client") or event[
"log_format"
].startswith("Timing out client"):
return
context = LoggingContext.current_context()
# Copy the context information to the log event.
if context is not None:
context.copy_to_twisted_log_entry(event)
else:
# If there's no logging context, not even the root one, we might be
# starting up or it might be from non-Synapse code. Log it as if it
# came from the root logger.
event["request"] = None
event["scope"] = None
self.observer(event)
class PythonStdlibToTwistedLogger(logging.Handler):
"""
Transform a Python stdlib log message into a Twisted one.
"""
def __init__(self, observer, *args, **kwargs):
"""
Args:
observer (ILogObserver): A Twisted logging observer.
*args, **kwargs: Args/kwargs to be passed to logging.Handler.
"""
self.observer = observer
super().__init__(*args, **kwargs)
def emit(self, record: logging.LogRecord) -> None:
"""
Emit a record to Twisted's observer.
Args:
record (logging.LogRecord)
"""
self.observer(
{
"log_time": record.created,
"log_text": record.getMessage(),
"log_format": "{log_text}",
"log_namespace": record.name,
"log_level": stdlib_log_level_to_twisted(record.levelname),
}
)
def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver:
"""
A log observer that formats events like the traditional log formatter and
sends them to `outFile`.
Args:
outFile (file object): The file object to write to.
"""
def formatEvent(_event: dict) -> str:
event = dict(_event)
event["log_level"] = event["log_level"].name.upper()
event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
event.get("log_format", "{log_text}") or "{log_text}"
)
return eventAsText(event, includeSystem=False) + "\n"
return FileLogObserver(outFile, formatEvent)
class DrainType(Names):
CONSOLE = NamedConstant()
CONSOLE_JSON = NamedConstant()
CONSOLE_JSON_TERSE = NamedConstant()
FILE = NamedConstant()
FILE_JSON = NamedConstant()
NETWORK_JSON_TERSE = NamedConstant()
class OutputPipeType(Values):
stdout = ValueConstant(sys.__stdout__)
stderr = ValueConstant(sys.__stderr__)
@attr.s
class DrainConfiguration(object):
name = attr.ib()
type = attr.ib()
location = attr.ib()
options = attr.ib(default=None)
@attr.s
class NetworkJSONTerseOptions(object):
maximum_buffer = attr.ib(type=int)
DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
def parse_drain_configs(
drains: dict
) -> typing.Generator[DrainConfiguration, None, None]:
"""
Parse the drain configurations.
Args:
drains (dict): A list of drain configurations.
Yields:
DrainConfiguration instances.
Raises:
ConfigError: If any of the drain configuration items are invalid.
"""
for name, config in drains.items():
if "type" not in config:
raise ConfigError("Logging drains require a 'type' key.")
try:
logging_type = DrainType.lookupByName(config["type"].upper())
except ValueError:
raise ConfigError(
"%s is not a known logging drain type." % (config["type"],)
)
if logging_type in [
DrainType.CONSOLE,
DrainType.CONSOLE_JSON,
DrainType.CONSOLE_JSON_TERSE,
]:
location = config.get("location")
if location is None or location not in ["stdout", "stderr"]:
raise ConfigError(
(
"The %s drain needs the 'location' key set to "
"either 'stdout' or 'stderr'."
)
% (logging_type,)
)
pipe = OutputPipeType.lookupByName(location).value
yield DrainConfiguration(name=name, type=logging_type, location=pipe)
elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
if "location" not in config:
raise ConfigError(
"The %s drain needs the 'location' key set." % (logging_type,)
)
location = config.get("location")
if os.path.abspath(location) != location:
raise ConfigError(
"File paths need to be absolute, '%s' is a relative path"
% (location,)
)
yield DrainConfiguration(name=name, type=logging_type, location=location)
elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
host = config.get("host")
port = config.get("port")
maximum_buffer = config.get("maximum_buffer", 1000)
yield DrainConfiguration(
name=name,
type=logging_type,
location=(host, port),
options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
)
else:
raise ConfigError(
"The %s drain type is currently not implemented."
% (config["type"].upper(),)
)
def setup_structured_logging(
hs,
config,
log_config: dict,
logBeginner: LogBeginner = globalLogBeginner,
redirect_stdlib_logging: bool = True,
) -> LogPublisher:
"""
Set up Twisted's structured logging system.
Args:
hs: The homeserver to use.
config (HomeserverConfig): The configuration of the Synapse homeserver.
log_config (dict): The log configuration to use.
"""
if config.no_redirect_stdio:
raise ConfigError(
"no_redirect_stdio cannot be defined using structured logging."
)
logger = Logger()
if "drains" not in log_config:
raise ConfigError("The logging configuration requires a list of drains.")
observers = []
for observer in parse_drain_configs(log_config["drains"]):
# Pipe drains
if observer.type == DrainType.CONSOLE:
logger.debug(
"Starting up the {name} console logger drain", name=observer.name
)
observers.append(SynapseFileLogObserver(observer.location))
elif observer.type == DrainType.CONSOLE_JSON:
logger.debug(
"Starting up the {name} JSON console logger drain", name=observer.name
)
observers.append(jsonFileLogObserver(observer.location))
elif observer.type == DrainType.CONSOLE_JSON_TERSE:
logger.debug(
"Starting up the {name} terse JSON console logger drain",
name=observer.name,
)
observers.append(
TerseJSONToConsoleLogObserver(observer.location, metadata={})
)
# File drains
elif observer.type == DrainType.FILE:
logger.debug("Starting up the {name} file logger drain", name=observer.name)
log_file = open(observer.location, "at", buffering=1, encoding="utf8")
observers.append(SynapseFileLogObserver(log_file))
elif observer.type == DrainType.FILE_JSON:
logger.debug(
"Starting up the {name} JSON file logger drain", name=observer.name
)
log_file = open(observer.location, "at", buffering=1, encoding="utf8")
observers.append(jsonFileLogObserver(log_file))
elif observer.type == DrainType.NETWORK_JSON_TERSE:
metadata = {"server_name": hs.config.server_name}
log_observer = TerseJSONToTCPLogObserver(
hs=hs,
host=observer.location[0],
port=observer.location[1],
metadata=metadata,
maximum_buffer=observer.options.maximum_buffer,
)
log_observer.start()
observers.append(log_observer)
else:
# We should never get here, but, just in case, throw an error.
raise ConfigError("%s drain type cannot be configured" % (observer.type,))
publisher = LogPublisher(*observers)
log_filter = LogLevelFilterPredicate()
for namespace, namespace_config in log_config.get(
"loggers", DEFAULT_LOGGERS
).items():
# Set the log level for twisted.logger.Logger namespaces
log_filter.setLogLevelForNamespace(
namespace,
stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
)
# Also set the log levels for the stdlib logger namespaces, to prevent
# them getting to PythonStdlibToTwistedLogger and having to be formatted
if "level" in namespace_config:
logging.getLogger(namespace).setLevel(namespace_config.get("level"))
f = FilteringLogObserver(publisher, [log_filter])
lco = LogContextObserver(f)
if redirect_stdlib_logging:
stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
stdliblogger = logging.getLogger()
stdliblogger.addHandler(stuff_into_twisted)
# Always redirect standard I/O, otherwise other logging outputs might miss
# it.
logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
return publisher
def reload_structured_logging(*args, log_config=None) -> None:
warnings.warn(
"Currently the structured logging system can not be reloaded, doing nothing"
)