mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-06-24 15:00:29 -04:00
Merge remote-tracking branch 'origin/develop' into paul/thirdpartylookup
This commit is contained in:
commit
d5bf7a4a99
47 changed files with 1203 additions and 546 deletions
|
@ -16,7 +16,8 @@
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.appservice import ApplicationService
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.logcontext import preserve_fn
|
||||
|
||||
import logging
|
||||
|
||||
|
@ -42,36 +43,60 @@ class ApplicationServicesHandler(object):
|
|||
self.appservice_api = hs.get_application_service_api()
|
||||
self.scheduler = hs.get_application_service_scheduler()
|
||||
self.started_scheduler = False
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def notify_interested_services(self, event):
|
||||
def notify_interested_services(self, current_id):
|
||||
"""Notifies (pushes) all application services interested in this event.
|
||||
|
||||
Pushing is done asynchronously, so this method won't block for any
|
||||
prolonged length of time.
|
||||
|
||||
Args:
|
||||
event(Event): The event to push out to interested services.
|
||||
current_id(int): The current maximum ID.
|
||||
"""
|
||||
# Gather interested services
|
||||
services = yield self._get_services_for_event(event)
|
||||
if len(services) == 0:
|
||||
return # no services need notifying
|
||||
services = yield self.store.get_app_services()
|
||||
if not services:
|
||||
return
|
||||
|
||||
# Do we know this user exists? If not, poke the user query API for
|
||||
# all services which match that user regex. This needs to block as these
|
||||
# user queries need to be made BEFORE pushing the event.
|
||||
yield self._check_user_exists(event.sender)
|
||||
if event.type == EventTypes.Member:
|
||||
yield self._check_user_exists(event.state_key)
|
||||
with Measure(self.clock, "notify_interested_services"):
|
||||
upper_bound = current_id
|
||||
limit = 100
|
||||
while True:
|
||||
upper_bound, events = yield self.store.get_new_events_for_appservice(
|
||||
upper_bound, limit
|
||||
)
|
||||
|
||||
if not self.started_scheduler:
|
||||
self.scheduler.start().addErrback(log_failure)
|
||||
self.started_scheduler = True
|
||||
logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
|
||||
|
||||
# Fork off pushes to these services
|
||||
for service in services:
|
||||
self.scheduler.submit_event_for_as(service, event)
|
||||
if not events:
|
||||
break
|
||||
|
||||
for event in events:
|
||||
# Gather interested services
|
||||
services = yield self._get_services_for_event(event)
|
||||
if len(services) == 0:
|
||||
continue # no services need notifying
|
||||
|
||||
# Do we know this user exists? If not, poke the user query API for
|
||||
# all services which match that user regex. This needs to block as
|
||||
# these user queries need to be made BEFORE pushing the event.
|
||||
yield self._check_user_exists(event.sender)
|
||||
if event.type == EventTypes.Member:
|
||||
yield self._check_user_exists(event.state_key)
|
||||
|
||||
if not self.started_scheduler:
|
||||
self.scheduler.start().addErrback(log_failure)
|
||||
self.started_scheduler = True
|
||||
|
||||
# Fork off pushes to these services
|
||||
for service in services:
|
||||
preserve_fn(self.scheduler.submit_event_for_as)(service, event)
|
||||
|
||||
yield self.store.set_appservice_last_pos(upper_bound)
|
||||
|
||||
if len(events) < limit:
|
||||
break
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_user_exists(self, user_id):
|
||||
|
@ -104,11 +129,12 @@ class ApplicationServicesHandler(object):
|
|||
association can be found.
|
||||
"""
|
||||
room_alias_str = room_alias.to_string()
|
||||
alias_query_services = yield self._get_services_for_event(
|
||||
event=None,
|
||||
restrict_to=ApplicationService.NS_ALIASES,
|
||||
alias_list=[room_alias_str]
|
||||
)
|
||||
services = yield self.store.get_app_services()
|
||||
alias_query_services = [
|
||||
s for s in services if (
|
||||
s.is_interested_in_alias(room_alias_str)
|
||||
)
|
||||
]
|
||||
for alias_service in alias_query_services:
|
||||
is_known_alias = yield self.appservice_api.query_alias(
|
||||
alias_service, room_alias_str
|
||||
|
@ -136,34 +162,19 @@ class ApplicationServicesHandler(object):
|
|||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_services_for_event(self, event, restrict_to="", alias_list=None):
|
||||
def _get_services_for_event(self, event):
|
||||
"""Retrieve a list of application services interested in this event.
|
||||
|
||||
Args:
|
||||
event(Event): The event to check. Can be None if alias_list is not.
|
||||
restrict_to(str): The namespace to restrict regex tests to.
|
||||
alias_list: A list of aliases to get services for. If None, this
|
||||
list is obtained from the database.
|
||||
Returns:
|
||||
list<ApplicationService>: A list of services interested in this
|
||||
event based on the service regex.
|
||||
"""
|
||||
member_list = None
|
||||
if hasattr(event, "room_id"):
|
||||
# We need to know the aliases associated with this event.room_id,
|
||||
# if any.
|
||||
if not alias_list:
|
||||
alias_list = yield self.store.get_aliases_for_room(
|
||||
event.room_id
|
||||
)
|
||||
# We need to know the members associated with this event.room_id,
|
||||
# if any.
|
||||
member_list = yield self.store.get_users_in_room(event.room_id)
|
||||
|
||||
services = yield self.store.get_app_services()
|
||||
interested_list = [
|
||||
s for s in services if (
|
||||
s.is_interested(event, restrict_to, alias_list, member_list)
|
||||
yield s.is_interested(event, self.store)
|
||||
)
|
||||
]
|
||||
defer.returnValue(interested_list)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue