mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-11 02:15:03 -05:00
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_federation
This commit is contained in:
commit
ef184caf30
217 changed files with 2601 additions and 2756 deletions
|
|
@ -17,4 +17,4 @@
|
|||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.33.1"
|
||||
__version__ = "0.33.2"
|
||||
|
|
|
|||
|
|
@ -775,11 +775,25 @@ class Auth(object):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_auth_blocking(self):
|
||||
def check_auth_blocking(self, user_id=None):
|
||||
"""Checks if the user should be rejected for some external reason,
|
||||
such as monthly active user limiting or global disable flag
|
||||
|
||||
Args:
|
||||
user_id(str|None): If present, checks for presence against existing
|
||||
MAU cohort
|
||||
"""
|
||||
if self.hs.config.hs_disabled:
|
||||
raise AuthError(
|
||||
403, self.hs.config.hs_disabled_message, errcode=Codes.HS_DISABLED
|
||||
)
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
# If the user is already part of the MAU cohort
|
||||
if user_id:
|
||||
timestamp = yield self.store.user_last_seen_monthly_active(user_id)
|
||||
if timestamp:
|
||||
return
|
||||
# Else if there is no room in the MAU bucket, bail
|
||||
current_mau = yield self.store.get_monthly_active_count()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise AuthError(
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ class Codes(object):
|
|||
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
|
||||
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
|
||||
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
|
||||
HS_DISABLED = "M_HS_DISABLED"
|
||||
UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
|
||||
INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
|
||||
|
||||
|
|
|
|||
|
|
@ -168,11 +168,13 @@ def start(config_options):
|
|||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = ClientReaderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -174,11 +174,13 @@ def start(config_options):
|
|||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = EventCreatorServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -153,11 +153,13 @@ def start(config_options):
|
|||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = FederationReaderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
|
|||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.httpresourcetree import create_resource_tree
|
||||
from synapse.util.logcontext import LoggingContext, run_in_background
|
||||
from synapse.util.manhole import manhole
|
||||
|
|
@ -186,11 +186,13 @@ def start(config_options):
|
|||
config.send_federation = True
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ps = FederationSenderServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -208,11 +208,13 @@ def start(config_options):
|
|||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = FrontendProxyServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
|
|||
|
||||
|
||||
# Gauges to expose monthly active user control metrics
|
||||
current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
|
||||
max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
|
||||
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
|
||||
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
|
||||
|
||||
|
||||
def setup(config_options):
|
||||
|
|
@ -338,6 +338,7 @@ def setup(config_options):
|
|||
events.USE_FROZEN_DICTS = config.use_frozen_dicts
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
database_engine = create_engine(config.database_config)
|
||||
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
|
||||
|
|
@ -346,6 +347,7 @@ def setup(config_options):
|
|||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
@ -530,7 +532,7 @@ def run(hs):
|
|||
if hs.config.limit_usage_by_mau:
|
||||
count = yield hs.get_datastore().get_monthly_active_count()
|
||||
current_mau_gauge.set(float(count))
|
||||
max_mau_value_gauge.set(float(hs.config.max_mau_value))
|
||||
max_mau_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
hs.get_datastore().initialise_reserved_users(
|
||||
hs.config.mau_limits_reserved_threepids
|
||||
|
|
|
|||
|
|
@ -155,11 +155,13 @@ def start(config_options):
|
|||
database_engine = create_engine(config.database_config)
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ss = MediaRepositoryServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -214,11 +214,13 @@ def start(config_options):
|
|||
config.update_user_directory = True
|
||||
|
||||
tls_server_context_factory = context_factory.ServerContextFactory(config)
|
||||
tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
|
||||
|
||||
ps = UserDirectoryServer(
|
||||
config.server_name,
|
||||
db_config=config.database_config,
|
||||
tls_server_context_factory=tls_server_context_factory,
|
||||
tls_client_options_factory=tls_client_options_factory,
|
||||
config=config,
|
||||
version_string="Synapse/" + get_version_string(synapse),
|
||||
database_engine=database_engine,
|
||||
|
|
|
|||
|
|
@ -193,9 +193,8 @@ def setup_logging(config, use_worker_options=False):
|
|||
|
||||
def sighup(signum, stack):
|
||||
# it might be better to use a file watcher or something for this.
|
||||
logging.info("Reloading log config from %s due to SIGHUP",
|
||||
log_config)
|
||||
load_log_config()
|
||||
logging.info("Reloaded log config from %s due to SIGHUP", log_config)
|
||||
|
||||
load_log_config()
|
||||
|
||||
|
|
|
|||
|
|
@ -78,6 +78,10 @@ class ServerConfig(Config):
|
|||
"mau_limit_reserved_threepids", []
|
||||
)
|
||||
|
||||
# Options to disable HS
|
||||
self.hs_disabled = config.get("hs_disabled", False)
|
||||
self.hs_disabled_message = config.get("hs_disabled_message", "")
|
||||
|
||||
# FIXME: federation_domain_whitelist needs sytests
|
||||
self.federation_domain_whitelist = None
|
||||
federation_domain_whitelist = config.get(
|
||||
|
|
|
|||
|
|
@ -11,19 +11,22 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from zope.interface import implementer
|
||||
|
||||
from OpenSSL import SSL, crypto
|
||||
from twisted.internet import ssl
|
||||
from twisted.internet._sslverify import _defaultCurveName
|
||||
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
|
||||
from twisted.internet.ssl import CertificateOptions, ContextFactory
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ServerContextFactory(ssl.ContextFactory):
|
||||
class ServerContextFactory(ContextFactory):
|
||||
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
|
||||
connections and to make connections to remote servers."""
|
||||
connections."""
|
||||
|
||||
def __init__(self, config):
|
||||
self._context = SSL.Context(SSL.SSLv23_METHOD)
|
||||
|
|
@ -48,3 +51,78 @@ class ServerContextFactory(ssl.ContextFactory):
|
|||
|
||||
def getContext(self):
|
||||
return self._context
|
||||
|
||||
|
||||
def _idnaBytes(text):
|
||||
"""
|
||||
Convert some text typed by a human into some ASCII bytes. This is a
|
||||
copy of twisted.internet._idna._idnaBytes. For documentation, see the
|
||||
twisted documentation.
|
||||
"""
|
||||
try:
|
||||
import idna
|
||||
except ImportError:
|
||||
return text.encode("idna")
|
||||
else:
|
||||
return idna.encode(text)
|
||||
|
||||
|
||||
def _tolerateErrors(wrapped):
|
||||
"""
|
||||
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
|
||||
the error is immediately logged and the connection is dropped if possible.
|
||||
This is a copy of twisted.internet._sslverify._tolerateErrors. For
|
||||
documentation, see the twisted documentation.
|
||||
"""
|
||||
|
||||
def infoCallback(connection, where, ret):
|
||||
try:
|
||||
return wrapped(connection, where, ret)
|
||||
except: # noqa: E722, taken from the twisted implementation
|
||||
f = Failure()
|
||||
logger.exception("Error during info_callback")
|
||||
connection.get_app_data().failVerification(f)
|
||||
|
||||
return infoCallback
|
||||
|
||||
|
||||
@implementer(IOpenSSLClientConnectionCreator)
|
||||
class ClientTLSOptions(object):
|
||||
"""
|
||||
Client creator for TLS without certificate identity verification. This is a
|
||||
copy of twisted.internet._sslverify.ClientTLSOptions with the identity
|
||||
verification left out. For documentation, see the twisted documentation.
|
||||
"""
|
||||
|
||||
def __init__(self, hostname, ctx):
|
||||
self._ctx = ctx
|
||||
self._hostname = hostname
|
||||
self._hostnameBytes = _idnaBytes(hostname)
|
||||
ctx.set_info_callback(
|
||||
_tolerateErrors(self._identityVerifyingInfoCallback)
|
||||
)
|
||||
|
||||
def clientConnectionForTLS(self, tlsProtocol):
|
||||
context = self._ctx
|
||||
connection = SSL.Connection(context, None)
|
||||
connection.set_app_data(tlsProtocol)
|
||||
return connection
|
||||
|
||||
def _identityVerifyingInfoCallback(self, connection, where, ret):
|
||||
if where & SSL.SSL_CB_HANDSHAKE_START:
|
||||
connection.set_tlsext_host_name(self._hostnameBytes)
|
||||
|
||||
|
||||
class ClientTLSOptionsFactory(object):
|
||||
"""Factory for Twisted ClientTLSOptions that are used to make connections
|
||||
to remote servers for federation."""
|
||||
|
||||
def __init__(self, config):
|
||||
# We don't use config options yet
|
||||
pass
|
||||
|
||||
def get_options(self, host):
|
||||
return ClientTLSOptions(
|
||||
host.decode('utf-8'),
|
||||
CertificateOptions(verify=False).getContext()
|
||||
)
|
||||
|
|
|
|||
|
|
@ -30,14 +30,14 @@ KEY_API_V1 = b"/_matrix/key/v1/"
|
|||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
|
||||
def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
|
||||
"""Fetch the keys for a remote server."""
|
||||
|
||||
factory = SynapseKeyClientFactory()
|
||||
factory.path = path
|
||||
factory.host = server_name
|
||||
endpoint = matrix_federation_endpoint(
|
||||
reactor, server_name, ssl_context_factory, timeout=30
|
||||
reactor, server_name, tls_client_options_factory, timeout=30
|
||||
)
|
||||
|
||||
for i in range(5):
|
||||
|
|
|
|||
|
|
@ -512,7 +512,7 @@ class Keyring(object):
|
|||
continue
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_server_context_factory,
|
||||
server_name, self.hs.tls_client_options_factory,
|
||||
path=(b"/_matrix/key/v2/server/%s" % (
|
||||
urllib.quote(requested_key_id),
|
||||
)).encode("ascii"),
|
||||
|
|
@ -655,7 +655,7 @@ class Keyring(object):
|
|||
# Try to fetch the key from the remote server.
|
||||
|
||||
(response, tls_certificate) = yield fetch_server_key(
|
||||
server_name, self.hs.tls_server_context_factory
|
||||
server_name, self.hs.tls_client_options_factory
|
||||
)
|
||||
|
||||
# Check the response.
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ from synapse.replication.http.federation import (
|
|||
ReplicationGetQueryRestServlet,
|
||||
)
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util import async
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
|
|
@ -71,8 +71,8 @@ class FederationServer(FederationBase):
|
|||
self.auth = hs.get_auth()
|
||||
self.handler = hs.get_handlers().federation_handler
|
||||
|
||||
self._server_linearizer = async.Linearizer("fed_server")
|
||||
self._transaction_linearizer = async.Linearizer("fed_txn_handler")
|
||||
self._server_linearizer = Linearizer("fed_server")
|
||||
self._transaction_linearizer = Linearizer("fed_txn_handler")
|
||||
|
||||
self.transaction_actions = TransactionActions(self.store)
|
||||
|
||||
|
|
@ -204,7 +204,7 @@ class FederationServer(FederationBase):
|
|||
event_id, f.getTraceback().rstrip(),
|
||||
)
|
||||
|
||||
yield async.concurrently_execute(
|
||||
yield concurrently_execute(
|
||||
process_pdus_for_room, pdus_by_room.keys(),
|
||||
TRANSACTION_CONCURRENCY_LIMIT,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def delete_threepid(self, user_id, medium, address):
|
||||
"""Attempts to unbind the 3pid on the identity servers and deletes it
|
||||
from the local database.
|
||||
|
||||
Args:
|
||||
user_id (str)
|
||||
medium (str)
|
||||
address (str)
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: Returns True if successfully unbound the 3pid on
|
||||
the identity server, False if identity server doesn't support the
|
||||
unbind API.
|
||||
"""
|
||||
|
||||
# 'Canonicalise' email addresses as per above
|
||||
if medium == 'email':
|
||||
address = address.lower()
|
||||
|
||||
identity_handler = self.hs.get_handlers().identity_handler
|
||||
yield identity_handler.unbind_threepid(
|
||||
result = yield identity_handler.try_unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': medium,
|
||||
|
|
@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
|
|||
},
|
||||
)
|
||||
|
||||
ret = yield self.store.user_delete_threepid(
|
||||
yield self.store.user_delete_threepid(
|
||||
user_id, medium, address,
|
||||
)
|
||||
defer.returnValue(ret)
|
||||
defer.returnValue(result)
|
||||
|
||||
def _save_session(self, session):
|
||||
# TODO: Persistent storage
|
||||
|
|
|
|||
|
|
@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
|
|||
erase_data (bool): whether to GDPR-erase the user's data
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
Deferred[bool]: True if identity server supports removing
|
||||
threepids, otherwise False.
|
||||
"""
|
||||
# FIXME: Theoretically there is a race here wherein user resets
|
||||
# password using threepid.
|
||||
|
|
@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
|
|||
# leave the user still active so they can try again.
|
||||
# Ideally we would prevent password resets and then do this in the
|
||||
# background thread.
|
||||
|
||||
# This will be set to false if the identity server doesn't support
|
||||
# unbinding
|
||||
identity_server_supports_unbinding = True
|
||||
|
||||
threepids = yield self.store.user_get_threepids(user_id)
|
||||
for threepid in threepids:
|
||||
try:
|
||||
yield self._identity_handler.unbind_threepid(
|
||||
result = yield self._identity_handler.try_unbind_threepid(
|
||||
user_id,
|
||||
{
|
||||
'medium': threepid['medium'],
|
||||
'address': threepid['address'],
|
||||
},
|
||||
)
|
||||
identity_server_supports_unbinding &= result
|
||||
except Exception:
|
||||
# Do we want this to be a fatal error or should we carry on?
|
||||
logger.exception("Failed to remove threepid from ID server")
|
||||
|
|
@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
|
|||
# parts users from rooms (if it isn't already running)
|
||||
self._start_user_parting()
|
||||
|
||||
defer.returnValue(identity_server_supports_unbinding)
|
||||
|
||||
def _start_user_parting(self):
|
||||
"""
|
||||
Start the process that goes through the table of users
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
|
|||
from synapse.api.errors import FederationDeniedError
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util import stringutils
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.metrics import measure_func
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRes
|
|||
from synapse.state import resolve_events_with_factory
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
from synapse.util.logutils import log_function
|
||||
|
|
|
|||
|
|
@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
|
|||
defer.returnValue(data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def unbind_threepid(self, mxid, threepid):
|
||||
"""
|
||||
Removes a binding from an identity server
|
||||
def try_unbind_threepid(self, mxid, threepid):
|
||||
"""Removes a binding from an identity server
|
||||
|
||||
Args:
|
||||
mxid (str): Matrix user ID of binding to be removed
|
||||
threepid (dict): Dict with medium & address of binding to be removed
|
||||
|
||||
Raises:
|
||||
SynapseError: If we failed to contact the identity server
|
||||
|
||||
Returns:
|
||||
Deferred[bool]: True on success, otherwise False
|
||||
Deferred[bool]: True on success, otherwise False if the identity
|
||||
server doesn't support unbinding
|
||||
"""
|
||||
logger.debug("unbinding threepid %r from %s", threepid, mxid)
|
||||
if not self.trusted_id_servers:
|
||||
|
|
@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
|
|||
content=content,
|
||||
destination_is=id_server,
|
||||
)
|
||||
yield self.http_client.post_json_get_json(
|
||||
url,
|
||||
content,
|
||||
headers,
|
||||
)
|
||||
try:
|
||||
yield self.http_client.post_json_get_json(
|
||||
url,
|
||||
content,
|
||||
headers,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if e.code in (400, 404, 501,):
|
||||
# The remote server probably doesn't support unbinding (yet)
|
||||
logger.warn("Received %d response while unbinding threepid", e.code)
|
||||
defer.returnValue(False)
|
||||
else:
|
||||
logger.error("Failed to unbind threepid on identity server: %s", e)
|
||||
raise SynapseError(502, "Failed to contact identity server")
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
|
|||
from synapse.streams.config import PaginationConfig
|
||||
from synapse.types import StreamToken, UserID
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.snapshot_cache import SnapshotCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ from synapse.events.utils import serialize_event
|
|||
from synapse.events.validator import EventValidator
|
||||
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
|
||||
from synapse.types import RoomAlias, UserID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.metrics import measure_func
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ from synapse.api.constants import Membership
|
|||
from synapse.api.errors import SynapseError
|
||||
from synapse.events.utils import serialize_event
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async import ReadWriteLock
|
||||
from synapse.util.async_helpers import ReadWriteLock
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.stringutils import random_string
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
|
|||
from synapse.metrics import LaterGauge
|
||||
from synapse.storage.presence import UserPresenceState
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.logcontext import run_in_background
|
||||
from synapse.util.logutils import log_function
|
||||
|
|
@ -95,6 +95,7 @@ class PresenceHandler(object):
|
|||
Args:
|
||||
hs (synapse.server.HomeServer):
|
||||
"""
|
||||
self.hs = hs
|
||||
self.is_mine = hs.is_mine
|
||||
self.is_mine_id = hs.is_mine_id
|
||||
self.clock = hs.get_clock()
|
||||
|
|
@ -230,6 +231,10 @@ class PresenceHandler(object):
|
|||
earlier than they should when synapse is restarted. This affect of this
|
||||
is some spurious presence changes that will self-correct.
|
||||
"""
|
||||
# If the DB pool has already terminated, don't try updating
|
||||
if not self.hs.get_db_pool().running:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Performing _on_shutdown. Persisting %d unpersisted changes",
|
||||
len(self.user_to_current_state)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ from synapse.api.errors import (
|
|||
)
|
||||
from synapse.http.client import CaptchaServerHttpClient
|
||||
from synapse.types import RoomAlias, RoomID, UserID, create_requester
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.threepids import check_3pid_allowed
|
||||
|
||||
from ._base import BaseHandler
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.constants import EventTypes, JoinRules
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ import synapse.types
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||
from synapse.types import RoomID, UserID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.distributor import user_joined_room, user_left_room
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ from twisted.internet import defer
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.push.clientformat import format_push_rules_for_user
|
||||
from synapse.types import RoomStreamToken
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
|
|
@ -191,6 +191,7 @@ class SyncHandler(object):
|
|||
self.clock = hs.get_clock()
|
||||
self.response_cache = ResponseCache(hs, "sync")
|
||||
self.state = hs.get_state_handler()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
|
||||
self.lazy_loaded_members_cache = ExpiringCache(
|
||||
|
|
@ -198,19 +199,27 @@ class SyncHandler(object):
|
|||
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||
full_state=False):
|
||||
"""Get the sync for a client if we have new data for it now. Otherwise
|
||||
wait for new data to arrive on the server. If the timeout expires, then
|
||||
return an empty sync result.
|
||||
Returns:
|
||||
A Deferred SyncResult.
|
||||
Deferred[SyncResult]
|
||||
"""
|
||||
return self.response_cache.wrap(
|
||||
# If the user is not part of the mau group, then check that limits have
|
||||
# not been exceeded (if not part of the group by this point, almost certain
|
||||
# auth_blocking will occur)
|
||||
user_id = sync_config.user.to_string()
|
||||
yield self.auth.check_auth_blocking(user_id)
|
||||
|
||||
res = yield self.response_cache.wrap(
|
||||
sync_config.request_key,
|
||||
self._wait_for_sync_for_user,
|
||||
sync_config, since_token, timeout, full_state,
|
||||
)
|
||||
defer.returnValue(res)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ from twisted.web.http_headers import Headers
|
|||
from synapse.api.errors import Codes, HttpResponseException, SynapseError
|
||||
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
|
||||
from synapse.http.endpoint import SpiderEndpoint
|
||||
from synapse.util.async import add_timeout_to_deferred
|
||||
from synapse.util.async_helpers import add_timeout_to_deferred
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ from twisted.names.error import DNSNameError, DomainError
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
SERVER_CACHE = {}
|
||||
|
||||
# our record of an individual server which can be tried to reach a destination.
|
||||
|
|
@ -103,15 +102,16 @@ def parse_and_validate_server_name(server_name):
|
|||
return host, port
|
||||
|
||||
|
||||
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
|
||||
def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None,
|
||||
timeout=None):
|
||||
"""Construct an endpoint for the given matrix destination.
|
||||
|
||||
Args:
|
||||
reactor: Twisted reactor.
|
||||
destination (bytes): The name of the server to connect to.
|
||||
ssl_context_factory (twisted.internet.ssl.ContextFactory): Factory
|
||||
which generates SSL contexts to use for TLS.
|
||||
tls_client_options_factory
|
||||
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
|
||||
Factory which generates TLS options for client connections.
|
||||
timeout (int): connection timeout in seconds
|
||||
"""
|
||||
|
||||
|
|
@ -122,13 +122,13 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
|
|||
if timeout is not None:
|
||||
endpoint_kw_args.update(timeout=timeout)
|
||||
|
||||
if ssl_context_factory is None:
|
||||
if tls_client_options_factory is None:
|
||||
transport_endpoint = HostnameEndpoint
|
||||
default_port = 8008
|
||||
else:
|
||||
def transport_endpoint(reactor, host, port, timeout):
|
||||
return wrapClientTLS(
|
||||
ssl_context_factory,
|
||||
tls_client_options_factory.get_options(host),
|
||||
HostnameEndpoint(reactor, host, port, timeout=timeout))
|
||||
default_port = 8448
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ from synapse.api.errors import (
|
|||
from synapse.http import cancelled_to_request_timed_out_error
|
||||
from synapse.http.endpoint import matrix_federation_endpoint
|
||||
from synapse.util import logcontext
|
||||
from synapse.util.async import add_timeout_to_deferred
|
||||
from synapse.util.async_helpers import add_timeout_to_deferred
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -61,14 +61,14 @@ MAX_SHORT_RETRIES = 3
|
|||
|
||||
class MatrixFederationEndpointFactory(object):
|
||||
def __init__(self, hs):
|
||||
self.tls_server_context_factory = hs.tls_server_context_factory
|
||||
self.tls_client_options_factory = hs.tls_client_options_factory
|
||||
|
||||
def endpointForURI(self, uri):
|
||||
destination = uri.netloc
|
||||
|
||||
return matrix_federation_endpoint(
|
||||
reactor, destination, timeout=10,
|
||||
ssl_context_factory=self.tls_server_context_factory
|
||||
tls_client_options_factory=self.tls_client_options_factory
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ from synapse.api.errors import AuthError
|
|||
from synapse.handlers.presence import format_user_presence_state
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.types import StreamToken
|
||||
from synapse.util.async import (
|
||||
from synapse.util.async_helpers import (
|
||||
DeferredTimeoutError,
|
||||
ObservableDeferred,
|
||||
add_timeout_to_deferred,
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ from twisted.internet import defer
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.event_auth import get_user_power_level
|
||||
from synapse.state import POWER_KEY
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ from synapse.push.presentable_names import (
|
|||
name_from_member_event,
|
||||
)
|
||||
from synapse.types import UserID
|
||||
from synapse.util.async import concurrently_execute
|
||||
from synapse.util.async_helpers import concurrently_execute
|
||||
from synapse.visibility import filter_events_for_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
to ensure idempotency when performing PUTs using the REST API."""
|
||||
import logging
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
|
|||
|
|
@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
|
|||
if not is_admin:
|
||||
raise AuthError(403, "You are not a server admin")
|
||||
|
||||
yield self._deactivate_account_handler.deactivate_account(
|
||||
result = yield self._deactivate_account_handler.deactivate_account(
|
||||
target_user_id, erase,
|
||||
)
|
||||
defer.returnValue((200, {}))
|
||||
if result:
|
||||
id_server_unbind_result = "success"
|
||||
else:
|
||||
id_server_unbind_result = "no-support"
|
||||
|
||||
defer.returnValue((200, {
|
||||
"id_server_unbind_result": id_server_unbind_result,
|
||||
}))
|
||||
|
||||
|
||||
class ShutdownRoomRestServlet(ClientV1RestServlet):
|
||||
|
|
|
|||
|
|
@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
|
|||
yield self.auth_handler.validate_user_via_ui_auth(
|
||||
requester, body, self.hs.get_ip_from_request(request),
|
||||
)
|
||||
yield self._deactivate_account_handler.deactivate_account(
|
||||
result = yield self._deactivate_account_handler.deactivate_account(
|
||||
requester.user.to_string(), erase,
|
||||
)
|
||||
defer.returnValue((200, {}))
|
||||
if result:
|
||||
id_server_unbind_result = "success"
|
||||
else:
|
||||
id_server_unbind_result = "no-support"
|
||||
|
||||
defer.returnValue((200, {
|
||||
"id_server_unbind_result": id_server_unbind_result,
|
||||
}))
|
||||
|
||||
|
||||
class EmailThreepidRequestTokenRestServlet(RestServlet):
|
||||
|
|
@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
|
|||
user_id = requester.user.to_string()
|
||||
|
||||
try:
|
||||
yield self.auth_handler.delete_threepid(
|
||||
ret = yield self.auth_handler.delete_threepid(
|
||||
user_id, body['medium'], body['address']
|
||||
)
|
||||
except Exception:
|
||||
|
|
@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
|
|||
logger.exception("Failed to remove threepid")
|
||||
raise SynapseError(500, "Failed to remove threepid")
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
if ret:
|
||||
id_server_unbind_result = "success"
|
||||
else:
|
||||
id_server_unbind_result = "no-support"
|
||||
|
||||
defer.returnValue((200, {
|
||||
"id_server_unbind_result": id_server_unbind_result,
|
||||
}))
|
||||
|
||||
|
||||
class WhoamiRestServlet(RestServlet):
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ from synapse.api.errors import (
|
|||
)
|
||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.logcontext import make_deferred_yieldable
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
from synapse.util.stringutils import is_ascii, random_string
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ from synapse.http.server import (
|
|||
)
|
||||
from synapse.http.servlet import parse_integer, parse_string
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
from synapse.util.stringutils import is_ascii, random_string
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ from synapse import event_auth
|
|||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import AuthError
|
||||
from synapse.events.snapshot import EventContext
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.logutils import log_function
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
|
||||
<link rel="stylesheet" href="style.css">
|
||||
<script src="js/jquery-2.1.3.min.js"></script>
|
||||
<script src="js/recaptcha_ajax.js"></script>
|
||||
<script src="https://www.google.com/recaptcha/api/js/recaptcha_ajax.js"></script>
|
||||
<script src="register_config.js"></script>
|
||||
<script src="js/register.js"></script>
|
||||
</head>
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -96,6 +96,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
self._batch_row_update[key] = (user_agent, device_id, now)
|
||||
|
||||
def _update_client_ips_batch(self):
|
||||
|
||||
# If the DB pool has already terminated, don't try updating
|
||||
if not self.hs.get_db_pool().running:
|
||||
return
|
||||
|
||||
def update():
|
||||
to_update = self._batch_row_update
|
||||
self._batch_row_update = {}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
|
|||
from synapse.storage.event_federation import EventFederationStore
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.frozenutils import frozendict_json_encoder
|
||||
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
tp["medium"], tp["address"]
|
||||
)
|
||||
if user_id:
|
||||
self.upsert_monthly_active_user(user_id)
|
||||
yield self.upsert_monthly_active_user(user_id)
|
||||
reserved_user_list.append(user_id)
|
||||
else:
|
||||
logger.warning(
|
||||
|
|
@ -64,23 +64,27 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
Deferred[]
|
||||
"""
|
||||
def _reap_users(txn):
|
||||
# Purge stale users
|
||||
|
||||
thirty_days_ago = (
|
||||
int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
)
|
||||
# Purge stale users
|
||||
|
||||
# questionmarks is a hack to overcome sqlite not supporting
|
||||
# tuples in 'WHERE IN %s'
|
||||
questionmarks = '?' * len(self.reserved_users)
|
||||
query_args = [thirty_days_ago]
|
||||
query_args.extend(self.reserved_users)
|
||||
base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
|
||||
|
||||
sql = """
|
||||
DELETE FROM monthly_active_users
|
||||
WHERE timestamp < ?
|
||||
AND user_id NOT IN ({})
|
||||
""".format(','.join(questionmarks))
|
||||
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
|
||||
# when len(reserved_users) == 0. Works fine on sqlite.
|
||||
if len(self.reserved_users) > 0:
|
||||
# questionmarks is a hack to overcome sqlite not supporting
|
||||
# tuples in 'WHERE IN %s'
|
||||
questionmarks = '?' * len(self.reserved_users)
|
||||
|
||||
query_args.extend(self.reserved_users)
|
||||
sql = base_sql + """ AND user_id NOT IN ({})""".format(
|
||||
','.join(questionmarks)
|
||||
)
|
||||
else:
|
||||
sql = base_sql
|
||||
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
|
|
@ -93,16 +97,24 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
# negative LIMIT values. So there is no way to write it that both can
|
||||
# support
|
||||
query_args = [self.hs.config.max_mau_value]
|
||||
query_args.extend(self.reserved_users)
|
||||
sql = """
|
||||
|
||||
base_sql = """
|
||||
DELETE FROM monthly_active_users
|
||||
WHERE user_id NOT IN (
|
||||
SELECT user_id FROM monthly_active_users
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
)
|
||||
AND user_id NOT IN ({})
|
||||
""".format(','.join(questionmarks))
|
||||
"""
|
||||
# Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
|
||||
# when len(reserved_users) == 0. Works fine on sqlite.
|
||||
if len(self.reserved_users) > 0:
|
||||
query_args.extend(self.reserved_users)
|
||||
sql = base_sql + """ AND user_id NOT IN ({})""".format(
|
||||
','.join(questionmarks)
|
||||
)
|
||||
else:
|
||||
sql = base_sql
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
yield self.runInteraction("reap_monthly_active_users", _reap_users)
|
||||
|
|
@ -113,7 +125,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
# is racy.
|
||||
# Have resolved to invalidate the whole cache for now and do
|
||||
# something about it if and when the perf becomes significant
|
||||
self._user_last_seen_monthly_active.invalidate_all()
|
||||
self.user_last_seen_monthly_active.invalidate_all()
|
||||
self.get_monthly_active_count.invalidate_all()
|
||||
|
||||
@cached(num_args=0)
|
||||
|
|
@ -152,11 +164,11 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
lock=False,
|
||||
)
|
||||
if is_insert:
|
||||
self._user_last_seen_monthly_active.invalidate((user_id,))
|
||||
self.user_last_seen_monthly_active.invalidate((user_id,))
|
||||
self.get_monthly_active_count.invalidate(())
|
||||
|
||||
@cached(num_args=1)
|
||||
def _user_last_seen_monthly_active(self, user_id):
|
||||
def user_last_seen_monthly_active(self, user_id):
|
||||
"""
|
||||
Checks if a given user is part of the monthly active user group
|
||||
Arguments:
|
||||
|
|
@ -173,7 +185,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
},
|
||||
retcol="timestamp",
|
||||
allow_none=True,
|
||||
desc="_user_last_seen_monthly_active",
|
||||
desc="user_last_seen_monthly_active",
|
||||
))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
@ -185,7 +197,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
|
|||
user_id(str): the user_id to query
|
||||
"""
|
||||
if self.hs.config.limit_usage_by_mau:
|
||||
last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id)
|
||||
last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
|
||||
now = self.hs.get_clock().time_msec()
|
||||
|
||||
# We want to reduce to the total number of db writes, and are happy
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ from twisted.internet import defer
|
|||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches import intern_string
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.stringutils import to_ascii
|
||||
|
|
|
|||
|
|
@ -188,62 +188,30 @@ class Linearizer(object):
|
|||
# things blocked from executing.
|
||||
self.key_to_defer = {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def queue(self, key):
|
||||
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
|
||||
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
|
||||
# propagated inside inlineCallbacks until Twisted 18.7)
|
||||
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
|
||||
|
||||
# If the number of things executing is greater than the maximum
|
||||
# then add a deferred to the list of blocked items
|
||||
# When on of the things currently executing finishes it will callback
|
||||
# When one of the things currently executing finishes it will callback
|
||||
# this item so that it can continue executing.
|
||||
if entry[0] >= self.max_count:
|
||||
new_defer = defer.Deferred()
|
||||
entry[1][new_defer] = 1
|
||||
|
||||
logger.info(
|
||||
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
|
||||
)
|
||||
try:
|
||||
yield make_deferred_yieldable(new_defer)
|
||||
except Exception as e:
|
||||
if isinstance(e, CancelledError):
|
||||
logger.info(
|
||||
"Cancelling wait for linearizer lock %r for key %r",
|
||||
self.name, key,
|
||||
)
|
||||
else:
|
||||
logger.warn(
|
||||
"Unexpected exception waiting for linearizer lock %r for key %r",
|
||||
self.name, key,
|
||||
)
|
||||
|
||||
# we just have to take ourselves back out of the queue.
|
||||
del entry[1][new_defer]
|
||||
raise
|
||||
|
||||
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
|
||||
entry[0] += 1
|
||||
|
||||
# if the code holding the lock completes synchronously, then it
|
||||
# will recursively run the next claimant on the list. That can
|
||||
# relatively rapidly lead to stack exhaustion. This is essentially
|
||||
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
|
||||
#
|
||||
# In order to break the cycle, we add a cheeky sleep(0) here to
|
||||
# ensure that we fall back to the reactor between each iteration.
|
||||
#
|
||||
# (This needs to happen while we hold the lock, and the context manager's exit
|
||||
# code must be synchronous, so this is the only sensible place.)
|
||||
yield self._clock.sleep(0)
|
||||
|
||||
res = self._await_lock(key)
|
||||
else:
|
||||
logger.info(
|
||||
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
|
||||
)
|
||||
entry[0] += 1
|
||||
res = defer.succeed(None)
|
||||
|
||||
# once we successfully get the lock, we need to return a context manager which
|
||||
# will release the lock.
|
||||
|
||||
@contextmanager
|
||||
def _ctx_manager():
|
||||
def _ctx_manager(_):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
|
|
@ -264,7 +232,64 @@ class Linearizer(object):
|
|||
# map.
|
||||
del self.key_to_defer[key]
|
||||
|
||||
defer.returnValue(_ctx_manager())
|
||||
res.addCallback(_ctx_manager)
|
||||
return res
|
||||
|
||||
def _await_lock(self, key):
|
||||
"""Helper for queue: adds a deferred to the queue
|
||||
|
||||
Assumes that we've already checked that we've reached the limit of the number
|
||||
of lock-holders we allow. Creates a new deferred which is added to the list, and
|
||||
adds some management around cancellations.
|
||||
|
||||
Returns the deferred, which will callback once we have secured the lock.
|
||||
|
||||
"""
|
||||
entry = self.key_to_defer[key]
|
||||
|
||||
logger.info(
|
||||
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
|
||||
)
|
||||
|
||||
new_defer = make_deferred_yieldable(defer.Deferred())
|
||||
entry[1][new_defer] = 1
|
||||
|
||||
def cb(_r):
|
||||
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
|
||||
entry[0] += 1
|
||||
|
||||
# if the code holding the lock completes synchronously, then it
|
||||
# will recursively run the next claimant on the list. That can
|
||||
# relatively rapidly lead to stack exhaustion. This is essentially
|
||||
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
|
||||
#
|
||||
# In order to break the cycle, we add a cheeky sleep(0) here to
|
||||
# ensure that we fall back to the reactor between each iteration.
|
||||
#
|
||||
# (This needs to happen while we hold the lock, and the context manager's exit
|
||||
# code must be synchronous, so this is the only sensible place.)
|
||||
return self._clock.sleep(0)
|
||||
|
||||
def eb(e):
|
||||
logger.info("defer %r got err %r", new_defer, e)
|
||||
if isinstance(e, CancelledError):
|
||||
logger.info(
|
||||
"Cancelling wait for linearizer lock %r for key %r",
|
||||
self.name, key,
|
||||
)
|
||||
|
||||
else:
|
||||
logger.warn(
|
||||
"Unexpected exception waiting for linearizer lock %r for key %r",
|
||||
self.name, key,
|
||||
)
|
||||
|
||||
# we just have to take ourselves back out of the queue.
|
||||
del entry[1][new_defer]
|
||||
return e
|
||||
|
||||
new_defer.addCallbacks(cb, eb)
|
||||
return new_defer
|
||||
|
||||
|
||||
class ReadWriteLock(object):
|
||||
|
|
@ -25,7 +25,7 @@ from six import itervalues, string_types
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import logging
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
from synapse.util.caches import register_cache
|
||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.async_helpers import ObservableDeferred
|
||||
|
||||
|
||||
class SnapshotCache(object):
|
||||
|
|
|
|||
|
|
@ -526,7 +526,7 @@ _to_ignore = [
|
|||
"synapse.util.logcontext",
|
||||
"synapse.http.server",
|
||||
"synapse.storage._base",
|
||||
"synapse.util.async",
|
||||
"synapse.util.async_helpers",
|
||||
]
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue