Merge branch 'develop' into babolivier/msc2326_bg_update

This commit is contained in:
Brendan Abolivier 2019-11-04 18:09:50 +00:00
commit e252ffadbc
10 changed files with 256 additions and 93 deletions

1
changelog.d/6305.misc Normal file
View File

@ -0,0 +1 @@
Add some documentation about worker replication.

1
changelog.d/6318.misc Normal file
View File

@ -0,0 +1 @@
Remove the dependency on psutil and replace functionality with the stdlib `resource` module.

View File

@ -199,7 +199,20 @@ client (C):
#### REPLICATE (C) #### REPLICATE (C)
Asks the server to replicate a given stream Asks the server to replicate a given stream. The syntax is:
```
REPLICATE <stream_name> <token>
```
Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.
The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
#### USER_SYNC (C) #### USER_SYNC (C)

View File

@ -19,12 +19,13 @@ from __future__ import print_function
import gc import gc
import logging import logging
import math
import os import os
import resource
import sys import sys
from six import iteritems from six import iteritems
import psutil
from prometheus_client import Gauge from prometheus_client import Gauge
from twisted.application import service from twisted.application import service
@ -471,6 +472,87 @@ class SynapseService(service.Service):
return self._port.stopListening() return self._port.stopListening()
# Contains the list of processes we will be monitoring
# currently either 0 or 1
_stats_process = []
@defer.inlineCallbacks
def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - hs.start_time)
if uptime < 0:
uptime = 0
stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR
stats["event_cache_size"] = hs.config.event_cache_size
#
# Performance statistics
#
old = stats_process[0]
new = (now, resource.getrusage(resource.RUSAGE_SELF))
stats_process[0] = new
# Get RSS in bytes
stats["memory_rss"] = new[1].ru_maxrss
# Get CPU time in % of a single core, not % of all cores
used_cpu_time = (new[1].ru_utime + new[1].ru_stime) - (
old[1].ru_utime + old[1].ru_stime
)
if used_cpu_time == 0 or new[0] == old[0]:
stats["cpu_average"] = 0
else:
stats["cpu_average"] = math.floor(used_cpu_time / (new[0] - old[0]) * 100)
#
# Database version
#
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try:
yield hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)
def run(hs): def run(hs):
PROFILE_SYNAPSE = False PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE: if PROFILE_SYNAPSE:
@ -497,91 +579,19 @@ def run(hs):
reactor.run = profile(reactor.run) reactor.run = profile(reactor.run)
clock = hs.get_clock() clock = hs.get_clock()
start_time = clock.time()
stats = {} stats = {}
# Contains the list of processes we will be monitoring def performance_stats_init():
# currently either 0 or 1 _stats_process.clear()
stats_process = [] _stats_process.append(
(int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF)))
)
def start_phone_stats_home(): def start_phone_stats_home():
return run_as_background_process("phone_stats_home", phone_stats_home) return run_as_background_process(
"phone_stats_home", phone_stats_home, hs, stats
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
uptime = int(now - start_time)
if uptime < 0:
uptime = 0
stats["homeserver"] = hs.config.server_name
stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
) )
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats[
"daily_active_rooms"
] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = CACHE_SIZE_FACTOR
stats["event_cache_size"] = hs.config.event_cache_size
if len(stats_process) > 0:
stats["memory_rss"] = 0
stats["cpu_average"] = 0
for process in stats_process:
stats["memory_rss"] += process.memory_info().rss
stats["cpu_average"] += int(process.cpu_percent(interval=None))
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
logger.info(
"Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)
)
try:
yield hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats
)
except Exception as e:
logger.warning("Error reporting stats: %s", e)
def performance_stats_init():
try:
process = psutil.Process()
# Ensure we can fetch both, and make the initial request for cpu_percent
# so the next request will use this as the initial point.
process.memory_info().rss
process.cpu_percent(interval=None)
logger.info("report_stats can use psutil")
stats_process.append(process)
except (AttributeError):
logger.warning("Unable to read memory/cpu stats. Disabling reporting.")
def generate_user_daily_visit_stats(): def generate_user_daily_visit_stats():
return run_as_background_process( return run_as_background_process(
@ -626,7 +636,7 @@ def run(hs):
if hs.config.report_stats: if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals") logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000) clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
# We need to defer this init for the cases that we daemonize # We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process # otherwise the process ID we get is that of the non-daemon process
@ -634,7 +644,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can # We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes # be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home) clock.call_later(5 * 60, start_phone_stats_home, hs, stats)
_base.start_reactor( _base.start_reactor(
"synapse-homeserver", "synapse-homeserver",

View File

@ -61,7 +61,6 @@ REQUIREMENTS = [
"bcrypt>=3.1.0", "bcrypt>=3.1.0",
"pillow>=4.3.0", "pillow>=4.3.0",
"sortedcontainers>=1.4.4", "sortedcontainers>=1.4.4",
"psutil>=2.0.0",
"pymacaroons>=0.13.0", "pymacaroons>=0.13.0",
"msgpack>=0.5.2", "msgpack>=0.5.2",
"phonenumbers>=8.2.0", "phonenumbers>=8.2.0",

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Dict
import six import six
@ -44,7 +45,14 @@ class BaseSlavedStore(SQLBaseStore):
self.hs = hs self.hs = hs
def stream_positions(self): def stream_positions(self) -> Dict[str, int]:
"""
Get the current positions of all the streams this store wants to subscribe to
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
pos = {} pos = {}
if self._cache_id_gen: if self._cache_id_gen:
pos["caches"] = self._cache_id_gen.get_current_token() pos["caches"] = self._cache_id_gen.get_current_token()

View File

@ -16,10 +16,17 @@
""" """
import logging import logging
from typing import Dict
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory from twisted.internet.protocol import ReconnectingClientFactory
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import (
AbstractReplicationClientHandler,
ClientReplicationStreamProtocol,
)
from .commands import ( from .commands import (
FederationAckCommand, FederationAckCommand,
InvalidateCacheCommand, InvalidateCacheCommand,
@ -27,7 +34,6 @@ from .commands import (
UserIpCommand, UserIpCommand,
UserSyncCommand, UserSyncCommand,
) )
from .protocol import ClientReplicationStreamProtocol
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -42,7 +48,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
maxDelay = 30 # Try at least once every N seconds maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler): def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name self.client_name = client_name
self.handler = handler self.handler = handler
self.server_name = hs.config.server_name self.server_name = hs.config.server_name
@ -68,13 +74,13 @@ class ReplicationClientFactory(ReconnectingClientFactory):
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
class ReplicationClientHandler(object): class ReplicationClientHandler(AbstractReplicationClientHandler):
"""A base handler that can be passed to the ReplicationClientFactory. """A base handler that can be passed to the ReplicationClientFactory.
By default proxies incoming replication data to the SlaveStore. By default proxies incoming replication data to the SlaveStore.
""" """
def __init__(self, store): def __init__(self, store: BaseSlavedStore):
self.store = store self.store = store
# The current connection. None if we are currently (re)connecting # The current connection. None if we are currently (re)connecting
@ -138,11 +144,13 @@ class ReplicationClientHandler(object):
if d: if d:
d.callback(data) d.callback(data)
def get_streams_to_replicate(self): def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to """Called when a new connection has been established and we need to
subscribe to streams. subscribe to streams.
Returns a dictionary of stream name to token. Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
""" """
args = self.store.stream_positions() args = self.store.stream_positions()
user_account_data = args.pop("user_account_data", None) user_account_data = args.pop("user_account_data", None)

View File

@ -48,7 +48,7 @@ indicate which side is sending, these are *not* included on the wire::
> ERROR server stopping > ERROR server stopping
* connection closed by server * * connection closed by server *
""" """
import abc
import fcntl import fcntl
import logging import logging
import struct import struct
@ -65,6 +65,7 @@ from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
from .commands import ( from .commands import (
@ -558,11 +559,80 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.streamer.lost_connection(self) self.streamer.lost_connection(self)
class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
The interface for the handler that should be passed to
ClientReplicationStreamProtocol
"""
@abc.abstractmethod
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
raise NotImplementedError()
@abc.abstractmethod
def on_position(self, stream_name, token):
"""Called when we get new position data."""
raise NotImplementedError()
@abc.abstractmethod
def on_sync(self, data):
"""Called when get a new SYNC command."""
raise NotImplementedError()
@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
subscribe to streams.
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
raise NotImplementedError()
@abc.abstractmethod
def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users."""
raise NotImplementedError()
@abc.abstractmethod
def update_connection(self, connection):
"""Called when a connection has been established (or lost with None).
"""
raise NotImplementedError()
@abc.abstractmethod
def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
raise NotImplementedError()
class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS
def __init__(self, client_name, server_name, clock, handler): def __init__(
self,
client_name: str,
server_name: str,
clock: Clock,
handler: AbstractReplicationClientHandler,
):
BaseReplicationStreamProtocol.__init__(self, clock) BaseReplicationStreamProtocol.__init__(self, clock)
self.client_name = client_name self.client_name = client_name

View File

@ -221,6 +221,7 @@ class HomeServer(object):
self.hostname = hostname self.hostname = hostname
self._building = {} self._building = {}
self._listening_services = [] self._listening_services = []
self.start_time = None
self.clock = Clock(reactor) self.clock = Clock(reactor)
self.distributor = Distributor() self.distributor = Distributor()
@ -240,6 +241,7 @@ class HomeServer(object):
datastore = self.DATASTORE_CLASS(conn, self) datastore = self.DATASTORE_CLASS(conn, self)
self.datastores = DataStores(datastore, conn, self) self.datastores = DataStores(datastore, conn, self)
conn.commit() conn.commit()
self.start_time = int(self.get_clock().time())
logger.info("Finished setting up.") logger.info("Finished setting up.")
def setup_master(self): def setup_master(self):

51
tests/test_phone_home.py Normal file
View File

@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import resource
import mock
from synapse.app.homeserver import phone_stats_home
from tests.unittest import HomeserverTestCase
class PhoneHomeStatsTestCase(HomeserverTestCase):
def test_performance_frozen_clock(self):
"""
If time doesn't move, don't error out.
"""
past_stats = [
(self.hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF))
]
stats = {}
self.get_success(phone_stats_home(self.hs, stats, past_stats))
self.assertEqual(stats["cpu_average"], 0)
def test_performance_100(self):
"""
1 second of usage over 1 second is 100% CPU usage.
"""
real_res = resource.getrusage(resource.RUSAGE_SELF)
old_resource = mock.Mock(spec=real_res)
old_resource.ru_utime = real_res.ru_utime - 1
old_resource.ru_stime = real_res.ru_stime
old_resource.ru_maxrss = real_res.ru_maxrss
past_stats = [(self.hs.get_clock().time(), old_resource)]
stats = {}
self.reactor.advance(1)
self.get_success(phone_stats_home(self.hs, stats, past_stats))
self.assertApproximates(stats["cpu_average"], 100, tolerance=2.5)