mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Run things as background processes
This fixes #3518, and ensures that we get useful logs and metrics for lots of things that happen in the background. (There are certainly more things that happen in the background; these are just the common ones I've found running a single-process synapse locally).
This commit is contained in:
parent
65d6a0e477
commit
667fba68f3
@ -168,7 +168,7 @@ class TransactionQueue(object):
|
|||||||
|
|
||||||
# fire off a processing loop in the background
|
# fire off a processing loop in the background
|
||||||
run_as_background_process(
|
run_as_background_process(
|
||||||
"process_transaction_queue",
|
"process_event_queue_for_federation",
|
||||||
self._process_event_queue_loop,
|
self._process_event_queue_loop,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -434,14 +434,11 @@ class TransactionQueue(object):
|
|||||||
|
|
||||||
logger.debug("TX [%s] Starting transaction loop", destination)
|
logger.debug("TX [%s] Starting transaction loop", destination)
|
||||||
|
|
||||||
# Drop the logcontext before starting the transaction. It doesn't
|
run_as_background_process(
|
||||||
# really make sense to log all the outbound transactions against
|
"federation_transaction_transmission_loop",
|
||||||
# whatever path led us to this point: that's pretty arbitrary really.
|
self._transaction_transmission_loop,
|
||||||
#
|
destination,
|
||||||
# (this also means we can fire off _perform_transaction without
|
)
|
||||||
# yielding)
|
|
||||||
with logcontext.PreserveLoggingContext():
|
|
||||||
self._transaction_transmission_loop(destination)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _transaction_transmission_loop(self, destination):
|
def _transaction_transmission_loop(self, destination):
|
||||||
|
@ -19,6 +19,8 @@ from canonicaljson import json
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
|
|
||||||
from . import engines
|
from . import engines
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||||||
self._background_update_handlers = {}
|
self._background_update_handlers = {}
|
||||||
self._all_done = False
|
self._all_done = False
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def start_doing_background_updates(self):
|
def start_doing_background_updates(self):
|
||||||
logger.info("Starting background schema updates")
|
run_as_background_process(
|
||||||
|
"background_updates", self._run_background_updates,
|
||||||
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _run_background_updates(self):
|
||||||
|
logger.info("Starting background schema updates")
|
||||||
while True:
|
while True:
|
||||||
yield self.hs.get_clock().sleep(
|
yield self.hs.get_clock().sleep(
|
||||||
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
|
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
|
||||||
|
@ -19,6 +19,7 @@ from six import iteritems
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
|
|
||||||
from . import background_updates
|
from . import background_updates
|
||||||
@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||||||
self._batch_row_update[key] = (user_agent, device_id, now)
|
self._batch_row_update[key] = (user_agent, device_id, now)
|
||||||
|
|
||||||
def _update_client_ips_batch(self):
|
def _update_client_ips_batch(self):
|
||||||
|
def update():
|
||||||
to_update = self._batch_row_update
|
to_update = self._batch_row_update
|
||||||
self._batch_row_update = {}
|
self._batch_row_update = {}
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
|
"_update_client_ips_batch", self._update_client_ips_batch_txn,
|
||||||
|
to_update,
|
||||||
|
)
|
||||||
|
|
||||||
|
run_as_background_process(
|
||||||
|
"update_client_ips", update,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _update_client_ips_batch_txn(self, txn, to_update):
|
def _update_client_ips_batch_txn(self, txn, to_update):
|
||||||
|
@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError
|
|||||||
# these are only included to make the type annotations work
|
# these are only included to make the type annotations work
|
||||||
from synapse.events import EventBase # noqa: F401
|
from synapse.events import EventBase # noqa: F401
|
||||||
from synapse.events.snapshot import EventContext # noqa: F401
|
from synapse.events.snapshot import EventContext # noqa: F401
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.events_worker import EventsWorkerStore
|
from synapse.storage.events_worker import EventsWorkerStore
|
||||||
from synapse.types import RoomStreamToken, get_domain_from_id
|
from synapse.types import RoomStreamToken, get_domain_from_id
|
||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import ObservableDeferred
|
||||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
@ -155,11 +156,8 @@ class _EventPeristenceQueue(object):
|
|||||||
self._event_persist_queues[room_id] = queue
|
self._event_persist_queues[room_id] = queue
|
||||||
self._currently_persisting_rooms.discard(room_id)
|
self._currently_persisting_rooms.discard(room_id)
|
||||||
|
|
||||||
# set handle_queue_loop off on the background. We don't want to
|
# set handle_queue_loop off in the background
|
||||||
# attribute work done in it to the current request, so we drop the
|
run_as_background_process("persist_events", handle_queue_loop)
|
||||||
# logcontext altogether.
|
|
||||||
with PreserveLoggingContext():
|
|
||||||
handle_queue_loop()
|
|
||||||
|
|
||||||
def _get_drainining_queue(self, room_id):
|
def _get_drainining_queue(self, room_id):
|
||||||
queue = self._event_persist_queues.setdefault(room_id, deque())
|
queue = self._event_persist_queues.setdefault(room_id, deque())
|
||||||
|
@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
|
|||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
from synapse.events.snapshot import EventContext # noqa: F401
|
from synapse.events.snapshot import EventContext # noqa: F401
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.logcontext import (
|
from synapse.util.logcontext import (
|
||||||
LoggingContext,
|
LoggingContext,
|
||||||
PreserveLoggingContext,
|
PreserveLoggingContext,
|
||||||
@ -322,9 +323,10 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
should_start = False
|
should_start = False
|
||||||
|
|
||||||
if should_start:
|
if should_start:
|
||||||
with PreserveLoggingContext():
|
run_as_background_process(
|
||||||
self.runWithConnection(
|
"fetch_events",
|
||||||
self._do_fetch
|
self.runWithConnection,
|
||||||
|
self._do_fetch,
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Loading %d events", len(events))
|
logger.debug("Loading %d events", len(events))
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
import logging
|
import logging
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.caches import register_cache
|
from synapse.util.caches import register_cache
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -63,7 +64,10 @@ class ExpiringCache(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
def f():
|
def f():
|
||||||
self._prune_cache()
|
run_as_background_process(
|
||||||
|
"prune_cache_%s" % self._cache_name,
|
||||||
|
self._prune_cache,
|
||||||
|
)
|
||||||
|
|
||||||
self._clock.looping_call(f, self._expiry_ms / 2)
|
self._clock.looping_call(f, self._expiry_ms / 2)
|
||||||
|
|
||||||
|
@ -75,6 +75,10 @@ class Distributor(object):
|
|||||||
self.pre_registration[name].append(observer)
|
self.pre_registration[name].append(observer)
|
||||||
|
|
||||||
def fire(self, name, *args, **kwargs):
|
def fire(self, name, *args, **kwargs):
|
||||||
|
"""Dispatches the given signal to the registered observers.
|
||||||
|
|
||||||
|
Runs the observers as a background process. Does not return a deferred.
|
||||||
|
"""
|
||||||
if name not in self.signals:
|
if name not in self.signals:
|
||||||
raise KeyError("%r does not have a signal named %s" % (self, name))
|
raise KeyError("%r does not have a signal named %s" % (self, name))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user