mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-12-10 17:04:21 -05:00
224 lines
8.4 KiB
Python
224 lines
8.4 KiB
Python
# Copyright 2014-2016 OpenMarket Ltd
|
|
# Copyright 2019 New Vector Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
|
|
from service_identity import VerificationError
|
|
from service_identity.pyopenssl import verify_hostname, verify_ip_address
|
|
from zope.interface import implementer
|
|
|
|
from OpenSSL import SSL, crypto
|
|
from twisted.internet._sslverify import _defaultCurveName
|
|
from twisted.internet.abstract import isIPAddress, isIPv6Address
|
|
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
|
|
from twisted.internet.ssl import (
|
|
CertificateOptions,
|
|
ContextFactory,
|
|
TLSVersion,
|
|
platformTrust,
|
|
)
|
|
from twisted.python.failure import Failure
|
|
from twisted.web.iweb import IPolicyForHTTPS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_TLS_VERSION_MAP = {
|
|
"1": TLSVersion.TLSv1_0,
|
|
"1.1": TLSVersion.TLSv1_1,
|
|
"1.2": TLSVersion.TLSv1_2,
|
|
"1.3": TLSVersion.TLSv1_3,
|
|
}
|
|
|
|
|
|
class ServerContextFactory(ContextFactory):
|
|
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
|
|
connections."""
|
|
|
|
def __init__(self, config):
|
|
self._context = SSL.Context(SSL.SSLv23_METHOD)
|
|
self.configure_context(self._context, config)
|
|
|
|
@staticmethod
|
|
def configure_context(context, config):
|
|
try:
|
|
_ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
|
|
context.set_tmp_ecdh(_ecCurve)
|
|
except Exception:
|
|
logger.exception("Failed to enable elliptic curve for TLS")
|
|
|
|
context.set_options(
|
|
SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
|
|
)
|
|
context.use_certificate_chain_file(config.tls_certificate_file)
|
|
context.use_privatekey(config.tls_private_key)
|
|
|
|
# https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
|
|
context.set_cipher_list(
|
|
"ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
|
|
)
|
|
|
|
def getContext(self):
|
|
return self._context
|
|
|
|
|
|
@implementer(IPolicyForHTTPS)
|
|
class ClientTLSOptionsFactory(object):
|
|
"""Factory for Twisted SSLClientConnectionCreators that are used to make connections
|
|
to remote servers for federation.
|
|
|
|
Uses one of two OpenSSL context objects for all connections, depending on whether
|
|
we should do SSL certificate verification.
|
|
|
|
get_options decides whether we should do SSL certificate verification and
|
|
constructs an SSLClientConnectionCreator factory accordingly.
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
self._config = config
|
|
|
|
# Check if we're using a custom list of a CA certificates
|
|
trust_root = config.federation_ca_trust_root
|
|
if trust_root is None:
|
|
# Use CA root certs provided by OpenSSL
|
|
trust_root = platformTrust()
|
|
|
|
# "insecurelyLowerMinimumTo" is the argument that will go lower than
|
|
# Twisted's default, which is why it is marked as "insecure" (since
|
|
# Twisted's defaults are reasonably secure). But, since Twisted is
|
|
# moving to TLS 1.2 by default, we want to respect the config option if
|
|
# it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
|
|
# let us do).
|
|
minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]
|
|
|
|
self._verify_ssl = CertificateOptions(
|
|
trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
|
|
)
|
|
self._verify_ssl_context = self._verify_ssl.getContext()
|
|
self._verify_ssl_context.set_info_callback(self._context_info_cb)
|
|
|
|
self._no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
|
|
self._no_verify_ssl_context = self._no_verify_ssl.getContext()
|
|
self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
|
|
|
|
def get_options(self, host: bytes):
|
|
|
|
# IPolicyForHTTPS.get_options takes bytes, but we want to compare
|
|
# against the str whitelist. The hostnames in the whitelist are already
|
|
# IDNA-encoded like the hosts will be here.
|
|
ascii_host = host.decode("ascii")
|
|
|
|
# Check if certificate verification has been enabled
|
|
should_verify = self._config.federation_verify_certificates
|
|
|
|
# Check if we've disabled certificate verification for this host
|
|
if should_verify:
|
|
for regex in self._config.federation_certificate_verification_whitelist:
|
|
if regex.match(ascii_host):
|
|
should_verify = False
|
|
break
|
|
|
|
ssl_context = (
|
|
self._verify_ssl_context if should_verify else self._no_verify_ssl_context
|
|
)
|
|
|
|
return SSLClientConnectionCreator(host, ssl_context, should_verify)
|
|
|
|
@staticmethod
|
|
def _context_info_cb(ssl_connection, where, ret):
|
|
"""The 'information callback' for our openssl context object."""
|
|
# we assume that the app_data on the connection object has been set to
|
|
# a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
|
|
tls_protocol = ssl_connection.get_app_data()
|
|
try:
|
|
# ... we further assume that SSLClientConnectionCreator has set the
|
|
# '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
|
|
tls_protocol._synapse_tls_verifier.verify_context_info_cb(
|
|
ssl_connection, where
|
|
)
|
|
except: # noqa: E722, taken from the twisted implementation
|
|
logger.exception("Error during info_callback")
|
|
f = Failure()
|
|
tls_protocol.failVerification(f)
|
|
|
|
def creatorForNetloc(self, hostname, port):
|
|
"""Implements the IPolicyForHTTPS interace so that this can be passed
|
|
directly to agents.
|
|
"""
|
|
return self.get_options(hostname)
|
|
|
|
|
|
@implementer(IOpenSSLClientConnectionCreator)
|
|
class SSLClientConnectionCreator(object):
|
|
"""Creates openssl connection objects for client connections.
|
|
|
|
Replaces twisted.internet.ssl.ClientTLSOptions
|
|
"""
|
|
|
|
def __init__(self, hostname: bytes, ctx, verify_certs: bool):
|
|
self._ctx = ctx
|
|
self._verifier = ConnectionVerifier(hostname, verify_certs)
|
|
|
|
def clientConnectionForTLS(self, tls_protocol):
|
|
context = self._ctx
|
|
connection = SSL.Connection(context, None)
|
|
|
|
# as per twisted.internet.ssl.ClientTLSOptions, we set the application
|
|
# data to our TLSMemoryBIOProtocol...
|
|
connection.set_app_data(tls_protocol)
|
|
|
|
# ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
|
|
# tls_protocol so that the SSL context's info callback has something to
|
|
# call to do the cert verification.
|
|
setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
|
|
return connection
|
|
|
|
|
|
class ConnectionVerifier(object):
|
|
"""Set the SNI, and do cert verification
|
|
|
|
This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
|
|
the ssl context's info callback.
|
|
"""
|
|
|
|
# This code is based on twisted.internet.ssl.ClientTLSOptions.
|
|
|
|
def __init__(self, hostname: bytes, verify_certs):
|
|
self._verify_certs = verify_certs
|
|
|
|
_decoded = hostname.decode("ascii")
|
|
if isIPAddress(_decoded) or isIPv6Address(_decoded):
|
|
self._is_ip_address = True
|
|
else:
|
|
self._is_ip_address = False
|
|
|
|
self._hostnameBytes = hostname
|
|
self._hostnameASCII = self._hostnameBytes.decode("ascii")
|
|
|
|
def verify_context_info_cb(self, ssl_connection, where):
|
|
if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
|
|
ssl_connection.set_tlsext_host_name(self._hostnameBytes)
|
|
|
|
if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
|
|
try:
|
|
if self._is_ip_address:
|
|
verify_ip_address(ssl_connection, self._hostnameASCII)
|
|
else:
|
|
verify_hostname(ssl_connection, self._hostnameASCII)
|
|
except VerificationError:
|
|
f = Failure()
|
|
tls_protocol = ssl_connection.get_app_data()
|
|
tls_protocol.failVerification(f)
|