Merge branch 'markjh/liberate_presence_handler' into markjh/liberate_sync_handler

This commit is contained in:
Mark Haines 2016-05-16 20:08:32 +01:00
commit 53e171f345
12 changed files with 49 additions and 36 deletions

View File

@ -24,7 +24,6 @@ from .message import MessageHandler
from .events import EventStreamHandler, EventHandler from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler from .federation import FederationHandler
from .profile import ProfileHandler from .profile import ProfileHandler
from .presence import PresenceHandler
from .directory import DirectoryHandler from .directory import DirectoryHandler
from .typing import TypingNotificationHandler from .typing import TypingNotificationHandler
from .admin import AdminHandler from .admin import AdminHandler
@ -53,7 +52,6 @@ class Handlers(object):
self.event_handler = EventHandler(hs) self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs) self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs) self.profile_handler = ProfileHandler(hs)
self.presence_handler = PresenceHandler(hs)
self.room_list_handler = RoomListHandler(hs) self.room_list_handler = RoomListHandler(hs)
self.directory_handler = DirectoryHandler(hs) self.directory_handler = DirectoryHandler(hs)
self.typing_notification_handler = TypingNotificationHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs)

View File

@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
If `only_keys` is not None, events from keys will be sent down. If `only_keys` is not None, events from keys will be sent down.
""" """
auth_user = UserID.from_string(auth_user_id) auth_user = UserID.from_string(auth_user_id)
presence_handler = self.hs.get_handlers().presence_handler presence_handler = self.hs.get_presence_handler()
context = yield presence_handler.user_syncing( context = yield presence_handler.user_syncing(
auth_user_id, affect_presence=affect_presence, auth_user_id, affect_presence=affect_presence,

View File

@ -236,7 +236,7 @@ class MessageHandler(BaseHandler):
) )
if event.type == EventTypes.Message: if event.type == EventTypes.Message:
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user) yield presence.bump_presence_active_time(user)
def deduplicate_state_event(self, event, context): def deduplicate_state_event(self, event, context):
@ -674,7 +674,7 @@ class MessageHandler(BaseHandler):
and m.content["membership"] == Membership.JOIN and m.content["membership"] == Membership.JOIN
] ]
presence_handler = self.hs.get_handlers().presence_handler presence_handler = self.hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_presence(): def get_presence():

View File

