From 2d20466f9a1349c97d5a3822eb4ee64f19bbdf27 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 15:00:59 +0000 Subject: [PATCH 01/17] Add stub functions and work out execution flow to implement AS event stream polling. --- synapse/handlers/events.py | 3 --- synapse/handlers/room.py | 34 +++++++++++++++++++++++++--------- synapse/storage/appservice.py | 19 +++++++++++++++++++ synapse/storage/stream.py | 21 +++++++++++++++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 025e7e7e6..8d5f5c849 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -69,9 +69,6 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - if pagin_config.from_token is None: - pagin_config.from_token = None - rm_handler = self.hs.get_handlers().room_member_handler room_ids = yield rm_handler.get_rooms_for_user(auth_user) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 914742d91..a8b0c9563 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -510,9 +510,16 @@ class RoomMemberHandler(BaseHandler): def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]): """Returns a list of roomids that the user has any of the given membership states in.""" - rooms = yield self.store.get_rooms_for_user_where_membership_is( - user_id=user.to_string(), membership_list=membership_list + + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() ) + if app_service: + rooms = yield self.store.get_app_service_rooms(app_service) + else: + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id=user.to_string(), membership_list=membership_list + ) # For some reason the list of events contains duplicates # TODO(paul): work out why because I really don't think it should @@ -559,13 +566,22 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - events, end_key = yield self.store.get_room_events_stream( - user_id=user.to_string(), - from_key=from_key, - to_key=to_key, - room_id=None, - limit=limit, - ) + app_service = self.store.get_app_service_by_user_id(user.to_string()) + if app_service: + events, end_key = yield self.store.get_appservice_room_stream( + service=app_service, + from_key=from_key, + to_key=to_key, + limit=limit, + ) + else: + events, end_key = yield self.store.get_room_events_stream( + user_id=user.to_string(), + from_key=from_key, + to_key=to_key, + room_id=None, + limit=limit, + ) defer.returnValue((events, end_key)) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index dc3666efd..435ccfd6f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.appservice import ApplicationService +from synapse.storage.roommember import RoomsForUser from ._base import SQLBaseStore @@ -150,6 +151,16 @@ class ApplicationServiceStore(SQLBaseStore): yield self.cache_defer # make sure the cache is ready defer.returnValue(self.services_cache) + @defer.inlineCallbacks + def get_app_service_by_user_id(self, user_id): + yield self.cache_defer # make sure the cache is ready + + for service in self.services_cache: + if service.sender == user_id: + defer.returnValue(service) + return + defer.returnValue(None) + @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): """Get the application service with the given token. @@ -173,6 +184,14 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. + @defer.inlineCallbacks + def get_app_service_rooms(self, service): + logger.info("get_app_service_rooms -> %s", service) + + # TODO stub + yield self.cache_defer + defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) + @defer.inlineCallbacks def _populate_cache(self): """Populates the ApplicationServiceCache from the database.""" diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3ccb6f8a6..aa3c9f8c9 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -127,6 +127,27 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + + def get_appservice_room_stream(self, service, from_key, to_key, limit=0): + # NB this lives here instead of appservice.py so we can reuse the + # 'private' StreamToken class in this file. + logger.info("get_appservice_room_stream -> %s", service) + + if limit: + limit = max(limit, MAX_STREAM_SIZE) + else: + limit = MAX_STREAM_SIZE + + # From and to keys should be integers from ordering. + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) + + if from_key == to_key: + return defer.succeed(([], to_key)) + + # TODO stub + return defer.succeed(([], to_key)) + @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, limit=0, with_feedback=False): From 2b8ca84296b228b7cef09244605e4f2760349538 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:15:25 +0000 Subject: [PATCH 02/17] Add support for extracting matching room_ids and room_aliases for a given AS. --- synapse/storage/appservice.py | 50 +++++++++++++++++++++++++++++++++-- synapse/storage/directory.py | 23 ++++++++++++++++ synapse/storage/room.py | 11 ++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 435ccfd6f..c8f0ce44f 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -153,6 +153,19 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_by_user_id(self, user_id): + """Retrieve an application service from their user ID. + + All application services have associated with them a particular user ID. + There is no distinguishing feature on the user ID which indicates it + represents an application service. This function allows you to map from + a user ID to an application service. + + Args: + user_id(str): The user ID to see if it is an application service. + Returns: + synapse.appservice.ApplicationService or None. + """ + yield self.cache_defer # make sure the cache is ready for service in self.services_cache: @@ -163,7 +176,7 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_by_token(self, token, from_cache=True): - """Get the application service with the given token. + """Get the application service with the given appservice token. Args: token (str): The application service token. @@ -186,10 +199,43 @@ class ApplicationServiceStore(SQLBaseStore): @defer.inlineCallbacks def get_app_service_rooms(self, service): - logger.info("get_app_service_rooms -> %s", service) + """Get a list of RoomsForUser for this application service. + + Application services may be "interested" in lots of rooms depending on + the room ID, the room aliases, or the members in the room. This function + takes all of these into account and returns a list of RoomsForUser which + represent the entire list of room IDs that this application service + wants to know about. + + Args: + service: The application service to get a room list for. + Returns: + A list of RoomsForUser. + """ + # FIXME: This is assuming that this store has methods from + # RoomStore, DirectoryStore, which is a bad assumption to + # make as it makes testing trickier and coupling less obvious. + + # get all rooms matching the room ID regex. + room_entries = yield self.get_all_rooms() # RoomEntry list + matching_room_id_list = [ + r.room_id for r in room_entries if + service.is_interested_in_room(r.room_id) + ] + + # resolve room IDs for matching room alias regex. + room_alias_mappings = yield self.get_all_associations() + matching_alias_list = [ + r.room_id for r in room_alias_mappings if + service.is_interested_in_alias(r.room_alias) + ] + + # get all rooms for every user for this AS. # TODO stub yield self.cache_defer + + defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) @defer.inlineCallbacks diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d5969..e13b33693 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -134,6 +134,29 @@ class DirectoryStore(SQLBaseStore): return room_id + @defer.inlineCallbacks + def get_all_associations(self): + """Retrieve the entire list of room alias -> room ID pairings. + + Returns: + A list of RoomAliasMappings. + """ + results = self._simple_select_list( + "room_aliases", + None, + ["room_alias", "room_id"] + ) + # TODO(kegan): It feels wrong to be specifying no servers here, but + # equally this function isn't required to obtain all servers so + # retrieving them "just for the sake of it" also seems wrong, but we + # want to conform to passing Objects around and not dicts.. + return [ + RoomAliasMapping( + room_id=r["room_id"], room_alias=r["room_alias"], servers="" + ) for r in results + ] + + def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 750b17a45..3a6469340 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -71,6 +71,17 @@ class RoomStore(SQLBaseStore): RoomsTable.decode_single_result, query, room_id, ) + def get_all_rooms(self): + """Retrieve all the rooms. + + Returns: + A list of namedtuples containing the room information. + """ + query = RoomsTable.select_statement() + return self._execute( + RoomsTable.decode_results, query, + ) + @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. From 2c79c4dc7f638f1cb823903a2f8bb1005fda4a2c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:37:14 +0000 Subject: [PATCH 03/17] Fix alias query. --- synapse/storage/directory.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e13b33693..70c8c8ccd 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -141,20 +141,19 @@ class DirectoryStore(SQLBaseStore): Returns: A list of RoomAliasMappings. """ - results = self._simple_select_list( - "room_aliases", - None, - ["room_alias", "room_id"] + results = yield self._execute_and_decode( + "SELECT room_id, room_alias FROM room_aliases" ) + # TODO(kegan): It feels wrong to be specifying no servers here, but # equally this function isn't required to obtain all servers so # retrieving them "just for the sake of it" also seems wrong, but we # want to conform to passing Objects around and not dicts.. - return [ + defer.returnValue([ RoomAliasMapping( room_id=r["room_id"], room_alias=r["room_alias"], servers="" ) for r in results - ] + ]) def get_aliases_for_room(self, room_id): From 978ce87c86e60fd49f078d5bea79715abea6d236 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:37:48 +0000 Subject: [PATCH 04/17] Comment unused variables. --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index aa3c9f8c9..3c8f3320f 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -139,8 +139,8 @@ class StreamStore(SQLBaseStore): limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - from_id = _StreamToken.parse_stream_token(from_key) - to_id = _StreamToken.parse_stream_token(to_key) + # from_id = _StreamToken.parse_stream_token(from_key) + # to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: return defer.succeed(([], to_key)) From 29267cf9d7fbacdfcccaaef9160657f24b9aca14 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 25 Feb 2015 17:42:28 +0000 Subject: [PATCH 05/17] PEP8 and pyflakes --- synapse/storage/appservice.py | 8 +++----- synapse/storage/directory.py | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c8f0ce44f..017b6d1e8 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -229,12 +229,10 @@ class ApplicationServiceStore(SQLBaseStore): r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) ] + logging.debug(matching_alias_list) + logging.debug(matching_room_id_list) - # get all rooms for every user for this AS. - - # TODO stub - yield self.cache_defer - + # TODO get all rooms for every user for this AS. defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 70c8c8ccd..e391239a3 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -155,7 +155,6 @@ class DirectoryStore(SQLBaseStore): ) for r in results ]) - def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", From 92478e96d6f6992146102599ca96b8dcacbf3895 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 14:35:28 +0000 Subject: [PATCH 06/17] Finish impl to extract all room IDs an AS may be interested in when polling the event stream. --- synapse/storage/appservice.py | 35 +++++++++++++++++++++++++++------ synapse/storage/registration.py | 7 +++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 017b6d1e8..d0632d55d 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -213,8 +213,9 @@ class ApplicationServiceStore(SQLBaseStore): A list of RoomsForUser. """ # FIXME: This is assuming that this store has methods from - # RoomStore, DirectoryStore, which is a bad assumption to - # make as it makes testing trickier and coupling less obvious. + # RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is + # a bad assumption to make as it makes testing trickier and coupling + # less obvious. # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list @@ -229,12 +230,34 @@ class ApplicationServiceStore(SQLBaseStore): r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) ] - logging.debug(matching_alias_list) - logging.debug(matching_room_id_list) + room_ids_matching_alias_or_id = set( + matching_room_id_list + matching_alias_list + ) - # TODO get all rooms for every user for this AS. + # get all rooms for every user for this AS. This is scoped to users on + # this HS only. + user_list = yield self.get_all_users() + user_list = [ + u["name"] for u in user_list if + service.is_interested_in_user(u["name"]) + ] + rooms_for_user_matching_user_id = [] # RoomsForUser list + for user_id in user_list: + rooms_for_user = yield self.get_rooms_for_user(user_id) + rooms_for_user_matching_user_id += rooms_for_user + rooms_for_user_matching_user_id = set(rooms_for_user_matching_user_id) - defer.returnValue([RoomsForUser("!foo:bar", service.sender, "join")]) + # make RoomsForUser tuples for room ids and aliases which are not in the + # main rooms_for_user_list - e.g. they are rooms which do not have AS + # registered users in it. + known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] + missing_rooms_for_user = [ + RoomsForUser(r, service.sender, "join") for r in + room_ids_matching_alias_or_id if r not in known_room_ids + ] + rooms_for_user_matching_user_id |= set(missing_rooms_for_user) + + defer.returnValue(rooms_for_user_matching_user_id) @defer.inlineCallbacks def _populate_cache(self): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 029b07cc6..7aff3dbd3 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -92,6 +92,13 @@ class RegistrationStore(SQLBaseStore): query, user_id ) + def get_all_users(self): + query = ("SELECT users.name FROM users") + return self._execute( + self.cursor_to_dict, + query + ) + def get_user_by_token(self, token): """Get a user from the given access token. From dcec7175dc30754603618351b59bc72ff41d305b Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 16:23:01 +0000 Subject: [PATCH 07/17] Finish impl to get new events for AS. ASes should now be able to poll /events --- synapse/handlers/room.py | 4 ++- synapse/storage/stream.py | 62 ++++++++++++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a8b0c9563..80f7ee3f1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -566,7 +566,9 @@ class RoomEventSource(object): to_key = yield self.get_current_key() - app_service = self.store.get_app_service_by_user_id(user.to_string()) + app_service = yield self.store.get_app_service_by_user_id( + user.to_string() + ) if app_service: events, end_key = yield self.store.get_appservice_room_stream( service=app_service, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3c8f3320f..6946e9fe7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -128,25 +128,73 @@ class _StreamToken(namedtuple("_StreamToken", "topological stream")): class StreamStore(SQLBaseStore): + @defer.inlineCallbacks def get_appservice_room_stream(self, service, from_key, to_key, limit=0): # NB this lives here instead of appservice.py so we can reuse the # 'private' StreamToken class in this file. - logger.info("get_appservice_room_stream -> %s", service) - if limit: limit = max(limit, MAX_STREAM_SIZE) else: limit = MAX_STREAM_SIZE # From and to keys should be integers from ordering. - # from_id = _StreamToken.parse_stream_token(from_key) - # to_id = _StreamToken.parse_stream_token(to_key) + from_id = _StreamToken.parse_stream_token(from_key) + to_id = _StreamToken.parse_stream_token(to_key) if from_key == to_key: - return defer.succeed(([], to_key)) + defer.returnValue(([], to_key)) + return - # TODO stub - return defer.succeed(([], to_key)) + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is used + # for the Notifier listener rooms). We can't reasonably make a SQL + # query for these room IDs, so we'll pull all the events between from/to + # and filter in python. + rooms_for_as = yield self.get_app_service_rooms(service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + # select all the events between from/to with a sensible limit + sql = ( + "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " + "ORDER BY stream_ordering ASC LIMIT %(limit)d " + ) % { + "limit": limit + } + + def f(txn): + txn.execute(sql, (from_id.stream, to_id.stream,)) + + rows = self.cursor_to_dict(txn) + + ret = self._get_events_txn( + txn, + # apply the filter on the room id list + [ + r["event_id"] for r in rows + if r["room_id"] in room_ids_for_as + ], + get_prev_content=True + ) + + self._set_before_and_after(ret, rows) + + if rows: + key = "s%d" % max([r["stream_ordering"] for r in rows]) + + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + return ret, key + + results = yield self.runInteraction("get_appservice_room_stream", f) + defer.returnValue(results) @log_function def get_room_events_stream(self, user_id, from_key, to_key, room_id, From 210ef791007fdd58375db7ce6d7f1b5681e382ca Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 16:25:37 +0000 Subject: [PATCH 08/17] Update CHANGES --- CHANGES.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 9e36f6232..ecc30b467 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,8 @@ Changes in synapse vx.x.x (x-x-x) * Add support for registration fallback. This is a page hosted on the server which allows a user to register for an account, regardless of what client they are using (e.g. mobile devices). +* Application services can now poll on the CS API ``/events`` for their events, + by providing their application service ``access_token``. Changes in synapse v0.7.1 (2015-02-19) ====================================== From f0995436e7951448d8be3c372f4002845a111a7d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 26 Feb 2015 17:21:17 +0000 Subject: [PATCH 09/17] Check for membership invite events correctly. --- synapse/storage/stream.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 6946e9fe7..5d01ecf20 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,12 +36,14 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function from collections import namedtuple import logging +import simplejson logger = logging.getLogger(__name__) @@ -159,13 +161,30 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( - "SELECT e.event_id, e.room_id, e.stream_ordering FROM events AS e " + "SELECT e.event_id, e.room_id, e.type, e.unrecognized_keys, " + "e.stream_ordering FROM events AS e " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { "limit": limit } + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + # load up the content to inspect if some user the AS is + # interested in was invited to a room. We'll be passing this + # through _get_events_txn later, so ignore the fact that this + # may be a redacted event. + event_content = simplejson.loads(row["unrecognized_keys"]) + if (service.is_interested_in_user( + event_content.get("state_key"))): + return True + return False + def f(txn): txn.execute(sql, (from_id.stream, to_id.stream,)) @@ -176,7 +195,7 @@ class StreamStore(SQLBaseStore): # apply the filter on the room id list [ r["event_id"] for r in rows - if r["room_id"] in room_ids_for_as + if app_service_interested(r) ], get_prev_content=True ) From 1cc77145d483c4a6cea75fafbe83a1661b8b61cd Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 09:39:12 +0000 Subject: [PATCH 10/17] Notify appservices of invites mid-poll. This requires the notifier to have knowledge of appservice listeners so it can do the regex checks on incoming invites to see if the state_key matches. It isn't enough to just rely on the room listeners and store.get_app_service_rooms as the room will initially not exist or won't be on the ASes radar due to having none of its users in the room. --- synapse/notifier.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 2475f3ffb..09d23e79b 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -36,8 +36,10 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred, + appservice=None): self.user = user + self.appservice = appservice self.from_token = from_token self.limit = limit self.timeout = timeout @@ -65,6 +67,10 @@ class _NotificationListener(object): lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: + notifier.appservice_to_listeners.get( + self.appservice, set() + ).discard(self) class Notifier(object): @@ -79,6 +85,7 @@ class Notifier(object): self.rooms_to_listeners = {} self.user_to_listeners = {} + self.appservice_to_listeners = {} self.event_sources = hs.get_event_sources() @@ -114,6 +121,17 @@ class Notifier(object): for user in extra_users: listeners |= self.user_to_listeners.get(user, set()).copy() + for appservice in self.appservice_to_listeners: + # TODO (kegan): Redundant appservice listener checks? + # App services will already be in the rooms_to_listeners set, but + # that isn't enough. They need to be checked here in order to + # receive *invites* for users they are interested in. Does this + # make the rooms_to_listeners check somewhat obselete? + if appservice.is_interested(event): + listeners |= self.appservice_to_listeners.get( + appservice, set() + ).copy() + logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -280,6 +298,10 @@ class Notifier(object): if not from_token: from_token = yield self.event_sources.get_current_token() + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) + listener = _NotificationListener( user, rooms, @@ -287,6 +309,7 @@ class Notifier(object): limit, timeout, deferred, + appservice=appservice ) def _timeout_listener(): @@ -319,6 +342,11 @@ class Notifier(object): self.user_to_listeners.setdefault(listener.user, set()).add(listener) + if listener.appservice: + self.appservice_to_listeners.setdefault( + listener.appservice, set() + ).add(listener) + @defer.inlineCallbacks @log_function def _check_for_updates(self, listener): From 0ebd632d39e7c493d26e0dd24876232caf2660b4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 09:46:38 +0000 Subject: [PATCH 11/17] Fix unit tests --- tests/rest/client/v1/test_presence.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 7c5df5c11..5f2ef64ef 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) + self.mock_datastore.get_app_service_by_user_id = Mock( + return_value=defer.succeed(None) + ) def get_profile_displayname(user_id): return defer.succeed("Frank") From 806a6c886aaa695a7dbfd35f71b9cc59941b8366 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Fri, 27 Feb 2015 09:48:57 +0000 Subject: [PATCH 12/17] PEP8 --- synapse/storage/stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5d01ecf20..09417bd14 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -169,7 +169,6 @@ class StreamStore(SQLBaseStore): "limit": limit } - def app_service_interested(row): if row["room_id"] in room_ids_for_as: return True From ebc48306662cf8719a0ea64e2955bf8d5e037a8e Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 09:53:00 +0000 Subject: [PATCH 13/17] PR tweaks: set earlier on and use 'as json' for compat --- synapse/storage/appservice.py | 18 +++++++----------- synapse/storage/registration.py | 2 +- synapse/storage/stream.py | 8 ++++---- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index d0632d55d..c6ca2ab04 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -219,20 +219,17 @@ class ApplicationServiceStore(SQLBaseStore): # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list - matching_room_id_list = [ + matching_room_list = set([ r.room_id for r in room_entries if service.is_interested_in_room(r.room_id) - ] + ]) # resolve room IDs for matching room alias regex. room_alias_mappings = yield self.get_all_associations() - matching_alias_list = [ + matching_room_list |= set([ r.room_id for r in room_alias_mappings if service.is_interested_in_alias(r.room_alias) - ] - room_ids_matching_alias_or_id = set( - matching_room_id_list + matching_alias_list - ) + ]) # get all rooms for every user for this AS. This is scoped to users on # this HS only. @@ -241,11 +238,10 @@ class ApplicationServiceStore(SQLBaseStore): u["name"] for u in user_list if service.is_interested_in_user(u["name"]) ] - rooms_for_user_matching_user_id = [] # RoomsForUser list + rooms_for_user_matching_user_id = set() # RoomsForUser list for user_id in user_list: rooms_for_user = yield self.get_rooms_for_user(user_id) - rooms_for_user_matching_user_id += rooms_for_user - rooms_for_user_matching_user_id = set(rooms_for_user_matching_user_id) + rooms_for_user_matching_user_id |= set(rooms_for_user) # make RoomsForUser tuples for room ids and aliases which are not in the # main rooms_for_user_list - e.g. they are rooms which do not have AS @@ -253,7 +249,7 @@ class ApplicationServiceStore(SQLBaseStore): known_room_ids = [r.room_id for r in rooms_for_user_matching_user_id] missing_rooms_for_user = [ RoomsForUser(r, service.sender, "join") for r in - room_ids_matching_alias_or_id if r not in known_room_ids + matching_room_list if r not in known_room_ids ] rooms_for_user_matching_user_id |= set(missing_rooms_for_user) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7aff3dbd3..9c92575c7 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,7 @@ class RegistrationStore(SQLBaseStore): ) def get_all_users(self): - query = ("SELECT users.name FROM users") + query = "SELECT users.name FROM users" return self._execute( self.cursor_to_dict, query diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09417bd14..bad427288 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,7 @@ from synapse.util.logutils import log_function from collections import namedtuple import logging -import simplejson +import simplejson as json logger = logging.getLogger(__name__) @@ -178,7 +178,7 @@ class StreamStore(SQLBaseStore): # interested in was invited to a room. We'll be passing this # through _get_events_txn later, so ignore the fact that this # may be a redacted event. - event_content = simplejson.loads(row["unrecognized_keys"]) + event_content = json.loads(row["unrecognized_keys"]) if (service.is_interested_in_user( event_content.get("state_key"))): return True @@ -202,7 +202,7 @@ class StreamStore(SQLBaseStore): self._set_before_and_after(ret, rows) if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) + key = "s%d" % max(r["stream_ordering"] for r in rows) else: # Assume we didn't get anything because there was nothing to @@ -271,7 +271,7 @@ class StreamStore(SQLBaseStore): self._set_before_and_after(ret, rows) if rows: - key = "s%d" % max([r["stream_ordering"] for r in rows]) + key = "s%d" % max(r["stream_ordering"] for r in rows) else: # Assume we didn't get anything because there was nothing to From 3d73383d185b41b9986366da8123255e3a8ce1e0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 10:16:24 +0000 Subject: [PATCH 14/17] Modify _simple_select_list to allow an empty WHERE clause. Use it for get_all_rooms and get_all_users. --- synapse/storage/_base.py | 22 +++++++++++++++------- synapse/storage/appservice.py | 4 ++-- synapse/storage/registration.py | 7 ++----- synapse/storage/room.py | 5 ++--- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c98dd36ae..3725c9795 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -450,7 +450,8 @@ class SQLBaseStore(object): Args: table : string giving the table name - keyvalues : dict of column names and values to select the rows with + keyvalues : dict of column names and values to select the rows with, + or None to not apply a WHERE clause. retcols : list of strings giving the names of the columns to return """ return self.runInteraction( @@ -469,13 +470,20 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the rows with retcols : list of strings giving the names of the columns to return """ - sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k, ) for k in keyvalues) - ) + if keyvalues: + sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k, ) for k in keyvalues) + ) + txn.execute(sql, keyvalues.values()) + else: + sql = "SELECT %s FROM %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table + ) + txn.execute(sql) - txn.execute(sql, keyvalues.values()) return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c6ca2ab04..0e3eab942 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -220,8 +220,8 @@ class ApplicationServiceStore(SQLBaseStore): # get all rooms matching the room ID regex. room_entries = yield self.get_all_rooms() # RoomEntry list matching_room_list = set([ - r.room_id for r in room_entries if - service.is_interested_in_room(r.room_id) + r["room_id"] for r in room_entries if + service.is_interested_in_room(r["room_id"]) ]) # resolve room IDs for matching room alias regex. diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9c92575c7..54cd15bc0 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,11 +93,8 @@ class RegistrationStore(SQLBaseStore): ) def get_all_users(self): - query = "SELECT users.name FROM users" - return self._execute( - self.cursor_to_dict, - query - ) + return self._simple_select_list( + table="users", keyvalues=None, retcols=["name"]) def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 3a6469340..6bd0b22ae 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -77,9 +77,8 @@ class RoomStore(SQLBaseStore): Returns: A list of namedtuples containing the room information. """ - query = RoomsTable.select_statement() - return self._execute( - RoomsTable.decode_results, query, + return self._simple_select_list( + table="rooms", keyvalues=None, retcols=["room_id"] ) @defer.inlineCallbacks From b216b3689248094989168c340b60f500c93772a7 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 10:41:35 +0000 Subject: [PATCH 15/17] JOIN state_events rather than parsing unrecognized_keys to pull out member state_keys --- synapse/storage/appservice.py | 2 +- synapse/storage/stream.py | 14 ++++---------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 0e3eab942..3a267d044 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,7 +218,7 @@ class ApplicationServiceStore(SQLBaseStore): # less obvious. # get all rooms matching the room ID regex. - room_entries = yield self.get_all_rooms() # RoomEntry list + room_entries = yield self.get_all_rooms() matching_room_list = set([ r["room_id"] for r in room_entries if service.is_interested_in_room(r["room_id"]) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index bad427288..865cb13e9 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -43,7 +43,6 @@ from synapse.util.logutils import log_function from collections import namedtuple import logging -import simplejson as json logger = logging.getLogger(__name__) @@ -161,8 +160,9 @@ class StreamStore(SQLBaseStore): # select all the events between from/to with a sensible limit sql = ( - "SELECT e.event_id, e.room_id, e.type, e.unrecognized_keys, " - "e.stream_ordering FROM events AS e " + "SELECT e.event_id, e.room_id, e.type, s.state_key, " + "e.stream_ordering FROM events AS e LEFT JOIN state_events as s ON " + "e.event_id = s.event_id " "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? " "ORDER BY stream_ordering ASC LIMIT %(limit)d " ) % { @@ -174,13 +174,7 @@ class StreamStore(SQLBaseStore): return True if row["type"] == EventTypes.Member: - # load up the content to inspect if some user the AS is - # interested in was invited to a room. We'll be passing this - # through _get_events_txn later, so ignore the fact that this - # may be a redacted event. - event_content = json.loads(row["unrecognized_keys"]) - if (service.is_interested_in_user( - event_content.get("state_key"))): + if service.is_interested_in_user(row.get("state_key")): return True return False From 377ae369c1275fabdac46fa00c0b2ba238467435 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 11:20:51 +0000 Subject: [PATCH 16/17] Wrap all of get_app_service_rooms in a txn. --- synapse/storage/appservice.py | 38 ++++++++++++++++++--------- synapse/storage/directory.py | 21 --------------- synapse/storage/registration.py | 4 --- synapse/storage/room.py | 10 ------- synapse/storage/roommember.py | 36 ++++++++++++++------------ synapse/storage/stream.py | 46 ++++++++++++++++----------------- 6 files changed, 67 insertions(+), 88 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 3a267d044..97481d113 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -15,6 +15,7 @@ import logging from twisted.internet import defer +from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService from synapse.storage.roommember import RoomsForUser @@ -197,7 +198,6 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. - @defer.inlineCallbacks def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -212,35 +212,49 @@ class ApplicationServiceStore(SQLBaseStore): Returns: A list of RoomsForUser. """ - # FIXME: This is assuming that this store has methods from - # RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is - # a bad assumption to make as it makes testing trickier and coupling - # less obvious. + return self.runInteraction( + "get_app_service_rooms", + self._get_app_service_rooms_txn, + service, + ) + def _get_app_service_rooms_txn(self, txn, service): # get all rooms matching the room ID regex. - room_entries = yield self.get_all_rooms() + room_entries = self._simple_select_list_txn( + txn=txn, table="rooms", keyvalues=None, retcols=["room_id"] + ) matching_room_list = set([ r["room_id"] for r in room_entries if service.is_interested_in_room(r["room_id"]) ]) # resolve room IDs for matching room alias regex. - room_alias_mappings = yield self.get_all_associations() + room_alias_mappings = self._simple_select_list_txn( + txn=txn, table="room_aliases", keyvalues=None, + retcols=["room_id", "room_alias"] + ) matching_room_list |= set([ - r.room_id for r in room_alias_mappings if - service.is_interested_in_alias(r.room_alias) + r["room_id"] for r in room_alias_mappings if + service.is_interested_in_alias(r["room_alias"]) ]) # get all rooms for every user for this AS. This is scoped to users on # this HS only. - user_list = yield self.get_all_users() + user_list = self._simple_select_list_txn( + txn=txn, table="users", keyvalues=None, retcols=["name"] + ) user_list = [ u["name"] for u in user_list if service.is_interested_in_user(u["name"]) ] rooms_for_user_matching_user_id = set() # RoomsForUser list for user_id in user_list: - rooms_for_user = yield self.get_rooms_for_user(user_id) + # FIXME: This assumes this store is linked with RoomMemberStore :( + rooms_for_user = self._get_rooms_for_user_where_membership_is_txn( + txn=txn, + user_id=user_id, + membership_list=[Membership.JOIN] + ) rooms_for_user_matching_user_id |= set(rooms_for_user) # make RoomsForUser tuples for room ids and aliases which are not in the @@ -253,7 +267,7 @@ class ApplicationServiceStore(SQLBaseStore): ] rooms_for_user_matching_user_id |= set(missing_rooms_for_user) - defer.returnValue(rooms_for_user_matching_user_id) + return rooms_for_user_matching_user_id @defer.inlineCallbacks def _populate_cache(self): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e391239a3..68b7d5969 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -134,27 +134,6 @@ class DirectoryStore(SQLBaseStore): return room_id - @defer.inlineCallbacks - def get_all_associations(self): - """Retrieve the entire list of room alias -> room ID pairings. - - Returns: - A list of RoomAliasMappings. - """ - results = yield self._execute_and_decode( - "SELECT room_id, room_alias FROM room_aliases" - ) - - # TODO(kegan): It feels wrong to be specifying no servers here, but - # equally this function isn't required to obtain all servers so - # retrieving them "just for the sake of it" also seems wrong, but we - # want to conform to passing Objects around and not dicts.. - defer.returnValue([ - RoomAliasMapping( - room_id=r["room_id"], room_alias=r["room_alias"], servers="" - ) for r in results - ]) - def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 54cd15bc0..029b07cc6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -92,10 +92,6 @@ class RegistrationStore(SQLBaseStore): query, user_id ) - def get_all_users(self): - return self._simple_select_list( - table="users", keyvalues=None, retcols=["name"]) - def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 6bd0b22ae..750b17a45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -71,16 +71,6 @@ class RoomStore(SQLBaseStore): RoomsTable.decode_single_result, query, room_id, ) - def get_all_rooms(self): - """Retrieve all the rooms. - - Returns: - A list of namedtuples containing the room information. - """ - return self._simple_select_list( - table="rooms", keyvalues=None, retcols=["room_id"] - ) - @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 58aa376c2..3d0172d09 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore): if not membership_list: return defer.succeed(None) + return self.runInteraction( + "get_rooms_for_user_where_membership_is", + self._get_rooms_for_user_where_membership_is_txn, + user_id, membership_list + ) + + def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) @@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - def f(txn): - sql = ( - "SELECT m.room_id, m.sender, m.membership" - " FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE %s" - ) % (where_clause,) + sql = ( + "SELECT m.room_id, m.sender, m.membership" + " FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %s" + ) % (where_clause,) - txn.execute(sql, args) - return [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] - - return self.runInteraction( - "get_rooms_for_user_where_membership_is", - f - ) + txn.execute(sql, args) + return [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] def get_joined_hosts_for_room(self, room_id): return self._simple_select_onecol( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 865cb13e9..09bc52221 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -146,18 +146,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - # Logic: - # - We want ALL events which match the AS room_id regex - # - We want ALL events which match the rooms represented by the AS - # room_alias regex - # - We want ALL events for rooms that AS users have joined. - # This is currently supported via get_app_service_rooms (which is used - # for the Notifier listener rooms). We can't reasonably make a SQL - # query for these room IDs, so we'll pull all the events between from/to - # and filter in python. - rooms_for_as = yield self.get_app_service_rooms(service) - room_ids_for_as = [r.room_id for r in rooms_for_as] - # select all the events between from/to with a sensible limit sql = ( "SELECT e.event_id, e.room_id, e.type, s.state_key, " @@ -169,20 +157,32 @@ class StreamStore(SQLBaseStore): "limit": limit } - def app_service_interested(row): - if row["room_id"] in room_ids_for_as: - return True - - if row["type"] == EventTypes.Member: - if service.is_interested_in_user(row.get("state_key")): - return True - return False - def f(txn): + # pull out all the events between the tokens txn.execute(sql, (from_id.stream, to_id.stream,)) - rows = self.cursor_to_dict(txn) + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is + # used for the Notifier listener rooms). We can't reasonably make a + # SQL query for these room IDs, so we'll pull all the events between + # from/to and filter in python. + rooms_for_as = self._get_app_service_rooms_txn(txn, service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + if service.is_interested_in_user(row.get("state_key")): + return True + return False + ret = self._get_events_txn( txn, # apply the filter on the room id list @@ -197,7 +197,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get. @@ -266,7 +265,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get. From cb97ea3ec236c23c745e59c3a857503dd8dc3410 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 11:23:46 +0000 Subject: [PATCH 17/17] PEP8 --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3d0172d09..65ffb4627 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -187,7 +187,7 @@ class RoomMemberStore(SQLBaseStore): ) def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, - membership_list): + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), )