Merge pull request #92 from matrix-org/application-services-event-stream

Application services event stream support
This commit is contained in:
Kegsay 2015-03-02 12:02:48 +00:00
commit 33f93d389e
9 changed files with 272 additions and 39 deletions

View File

@ -4,6 +4,8 @@ Changes in synapse vx.x.x (x-x-x)
* Add support for registration fallback. This is a page hosted on the server
which allows a user to register for an account, regardless of what client
they are using (e.g. mobile devices).
* Application services can now poll on the CS API ``/events`` for their events,
by providing their application service ``access_token``.
Changes in synapse v0.7.1 (2015-02-19)
======================================

View File

@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler):
)
self._streams_per_user[auth_user] += 1
if pagin_config.from_token is None:
pagin_config.from_token = None
rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user)

View File

@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
"""Returns a list of roomids that the user has any of the given
membership states in."""
rooms = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user.to_string(), membership_list=membership_list
app_service = yield self.store.get_app_service_by_user_id(
user.to_string()
)
if app_service:
rooms = yield self.store.get_app_service_rooms(app_service)
else:
rooms = yield self.store.get_rooms_for_user_where_membership_is(
user_id=user.to_string(), membership_list=membership_list
)
# For some reason the list of events contains duplicates
# TODO(paul): work out why because I really don't think it should
@ -559,13 +566,24 @@ class RoomEventSource(object):
to_key = yield self.get_current_key()
events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(),
from_key=from_key,
to_key=to_key,
room_id=None,
limit=limit,
app_service = yield self.store.get_app_service_by_user_id(
user.to_string()
)
if app_service:
events, end_key = yield self.store.get_appservice_room_stream(
service=app_service,
from_key=from_key,
to_key=to_key,
limit=limit,
)
else:
events, end_key = yield self.store.get_room_events_stream(
user_id=user.to_string(),
from_key=from_key,
to_key=to_key,
room_id=None,
limit=limit,
)
defer.returnValue((events, end_key))

View File

