diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index fb291d7fb..47f0cf0fa 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from synapse.api.errors import SynapseError +from synapse.storage.presence import UserPresenceState from synapse.types import UserID, RoomID from twisted.internet import defer @@ -253,19 +254,35 @@ class Filter(object): Returns: bool: True if the event matches """ - sender = event.get("sender", None) - if not sender: - # Presence events have their 'sender' in content.user_id - content = event.get("content") - # account_data has been allowed to have non-dict content, so check type first - if isinstance(content, dict): - sender = content.get("user_id") + # We usually get the full "events" as dictionaries coming through, + # except for presence which actually gets passed around as its own + # namedtuple type. + if isinstance(event, UserPresenceState): + sender = event.user_id + room_id = None + ev_type = "m.presence" + is_url = False + else: + sender = event.get("sender", None) + if not sender: + # Presence events had their 'sender' in content.user_id, but are + # now handled above. We don't know if anything else uses this + # form. TODO: Check this and probably remove it. + content = event.get("content") + # account_data has been allowed to have non-dict content, so + # check type first + if isinstance(content, dict): + sender = content.get("user_id") + + room_id = event.get("room_id", None) + ev_type = event.get("type", None) + is_url = "url" in event.get("content", {}) return self.check_fields( - event.get("room_id", None), + room_id, sender, - event.get("type", None), - "url" in event.get("content", {}) + ev_type, + is_url, ) def check_fields(self, room_id, sender, event_type, contains_url): diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index e0ade4c16..10f5f35a6 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig from synapse.types import ( UserID, StreamToken, @@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler): "content": content, }) + now = self.clock.time_msec() + ret = { "rooms": rooms_ret, - "presence": presence, + "presence": [ + { + "type": "m.presence", + "content": format_user_presence_state(event, now), + } + for event in presence + ], "account_data": account_data_events, "receipts": receipt, "end": now_token.to_string(), diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index da610e430..9cc94287b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -719,9 +720,7 @@ class PresenceHandler(object): for state in updates ]) else: - defer.returnValue([ - format_user_presence_state(state, now) for state in updates - ]) + defer.returnValue(updates) @defer.inlineCallbacks def set_state(self, target_user, state, ignore_status_msg=False): @@ -795,6 +794,9 @@ class PresenceHandler(object): as_event=False, ) + now = self.clock.time_msec() + results[:] = [format_user_presence_state(r, now) for r in results] + is_accepted = { row["observed_user_id"]: row["accepted"] for row in presence_list } @@ -847,6 +849,7 @@ class PresenceHandler(object): ) state_dict = yield self.get_state(observed_user, as_event=False) + state_dict = format_user_presence_state(state_dict, self.clock.time_msec()) self.federation.send_edu( destination=observer_user.domain, @@ -979,14 +982,18 @@ def should_notify(old_state, new_state): return False -def format_user_presence_state(state, now): +def format_user_presence_state(state, now, include_user_id=True): """Convert UserPresenceState to a format that can be sent down to clients and to other servers. + + The "user_id" is optional so that this function can be used to format presence + updates for client /sync responses and for federation /send requests. """ content = { "presence": state.state, - "user_id": state.user_id, } + if include_user_id: + content["user_id"] = state.user_id if state.last_active_ts: content["last_active_ago"] = now - state.last_active_ts if state.status_msg and state.state != PresenceState.OFFLINE: @@ -1025,7 +1032,6 @@ class PresenceEventSource(object): # sending down the rare duplicate is not a concern. with Measure(self.clock, "presence.get_new_events"): - user_id = user.to_string() if from_key is not None: from_key = int(from_key) @@ -1034,18 +1040,7 @@ class PresenceEventSource(object): max_token = self.store.get_current_presence_token() - plist = yield self.store.get_presence_list_accepted(user.localpart) - users_interested_in = set(row["observed_user_id"] for row in plist) - users_interested_in.add(user_id) # So that we receive our own presence - - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - users_interested_in.update(users_who_share_room) - - if explicit_room_id: - user_ids = yield self.store.get_users_in_room(explicit_room_id) - users_interested_in.update(user_ids) + users_interested_in = yield self._get_interested_in(user, explicit_room_id) user_ids_changed = set() changed = None @@ -1073,16 +1068,13 @@ class PresenceEventSource(object): updates = yield presence.current_state_for_users(user_ids_changed) - now = self.clock.time_msec() - - defer.returnValue(([ - { - "type": "m.presence", - "content": format_user_presence_state(s, now), - } - for s in updates.values() - if include_offline or s.state != PresenceState.OFFLINE - ], max_token)) + if include_offline: + defer.returnValue((updates.values(), max_token)) + else: + defer.returnValue(([ + s for s in updates.itervalues() + if s.state != PresenceState.OFFLINE + ], max_token)) def get_current_key(self): return self.store.get_current_presence_token() @@ -1090,6 +1082,31 @@ class PresenceEventSource(object): def get_pagination_rows(self, user, pagination_config, key): return self.get_new_events(user, from_key=None, include_offline=False) + @cachedInlineCallbacks(num_args=2, cache_context=True) + def _get_interested_in(self, user, explicit_room_id, cache_context): + """Returns the set of users that the given user should see presence + updates for + """ + user_id = user.to_string() + plist = yield self.store.get_presence_list_accepted( + user.localpart, on_invalidate=cache_context.invalidate, + ) + users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in.add(user_id) # So that we receive our own presence + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id, on_invalidate=cache_context.invalidate, + ) + users_interested_in.update(users_who_share_room) + + if explicit_room_id: + user_ids = yield self.store.get_users_in_room( + explicit_room_id, on_invalidate=cache_context.invalidate, + ) + users_interested_in.update(user_ids) + + defer.returnValue(users_interested_in) + def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): """Checks the presence of users that have timed out and updates as diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5572cb883..33b7fdfe8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -721,14 +721,14 @@ class SyncHandler(object): extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) - states = yield self.presence_handler.get_states( - extra_users_ids, - as_event=True, - ) - presence.extend(states) + if extra_users_ids: + states = yield self.presence_handler.get_states( + extra_users_ids, + ) + presence.extend(states) - # Deduplicate the presence entries so that there's at most one per user - presence = {p["content"]["user_id"]: p for p in presence}.values() + # Deduplicate the presence entries so that there's at most one per user + presence = {p.user_id: p for p in presence}.values() presence = sync_config.filter_collection.filter_presence( presence diff --git a/synapse/notifier.py b/synapse/notifier.py index 2657dcd8d..31f723d94 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.handlers.presence import format_user_presence_state from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function @@ -412,6 +413,15 @@ class Notifier(object): new_events, is_peeking=is_peeking, ) + elif name == "presence": + now = self.clock.time_msec() + new_events[:] = [ + { + "type": "m.presence", + "content": format_user_presence_state(event, now), + } + for event in new_events + ] events.extend(new_events) end_token = end_token.copy_and_replace(keyname, new_key) diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index eafdce865..47b2dc45e 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -19,6 +19,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.types import UserID +from synapse.handlers.presence import format_user_presence_state from synapse.http.servlet import parse_json_object_from_request from .base import ClientV1RestServlet, client_path_patterns @@ -33,6 +34,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): def __init__(self, hs): super(PresenceStatusRestServlet, self).__init__(hs) self.presence_handler = hs.get_presence_handler() + self.clock = hs.get_clock() @defer.inlineCallbacks def on_GET(self, request, user_id): @@ -48,6 +50,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet): raise AuthError(403, "You are not allowed to see their presence.") state = yield self.presence_handler.get_state(target_user=user) + state = format_user_presence_state(state, self.clock.time_msec()) defer.returnValue((200, state)) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index b3d800163..a7a9e0a79 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.http.servlet import ( RestServlet, parse_string, parse_integer, parse_boolean ) +from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken from synapse.events.utils import ( @@ -28,7 +29,6 @@ from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from ._base import client_v2_patterns -import copy import itertools import logging @@ -194,12 +194,18 @@ class SyncRestServlet(RestServlet): defer.returnValue((200, response_content)) def encode_presence(self, events, time_now): - formatted = [] - for event in events: - event = copy.deepcopy(event) - event['sender'] = event['content'].pop('user_id') - formatted.append(event) - return {"events": formatted} + return { + "events": [ + { + "type": "m.presence", + "sender": event.user_id, + "content": format_user_presence_state( + event, time_now, include_user_id=False + ), + } + for event in events + ] + } def encode_joined(self, rooms, time_now, token_id, event_fields): """