From 377ae369c1275fabdac46fa00c0b2ba238467435 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 2 Mar 2015 11:20:51 +0000 Subject: [PATCH] Wrap all of get_app_service_rooms in a txn. --- synapse/storage/appservice.py | 38 ++++++++++++++++++--------- synapse/storage/directory.py | 21 --------------- synapse/storage/registration.py | 4 --- synapse/storage/room.py | 10 ------- synapse/storage/roommember.py | 36 ++++++++++++++------------ synapse/storage/stream.py | 46 ++++++++++++++++----------------- 6 files changed, 67 insertions(+), 88 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 3a267d044..97481d113 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -15,6 +15,7 @@ import logging from twisted.internet import defer +from synapse.api.constants import Membership from synapse.api.errors import StoreError from synapse.appservice import ApplicationService from synapse.storage.roommember import RoomsForUser @@ -197,7 +198,6 @@ class ApplicationServiceStore(SQLBaseStore): # TODO: The from_cache=False impl # TODO: This should be JOINed with the application_services_regex table. - @defer.inlineCallbacks def get_app_service_rooms(self, service): """Get a list of RoomsForUser for this application service. @@ -212,35 +212,49 @@ class ApplicationServiceStore(SQLBaseStore): Returns: A list of RoomsForUser. """ - # FIXME: This is assuming that this store has methods from - # RoomStore, DirectoryStore, RegistrationStore, RoomMemberStore which is - # a bad assumption to make as it makes testing trickier and coupling - # less obvious. + return self.runInteraction( + "get_app_service_rooms", + self._get_app_service_rooms_txn, + service, + ) + def _get_app_service_rooms_txn(self, txn, service): # get all rooms matching the room ID regex. - room_entries = yield self.get_all_rooms() + room_entries = self._simple_select_list_txn( + txn=txn, table="rooms", keyvalues=None, retcols=["room_id"] + ) matching_room_list = set([ r["room_id"] for r in room_entries if service.is_interested_in_room(r["room_id"]) ]) # resolve room IDs for matching room alias regex. - room_alias_mappings = yield self.get_all_associations() + room_alias_mappings = self._simple_select_list_txn( + txn=txn, table="room_aliases", keyvalues=None, + retcols=["room_id", "room_alias"] + ) matching_room_list |= set([ - r.room_id for r in room_alias_mappings if - service.is_interested_in_alias(r.room_alias) + r["room_id"] for r in room_alias_mappings if + service.is_interested_in_alias(r["room_alias"]) ]) # get all rooms for every user for this AS. This is scoped to users on # this HS only. - user_list = yield self.get_all_users() + user_list = self._simple_select_list_txn( + txn=txn, table="users", keyvalues=None, retcols=["name"] + ) user_list = [ u["name"] for u in user_list if service.is_interested_in_user(u["name"]) ] rooms_for_user_matching_user_id = set() # RoomsForUser list for user_id in user_list: - rooms_for_user = yield self.get_rooms_for_user(user_id) + # FIXME: This assumes this store is linked with RoomMemberStore :( + rooms_for_user = self._get_rooms_for_user_where_membership_is_txn( + txn=txn, + user_id=user_id, + membership_list=[Membership.JOIN] + ) rooms_for_user_matching_user_id |= set(rooms_for_user) # make RoomsForUser tuples for room ids and aliases which are not in the @@ -253,7 +267,7 @@ class ApplicationServiceStore(SQLBaseStore): ] rooms_for_user_matching_user_id |= set(missing_rooms_for_user) - defer.returnValue(rooms_for_user_matching_user_id) + return rooms_for_user_matching_user_id @defer.inlineCallbacks def _populate_cache(self): diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index e391239a3..68b7d5969 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -134,27 +134,6 @@ class DirectoryStore(SQLBaseStore): return room_id - @defer.inlineCallbacks - def get_all_associations(self): - """Retrieve the entire list of room alias -> room ID pairings. - - Returns: - A list of RoomAliasMappings. - """ - results = yield self._execute_and_decode( - "SELECT room_id, room_alias FROM room_aliases" - ) - - # TODO(kegan): It feels wrong to be specifying no servers here, but - # equally this function isn't required to obtain all servers so - # retrieving them "just for the sake of it" also seems wrong, but we - # want to conform to passing Objects around and not dicts.. - defer.returnValue([ - RoomAliasMapping( - room_id=r["room_id"], room_alias=r["room_alias"], servers="" - ) for r in results - ]) - def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 54cd15bc0..029b07cc6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -92,10 +92,6 @@ class RegistrationStore(SQLBaseStore): query, user_id ) - def get_all_users(self): - return self._simple_select_list( - table="users", keyvalues=None, retcols=["name"]) - def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 6bd0b22ae..750b17a45 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -71,16 +71,6 @@ class RoomStore(SQLBaseStore): RoomsTable.decode_single_result, query, room_id, ) - def get_all_rooms(self): - """Retrieve all the rooms. - - Returns: - A list of namedtuples containing the room information. - """ - return self._simple_select_list( - table="rooms", keyvalues=None, retcols=["room_id"] - ) - @defer.inlineCallbacks def get_rooms(self, is_public): """Retrieve a list of all public rooms. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 58aa376c2..3d0172d09 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -180,6 +180,14 @@ class RoomMemberStore(SQLBaseStore): if not membership_list: return defer.succeed(None) + return self.runInteraction( + "get_rooms_for_user_where_membership_is", + self._get_rooms_for_user_where_membership_is_txn, + user_id, membership_list + ) + + def _get_rooms_for_user_where_membership_is_txn(self, txn, user_id, + membership_list): where_clause = "user_id = ? AND (%s)" % ( " OR ".join(["membership = ?" for _ in membership_list]), ) @@ -187,24 +195,18 @@ class RoomMemberStore(SQLBaseStore): args = [user_id] args.extend(membership_list) - def f(txn): - sql = ( - "SELECT m.room_id, m.sender, m.membership" - " FROM room_memberships as m" - " INNER JOIN current_state_events as c" - " ON m.event_id = c.event_id" - " WHERE %s" - ) % (where_clause,) + sql = ( + "SELECT m.room_id, m.sender, m.membership" + " FROM room_memberships as m" + " INNER JOIN current_state_events as c" + " ON m.event_id = c.event_id" + " WHERE %s" + ) % (where_clause,) - txn.execute(sql, args) - return [ - RoomsForUser(**r) for r in self.cursor_to_dict(txn) - ] - - return self.runInteraction( - "get_rooms_for_user_where_membership_is", - f - ) + txn.execute(sql, args) + return [ + RoomsForUser(**r) for r in self.cursor_to_dict(txn) + ] def get_joined_hosts_for_room(self, room_id): return self._simple_select_onecol( diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 865cb13e9..09bc52221 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -146,18 +146,6 @@ class StreamStore(SQLBaseStore): defer.returnValue(([], to_key)) return - # Logic: - # - We want ALL events which match the AS room_id regex - # - We want ALL events which match the rooms represented by the AS - # room_alias regex - # - We want ALL events for rooms that AS users have joined. - # This is currently supported via get_app_service_rooms (which is used - # for the Notifier listener rooms). We can't reasonably make a SQL - # query for these room IDs, so we'll pull all the events between from/to - # and filter in python. - rooms_for_as = yield self.get_app_service_rooms(service) - room_ids_for_as = [r.room_id for r in rooms_for_as] - # select all the events between from/to with a sensible limit sql = ( "SELECT e.event_id, e.room_id, e.type, s.state_key, " @@ -169,20 +157,32 @@ class StreamStore(SQLBaseStore): "limit": limit } - def app_service_interested(row): - if row["room_id"] in room_ids_for_as: - return True - - if row["type"] == EventTypes.Member: - if service.is_interested_in_user(row.get("state_key")): - return True - return False - def f(txn): + # pull out all the events between the tokens txn.execute(sql, (from_id.stream, to_id.stream,)) - rows = self.cursor_to_dict(txn) + # Logic: + # - We want ALL events which match the AS room_id regex + # - We want ALL events which match the rooms represented by the AS + # room_alias regex + # - We want ALL events for rooms that AS users have joined. + # This is currently supported via get_app_service_rooms (which is + # used for the Notifier listener rooms). We can't reasonably make a + # SQL query for these room IDs, so we'll pull all the events between + # from/to and filter in python. + rooms_for_as = self._get_app_service_rooms_txn(txn, service) + room_ids_for_as = [r.room_id for r in rooms_for_as] + + def app_service_interested(row): + if row["room_id"] in room_ids_for_as: + return True + + if row["type"] == EventTypes.Member: + if service.is_interested_in_user(row.get("state_key")): + return True + return False + ret = self._get_events_txn( txn, # apply the filter on the room id list @@ -197,7 +197,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get. @@ -266,7 +265,6 @@ class StreamStore(SQLBaseStore): if rows: key = "s%d" % max(r["stream_ordering"] for r in rows) - else: # Assume we didn't get anything because there was nothing to # get.