Merge pull request #3556 from matrix-org/rav/background_processes

Run things as background processes
This commit is contained in:
Richard van der Hoff 2018-07-19 11:04:18 +01:00 committed by GitHub
commit c754e006f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 104 deletions

1
changelog.d/3556.feature Normal file
View File

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

View File

@ -168,7 +168,7 @@ class TransactionQueue(object):
# fire off a processing loop in the background # fire off a processing loop in the background
run_as_background_process( run_as_background_process(
"process_transaction_queue", "process_event_queue_for_federation",
self._process_event_queue_loop, self._process_event_queue_loop,
) )
@ -434,14 +434,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination) logger.debug("TX [%s] Starting transaction loop", destination)
# Drop the logcontext before starting the transaction. It doesn't run_as_background_process(
# really make sense to log all the outbound transactions against "federation_transaction_transmission_loop",
# whatever path led us to this point: that's pretty arbitrary really. self._transaction_transmission_loop,
# destination,
# (this also means we can fire off _perform_transaction without )
# yielding)
with logcontext.PreserveLoggingContext():
self._transaction_transmission_loop(destination)
@defer.inlineCallbacks @defer.inlineCallbacks
def _transaction_transmission_loop(self, destination): def _transaction_transmission_loop(self, destination):

View File

@ -19,6 +19,8 @@ from canonicaljson import json
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from . import engines from . import engines
from ._base import SQLBaseStore from ._base import SQLBaseStore
@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {} self._background_update_handlers = {}
self._all_done = False self._all_done = False
@defer.inlineCallbacks
def start_doing_background_updates(self): def start_doing_background_updates(self):
logger.info("Starting background schema updates") run_as_background_process(
"background_updates", self._run_background_updates,
)
@defer.inlineCallbacks
def _run_background_updates(self):
logger.info("Starting background schema updates")
while True: while True:
yield self.hs.get_clock().sleep( yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)

View File

@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates from . import background_updates
@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now) self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self): def _update_client_ips_batch(self):
def update():
to_update = self._batch_row_update to_update = self._batch_row_update
self._batch_row_update = {} self._batch_row_update = {}
return self.runInteraction( return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update "_update_client_ips_batch", self._update_client_ips_batch_txn,
to_update,
)
run_as_background_process(
"update_client_ips", update,
) )
def _update_client_ips_batch_txn(self, txn, to_update): def _update_client_ips_batch_txn(self, txn, to_update):

View File

@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work # these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401 from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -155,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id) self._currently_persisting_rooms.discard(room_id)
# set handle_queue_loop off on the background. We don't want to # set handle_queue_loop off in the background
# attribute work done in it to the current request, so we drop the run_as_background_process("persist_events", handle_queue_loop)
# logcontext altogether.
with PreserveLoggingContext():
handle_queue_loop()
def _get_drainining_queue(self, room_id): def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque()) queue = self._event_persist_queues.setdefault(room_id, deque())

View File