@ -36,8 +36,10 @@ class _NotificationListener(object):
so that it can remove itself from the indexes in the Notifier class.
"""
def __init__(self, user, rooms, from_token, limit, timeout, deferred):
def __init__(self, user, rooms, from_token, limit, timeout, deferred,
appservice=None):
self.user = user
self.appservice = appservice
self.from_token = from_token
self.limit = limit
self.timeout = timeout
@ -65,6 +67,10 @@ class _NotificationListener(object):
lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self)
if self.appservice:
notifier.appservice_to_listeners.get(
self.appservice, set()
).discard(self)
class Notifier(object):
@ -79,6 +85,7 @@ class Notifier(object):
self.rooms_to_listeners = {}
self.user_to_listeners = {}
self.appservice_to_listeners = {}
self.event_sources = hs.get_event_sources()
@ -114,6 +121,17 @@ class Notifier(object):
for user in extra_users:
listeners |= self.user_to_listeners.get(user, set()).copy()
for appservice in self.appservice_to_listeners:
# TODO (kegan): Redundant appservice listener checks?
# App services will already be in the rooms_to_listeners set, but
# that isn't enough. They need to be checked here in order to
# receive *invites* for users they are interested in. Does this
# make the rooms_to_listeners check somewhat obselete?
if appservice.is_interested(event):
listeners |= self.appservice_to_listeners.get(
appservice, set()
).copy()
logger.debug("on_new_room_event listeners %s", listeners)
# TODO (erikj): Can we make this more efficient by hitting the
@ -280,6 +298,10 @@ class Notifier(object):
if not from_token:
from_token = yield self.event_sources.get_current_token()
appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
user.to_string()
)
listener = _NotificationListener(
user,
rooms,
@ -287,6 +309,7 @@ class Notifier(object):
limit,
timeout,
deferred,
appservice=appservice
)
def _timeout_listener():
@ -319,6 +342,11 @@ class Notifier(object):
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
if listener.appservice:
self.appservice_to_listeners.setdefault(
listener.appservice, set()
).add(listener)
@defer.inlineCallbacks
@log_function
def _check_for_updates(self, listener):

View File

@ -450,7 +450,8 @@ class SQLBaseStore(object):
Args:
table : string giving the table name
keyvalues : dict of column names and values to select the rows with
keyvalues : dict of column names and values to select the rows with,
or None to not apply a WHERE clause.
retcols : list of strings giving the names of the columns to return
"""
return self.runInteraction(
@ -469,13 +470,20 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the rows with
retcols : list of strings giving the names of the columns to return
"""
sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
if keyvalues:
sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
txn.execute(sql, keyvalues.values())
else:
sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
", ".join(retcols),
table
)
txn.execute(sql)
txn.execute(sql, keyvalues.values())
return self.cursor_to_dict(txn)
def _simple_update_one(self, table, keyvalues, updatevalues,

View File

@ -15,8 +15,10 @@
import logging
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import StoreError
from synapse.appservice import ApplicationService
from synapse.storage.roommember import RoomsForUser
from ._base import SQLBaseStore
@ -150,9 +152,32 @@ class ApplicationServiceStore(SQLBaseStore):
yield self.cache_defer # make sure the cache is ready
defer.returnValue(self.services_cache)
@defer.inlineCallbacks
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
All application services have associated with them a particular user ID.
There is no distinguishing feature on the user ID which indicates it
represents an application service. This function allows you to map from
a user ID to an application service.
Args:
user_id(str): The user ID to see if it is an application service.
Returns:
synapse.appservice.ApplicationService or None.
"""
yield self.cache_defer # make sure the cache is ready
for service in self.services_cache:
if service.sender == user_id:
defer.returnValue(service)
return
defer.returnValue(None)
@defer.inlineCallbacks
def get_app_service_by_token(self, token, from_cache=True):
"""Get the application service with the given token.
"""Get the application service with the given appservice token.
Args:
token (str): The application service token.
@ -173,6 +198,77 @@ class ApplicationServiceStore(SQLBaseStore):
# TODO: The from_cache=False impl
# TODO: This should be JOINed with the application_services_regex table.
def get_app_service_rooms(self, service):
"""Get a list of RoomsForUser for this application service.
Application services may be "interested" in lots of rooms depending on
the room ID, the room aliases, or the members in the room. This function
takes all of these into account and returns a list of RoomsForUser which
represent the entire list of room IDs that this application service
wants to know about.
Args:
service: The application service to get a room list for.
Returns:
A list of RoomsForUser.
"""
return self.runInteraction(
"get_app_service_rooms",
self._get_app_service_rooms_txn,
service,
)
def _get_app_service_rooms_txn(self, txn, service):
# get all rooms matching the room ID regex.
room_entries = self._simple_select_list_txn(
txn=txn, table="rooms", keyvalues=None, retcols=["room_id"]
)
matching_room_list = set([
r["room_id"] for r in room_entries if
service.is_interested_in_room(r["room_id"])
])
# resolve room IDs for matching room alias regex.
room_alias_mappings = self._simple_select_list_txn(
txn=txn, table="room_aliases", keyvalues=None,
retcols=["room_id", "room_alias"]
)
matching_room_list |= set([
r["room_id"] for r in room_alias_mappings if
service.is_interested_in_alias(r["room_alias"])
])
# get all rooms for every user for this AS. This is scoped to users on
# this HS only.
user_list = self._simple_select_list_txn(
txn=txn, table="users", keyvalues=None, retcols=["name"]
)
user_list = [
u["name"] for u in user_list if
service.is_interested_in_user(u["name"])
]
rooms_for_user_matching_user_id = set() # RoomsForUser list
for user_id in user_list:
# FIXME: This assumes this store is linked with RoomMemberStore :(
rooms_for_user = self._get_rooms_for_user_where_membership_is_txn(
txn=txn,
user_id=user_id,
membership_list=[Membership.JOIN]
)
rooms_for_user_matching_user_id |= set(rooms_for_user)
# make RoomsForUser tuples for room ids and aliases which are not in the
# main rooms_for_user_list - e.g. they are rooms which do not have AS
# registered users in it.
known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id]
missing_rooms_for_user = [
RoomsForUser(r, service.sender, "join") for r in
matching_room_list if r not in known_room_ids
]
rooms_for_user_matching_user_id |= set(missing_rooms_for_user)
return rooms_for_user_matching_user_id
@defer.inlineCallbacks
def _populate_cache(self):
"""Populates the ApplicationServiceCache from the database."""

