Merge branch 'develop' of github.com:matrix-org/synapse into erikj/processed_event_lag

This commit is contained in:
Erik Johnston 2018-04-12 11:36:07 +01:00
commit 19ceb4851f
2 changed files with 51 additions and 2 deletions

View File

@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging import logging
@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events: if not events:
break break
events_by_room = {}
for event in events: for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services # Gather interested services
services = yield self._get_services_for_event(event) services = yield self._get_services_for_event(event)
if len(services) == 0: if len(services) == 0:
continue # no services need notifying return # no services need notifying
# Do we know this user exists? If not, poke the user # Do we know this user exists? If not, poke the user
# query API for all services which match that user regex. # query API for all services which match that user regex.
@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
service, event service, event
) )
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound) yield self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec() now = self.clock.time_msec()

View File

@ -31,6 +31,18 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000. self.timeout_sec = timeout_ms / 1000.
def get(self, key): def get(self, key):
"""Look up the given key.
Returns a deferred which doesn't follow the synapse logcontext rules,
so you'll probably want to make_deferred_yieldable it.
Args:
key (str):
Returns:
twisted.internet.defer.Deferred|None: None if there is no entry
for this key; otherwise a deferred result.
"""
result = self.pending_result_cache.get(key) result = self.pending_result_cache.get(key)
if result is not None: if result is not None:
return result.observe() return result.observe()
@ -38,6 +50,26 @@ class ResponseCache(object):
return None return None
def set(self, key, deferred): def set(self, key, deferred):
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Returns a new Deferred which also doesn't follow the synapse logcontext
rules, so you will want to make_deferred_yieldable it
(TODO: before using this more widely, it might make sense to refactor
it and get() so that they do the necessary wrapping rather than having
to do it everywhere ResponseCache is used.)
Args:
key (str):
deferred (twisted.internet.defer.Deferred):
Returns:
twisted.internet.defer.Deferred
"""
result = ObservableDeferred(deferred, consumeErrors=True) result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result self.pending_result_cache[key] = result