Make AppserviceHandler stream events from database

This is for two reasons:

1. Suppresses duplicates correctly, as the notifier doesn't do any
   duplicate suppression.
2. Makes it easier to connect the AppserviceHandler to the replication
   stream.
This commit is contained in:
Erik Johnston 2016-08-18 11:54:41 +01:00
parent 403ecd8a2c
commit 9da84a9a1e
5 changed files with 110 additions and 22 deletions

View File

@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.logcontext import preserve_fn
import logging import logging
@ -45,24 +46,41 @@ class ApplicationServicesHandler(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
@defer.inlineCallbacks @defer.inlineCallbacks
def notify_interested_services(self, event): def notify_interested_services(self, current_id):
"""Notifies (pushes) all application services interested in this event. """Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any Pushing is done asynchronously, so this method won't block for any
prolonged length of time. prolonged length of time.
Args: Args:
event(Event): The event to push out to interested services. current_id(int): The current maximum ID.
""" """
services = yield self.store.get_app_services()
if not services:
return
with Measure(self.clock, "notify_interested_services"): with Measure(self.clock, "notify_interested_services"):
upper_bound = current_id
limit = 100
while True:
upper_bound, events = yield self.store.get_new_events_for_appservice(
upper_bound, limit
)
logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound)
if not events:
break
for event in events:
# Gather interested services # Gather interested services
services = yield self._get_services_for_event(event) services = yield self._get_services_for_event(event)
if len(services) == 0: if len(services) == 0:
return # no services need notifying continue # no services need notifying
# Do we know this user exists? If not, poke the user query API for # Do we know this user exists? If not, poke the user query API for
# all services which match that user regex. This needs to block as these # all services which match that user regex. This needs to block as
# user queries need to be made BEFORE pushing the event. # these user queries need to be made BEFORE pushing the event.
yield self._check_user_exists(event.sender) yield self._check_user_exists(event.sender)
if event.type == EventTypes.Member: if event.type == EventTypes.Member:
yield self._check_user_exists(event.state_key) yield self._check_user_exists(event.state_key)
@ -73,7 +91,12 @@ class ApplicationServicesHandler(object):
# Fork off pushes to these services # Fork off pushes to these services
for service in services: for service in services:
self.scheduler.submit_event_for_as(service, event) preserve_fn(self.scheduler.submit_event_for_as)(service, event)
yield self.store.set_appservice_last_pos(upper_bound)
if len(events) < limit:
break
@defer.inlineCallbacks @defer.inlineCallbacks
def query_user_exists(self, user_id): def query_user_exists(self, user_id):

View File

@ -214,7 +214,7 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]): def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event""" """Notify any user streams that are interested in this room event"""
# poke any interested application service. # poke any interested application service.
self.appservice_handler.notify_interested_services(event) self.appservice_handler.notify_interested_services(room_stream_id)
if event.type == EventTypes.Member and event.membership == Membership.JOIN: if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id) self._user_joined_room(event.state_key, event.room_id)

View File

@ -352,3 +352,42 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
return 0 return 0
else: else:
return int(last_txn_id[0]) # select 'last_txn' col return int(last_txn_id[0]) # select 'last_txn' col
def set_appservice_last_pos(self, pos):
def set_appservice_last_pos_txn(txn):
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)
return self.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
)
@defer.inlineCallbacks
def get_new_events_for_appservice(self, current_id, limit):
"""Get all new evnets"""
def get_new_events_for_appservice_txn(txn):
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e, appservice_stream_position AS a"
" WHERE a.stream_ordering < e.stream_ordering AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (current_id, limit))
rows = txn.fetchall()
upper_bound = current_id
if len(rows) == limit:
upper_bound = rows[-1][0]
return upper_bound, [row[1] for row in rows]
upper_bound, event_ids = yield self.runInteraction(
"get_new_events_for_appservice", get_new_events_for_appservice_txn,
)
events = yield self._get_events(event_ids)
defer.returnValue((upper_bound, events))

View File

@ -0,0 +1,23 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS appservice_stream_position(
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT,
CHECK (Lock='X')
);
INSERT INTO appservice_stream_position (stream_ordering)
SELECT COALESCE(MAX(stream_ordering), 0) FROM events;

View File

@ -53,8 +53,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
type="m.room.message", type="m.room.message",
room_id="!foo:bar" room_id="!foo:bar"
) )
self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
self.mock_as_api.push = Mock() self.mock_as_api.push = Mock()
yield self.handler.notify_interested_services(event) yield self.handler.notify_interested_services(0)
self.mock_scheduler.submit_event_for_as.assert_called_once_with( self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event interested_service, event
) )
@ -74,7 +75,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
) )
self.mock_as_api.push = Mock() self.mock_as_api.push = Mock()
self.mock_as_api.query_user = Mock() self.mock_as_api.query_user = Mock()
yield self.handler.notify_interested_services(event) self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
yield self.handler.notify_interested_services(0)
self.mock_as_api.query_user.assert_called_once_with( self.mock_as_api.query_user.assert_called_once_with(
services[0], user_id services[0], user_id
) )
@ -96,7 +98,8 @@ class AppServiceHandlerTestCase(unittest.TestCase):
) )
self.mock_as_api.push = Mock() self.mock_as_api.push = Mock()
self.mock_as_api.query_user = Mock() self.mock_as_api.query_user = Mock()
yield self.handler.notify_interested_services(event) self.mock_store.get_new_events_for_appservice.return_value = (0, [event])
yield self.handler.notify_interested_services(0)
self.assertFalse( self.assertFalse(
self.mock_as_api.query_user.called, self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been." "query_user called when it shouldn't have been."