@ -36,8 +36,6 @@ from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id from synapse.types import UserID, get_domain_from_id
import synapse.metrics import synapse.metrics
from ._base import BaseHandler
import logging import logging
@ -73,11 +71,11 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(BaseHandler): class PresenceHandler(object):
def __init__(self, hs): def __init__(self, hs):
super(PresenceHandler, self).__init__(hs) self.is_mine = hs.is_mine
self.hs = hs self.is_mine_id = hs.is_mine_id
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.wheel_timer = WheelTimer() self.wheel_timer = WheelTimer()
@ -138,7 +136,7 @@ class PresenceHandler(BaseHandler):
obj=state.user_id, obj=state.user_id,
then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT, then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
) )
if self.hs.is_mine_id(state.user_id): if self.is_mine_id(state.user_id):
self.wheel_timer.insert( self.wheel_timer.insert(
now=now, now=now,
obj=state.user_id, obj=state.user_id,
@ -228,7 +226,7 @@ class PresenceHandler(BaseHandler):
new_state, should_notify, should_ping = handle_update( new_state, should_notify, should_ping = handle_update(
prev_state, new_state, prev_state, new_state,
is_mine=self.hs.is_mine_id(user_id), is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer, wheel_timer=self.wheel_timer,
now=now now=now
) )
@ -287,7 +285,7 @@ class PresenceHandler(BaseHandler):
changes = handle_timeouts( changes = handle_timeouts(
states, states,
is_mine_fn=self.hs.is_mine_id, is_mine_fn=self.is_mine_id,
user_to_num_current_syncs=self.user_to_num_current_syncs, user_to_num_current_syncs=self.user_to_num_current_syncs,
now=now, now=now,
) )
@ -427,7 +425,7 @@ class PresenceHandler(BaseHandler):
hosts_to_states = {} hosts_to_states = {}
for room_id, states in room_ids_to_states.items(): for room_id, states in room_ids_to_states.items():
local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states: if not local_states:
continue continue
@ -436,7 +434,7 @@ class PresenceHandler(BaseHandler):
hosts_to_states.setdefault(host, []).extend(local_states) hosts_to_states.setdefault(host, []).extend(local_states)
for user_id, states in users_to_states.items(): for user_id, states in users_to_states.items():
local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states) local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
if not local_states: if not local_states:
continue continue
@ -611,14 +609,14 @@ class PresenceHandler(BaseHandler):
# don't need to send to local clients here, as that is done as part # don't need to send to local clients here, as that is done as part
# of the event stream/sync. # of the event stream/sync.
# TODO: Only send to servers not already in the room. # TODO: Only send to servers not already in the room.
if self.hs.is_mine(user): if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string()) state = yield self.current_state_for_user(user.to_string())
hosts = yield self.store.get_joined_hosts_for_room(room_id) hosts = yield self.store.get_joined_hosts_for_room(room_id)
self._push_to_remotes({host: (state,) for host in hosts}) self._push_to_remotes({host: (state,) for host in hosts})
else: else:
user_ids = yield self.store.get_users_in_room(room_id) user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.hs.is_mine_id, user_ids) user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids) states = yield self.current_state_for_users(user_ids)
@ -628,7 +626,7 @@ class PresenceHandler(BaseHandler):
def get_presence_list(self, observer_user, accepted=None): def get_presence_list(self, observer_user, accepted=None):
"""Returns the presence for all users in their presence list. """Returns the presence for all users in their presence list.
""" """
if not self.hs.is_mine(observer_user): if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this Home Server")
presence_list = yield self.store.get_presence_list( presence_list = yield self.store.get_presence_list(
@ -659,7 +657,7 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string() observer_user.localpart, observed_user.to_string()
) )
if self.hs.is_mine(observed_user): if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user) yield self.invite_presence(observed_user, observer_user)
else: else:
yield self.federation.send_edu( yield self.federation.send_edu(
@ -675,11 +673,11 @@ class PresenceHandler(BaseHandler):
def invite_presence(self, observed_user, observer_user): def invite_presence(self, observed_user, observer_user):
"""Handles new presence invites. """Handles new presence invites.
""" """
if not self.hs.is_mine(observed_user): if not self.is_mine(observed_user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this Home Server")
# TODO: Don't auto accept # TODO: Don't auto accept
if self.hs.is_mine(observer_user): if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user) yield self.accept_presence(observed_user, observer_user)
else: else:
self.federation.send_edu( self.federation.send_edu(
@ -742,7 +740,7 @@ class PresenceHandler(BaseHandler):
Returns: Returns:
A Deferred. A Deferred.
""" """
if not self.hs.is_mine(observer_user): if not self.is_mine(observer_user):
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this Home Server")
yield self.store.del_presence_list( yield self.store.del_presence_list(
@ -834,7 +832,11 @@ def _format_user_presence_state(state, now):
class PresenceEventSource(object): class PresenceEventSource(object):
def __init__(self, hs): def __init__(self, hs):
self.hs = hs # We can't call get_presence_handler here because there's a cycle:
#
# Presence -> Notifier -> PresenceEventSource -> Presence
#
self.get_presence_handler = hs.get_presence_handler
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.store = hs.get_datastore() self.store = hs.get_datastore()
@ -860,7 +862,7 @@ class PresenceEventSource(object):
from_key = int(from_key) from_key = int(from_key)
room_ids = room_ids or [] room_ids = room_ids or []
presence = self.hs.get_handlers().presence_handler presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache stream_change_cache = self.store.presence_stream_cache
if not room_ids: if not room_ids:

View File

@ -639,7 +639,7 @@ class SyncHandler(BaseHandler):
# For each newly joined room, we want to send down presence of # For each newly joined room, we want to send down presence of
# existing users. # existing users.
presence_handler = self.hs.get_handlers().presence_handler presence_handler = self.hs.get_presence_handler()
extra_presence_users = set() extra_presence_users = set()
for room_id in newly_joined_rooms: for room_id in newly_joined_rooms:
users = yield self.store.get_users_in_room(event.room_id) users = yield self.store.get_users_in_room(event.room_id)

View File

@ -109,7 +109,7 @@ class ReplicationResource(Resource):
self.version_string = hs.version_string self.version_string = hs.version_string
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.sources = hs.get_event_sources() self.sources = hs.get_event_sources()
self.presence_handler = hs.get_handlers().presence_handler self.presence_handler = hs.get_presence_handler()
self.typing_handler = hs.get_handlers().typing_notification_handler self.typing_handler = hs.get_handlers().typing_notification_handler
self.notifier = hs.notifier self.notifier = hs.notifier
self.clock = hs.get_clock() self.clock = hs.get_clock()

View File

@ -30,20 +30,24 @@ logger = logging.getLogger(__name__)
class PresenceStatusRestServlet(ClientV1RestServlet): class PresenceStatusRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusRestServlet, self).__init__(hs)
self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, user_id): def on_GET(self, request, user_id):
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
user = UserID.from_string(user_id) user = UserID.from_string(user_id)
if requester.user != user: if requester.user != user:
allowed = yield self.handlers.presence_handler.is_visible( allowed = yield self.presence_handler.is_visible(
observed_user=user, observer_user=requester.user, observed_user=user, observer_user=requester.user,
) )
if not allowed: if not allowed:
raise AuthError(403, "You are not allowed to see their presence.") raise AuthError(403, "You are not allowed to see their presence.")
state = yield self.handlers.presence_handler.get_state(target_user=user) state = yield self.presence_handler.get_state(target_user=user)
defer.returnValue((200, state)) defer.returnValue((200, state))
@ -74,7 +78,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except: except:
raise SynapseError(400, "Unable to parse state") raise SynapseError(400, "Unable to parse state")
yield self.handlers.presence_handler.set_state(user, state) yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {})) defer.returnValue((200, {}))
@ -85,6 +89,10 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
class PresenceListRestServlet(ClientV1RestServlet): class PresenceListRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)") PATTERNS = client_path_patterns("/presence/list/(?P<user_id>[^/]*)")
def __init__(self, hs):
super(PresenceListRestServlet, self).__init__(hs)
self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, user_id): def on_GET(self, request, user_id):
requester = yield self.auth.get_user_by_req(request) requester = yield self.auth.get_user_by_req(request)
@ -96,7 +104,7 @@ class PresenceListRestServlet(ClientV1RestServlet):
if requester.user != user: if requester.user != user:
raise SynapseError(400, "Cannot get another user's presence list") raise SynapseError(400, "Cannot get another user's presence list")
presence = yield self.handlers.presence_handler.get_presence_list( presence = yield self.presence_handler.get_presence_list(
observer_user=user, accepted=True observer_user=user, accepted=True
) )
@ -123,7 +131,7 @@ class PresenceListRestServlet(ClientV1RestServlet):
if len(u) == 0: if len(u) == 0:
continue continue
invited_user = UserID.from_string(u) invited_user = UserID.from_string(u)
yield self.handlers.presence_handler.send_presence_invite( yield self.presence_handler.send_presence_invite(
observer_user=user, observed_user=invited_user observer_user=user, observed_user=invited_user
) )
@ -134,7 +142,7 @@ class PresenceListRestServlet(ClientV1RestServlet):
if len(u) == 0: if len(u) == 0:
continue continue
dropped_user = UserID.from_string(u) dropped_user = UserID.from_string(u)
yield self.handlers.presence_handler.drop( yield self.presence_handler.drop(
observer_user=user, observed_user=dropped_user observer_user=user, observed_user=dropped_user
) )