@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event from synapse.events.utils import prune_event
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import ( from synapse.util.logcontext import (
LoggingContext, LoggingContext,
PreserveLoggingContext, PreserveLoggingContext,
@ -322,9 +323,10 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False should_start = False
if should_start: if should_start:
with PreserveLoggingContext(): run_as_background_process(
self.runWithConnection( "fetch_events",
self._do_fetch self.runWithConnection,
self._do_fetch,
) )
logger.debug("Loading %d events", len(events)) logger.debug("Loading %d events", len(events))

View File

@ -16,6 +16,7 @@
import logging import logging
from collections import OrderedDict from collections import OrderedDict
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache from synapse.util.caches import register_cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -63,7 +64,10 @@ class ExpiringCache(object):
return return
def f(): def f():
self._prune_cache() run_as_background_process(
"prune_cache_%s" % self._cache_name,
self._prune_cache,
)
self._clock.looping_call(f, self._expiry_ms / 2) self._clock.looping_call(f, self._expiry_ms / 2)

View File

@ -17,19 +17,17 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.util import unwrapFirstError from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id): def user_left_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_left_room", user=user, room_id=room_id) distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id): def user_joined_room(distributor, user, room_id):
with PreserveLoggingContext():
distributor.fire("user_joined_room", user=user, room_id=room_id) distributor.fire("user_joined_room", user=user, room_id=room_id)
@ -44,9 +42,7 @@ class Distributor(object):
model will do for today. model will do for today.
""" """
def __init__(self, suppress_failures=True): def __init__(self):
self.suppress_failures = suppress_failures
self.signals = {} self.signals = {}
self.pre_registration = {} self.pre_registration = {}
@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal( self.signals[name] = Signal(
name, name,
suppress_failures=self.suppress_failures,
) )
if name in self.pre_registration: if name in self.pre_registration:
@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer) self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs): def fire(self, name, *args, **kwargs):
"""Dispatches the given signal to the registered observers.
Runs the observers as a background process. Does not return a deferred.
"""
if name not in self.signals: if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name)) raise KeyError("%r does not have a signal named %s" % (self, name))
return self.signals[name].fire(*args, **kwargs) run_as_background_process(
name,
self.signals[name].fire,
*args, **kwargs
)
class Signal(object): class Signal(object):
@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers. method into all of the observers.
""" """
def __init__(self, name, suppress_failures): def __init__(self, name):
self.name = name self.name = name
self.suppress_failures = suppress_failures
self.observers = [] self.observers = []
def observe(self, observer): def observe(self, observer):
@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred.""" Each observer callable may return a Deferred."""
self.observers.append(observer) self.observers.append(observer)
@defer.inlineCallbacks
def fire(self, *args, **kwargs): def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and """Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is kwargs. Exceptions thrown by observers are logged but ignored. It is
@ -121,22 +122,17 @@ class Signal(object):
failure.type, failure.type,
failure.value, failure.value,
failure.getTracebackObject())) failure.getTracebackObject()))
if not self.suppress_failures:
return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
with PreserveLoggingContext():
deferreds = [ deferreds = [
do(observer) run_in_background(do, o)
for observer in self.observers for o in self.observers
] ]
res = yield defer.gatherResults( return make_deferred_yieldable(defer.gatherResults(
deferreds, consumeErrors=True deferreds, consumeErrors=True,
).addErrback(unwrapFirstError) ))
defer.returnValue(res)
def __repr__(self): def __repr__(self):
return "<Signal name=%r>" % (self.name,) return "<Signal name=%r>" % (self.name,)

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd # Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -15,8 +16,6 @@
from mock import Mock, patch from mock import Mock, patch
from twisted.internet import defer
from synapse.util.distributor import Distributor from synapse.util.distributor import Distributor
from . import unittest from . import unittest
@ -27,38 +26,15 @@ class DistributorTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.dist = Distributor() self.dist = Distributor()
@defer.inlineCallbacks
def test_signal_dispatch(self): def test_signal_dispatch(self):
self.dist.declare("alert") self.dist.declare("alert")
observer = Mock() observer = Mock()
self.dist.observe("alert", observer) self.dist.observe("alert", observer)
d = self.dist.fire("alert", 1, 2, 3) self.dist.fire("alert", 1, 2, 3)
yield d
self.assertTrue(d.called)
observer.assert_called_with(1, 2, 3) observer.assert_called_with(1, 2, 3)
@defer.inlineCallbacks
def test_signal_dispatch_deferred(self):
self.dist.declare("whine")
d_inner = defer.Deferred()
def observer():
return d_inner
self.dist.observe("whine", observer)
d_outer = self.dist.fire("whine")
self.assertFalse(d_outer.called)
d_inner.callback(None)
yield d_outer
self.assertTrue(d_outer.called)
@defer.inlineCallbacks
def test_signal_catch(self): def test_signal_catch(self):
self.dist.declare("alarm") self.dist.declare("alarm")
@ -71,9 +47,7 @@ class DistributorTestCase(unittest.TestCase):
with patch( with patch(
"synapse.util.distributor.logger", spec=["warning"] "synapse.util.distributor.logger", spec=["warning"]
) as mock_logger: ) as mock_logger:
d = self.dist.fire("alarm", "Go") self.dist.fire("alarm", "Go")
yield d
self.assertTrue(d.called)
observers[0].assert_called_once_with("Go") observers[0].assert_called_once_with("Go")
observers[1].assert_called_once_with("Go") observers[1].assert_called_once_with("Go")
@ -83,34 +57,12 @@ class DistributorTestCase(unittest.TestCase):
mock_logger.warning.call_args[0][0], str mock_logger.warning.call_args[0][0], str
) )
@defer.inlineCallbacks
def test_signal_catch_no_suppress(self):
# Gut-wrenching
self.dist.suppress_failures = False
self.dist.declare("whail")
class MyException(Exception):
pass
@defer.inlineCallbacks
def observer():
raise MyException("Oopsie")
self.dist.observe("whail", observer)
d = self.dist.fire("whail")
yield self.assertFailure(d, MyException)
self.dist.suppress_failures = True
@defer.inlineCallbacks
def test_signal_prereg(self): def test_signal_prereg(self):
observer = Mock() observer = Mock()
self.dist.observe("flare", observer) self.dist.observe("flare", observer)
self.dist.declare("flare") self.dist.declare("flare")
yield self.dist.fire("flare", 4, 5) self.dist.fire("flare", 4, 5)
observer.assert_called_with(4, 5) observer.assert_called_with(4, 5)