From 755def8083ec887feabcb45b3bc111db4aef20ab Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 13:46:47 +0100 Subject: [PATCH 01/24] Add more doc string, reduce C+P boilerplate for getting room list --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a01020e20..ce9dd6439 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -297,7 +297,26 @@ class PresenceHandler(BaseHandler): self.changed_presencelike_data(user, {"last_active": now}) + def get_joined_rooms_for_user(self, user): + """Get the list of rooms a user is joined to. + + Args: + user(UserID): The user. + Returns: + A Deferred of a list of room id strings. + """ + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_joined_rooms_for_user(user) + def changed_presencelike_data(self, user, state): + """Updates the presence state of a local user. + + Args: + user(UserID): The user being updated. + state(dict): The new presence state for the user. + Returns: + A Deferred + """ statuscache = self._get_or_make_usercache(user) self._user_cachemap_latest_serial += 1 @@ -544,8 +563,7 @@ class PresenceHandler(BaseHandler): # Also include people in all my rooms - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if state is None: state = yield self.store.get_presence_state(user.localpart) @@ -745,8 +763,7 @@ class PresenceHandler(BaseHandler): # and also user is informed of server-forced pushes localusers.add(user) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if not localusers and not room_ids: defer.returnValue(None) @@ -791,8 +808,7 @@ class PresenceHandler(BaseHandler): " | %d interested local observers %r", len(observers), observers ) - rm_handler = self.homeserver.get_handlers().room_member_handler - room_ids = yield rm_handler.get_joined_rooms_for_user(user) + room_ids = yield self.get_joined_rooms_for_user(user) if room_ids: logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) From e1150cac4bceab88097ea2421323f3b3852028e3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 15:46:37 +0100 Subject: [PATCH 02/24] Move updating the serial and state of the presence cache into a single function --- synapse/handlers/presence.py | 60 ++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0c3290b30..d129d4ca8 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -308,6 +308,11 @@ class PresenceHandler(BaseHandler): rm_handler = self.homeserver.get_handlers().room_member_handler return rm_handler.get_joined_rooms_for_user(user) + def get_joined_users_for_room_id(self, room_id): + rm_handler = self.homeserver.get_handlers().room_member_handler + return rm_handler.get_room_members(room_id) + + @defer.inlineCallbacks def changed_presencelike_data(self, user, state): """Updates the presence state of a local user. @@ -317,12 +322,9 @@ class PresenceHandler(BaseHandler): Returns: A Deferred """ - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) - - return self.push_presence(user, statuscache=statuscache) + statuscache = yield self.update_presence_cache(user, state) + yield self.push_presence(user, statuscache=statuscache) @log_function def started_user_eventstream(self, user): @@ -345,13 +347,12 @@ class PresenceHandler(BaseHandler): room_id(str): The room id the user joined. """ if self.hs.is_mine(user): - statuscache = self._get_or_make_usercache(user) - # No actual update but we need to bump the serial anyway for the # event source self._user_cachemap_latest_serial += 1 - statuscache.update({}, serial=self._user_cachemap_latest_serial) - + statuscache = yield self.update_presence_cache( + user, room_ids=[room_id] + ) self.push_update_to_local_and_remote( observed_user=user, room_ids=[room_id], @@ -359,16 +360,17 @@ class PresenceHandler(BaseHandler): ) # We also want to tell them about current presence of people. - rm_handler = self.homeserver.get_handlers().room_member_handler - curr_users = yield rm_handler.get_room_members(room_id) + curr_users = yield self.get_joined_users_for_room_id(room_id) for local_user in [c for c in curr_users if self.hs.is_mine(c)]: - statuscache = self._get_or_offline_usercache(local_user) - statuscache.update({}, serial=self._user_cachemap_latest_serial) + statuscache = yield self.update_presence_cache( + local_user, room_ids=[room_id], add_to_cache=False + ) + self.push_update_to_local_and_remote( observed_user=local_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(local_user), + statuscache=statuscache, ) @defer.inlineCallbacks @@ -829,10 +831,8 @@ class PresenceHandler(BaseHandler): self.clock.time_msec() - state.pop("last_active_ago") ) - statuscache = self._get_or_make_usercache(user) - self._user_cachemap_latest_serial += 1 - statuscache.update(state, serial=self._user_cachemap_latest_serial) + yield self.update_presence_cache(user, state, room_ids=room_ids) if not observers and not room_ids: logger.debug(" | no interested observers or room IDs") @@ -890,6 +890,32 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) + @defer.inlineCallbacks + def update_presence_cache(self, user, state={}, room_ids=None, + add_to_cache=True): + """Update the presence cache for a user with a new state and bump the + serial to the latest value. + + Args: + user(UserID): The user being updated + state(dict): The presence state being updated + room_ids(None or list of str): A list of room_ids to update. If + room_ids is None then fetch the list of room_ids the user is + joined to. + add_to_cache: Whether to add an entry to the presence cache if the + user isn't already in the cache. + Returns: + A Deferred UserPresenceCache for the user being updated. + """ + if room_ids is None: + room_ids = yield self.get_joined_rooms_for_user(user) + if add_to_cache: + statuscache = self._get_or_make_usercache(user) + else: + statuscache = self._get_or_offline_usercache(user) + statuscache.update(state, serial=self._user_cachemap_latest_serial) + defer.returnValue(statuscache) + @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], From 591c4bf223a4a8698f51ba258984e769f593e32b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 16:21:51 +0100 Subject: [PATCH 03/24] Cache the most recent serial for each room --- synapse/handlers/presence.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d129d4ca8..aa1d73f2f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 + # map room_ids to the latest presence serial for a member of that + # room + self._room_serials = {} + metrics.register_callback( "userCachemap:size", lambda: len(self._user_cachemap), @@ -909,6 +913,9 @@ class PresenceHandler(BaseHandler): """ if room_ids is None: room_ids = yield self.get_joined_rooms_for_user(user) + + for room_id in room_ids: + self._room_serials[room_id] = self._user_cachemap_latest_serial if add_to_cache: statuscache = self._get_or_make_usercache(user) else: @@ -1069,8 +1076,6 @@ class PresenceEventSource(object): def get_new_events_for_user(self, user, from_key, limit): from_key = int(from_key) - observer_user = user - presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap @@ -1079,17 +1084,28 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is None: + presence_list = () + user_ids_to_check = set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] > from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] if cached.serial <= from_key or cached.serial > max_serial: continue - if not (yield self.is_visible(observer_user, observed_user)): - continue - latest_serial = max(cached.serial, latest_serial) updates.append(cached.make_event(user=observed_user, clock=clock)) From e4c65b338d44fadc058cbd8e4cd79ae1601d3526 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 18 May 2015 18:21:06 +0100 Subject: [PATCH 04/24] Speed up the get_pagination_rows as well --- synapse/handlers/presence.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index aa1d73f2f..6537a3738 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1154,14 +1154,28 @@ class PresenceEventSource(object): presence = self.hs.get_handlers().presence_handler cachemap = presence._user_cachemap + user_ids_to_check = {user} + presence_list = yield presence.store.get_presence_list( + user.localpart, accepted=True + ) + if presence_list is None: + presence_list = () + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) + room_ids = yield presence.get_joined_rooms_for_user(user) + for room_id in set(room_ids) & set(presence._room_serials): + if presence._room_serials[room_id] >= from_key: + joined = yield presence.get_joined_users_for_room_id(room_id) + user_ids_to_check |= set(joined) + updates = [] # TODO(paul): use a DeferredList ? How to limit concurrency. - for observed_user in cachemap.keys(): + for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue - if (yield self.is_visible(observer_user, observed_user)): - updates.append((observed_user, cachemap[observed_user])) + updates.append((observed_user, cachemap[observed_user])) # TODO(paul): limit From e01b825cc929e16b6a60be0688bbe6d8d9b3866e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 13:21:59 +0100 Subject: [PATCH 05/24] Clean up the presence_list checking logic a bit --- synapse/handlers/presence.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6537a3738..226d6a0f5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1084,14 +1084,14 @@ class PresenceEventSource(object): clock = self.clock latest_serial = 0 + user_ids_to_check = {user} presence_list = yield presence.store.get_presence_list( user.localpart, accepted=True ) - if presence_list is None: - presence_list = () - user_ids_to_check = set( - UserID.from_string(p["observed_user_id"]) for p in presence_list - ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) room_ids = yield presence.get_joined_rooms_for_user(user) for room_id in set(room_ids) & set(presence._room_serials): if presence._room_serials[room_id] > from_key: @@ -1142,8 +1142,6 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): # TODO (erikj): Does this make sense? Ordering? - observer_user = user - from_key = int(pagination_config.from_key) if pagination_config.to_key: @@ -1158,11 +1156,10 @@ class PresenceEventSource(object): presence_list = yield presence.store.get_presence_list( user.localpart, accepted=True ) - if presence_list is None: - presence_list = () - user_ids_to_check |= set( - UserID.from_string(p["observed_user_id"]) for p in presence_list - ) + if presence_list is not None: + user_ids_to_check |= set( + UserID.from_string(p["observed_user_id"]) for p in presence_list + ) room_ids = yield presence.get_joined_rooms_for_user(user) for room_id in set(room_ids) & set(presence._room_serials): if presence._room_serials[room_id] >= from_key: From 8eca5bd50abc9deb3cd428f3f6b3b8fbeb8bdee1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 13:22:18 +0100 Subject: [PATCH 06/24] Fix the presence tests --- tests/handlers/test_presence.py | 13 +++---------- tests/rest/client/v1/test_presence.py | 3 +++ 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index ee773797e..12cf5747a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): """ PRESENCE_LIST = { 'apple': [ "@banana:test", "@clementine:test" ], + 'banana': [ "@apple:test" ], } @defer.inlineCallbacks @@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote(self): - # TODO(paul): Gut-wrenching - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) @@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase): @defer.inlineCallbacks def test_recv_remote_offline(self): """ Various tests relating to SYN-261 """ - potato_set = self.handler._remote_recvmap.setdefault(self.u_potato, - set()) - potato_set.add(self.u_apple) - self.room_members = [self.u_banana, self.u_potato] + self.room_members = [self.u_apple, self.u_banana, self.u_potato] self.assertEquals(self.event_source.get_current_key(), 0) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 29c0038f0..8f3df9241 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -295,6 +295,9 @@ class PresenceEventStreamTestCase(unittest.TestCase): else: return [] hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user + hs.handlers.room_member_handler.get_room_members = ( + lambda r: self.room_members if r == "a-room" else [] + ) self.mock_datastore = hs.get_datastore() self.mock_datastore.get_app_service_by_token = Mock(return_value=None) From 7ae8afb7ef5a0fb3162339737682e9248980600d Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 20 May 2015 14:48:11 +0100 Subject: [PATCH 07/24] Removed unused 'is_visible' method --- synapse/handlers/presence.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 226d6a0f5..6c48b1d20 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1045,32 +1045,6 @@ class PresenceEventSource(object): self.hs = hs self.clock = hs.get_clock() - @defer.inlineCallbacks - def is_visible(self, observer_user, observed_user): - if observer_user == observed_user: - defer.returnValue(True) - - presence = self.hs.get_handlers().presence_handler - - if (yield presence.store.user_rooms_intersect( - [u.to_string() for u in observer_user, observed_user])): - defer.returnValue(True) - - if self.hs.is_mine(observed_user): - pushmap = presence._local_pushmap - - defer.returnValue( - observed_user.localpart in pushmap and - observer_user in pushmap[observed_user.localpart] - ) - else: - recvmap = presence._remote_recvmap - - defer.returnValue( - observed_user in recvmap and - observer_user in recvmap[observed_user] - ) - @defer.inlineCallbacks @log_function def get_new_events_for_user(self, user, from_key, limit): @@ -1099,7 +1073,6 @@ class PresenceEventSource(object): user_ids_to_check |= set(joined) updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. for observed_user in user_ids_to_check & set(cachemap): cached = cachemap[observed_user] From d61ce3f6707c3f13a21733e356766d0292815ebc Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 May 2015 11:13:19 +0100 Subject: [PATCH 08/24] Add a cache for get_current_state with state_key --- synapse/push/__init__.py | 4 ++++ synapse/storage/_base.py | 5 +++++ synapse/storage/events.py | 6 ++++++ synapse/storage/state.py | 25 ++++++++++++++++++++++++- synapse/util/lrucache.py | 8 +++++++- 5 files changed, 46 insertions(+), 2 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 5575c847f..812598784 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -287,9 +287,13 @@ class Pusher(object): if len(actions) == 0: logger.warn("Empty actions! Using default action.") actions = Pusher.DEFAULT_ACTIONS + if 'notify' not in actions and 'dont_notify' not in actions: logger.warn("Neither notify nor dont_notify in actions: adding default") actions.extend(Pusher.DEFAULT_ACTIONS) + + logger.info("FNARG: %r", actions) + if 'dont_notify' in actions: logger.debug( "%s for %s: dont_notify", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9e348590b..2210b3ddf 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -124,6 +124,11 @@ class Cache(object): self.sequence += 1 self.cache.pop(keyargs, None) + def invalidate_all(self): + self.check_thread() + self.sequence += 1 + self.cache.clear() + def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 38395c66a..52074b4cc 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -107,6 +107,8 @@ class EventsStore(SQLBaseStore): # We purposefully do this first since if we include a `current_state` # key, we *want* to update the `current_state_events` table if current_state: + txn.call_after(self.get_current_state_for_key.invalidate_all) + self._simple_delete_txn( txn, table="current_state_events", @@ -335,6 +337,10 @@ class EventsStore(SQLBaseStore): ) if is_new_state and not context.rejected: + txn.call_after( + self.get_current_state_for_key.invalidate, + event.room_id, event.type, event.state_key + ) self._simple_upsert_txn( txn, "current_state_events", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dbc0e49c1..6df735055 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from twisted.internet import defer @@ -130,6 +130,12 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): + if event_type and state_key is not None: + result = yield self.get_current_state_for_key( + room_id, event_type, state_key + ) + defer.returnValue(result) + def f(txn): sql = ( "SELECT event_id FROM current_state_events" @@ -153,6 +159,23 @@ class StateStore(SQLBaseStore): events = yield self.runInteraction("get_current_state", f) defer.returnValue(events) + @cached(num_args=3) + @defer.inlineCallbacks + def get_current_state_for_key(self, room_id, event_type, state_key): + def f(txn): + sql = ( + "SELECT event_id FROM current_state_events" + " WHERE room_id = ? AND type = ? AND state_key = ?" + ) + + args = (room_id, event_type, state_key) + txn.execute(sql, args) + results = txn.fetchall() + return [r[0] for r in results] + event_ids = yield self.runInteraction("get_current_state_for_key", f) + events = yield self._get_events(event_ids, get_prev_content=False) + defer.returnValue(events) + def _make_group_id(clock): return str(int(clock.time_msec())) + random_string(5) diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index 96163c90f..cacd7e45f 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -20,7 +20,6 @@ import threading class LruCache(object): """Least-recently-used cache.""" - # TODO(mjark) Add mutex for linked list for thread safety. def __init__(self, max_size): cache = {} list_root = [] @@ -105,6 +104,12 @@ class LruCache(object): else: return default + @synchronized + def cache_clear(): + list_root[NEXT] = list_root + list_root[PREV] = list_root + cache.clear() + @synchronized def cache_len(): return len(cache) @@ -120,6 +125,7 @@ class LruCache(object): self.pop = cache_pop self.len = cache_len self.contains = cache_contains + self.clear = cache_clear def __getitem__(self, key): result = self.get(key, self.sentinel) From 53447e9cd380262c16677cfddeda1d75aeebe38c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 May 2015 15:14:26 +0100 Subject: [PATCH 09/24] Add caches for things requested by the pushers --- synapse/handlers/room.py | 4 +--- synapse/push/__init__.py | 27 +++++++++++---------------- synapse/storage/_base.py | 1 + synapse/storage/events.py | 19 ++++++++++++------- synapse/storage/push_rule.py | 23 ++++++++++++++++------- synapse/storage/room.py | 3 ++- synapse/storage/roommember.py | 2 ++ 7 files changed, 45 insertions(+), 34 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed..3da08c147 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -531,9 +531,7 @@ class RoomListHandler(BaseHandler): chunk = yield self.store.get_rooms(is_public=True) results = yield defer.gatherResults( [ - self.store.get_users_in_room( - room_id=room["room_id"], - ) + self.store.get_users_in_room(room["room_id"]) for room in chunk ], consumeErrors=True, diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 812598784..d4b376913 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -84,25 +84,20 @@ class Pusher(object): rules = baserules.list_with_base_rules(rawrules, user) + room_id = ev['room_id'] + # get *our* member event for display name matching - member_events_for_room = yield self.store.get_current_state( - room_id=ev['room_id'], - event_type='m.room.member', - state_key=None - ) my_display_name = None - room_member_count = 0 - for mev in member_events_for_room: - if mev.content['membership'] != 'join': - continue + our_member_event = yield self.store.get_current_state( + room_id=room_id, + event_type='m.room.member', + state_key=self.user_name, + ) + if our_member_event: + my_display_name = our_member_event[0].content.get("displayname") - # This loop does two things: - # 1) Find our current display name - if mev.state_key == self.user_name and 'displayname' in mev.content: - my_display_name = mev.content['displayname'] - - # and 2) Get the number of people in that room - room_member_count += 1 + room_members = yield self.store.get_users_in_room(room_id) + room_member_count = len(room_members) for r in rules: if r['rule_id'] in enabled_map: diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2210b3ddf..c8c76e58f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -180,6 +180,7 @@ def cached(max_entries=1000, num_args=1, lru=False): defer.returnValue(ret) wrapped.invalidate = cache.invalidate + wrapped.invalidate_all = cache.invalidate_all wrapped.prefill = cache.prefill return wrapped diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 52074b4cc..1304219e8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -108,6 +108,10 @@ class EventsStore(SQLBaseStore): # key, we *want* to update the `current_state_events` table if current_state: txn.call_after(self.get_current_state_for_key.invalidate_all) + txn.call_after(self.get_rooms_for_user.invalidate_all) + txn.call_after(self.get_users_in_room.invalidate, event.room_id) + txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) + txn.call_after(self.get_room_name_and_aliases, event.room_id) self._simple_delete_txn( txn, @@ -116,13 +120,6 @@ class EventsStore(SQLBaseStore): ) for s in current_state: - if s.type == EventTypes.Member: - txn.call_after( - self.get_rooms_for_user.invalidate, s.state_key - ) - txn.call_after( - self.get_joined_hosts_for_room.invalidate, s.room_id - ) self._simple_insert_txn( txn, "current_state_events", @@ -341,6 +338,14 @@ class EventsStore(SQLBaseStore): self.get_current_state_for_key.invalidate, event.room_id, event.type, event.state_key ) + + if (event.type == EventTypes.Name + or event.type == EventTypes.Aliases): + txn.call_after( + self.get_room_name_and_aliases.invalidate, + event.room_id + ) + self._simple_upsert_txn( txn, "current_state_events", diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 34805e276..fba5c7e40 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -13,9 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections - -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore, cached from twisted.internet import defer import logging @@ -41,6 +39,7 @@ class PushRuleStore(SQLBaseStore): defer.returnValue(rows) + @cached() @defer.inlineCallbacks def get_push_rules_enabled_for_user(self, user_name): results = yield self._simple_select_list( @@ -151,6 +150,10 @@ class PushRuleStore(SQLBaseStore): txn.execute(sql, (user_name, priority_class, new_rule_priority)) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, user_name + ) + self._simple_insert_txn( txn, table=PushRuleTable.table_name, @@ -179,6 +182,10 @@ class PushRuleStore(SQLBaseStore): new_rule['priority_class'] = priority_class new_rule['priority'] = new_prio + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, user_name + ) + self._simple_insert_txn( txn, table=PushRuleTable.table_name, @@ -201,6 +208,7 @@ class PushRuleStore(SQLBaseStore): {'user_name': user_name, 'rule_id': rule_id}, desc="delete_push_rule", ) + self.get_push_rules_enabled_for_user.invalidate(user_name) @defer.inlineCallbacks def set_push_rule_enabled(self, user_name, rule_id, enabled): @@ -210,6 +218,9 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, desc="set_push_rule_enabled", ) + txn.call_after( + self.get_push_rules_enabled_for_user.invalidate, user_name + ) class RuleNotFoundException(Exception): @@ -220,7 +231,7 @@ class InconsistentRuleException(Exception): pass -class PushRuleTable(Table): +class PushRuleTable(object): table_name = "push_rules" fields = [ @@ -233,10 +244,8 @@ class PushRuleTable(Table): "actions", ] - EntryType = collections.namedtuple("PushRuleEntry", fields) - -class PushRuleEnableTable(Table): +class PushRuleEnableTable(object): table_name = "push_rules_enable" fields = [ diff --git a/synapse/storage/room.py b/synapse/storage/room.py index f95637763..4612a8aa8 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached import collections import logging @@ -186,6 +186,7 @@ class RoomStore(SQLBaseStore): } ) + @cached() @defer.inlineCallbacks def get_room_name_and_aliases(self, room_id): def f(txn): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 839c74f63..3691eade0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -66,6 +66,7 @@ class RoomMemberStore(SQLBaseStore): txn.call_after(self.get_rooms_for_user.invalidate, target_user_id) txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id) + txn.call_after(self.get_users_in_room.invalidate, event.room_id) def get_room_member(self, user_id, room_id): """Retrieve the current state of a room member. @@ -87,6 +88,7 @@ class RoomMemberStore(SQLBaseStore): return self.runInteraction("get_room_member", f) + @cached() def get_users_in_room(self, room_id): def f(txn): From 2043527b9bbea019b8a4ffddcc9e82438d12b1d5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 21 May 2015 16:53:03 +0100 Subject: [PATCH 10/24] Don't try to use a txn when not in one, remove spurious debug logging --- synapse/push/__init__.py | 2 -- synapse/storage/push_rule.py | 4 +--- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index d4b376913..e3dd4ce76 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -287,8 +287,6 @@ class Pusher(object): logger.warn("Neither notify nor dont_notify in actions: adding default") actions.extend(Pusher.DEFAULT_ACTIONS) - logger.info("FNARG: %r", actions) - if 'dont_notify' in actions: logger.debug( "%s for %s: dont_notify", diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index fba5c7e40..88ee21b08 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -218,9 +218,7 @@ class PushRuleStore(SQLBaseStore): {'enabled': 1 if enabled else 0}, desc="set_push_rule_enabled", ) - txn.call_after( - self.get_push_rules_enabled_for_user.invalidate, user_name - ) + self.get_push_rules_enabled_for_user.invalidate(user_name) class RuleNotFoundException(Exception): From f8c2cd129def8a096113f73830f68e53acb01ccf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 17:03:30 +0100 Subject: [PATCH 11/24] Bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index 041e2151b..68f86138a 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.9.0-r4" +__version__ = "0.9.0-r5" From ee490988439d46a632aaba13b8341b7b6fc660f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 May 2015 17:36:52 +0100 Subject: [PATCH 12/24] Changelog --- CHANGES.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 65970a89c..e1420d7a3 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,9 @@ +Changes in synapse v0.9.0-r5 (2015-05-21) +========================================= + +* Add more database caches to reduce amount of work done for each pusher. This + radically reduces CPU usage when multiple pushers are set up in the same room. + Changes in synapse v0.9.0 (2015-05-07) ====================================== From f43544eecc362943f9d64a004d40984739a68d98 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 11:01:28 +0100 Subject: [PATCH 13/24] Make the appservice use 'users_in_room' rather than get_room_members since it is cached --- synapse/appservice/__init__.py | 6 +++--- synapse/handlers/appservice.py | 5 +---- tests/appservice/test_appservice.py | 15 +++------------ 3 files changed, 7 insertions(+), 19 deletions(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 63a18b802..e3ca45de8 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -148,8 +148,8 @@ class ApplicationService(object): and self.is_interested_in_user(event.state_key)): return True # check joined member events - for member in member_list: - if self.is_interested_in_user(member.state_key): + for user_id in member_list: + if self.is_interested_in_user(user_id): return True return False @@ -173,7 +173,7 @@ class ApplicationService(object): restrict_to(str): The namespace to restrict regex tests to. aliases_for_event(list): A list of all the known room aliases for this event. - member_list(list): A list of all joined room members in this room. + member_list(list): A list of all joined user_ids in this room. Returns: bool: True if this service would like to know about this event. """ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 355ab317d..05735137d 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -147,10 +147,7 @@ class ApplicationServicesHandler(object): ) # We need to know the members associated with this event.room_id, # if any. - member_list = yield self.store.get_room_members( - room_id=event.room_id, - membership=Membership.JOIN - ) + member_list = yield self.store.get_users_in_room(event.room_id) services = yield self.store.get_app_services() interested_list = [ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 62149d690..8ce8dc0a8 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -217,18 +217,9 @@ class ApplicationServiceTestCase(unittest.TestCase): _regex("@irc_.*") ) join_list = [ - Mock( - type="m.room.member", room_id="!foo:bar", sender="@alice:here", - state_key="@alice:here" - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@irc_fo:here", - state_key="@irc_fo:here" # AS user - ), - Mock( - type="m.room.member", room_id="!foo:bar", sender="@bob:here", - state_key="@bob:here" - ) + "@alice:here", + "@irc_fo:here", # AS user + "@bob:here", ] self.event.sender = "@xmpp_foobar:matrix.org" From 254aa3c986256de0c71d36078682c7904e27145b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 11:59:48 +0100 Subject: [PATCH 14/24] Revert register_new_matrix_user to use v1 api --- scripts/register_new_matrix_user | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user index 0ca83795a..4a520bdb5 100755 --- a/scripts/register_new_matrix_user +++ b/scripts/register_new_matrix_user @@ -33,9 +33,10 @@ def request_registration(user, password, server_location, shared_secret): ).hexdigest() data = { - "username": user, + "user": user, "password": password, "mac": mac, + "type": "org.matrix.login.shared_secret", } server_location = server_location.rstrip("/") @@ -43,7 +44,7 @@ def request_registration(user, password, server_location, shared_secret): print "Sending registration request..." req = urllib2.Request( - "%s/_matrix/client/v2_alpha/register" % (server_location,), + "%s/_matrix/client/api/v1/register" % (server_location,), data=json.dumps(data), headers={'Content-Type': 'application/json'} ) From b6adfc59f522cfc897616c153f055d669da71a9a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 13:00:50 +0100 Subject: [PATCH 15/24] Invalidate the get_latest_event_ids_in_room cache when deleting from event_forward_extremities --- synapse/storage/event_federation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 5d4b7843f..23573e8b2 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -472,3 +472,4 @@ class EventFederationStore(SQLBaseStore): query = "DELETE FROM event_forward_extremities WHERE room_id = ?" txn.execute(query, (room_id,)) + txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id) From 59a0682f3e9fe64c7ab35c011b0af7b87ee54f71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 13:13:07 +0100 Subject: [PATCH 16/24] Enable changing the interface the metrics listener binds to --- synapse/app/homeserver.py | 2 +- synapse/config/metrics.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fa4321141..70a7be60b 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -277,7 +277,7 @@ class SynapseHomeServer(HomeServer): config, metrics_resource, ), - interface="127.0.0.1", + interface=config.metrics_interface, ) logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port) diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index 71a1b1d18..c843c079c 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -20,6 +20,7 @@ class MetricsConfig(Config): def read_config(self, config): self.enable_metrics = config["enable_metrics"] self.metrics_port = config.get("metrics_port") + self.metrics_interface = config.get("metrics_interface", "127.0.0.1") def default_config(self, config_dir_path, server_name): return """\ @@ -28,6 +29,9 @@ class MetricsConfig(Config): # Enable collection and rendering of performance metrics enable_metrics: False - # Separate port to accept metrics requests on (on localhost) + # Separate port to accept metrics requests on # metrics_port: 8081 + + # Which interface to bind the metric listener to + # metrics_interface: 127.0.0.1 """ From 1b446a5d85f35d0af016496d9b12733ce667adb1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:26:08 +0100 Subject: [PATCH 17/24] Log less lines at INFO level, but include more helpful information --- synapse/federation/transaction_queue.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ca04822fb..afe71897f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -207,13 +207,13 @@ class TransactionQueue(object): # request at which point pending_pdus_by_dest just keeps growing. # we need application-layer timeouts of some flavour of these # requests - logger.info( + logger.debug( "TX [%s] Transaction already in progress", destination ) return - logger.info("TX [%s] _attempt_new_transaction", destination) + logger.debug("TX [%s] _attempt_new_transaction", destination) # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) @@ -221,11 +221,11 @@ class TransactionQueue(object): pending_failures = self.pending_failures_by_dest.pop(destination, []) if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: - logger.info("TX [%s] Nothing to send", destination) + logger.debug("TX [%s] Nothing to send", destination) return # Sort based on the order field @@ -275,9 +275,13 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] Sending transaction [%s]", + "TX [%s] Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", destination, transaction.transaction_id, + len(pending_pdus), + len(pending_edus), + len(pending_failures), ) with limiter: From e70e8e053e2d44d0f7cebe3bee7b4afbe103f0a7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:33:11 +0100 Subject: [PATCH 18/24] Add txn_id to some log lines --- synapse/federation/transaction_queue.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index afe71897f..32fa5e8c1 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -242,6 +242,8 @@ class TransactionQueue(object): try: self.pending_transactions[destination] = 1 + txn_id = str(self._next_txn_id) + limiter = yield get_retry_limiter( destination, self._clock, @@ -249,9 +251,9 @@ class TransactionQueue(object): ) logger.debug( - "TX [%s] Attempting new transaction" + "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", - destination, + destination, txn_id, len(pending_pdus), len(pending_edus), len(pending_failures) @@ -261,7 +263,7 @@ class TransactionQueue(object): transaction = Transaction.create_new( origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), + transaction_id=txn_id, origin=self.server_name, destination=destination, pdus=pdus, @@ -275,9 +277,9 @@ class TransactionQueue(object): logger.debug("TX [%s] Persisted transaction", destination) logger.info( - "TX [%s] Sending transaction [%s]," + "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d, failures: %d)", - destination, + destination, txn_id, transaction.transaction_id, len(pending_pdus), len(pending_edus), @@ -317,7 +319,10 @@ class TransactionQueue(object): code = e.code response = e.response - logger.info("TX [%s] got %d response", destination, code) + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) From b21d015c55dde5255c7a63f8d17e77caf535030b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:44:25 +0100 Subject: [PATCH 19/24] Log origin and stats of incoming transactions --- synapse/federation/transport/server.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2bfe0f3c9..af87805f3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -196,6 +196,14 @@ class FederationSendServlet(BaseFederationServlet): transaction_id, str(transaction_data) ) + logger.info( + "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)", + transaction_id, origin, + len(transaction_data.get("pdus", [])), + len(transaction_data.get("edus", [])), + len(transaction_data.get("failures", [])), + ) + # We should ideally be getting this from the security layer. # origin = body["origin"] From c8135f808b4e0a344e6d5d592049d27cc43af9b1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 14:45:46 +0100 Subject: [PATCH 20/24] Remove unused import --- synapse/handlers/appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 05735137d..8269482e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes from synapse.appservice import ApplicationService from synapse.types import UserID From 8bb85c8c5a24f5a937fbd66eace23bd680ca728f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:48:06 +0100 Subject: [PATCH 21/24] Update log line --- synapse/app/homeserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 70a7be60b..b887562f9 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -279,7 +279,10 @@ class SynapseHomeServer(HomeServer): ), interface=config.metrics_interface, ) - logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port) + logger.info( + "Metrics now running on %s port %d", + config.metrics_interface, config.metrics_port, + ) def run_startup_checks(self, db_conn, database_engine): all_users_native = are_all_users_on_domain( From 1ce1509989ae5eba67acbe0824d82177ab10917f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 14:51:22 +0100 Subject: [PATCH 22/24] s/metric_interface/metric_bind_host/ --- synapse/app/homeserver.py | 4 ++-- synapse/config/metrics.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b887562f9..f3513abb5 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -277,11 +277,11 @@ class SynapseHomeServer(HomeServer): config, metrics_resource, ), - interface=config.metrics_interface, + interface=config.metrics_bind_host, ) logger.info( "Metrics now running on %s port %d", - config.metrics_interface, config.metrics_port, + config.metrics_bind_host, config.metrics_port, ) def run_startup_checks(self, db_conn, database_engine): diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index c843c079c..0cfb30ce7 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -20,7 +20,7 @@ class MetricsConfig(Config): def read_config(self, config): self.enable_metrics = config["enable_metrics"] self.metrics_port = config.get("metrics_port") - self.metrics_interface = config.get("metrics_interface", "127.0.0.1") + self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1") def default_config(self, config_dir_path, server_name): return """\ @@ -32,6 +32,6 @@ class MetricsConfig(Config): # Separate port to accept metrics requests on # metrics_port: 8081 - # Which interface to bind the metric listener to - # metrics_interface: 127.0.0.1 + # Which host to bind the metric listener to + # metrics_bind_host: 127.0.0.1 """ From 284f55a7fbf597e32508301fe9571cd1b8523625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 15:18:04 +0100 Subject: [PATCH 23/24] Add doc strings --- synapse/federation/federation_client.py | 2 ++ synapse/federation/transport/client.py | 2 ++ synapse/http/matrixfederationclient.py | 3 +++ 3 files changed, 7 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ecb6dbd77..3249060bc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -190,6 +190,8 @@ class FederationClient(FederationBase): outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` + timeout (int): How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. Returns: Deferred: Results in the requested PDU. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index c2b53b78b..610a4c316 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -57,6 +57,8 @@ class TransportLayerClient(object): destination (str): The host name of the remote home server we want to get the state from. event_id (str): The id of the event being requested. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout. Returns: Deferred: Results in a dict received from the remote homeserver. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 312bbcc6b..6f976d5ce 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -345,6 +345,9 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout and that the request will + be retried. Returns: Deferred: Succeeds when we get *any* HTTP response. From 106a3051b88be742d24ace05f72d9ab6bff29dd2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 22 May 2015 15:53:03 +0100 Subject: [PATCH 24/24] Remove spurious TODO comment --- synapse/handlers/presence.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6c48b1d20..670c1d353 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1140,7 +1140,6 @@ class PresenceEventSource(object): user_ids_to_check |= set(joined) updates = [] - # TODO(paul): use a DeferredList ? How to limit concurrency. for observed_user in user_ids_to_check & set(cachemap): if not (to_key < cachemap[observed_user].serial <= from_key): continue