Merge pull request #3088 from matrix-org/erikj/as_parallel

Send events to ASes concurrently
This commit is contained in:
Erik Johnston 2018-04-12 10:42:36 +01:00 committed by GitHub
commit 971059a733
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -18,7 +18,9 @@ from twisted.internet import defer
import synapse import synapse
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging import logging
@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
if not events: if not events:
break break
events_by_room = {}
for event in events: for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services # Gather interested services
services = yield self._get_services_for_event(event) services = yield self._get_services_for_event(event)
if len(services) == 0: if len(services) == 0:
continue # no services need notifying return # no services need notifying
# Do we know this user exists? If not, poke the user # Do we know this user exists? If not, poke the user
# query API for all services which match that user regex. # query API for all services which match that user regex.
@ -108,6 +115,16 @@ class ApplicationServicesHandler(object):
service, event service, event
) )
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
events_processed_counter.inc_by(len(events)) events_processed_counter.inc_by(len(events))
yield self.store.set_appservice_last_pos(upper_bound) yield self.store.set_appservice_last_pos(upper_bound)