Fix some looping_call calls which were broken in #3604

It turns out that looping_call does check the deferred returned by its
callback, and (at least in the case of client_ips), we were relying on this,
and I broke it in #3604.

Update run_as_background_process to return the deferred, and make sure we
return it to clock.looping_call.
This commit is contained in:
Richard van der Hoff 2018-07-26 11:44:26 +01:00
parent 1bcd0490c2
commit 03751a6420
13 changed files with 24 additions and 15 deletions

1
changelog.d/3610.feature Normal file
View File

@ -0,0 +1 @@
Add metrics to track resource usage by background processes

View File

@ -429,7 +429,7 @@ def run(hs):
stats_process = [] stats_process = []
def start_phone_stats_home(): def start_phone_stats_home():
run_as_background_process("phone_stats_home", phone_stats_home) return run_as_background_process("phone_stats_home", phone_stats_home)
@defer.inlineCallbacks @defer.inlineCallbacks
def phone_stats_home(): def phone_stats_home():
@ -502,7 +502,7 @@ def run(hs):
) )
def generate_user_daily_visit_stats(): def generate_user_daily_visit_stats():
run_as_background_process( return run_as_background_process(
"generate_user_daily_visits", "generate_user_daily_visits",
hs.get_datastore().generate_user_daily_visits, hs.get_datastore().generate_user_daily_visits,
) )

View File

@ -153,7 +153,7 @@ class GroupAttestionRenewer(object):
defer.returnValue({}) defer.returnValue({})
def _start_renew_attestations(self): def _start_renew_attestations(self):
run_as_background_process("renew_attestations", self._renew_attestations) return run_as_background_process("renew_attestations", self._renew_attestations)
@defer.inlineCallbacks @defer.inlineCallbacks
def _renew_attestations(self): def _renew_attestations(self):

View File

@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler):
) )
def _start_update_remote_profile_cache(self): def _start_update_remote_profile_cache(self):
run_as_background_process( return run_as_background_process(
"Update remote profile", self._update_remote_profile_cache, "Update remote profile", self._update_remote_profile_cache,
) )

View File

@ -151,13 +151,19 @@ def run_as_background_process(desc, func, *args, **kwargs):
This should be used to wrap processes which are fired off to run in the This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request. background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse inlineCallbacks function).
Args: Args:
desc (str): a description for this background process type desc (str): a description for this background process type
func: a function, which may return a Deferred func: a function, which may return a Deferred
args: positional args for func args: positional args for func
kwargs: keyword args for func kwargs: keyword args for func
Returns: None Returns: Deferred which returns the result of func, but note that it does not
follow the synapse logcontext rules.
""" """
@defer.inlineCallbacks @defer.inlineCallbacks
def run(): def run():
@ -176,4 +182,4 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_processes[desc].remove(proc) _background_processes[desc].remove(proc)
with PreserveLoggingContext(): with PreserveLoggingContext():
run() return run()

View File

@ -106,7 +106,7 @@ class MediaRepository(object):
) )
def _start_update_recently_accessed(self): def _start_update_recently_accessed(self):
run_as_background_process( return run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed, "update_recently_accessed_media", self._update_recently_accessed,
) )

View File

@ -373,7 +373,7 @@ class PreviewUrlResource(Resource):
}) })
def _start_expire_url_cache_data(self): def _start_expire_url_cache_data(self):
run_as_background_process( return run_as_background_process(
"expire_url_cache_data", self._expire_url_cache_data, "expire_url_cache_data", self._expire_url_cache_data,
) )

View File

@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
to_update, to_update,
) )
run_as_background_process( return run_as_background_process(
"update_client_ips", update, "update_client_ips", update,
) )

View File

@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore):
logger.info("Pruned %d device list outbound pokes", txn.rowcount) logger.info("Pruned %d device list outbound pokes", txn.rowcount)
run_as_background_process( return run_as_background_process(
"prune_old_outbound_device_pokes", "prune_old_outbound_device_pokes",
self.runInteraction, self.runInteraction,
"_prune_old_outbound_device_pokes", "_prune_old_outbound_device_pokes",

View File

@ -549,7 +549,7 @@ class EventFederationStore(EventFederationWorkerStore):
sql, sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,) (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
) )
run_as_background_process( return run_as_background_process(
"delete_old_forward_extrem_cache", "delete_old_forward_extrem_cache",
self.runInteraction, self.runInteraction,
"_delete_old_forward_extrem_cache", "_delete_old_forward_extrem_cache",

View File

@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
) )
def _find_stream_orderings_for_times(self): def _find_stream_orderings_for_times(self):
run_as_background_process( return run_as_background_process(
"event_push_action_stream_orderings", "event_push_action_stream_orderings",
self.runInteraction, self.runInteraction,
"_find_stream_orderings_for_times", "_find_stream_orderings_for_times",
@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
""", (room_id, user_id, stream_ordering)) """, (room_id, user_id, stream_ordering))
def _start_rotate_notifs(self): def _start_rotate_notifs(self):
run_as_background_process("rotate_notifs", self._rotate_notifs) return run_as_background_process("rotate_notifs", self._rotate_notifs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _rotate_notifs(self): def _rotate_notifs(self):

View File

@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore):
return self.cursor_to_dict(txn) return self.cursor_to_dict(txn)
def _start_cleanup_transactions(self): def _start_cleanup_transactions(self):
run_as_background_process("cleanup_transactions", self._cleanup_transactions) return run_as_background_process(
"cleanup_transactions", self._cleanup_transactions,
)
def _cleanup_transactions(self): def _cleanup_transactions(self):
now = self._clock.time_msec() now = self._clock.time_msec()

View File

@ -64,7 +64,7 @@ class ExpiringCache(object):
return return
def f(): def f():
run_as_background_process( return run_as_background_process(
"prune_cache_%s" % self._cache_name, "prune_cache_%s" % self._cache_name,
self._prune_cache, self._prune_cache,
) )