Move additional tasks to the background worker (#8458)

This commit is contained in:
Patrick Cloke 2020-10-07 11:27:56 -04:00 committed by GitHub
parent 8dbf62fada
commit e4f72ddc44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 195 additions and 197 deletions

1
changelog.d/8458.feature Normal file
View File

@ -0,0 +1 @@
Allow running background tasks in a separate worker process.

View File

@ -127,6 +127,7 @@ from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
@ -135,6 +136,7 @@ from synapse.storage.databases.main.monthly_active_users import (
from synapse.storage.databases.main.presence import UserPresenceState
from synapse.storage.databases.main.search import SearchWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.types import ReadReceipt
@ -466,6 +468,7 @@ class GenericWorkerSlavedStore(
SlavedAccountDataStore,
SlavedPusherStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
@ -481,6 +484,7 @@ class GenericWorkerSlavedStore(
MediaRepositoryStore,
ServerMetricsStore,
SearchWorkerStore,
TransactionWorkerStore,
BaseSlavedStore,
):
pass

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import math
import resource
@ -19,7 +18,10 @@ import sys
from prometheus_client import Gauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
logger = logging.getLogger("synapse.app.homeserver")
@ -41,6 +43,7 @@ registered_reserved_users_mau_gauge = Gauge(
)
@wrap_as_background_process("phone_stats_home")
async def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time())
@ -143,20 +146,10 @@ def start_phone_stats_home(hs):
(int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
)
def start_phone_stats_home():
return run_as_background_process(
"phone_stats_home", phone_stats_home, hs, stats
)
def generate_user_daily_visit_stats():
return run_as_background_process(
"generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits
)
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000)
# monthly active user limiting functionality
def reap_monthly_active_users():
@ -167,6 +160,7 @@ def start_phone_stats_home(hs):
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
reap_monthly_active_users()
@wrap_as_background_process("generate_monthly_active_users")
async def generate_monthly_active_users():
current_mau_count = 0
current_mau_count_by_service = {}
@ -186,19 +180,14 @@ def start_phone_stats_home(hs):
registered_reserved_users_mau_gauge.set(float(len(reserved_users)))
max_mau_gauge.set(float(hs.config.max_mau_value))
def start_generate_monthly_active_users():
return run_as_background_process(
"generate_monthly_active_users", generate_monthly_active_users
)
if hs.config.limit_usage_by_mau or hs.config.mau_stats_only:
start_generate_monthly_active_users()
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
generate_monthly_active_users()
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000, hs, stats)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@ -206,4 +195,4 @@ def start_phone_stats_home(hs):
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, start_phone_stats_home)
clock.call_later(5 * 60, phone_stats_home, hs, stats)

View File

@ -351,7 +351,63 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return updated
class ClientIpStore(ClientIpBackgroundUpdateStore):
class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self.user_ips_max_age = hs.config.user_ips_max_age
if hs.config.run_background_tasks and self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""
if self.user_ips_max_age is None:
# Nothing to do
return
if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return
# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""
timestamp = self.clock.time_msec() - self.user_ips_max_age
def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))
await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)
class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
self.client_ip_last_seen = Cache(
@ -360,8 +416,6 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
super().__init__(database, db_conn, hs)
self.user_ips_max_age = hs.config.user_ips_max_age
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
@ -372,9 +426,6 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
"before", "shutdown", self._update_client_ips_batch
)
if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
async def insert_client_ip(
self, user_id, access_token, ip, user_agent, device_id, now=None
):
@ -525,49 +576,3 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
}
for (access_token, ip), (user_agent, last_seen) in results.items()
]
@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""
if self.user_ips_max_age is None:
# Nothing to do
return
if not await self.db_pool.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return
# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""
timestamp = self.clock.time_msec() - self.user_ips_max_age
def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))
await self.db_pool.runInteraction(
"_prune_old_user_ips", _prune_old_user_ips_txn
)

View File