View File

@ -570,7 +570,7 @@ class RoomTypingRestServlet(ClientV1RestServlet):
def __init__(self, hs): def __init__(self, hs):
super(RoomTypingRestServlet, self).__init__(hs) super(RoomTypingRestServlet, self).__init__(hs)
self.presence_handler = hs.get_handlers().presence_handler self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_PUT(self, request, room_id, user_id): def on_PUT(self, request, room_id, user_id):

View File

@ -37,7 +37,7 @@ class ReceiptRestServlet(RestServlet):
self.hs = hs self.hs = hs
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.receipts_handler = hs.get_handlers().receipts_handler self.receipts_handler = hs.get_handlers().receipts_handler
self.presence_handler = hs.get_handlers().presence_handler self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request, room_id, receipt_type, event_id): def on_POST(self, request, room_id, receipt_type, event_id):

View File

@ -83,7 +83,7 @@ class SyncRestServlet(RestServlet):
self.sync_handler = hs.get_handlers().sync_handler self.sync_handler = hs.get_handlers().sync_handler
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
self.presence_handler = hs.get_handlers().presence_handler self.presence_handler = hs.get_presence_handler()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):

View File

@ -27,6 +27,7 @@ from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFa
from synapse.notifier import Notifier from synapse.notifier import Notifier
from synapse.api.auth import Auth from synapse.api.auth import Auth
from synapse.handlers import Handlers from synapse.handlers import Handlers
from synapse.handlers.presence import PresenceHandler
from synapse.state import StateHandler from synapse.state import StateHandler
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.util import Clock from synapse.util import Clock
@ -78,6 +79,7 @@ class HomeServer(object):
'auth', 'auth',
'rest_servlet_factory', 'rest_servlet_factory',
'state_handler', 'state_handler',
'presence_handler',
'notifier', 'notifier',
'distributor', 'distributor',
'client_resource', 'client_resource',
@ -164,6 +166,9 @@ class HomeServer(object):
def build_state_handler(self): def build_state_handler(self):
return StateHandler(self) return StateHandler(self)
def build_presence_handler(self):
return PresenceHandler(self)
def build_event_sources(self): def build_event_sources(self):
return EventSources(self) return EventSources(self)

View File

@ -78,7 +78,7 @@ class ReplicationResourceCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_presence(self): def test_presence(self):
get = self.get(presence="-1") get = self.get(presence="-1")
yield self.hs.get_handlers().presence_handler.set_state( yield self.hs.get_presence_handler().set_state(
self.user, {"presence": "online"} self.user, {"presence": "online"}
) )
code, body = yield get code, body = yield get