View File

@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore):
if not membership_list:
return defer.succeed(None)
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
self._get_rooms_for_user_where_membership_is_txn,
user_id, membership_list
)
def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id,
membership_list):
where_clause = "user_id = ? AND (%s)" % (
" OR ".join(["membership = ?" for _ in membership_list]),
)
@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore):
args = [user_id]
args.extend(membership_list)
def f(txn):
sql = (
"SELECT m.room_id, m.sender, m.membership"
" FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id"
" WHERE %s"
) % (where_clause,)
sql = (
"SELECT m.room_id, m.sender, m.membership"
" FROM room_memberships as m"
" INNER JOIN current_state_events as c"
" ON m.event_id = c.event_id"
" WHERE %s"
) % (where_clause,)
txn.execute(sql, args)
return [
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
return self.runInteraction(
"get_rooms_for_user_where_membership_is",
f
)
txn.execute(sql, args)
return [
RoomsForUser(**r) for r in self.cursor_to_dict(txn)
]
def get_joined_hosts_for_room(self, room_id):
return self._simple_select_onecol(

View File

@ -36,6 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
@ -127,6 +128,85 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")):
class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
# NB this lives here instead of appservice.py so we can reuse the
# 'private' StreamToken class in this file.
if limit:
limit = max(limit, MAX_STREAM_SIZE)
else:
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
from_id = _StreamToken.parse_stream_token(from_key)
to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
return
# select all the events between from/to with a sensible limit
sql = (
"SELECT e.event_id, e.room_id, e.type, s.state_key, "
"e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON "
"e.event_id = s.event_id "
"WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
"ORDER BY stream_ordering ASC LIMIT %(limit)d "
) % {
"limit": limit
}
def f(txn):
# pull out all the events between the tokens
txn.execute(sql, (from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
# Logic:
# - We want ALL events which match the AS room_id regex
# - We want ALL events which match the rooms represented by the AS
# room_alias regex
# - We want ALL events for rooms that AS users have joined.
# This is currently supported via get_app_service_rooms (which is
# used for the Notifier listener rooms). We can't reasonably make a
# SQL query for these room IDs, so we'll pull all the events between
# from/to and filter in python.
rooms_for_as = self._get_app_service_rooms_txn(txn, service)
room_ids_for_as = [r.room_id for r in rooms_for_as]
def app_service_interested(row):
if row["room_id"] in room_ids_for_as:
return True
if row["type"] == EventTypes.Member:
if service.is_interested_in_user(row.get("state_key")):
return True
return False
ret = self._get_events_txn(
txn,
# apply the filter on the room id list
[
r["event_id"] for r in rows
if app_service_interested(r)
],
get_prev_content=True
)
self._set_before_and_after(ret, rows)
if rows:
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
key = to_key
return ret, key
results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
@ -184,8 +264,7 @@ class StreamStore(SQLBaseStore):
self._set_before_and_after(ret, rows)
if rows:
key = "s%d" % max([r["stream_ordering"] for r in rows])
key = "s%d" % max(r["stream_ordering"] for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.

View File

@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
self.mock_datastore = hs.get_datastore()
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
self.mock_datastore.get_app_service_by_user_id = Mock(
return_value=defer.succeed(None)
)
def get_profile_displayname(user_id):
return defer.succeed("Frank")