@ -18,7 +18,7 @@ import time
from typing import Dict
from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.event_push_actions import (
@ -57,18 +57,13 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
super().__init__(database, db_conn, hs)
# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"read_forward_extremities", self._read_forward_extremities
)
hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
if hs.config.run_background_tasks:
self._clock.looping_call(self._read_forward_extremities, 60 * 60 * 1000)
# Used in _generate_user_daily_visits to keep track of progress
self._last_user_visit_update = self._get_start_of_day()
@wrap_as_background_process("read_forward_extremities")
async def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
@ -274,6 +269,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
return today_start * 1000
@wrap_as_background_process("generate_user_daily_visits")
async def generate_user_daily_visits(self) -> None:
"""
Generates daily visit data for use in cohort/ retention analysis

View File

@ -14,14 +14,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
from synapse.storage.types import Cursor
@ -48,6 +50,21 @@ class RegistrationWorkerStore(SQLBaseStore):
database.engine, find_max_generated_user_id_localpart, "user_id_seq",
)
self._account_validity = hs.config.account_validity
if hs.config.run_background_tasks and self._account_validity.enabled:
self._clock.call_later(
0.0,
run_as_background_process,
"account_validity_set_expiration_dates",
self._set_expiration_date_when_missing,
)
# Create a background job for culling expired 3PID validity tokens
if hs.config.run_background_tasks:
self.clock.looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
)
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
return await self.db_pool.simple_select_one(
@ -778,6 +795,78 @@ class RegistrationWorkerStore(SQLBaseStore):
"delete_threepid_session", delete_threepid_session_txn
)
@wrap_as_background_process("cull_expired_threepid_validation_tokens")
async def cull_expired_threepid_validation_tokens(self) -> None:
"""Remove threepid validation tokens with expiry dates that have passed"""
def cull_expired_threepid_validation_tokens_txn(txn, ts):
sql = """
DELETE FROM threepid_validation_token WHERE
expires < ?
"""
txn.execute(sql, (ts,))
await self.db_pool.runInteraction(
"cull_expired_threepid_validation_tokens",
cull_expired_threepid_validation_tokens_txn,
self.clock.time_msec(),
)
async def _set_expiration_date_when_missing(self):
"""
Retrieves the list of registered users that don't have an expiration date, and
adds an expiration date for each of them.
"""
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
" WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
res = self.db_pool.cursor_to_dict(txn)
if res:
for user in res:
self.set_expiration_date_for_user_txn(
txn, user["name"], use_delta=True
)
await self.db_pool.runInteraction(
"get_users_with_no_expiration_date",
select_users_with_no_expiration_date_txn,
)
def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
"""Sets an expiration date to the account with the given user ID.
Args:
user_id (str): User ID to set an expiration date for.
use_delta (bool): If set to False, the expiration date for the user will be
now + validity period. If set to True, this expiration date will be a
random value in the [now + period - d ; now + period] range, d being a
delta equal to 10% of the validity period.
"""
now_ms = self._clock.time_msec()
expiration_ts = now_ms + self._account_validity.period
if use_delta:
expiration_ts = self.rand.randrange(
expiration_ts - self._account_validity.startup_job_max_delta,
expiration_ts,
)
self.db_pool.simple_upsert_txn(
txn,
"account_validity",
keyvalues={"user_id": user_id},
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
)
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@ -911,28 +1000,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._account_validity = hs.config.account_validity
self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors
if self._account_validity.enabled:
self._clock.call_later(
0.0,
run_as_background_process,
"account_validity_set_expiration_dates",
self._set_expiration_date_when_missing,
)
# Create a background job for culling expired 3PID validity tokens
def start_cull():
# run as a background process to make sure that the database transactions
# have a logcontext to report to
return run_as_background_process(
"cull_expired_threepid_validation_tokens",
self.cull_expired_threepid_validation_tokens,
)
hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
async def add_access_token_to_user(
self,
user_id: str,
@ -1477,22 +1546,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn,
)
async def cull_expired_threepid_validation_tokens(self) -> None:
"""Remove threepid validation tokens with expiry dates that have passed"""
def cull_expired_threepid_validation_tokens_txn(txn, ts):
sql = """
DELETE FROM threepid_validation_token WHERE
expires < ?
"""
txn.execute(sql, (ts,))
await self.db_pool.runInteraction(
"cull_expired_threepid_validation_tokens",
cull_expired_threepid_validation_tokens_txn,
self.clock.time_msec(),
)
async def set_user_deactivated_status(
self, user_id: str, deactivated: bool
) -> None:
@ -1522,61 +1575,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
)
txn.call_after(self.is_guest.invalidate, (user_id,))
async def _set_expiration_date_when_missing(self):
"""
Retrieves the list of registered users that don't have an expiration date, and
adds an expiration date for each of them.
"""
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
" WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
res = self.db_pool.cursor_to_dict(txn)
if res:
for user in res:
self.set_expiration_date_for_user_txn(
txn, user["name"], use_delta=True
)
await self.db_pool.runInteraction(
"get_users_with_no_expiration_date",
select_users_with_no_expiration_date_txn,
)
def set_expiration_date_for_user_txn(self, txn, user_id, use_delta=False):
"""Sets an expiration date to the account with the given user ID.
Args:
user_id (str): User ID to set an expiration date for.
use_delta (bool): If set to False, the expiration date for the user will be
now + validity period. If set to True, this expiration date will be a
random value in the [now + period - d ; now + period] range, d being a
delta equal to 10% of the validity period.
"""
now_ms = self._clock.time_msec()
expiration_ts = now_ms + self._account_validity.period
if use_delta:
expiration_ts = self.rand.randrange(
expiration_ts - self._account_validity.startup_job_max_delta,
expiration_ts,
)
self.db_pool.simple_upsert_txn(
txn,
"account_validity",
keyvalues={"user_id": user_id},
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
)
def find_max_generated_user_id_localpart(cur: Cursor) -> int:
"""

View File

@ -61,7 +61,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
self._check_safe_current_state_events_membership_updated_txn(txn)
txn.close()
if self.hs.config.metrics_flags.known_servers:
if (
self.hs.config.run_background_tasks
and self.hs.config.metrics_flags.known_servers
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
run_as_background_process,

View File

@ -19,7 +19,7 @@ from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@ -43,15 +43,33 @@ _UpdateTransactionRow = namedtuple(
SENTINEL = object()
class TransactionStore(SQLBaseStore):
class TransactionWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
if hs.config.run_background_tasks:
self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
@wrap_as_background_process("cleanup_transactions")
async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)
class TransactionStore(TransactionWorkerStore):
"""A collection of queries for handling PDUs.
"""
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
@ -266,22 +284,6 @@ class TransactionStore(SQLBaseStore):
},
)
def _start_cleanup_transactions(self):
return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions
)
async def _cleanup_transactions(self) -> None:
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
def _cleanup_transactions_txn(txn):
txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
await self.db_pool.runInteraction(
"_cleanup_transactions", _cleanup_transactions_txn
)
async def store_destination_rooms_entries(
self, destinations: Iterable[str], room_id: str, stream_ordering: int,
) -> None: