Merge branch 'presence_logging' into develop

This commit is contained in:
Erik Johnston 2014-08-29 12:10:00 +01:00
commit 47fb286184
9 changed files with 345 additions and 162 deletions

View File

@ -543,6 +543,8 @@ class _TransactionQueue(object):
def eb(failure): def eb(failure):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
else:
logger.exception("Failed to send edu", failure)
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
return deferred return deferred

View File

@ -16,6 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.events import SynapseEvent from synapse.api.events import SynapseEvent
from synapse.util.logutils import log_function
from ._base import BaseHandler from ._base import BaseHandler
@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0): def get_stream(self, auth_user_id, pagin_config, timeout=0):
auth_user = self.hs.parse_userid(auth_user_id) auth_user = self.hs.parse_userid(auth_user_id)
@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler):
# 10 seconds of grace to allow the client to reconnect again # 10 seconds of grace to allow the client to reconnect again
# before we think they're gone # before we think they're gone
def _later(): def _later():
logger.debug("_later stopped_user_eventstream %s", auth_user)
self.distributor.fire( self.distributor.fire(
"stopped_user_eventstream", auth_user "stopped_user_eventstream", auth_user
) )
del self._stop_timer_per_user[auth_user] del self._stop_timer_per_user[auth_user]
logger.debug("Scheduling _later: for %s", auth_user)
self._stop_timer_per_user[auth_user] = ( self._stop_timer_per_user[auth_user] = (
self.clock.call_later(5, _later) self.clock.call_later(30, _later)
) )

View File

@ -18,6 +18,8 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from ._base import BaseHandler from ._base import BaseHandler
import logging import logging
@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user): def is_presence_visible(self, observer_user, observed_user):
defer.returnValue(True)
# return
# FIXME (erikj): This code path absolutely kills the database.
assert(observed_user.is_mine) assert(observed_user.is_mine)
if observer_user == observed_user: if observer_user == observed_user:
@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler):
defer.returnValue(state) defer.returnValue(state)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def set_state(self, target_user, auth_user, state): def set_state(self, target_user, auth_user, state):
# return
# TODO (erikj): Turn this back on. Why did we end up sending EDUs
# everywhere?
if not target_user.is_mine: if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this Home Server")
@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler):
self.push_presence(user, statuscache=statuscache) self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user): def started_user_eventstream(self, user):
# TODO(paul): Use "last online" state # TODO(paul): Use "last online" state
self.set_state(user, user, {"state": PresenceState.ONLINE}) self.set_state(user, user, {"state": PresenceState.ONLINE})
@log_function
def stopped_user_eventstream(self, user): def stopped_user_eventstream(self, user):
# TODO(paul): Save current state as "last online" state # TODO(paul): Save current state as "last online" state
self.set_state(user, user, {"state": PresenceState.OFFLINE}) self.set_state(user, user, {"state": PresenceState.OFFLINE})
@defer.inlineCallbacks @defer.inlineCallbacks
def user_joined_room(self, user, room_id): def user_joined_room(self, user, room_id):
localusers = set()
remotedomains = set()
rm_handler = self.homeserver.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(room_id,
localusers=localusers, remotedomains=remotedomains,
ignore_user=user)
if user.is_mine: if user.is_mine:
yield self._send_presence_to_distribution(srcuser=user, self.push_update_to_local_and_remote(
localusers=localusers, remotedomains=remotedomains, observed_user=user,
room_ids=[room_id],
statuscache=self._get_or_offline_usercache(user), statuscache=self._get_or_offline_usercache(user),
) )
for srcuser in localusers: else:
yield self._send_presence(srcuser=srcuser, destuser=user, self.push_update_to_clients(
statuscache=self._get_or_offline_usercache(srcuser), observed_user=user,
room_ids=[room_id],
statuscache=self._get_or_offline_usercache(user),
)
# We also want to tell them about current presence of people.
rm_handler = self.homeserver.get_handlers().room_member_handler
curr_users = yield rm_handler.get_room_members(room_id)
for local_user in [c for c in curr_users if c.is_mine]:
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
statuscache=self._get_or_offline_usercache(local_user),
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler):
defer.returnValue(presence) defer.returnValue(presence)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def start_polling_presence(self, user, target_user=None, state=None): def start_polling_presence(self, user, target_user=None, state=None):
logger.debug("Start polling for presence from %s", user) logger.debug("Start polling for presence from %s", user)
if target_user: if target_user:
target_users = set([target_user]) target_users = set([target_user])
room_ids = []
else: else:
presence = yield self.store.get_presence_list( presence = yield self.store.get_presence_list(
user.localpart, accepted=True user.localpart, accepted=True
@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids:
for member in (yield rm_handler.get_room_members(room_id)):
target_users.add(member)
if state is None: if state is None:
state = yield self.store.get_presence_state(user.localpart) state = yield self.store.get_presence_state(user.localpart)
else:
# statuscache = self._get_or_make_usercache(user)
# self._user_cachemap_latest_serial += 1
# statuscache.update(state, self._user_cachemap_latest_serial)
pass
localusers, remoteusers = partitionbool( yield self.push_update_to_local_and_remote(
target_users, observed_user=user,
lambda u: u.is_mine users_to_push=target_users,
room_ids=room_ids,
statuscache=self._get_or_make_usercache(user),
) )
for target_user in localusers: for target_user in target_users:
self._start_polling_local(user, target_user) if target_user.is_mine:
self._start_polling_local(user, target_user)
# We want to tell the person that just came online
# presence state of people they are interested in?
self.push_update_to_clients(
observed_user=target_user,
users_to_push=[user],
statuscache=self._get_or_offline_usercache(target_user),
)
deferreds = [] deferreds = []
remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) remote_users = [u for u in target_users if not u.is_mine]
remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
# Only poll for people in our get_presence_list
for domain in remoteusers_by_domain: for domain in remoteusers_by_domain:
remoteusers = remoteusers_by_domain[domain] remoteusers = remoteusers_by_domain[domain]
@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler):
self._local_pushmap[target_localpart].add(user) self._local_pushmap[target_localpart].add(user)
self.push_update_to_clients(
observer_user=user,
observed_user=target_user,
statuscache=self._get_or_offline_usercache(target_user),
)
def _start_polling_remote(self, user, domain, remoteusers): def _start_polling_remote(self, user, domain, remoteusers):
to_poll = set() to_poll = set()
@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler):
content={"poll": [u.to_string() for u in to_poll]} content={"poll": [u.to_string() for u in to_poll]}
) )
@log_function
def stop_polling_presence(self, user, target_user=None): def stop_polling_presence(self, user, target_user=None):
logger.debug("Stop polling for presence from %s", user) logger.debug("Stop polling for presence from %s", user)
@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler):
if not self._local_pushmap[localpart]: if not self._local_pushmap[localpart]:
del self._local_pushmap[localpart] del self._local_pushmap[localpart]
@log_function
def _stop_polling_remote(self, user, domain, remoteusers): def _stop_polling_remote(self, user, domain, remoteusers):
to_unpoll = set() to_unpoll = set()
@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def push_presence(self, user, statuscache): def push_presence(self, user, statuscache):
assert(user.is_mine) assert(user.is_mine)
@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids: if not localusers and not room_ids:
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=localusers, remotedomains=remotedomains,
ignore_user=user,
)
if not localusers and not remotedomains:
defer.returnValue(None) defer.returnValue(None)
yield self._send_presence_to_distribution(user, yield self.push_update_to_local_and_remote(
localusers=localusers, remotedomains=remotedomains, observed_user=user,
statuscache=statuscache users_to_push=localusers,
remote_domains=remotedomains,
room_ids=room_ids,
statuscache=statuscache,
) )
def _send_presence(self, srcuser, destuser, statuscache):
if destuser.is_mine:
self.push_update_to_clients(
observer_user=destuser,
observed_user=srcuser,
statuscache=statuscache)
return defer.succeed(None)
else:
return self._push_presence_remote(srcuser, destuser.domain,
state=statuscache.get_state()
)
@defer.inlineCallbacks
def _send_presence_to_distribution(self, srcuser, localusers=set(),
remotedomains=set(), statuscache=None):
for u in localusers:
logger.debug(" | push to local user %s", u)
self.push_update_to_clients(
observer_user=u,
observed_user=srcuser,
statuscache=statuscache,
)
deferreds = []
for domain in remotedomains:
logger.debug(" | push to remote domain %s", domain)
deferreds.append(self._push_presence_remote(srcuser, domain,
state=statuscache.get_state())
)
yield defer.DeferredList(deferreds)
@defer.inlineCallbacks @defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None): def _push_presence_remote(self, user, destination, state=None):
if state is None: if state is None:
@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler):
self.clock.time_msec() - state.pop("mtime") self.clock.time_msec() - state.pop("mtime")
) )
user_state = {
"user_id": user.to_string(),
}
user_state.update(**state)
yield self.federation.send_edu( yield self.federation.send_edu(
destination=destination, destination=destination,
edu_type="m.presence", edu_type="m.presence",
content={ content={
"push": [ "push": [
dict(user_id=user.to_string(), **state), user_state,
], ],
} }
) )
@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids: if not observers and not room_ids:
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=observers, ignore_user=user
)
if not observers:
break break
state = dict(push) state = dict(push)
@ -636,12 +633,12 @@ class PresenceHandler(BaseHandler):
self._user_cachemap_latest_serial += 1 self._user_cachemap_latest_serial += 1
statuscache.update(state, serial=self._user_cachemap_latest_serial) statuscache.update(state, serial=self._user_cachemap_latest_serial)
for observer_user in observers: self.push_update_to_clients(
self.push_update_to_clients( observed_user=user,
observer_user=observer_user, users_to_push=observers,
observed_user=user, room_ids=room_ids,
statuscache=statuscache, statuscache=statuscache,
) )
if state["state"] == PresenceState.OFFLINE: if state["state"] == PresenceState.OFFLINE:
del self._user_cachemap[user] del self._user_cachemap[user]
@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds) yield defer.DeferredList(deferreds)
def push_update_to_clients(self, observer_user, observed_user, @defer.inlineCallbacks
statuscache): def push_update_to_local_and_remote(self, observed_user,
statuscache.make_event(user=observed_user, clock=self.clock) users_to_push=[], room_ids=[],
remote_domains=[],
statuscache=None):
localusers, remoteusers = partitionbool(
users_to_push,
lambda u: u.is_mine
)
localusers = set(localusers)
self.push_update_to_clients(
observed_user=observed_user,
users_to_push=localusers,
room_ids=room_ids,
statuscache=statuscache,
)
remote_domains = set(remote_domains)
remote_domains |= set([r.domain for r in remoteusers])
for room_id in room_ids:
remote_domains.update(
(yield self.store.get_joined_hosts_for_room(room_id))
)
remote_domains.discard(self.hs.hostname)
deferreds = []
for domain in remote_domains:
logger.debug(" | push to remote domain %s", domain)
deferreds.append(
self._push_presence_remote(
observed_user, domain, state=statuscache.get_state()
)
)
yield defer.DeferredList(deferreds)
defer.returnValue((localusers, remote_domains))
def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None):
self.notifier.on_new_user_event( self.notifier.on_new_user_event(
[observer_user], users_to_push,
room_ids,
) )

View File

@ -119,6 +119,7 @@ class Notifier(object):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def on_new_user_event(self, users=[], rooms=[]): def on_new_user_event(self, users=[], rooms=[]):
""" Used to inform listeners that something has happend """ Used to inform listeners that something has happend
presence/user event wise. presence/user event wise.

View File

@ -81,4 +81,4 @@ class PaginationConfig(object):
return ( return (
"<PaginationConfig from_tok=%s, to_tok=%s, " "<PaginationConfig from_tok=%s, to_tok=%s, "
"direction=%s, limit=%s>" "direction=%s, limit=%s>"
) % (self.from_tok, self.to_tok, self.direction, self.limit) ) % (self.from_token, self.to_token, self.direction, self.limit)

View File

@ -18,6 +18,8 @@ from inspect import getcallargs
from functools import wraps from functools import wraps
import logging import logging
import inspect
import traceback
def log_function(f): def log_function(f):
@ -65,4 +67,55 @@ def log_function(f):
return f(*args, **kwargs) return f(*args, **kwargs)
wrapped.__name__ = func_name
return wrapped
def trace_function(f):
func_name = f.__name__
linenum = f.func_code.co_firstlineno
pathname = f.func_code.co_filename
def wrapped(*args, **kwargs):
name = f.__module__
logger = logging.getLogger(name)
level = logging.DEBUG
s = inspect.currentframe().f_back
to_print = [
"\t%s:%s %s. Args: args=%s, kwargs=%s" % (
pathname, linenum, func_name, args, kwargs
)
]
while s:
if True or s.f_globals["__name__"].startswith("synapse"):
filename, lineno, function, _, _ = inspect.getframeinfo(s)
args_string = inspect.formatargvalues(*inspect.getargvalues(s))
to_print.append(
"\t%s:%d %s. Args: %s" % (
filename, lineno, function, args_string
)
)
s = s.f_back
msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
record = logging.LogRecord(
name=name,
level=level,
pathname=pathname,
lineno=lineno,
msg=msg,
args=None,
exc_info=None
)
logger.handle(record)
return f(*args, **kwargs)
wrapped.__name__ = func_name
return wrapped return wrapped

View File

@ -193,6 +193,8 @@ class PresenceStateTestCase(unittest.TestCase):
SynapseError SynapseError
) )
test_get_disallowed_state.skip = "Presence permissions are disabled"
@defer.inlineCallbacks @defer.inlineCallbacks
def test_set_my_state(self): def test_set_my_state(self):
mocked_set = self.datastore.set_presence_state mocked_set = self.datastore.set_presence_state
@ -497,6 +499,7 @@ class PresencePushTestCase(unittest.TestCase):
db_pool=None, db_pool=None,
datastore=Mock(spec=[ datastore=Mock(spec=[
"set_presence_state", "set_presence_state",
"get_joined_hosts_for_room",
# Bits that Federation needs # Bits that Federation needs
"prep_send_transaction", "prep_send_transaction",
@ -511,8 +514,12 @@ class PresencePushTestCase(unittest.TestCase):
) )
hs.handlers = JustPresenceHandlers(hs) hs.handlers = JustPresenceHandlers(hs)
def update(*args,**kwargs):
# print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
return defer.succeed(None)
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None) self.mock_update_client.side_effect = update
self.datastore = hs.get_datastore() self.datastore = hs.get_datastore()
@ -546,6 +553,14 @@ class PresencePushTestCase(unittest.TestCase):
return defer.succeed([]) return defer.succeed([])
self.room_member_handler.get_room_members = get_room_members self.room_member_handler.get_room_members = get_room_members
def get_room_hosts(room_id):
if room_id == "a-room":
hosts = set([u.domain for u in self.room_members])
return defer.succeed(hosts)
else:
return defer.succeed([])
self.datastore.get_joined_hosts_for_room = get_room_hosts
@defer.inlineCallbacks @defer.inlineCallbacks
def fetch_room_distributions_into(room_id, localusers=None, def fetch_room_distributions_into(room_id, localusers=None,
remotedomains=None, ignore_user=None): remotedomains=None, ignore_user=None):
@ -611,18 +626,10 @@ class PresencePushTestCase(unittest.TestCase):
{"state": ONLINE}) {"state": ONLINE})
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=["a-room"],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
call(observer_user=self.u_clementine,
observed_user=self.u_apple,
statuscache=ANY),
call(observer_user=self.u_elderberry,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
self.mock_update_client.reset_mock() self.mock_update_client.reset_mock()
@ -651,7 +658,8 @@ class PresencePushTestCase(unittest.TestCase):
], presence) ], presence)
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_banana, call(users_to_push=set([self.u_banana]),
room_ids=[],
observed_user=self.u_banana, observed_user=self.u_banana,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
]) # and no others... ]) # and no others...
@ -659,21 +667,21 @@ class PresencePushTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_remote(self): def test_push_remote(self):
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( # put_json.expect_call_and_return(
call("remote", # call("remote",
path=ANY, # Can't guarantee which txn ID will be which # path=ANY, # Can't guarantee which txn ID will be which
data=_expect_edu("remote", "m.presence", # data=_expect_edu("remote", "m.presence",
content={ # content={
"push": [ # "push": [
{"user_id": "@apple:test", # {"user_id": "@apple:test",
"state": "online", # "state": "online",
"mtime_age": 0}, # "mtime_age": 0},
], # ],
} # }
) # )
), # ),
defer.succeed((200, "OK")) # defer.succeed((200, "OK"))
) # )
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("farm", call("farm",
path=ANY, # Can't guarantee which txn ID will be which path=ANY, # Can't guarantee which txn ID will be which
@ -681,7 +689,7 @@ class PresencePushTestCase(unittest.TestCase):
content={ content={
"push": [ "push": [
{"user_id": "@apple:test", {"user_id": "@apple:test",
"state": "online", "state": u"online",
"mtime_age": 0}, "mtime_age": 0},
], ],
} }
@ -730,10 +738,8 @@ class PresencePushTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple]),
observed_user=self.u_potato, room_ids=["a-room"],
statuscache=ANY),
call(observer_user=self.u_banana,
observed_user=self.u_potato, observed_user=self.u_potato,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -753,19 +759,17 @@ class PresencePushTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
# Apple and Elderberry see each other call(room_ids=["a-room"],
call(observer_user=self.u_apple,
observed_user=self.u_elderberry, observed_user=self.u_elderberry,
users_to_push=set(),
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_elderberry, call(users_to_push=set([self.u_elderberry]),
observed_user=self.u_apple, observed_user=self.u_apple,
room_ids=[],
statuscache=ANY), statuscache=ANY),
# Banana and Elderberry see each other call(users_to_push=set([self.u_elderberry]),
call(observer_user=self.u_banana,
observed_user=self.u_elderberry,
statuscache=ANY),
call(observer_user=self.u_elderberry,
observed_user=self.u_banana, observed_user=self.u_banana,
room_ids=[],
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -887,7 +891,12 @@ class PresencePollingTestCase(unittest.TestCase):
self.datastore.get_received_txn_response = get_received_txn_response self.datastore.get_received_txn_response = get_received_txn_response
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None)
def update(*args,**kwargs):
# print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
return defer.succeed(None)
self.mock_update_client.side_effect = update
self.handler = hs.get_handlers().presence_handler self.handler = hs.get_handlers().presence_handler
self.handler.push_update_to_clients = self.mock_update_client self.handler.push_update_to_clients = self.mock_update_client
@ -951,10 +960,10 @@ class PresencePollingTestCase(unittest.TestCase):
# apple should see both banana and clementine currently offline # apple should see both banana and clementine currently offline
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=[self.u_apple],
observed_user=self.u_banana, observed_user=self.u_banana,
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_apple, call(users_to_push=[self.u_apple],
observed_user=self.u_clementine, observed_user=self.u_clementine,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -974,10 +983,11 @@ class PresencePollingTestCase(unittest.TestCase):
# apple and banana should now both see each other online # apple and banana should now both see each other online
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple]),
observed_user=self.u_banana, observed_user=self.u_banana,
room_ids=[],
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_banana, call(users_to_push=[self.u_banana],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -994,8 +1004,9 @@ class PresencePollingTestCase(unittest.TestCase):
# banana should now be told apple is offline # banana should now be told apple is offline
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_banana, call(users_to_push=set([self.u_banana, self.u_apple]),
observed_user=self.u_apple, observed_user=self.u_apple,
room_ids=[],
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -1008,7 +1019,7 @@ class PresencePollingTestCase(unittest.TestCase):
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("remote", call("remote",
path="/matrix/federation/v1/send/1000000/", path=ANY,
data=_expect_edu("remote", "m.presence", data=_expect_edu("remote", "m.presence",
content={ content={
"poll": [ "@potato:remote" ], "poll": [ "@potato:remote" ],
@ -1018,6 +1029,18 @@ class PresencePollingTestCase(unittest.TestCase):
defer.succeed((200, "OK")) defer.succeed((200, "OK"))
) )
put_json.expect_call_and_return(
call("remote",
path=ANY,
data=_expect_edu("remote", "m.presence",
content={
"push": [ {"user_id": "@clementine:test" }],
},
),
),
defer.succeed((200, "OK"))
)
# clementine goes online # clementine goes online
yield self.handler.set_state( yield self.handler.set_state(
target_user=self.u_clementine, auth_user=self.u_clementine, target_user=self.u_clementine, auth_user=self.u_clementine,
@ -1032,15 +1055,28 @@ class PresencePollingTestCase(unittest.TestCase):
self.assertTrue(self.u_clementine in self.assertTrue(self.u_clementine in
self.handler._remote_recvmap[self.u_potato]) self.handler._remote_recvmap[self.u_potato])
put_json.expect_call_and_return(
call("remote",
path=ANY,
data=_expect_edu("remote", "m.presence",
content={
"push": [ {"user_id": "@fig:test" }],
},
),
),
defer.succeed((200, "OK"))
)
# fig goes online; shouldn't send a second poll # fig goes online; shouldn't send a second poll
yield self.handler.set_state( yield self.handler.set_state(
target_user=self.u_fig, auth_user=self.u_fig, target_user=self.u_fig, auth_user=self.u_fig,
state={"state": ONLINE} state={"state": ONLINE}
) )
reactor.iterate(delay=0) # reactor.iterate(delay=0)
put_json.assert_had_no_calls() yield put_json.await_calls()
# fig goes offline # fig goes offline
yield self.handler.set_state( yield self.handler.set_state(
@ -1054,7 +1090,7 @@ class PresencePollingTestCase(unittest.TestCase):
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("remote", call("remote",
path="/matrix/federation/v1/send/1000001/", path=ANY,
data=_expect_edu("remote", "m.presence", data=_expect_edu("remote", "m.presence",
content={ content={
"unpoll": [ "@potato:remote" ], "unpoll": [ "@potato:remote" ],
@ -1069,7 +1105,7 @@ class PresencePollingTestCase(unittest.TestCase):
target_user=self.u_clementine, auth_user=self.u_clementine, target_user=self.u_clementine, auth_user=self.u_clementine,
state={"state": OFFLINE}) state={"state": OFFLINE})
put_json.await_calls() yield put_json.await_calls()
self.assertFalse(self.u_potato in self.handler._remote_recvmap, self.assertFalse(self.u_potato in self.handler._remote_recvmap,
msg="expected potato not to be in _remote_recvmap" msg="expected potato not to be in _remote_recvmap"

View File

@ -81,7 +81,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.replication = hs.get_replication_layer() self.replication = hs.get_replication_layer()
self.replication.send_edu = Mock() self.replication.send_edu = Mock()
self.replication.send_edu.return_value = defer.succeed((200, "OK"))
def send_edu(*args, **kwargs):
# print "send_edu: %s, %s" % (args, kwargs)
return defer.succeed((200, "OK"))
self.replication.send_edu.side_effect = send_edu
def get_profile_displayname(user_localpart): def get_profile_displayname(user_localpart):
return defer.succeed("Frank") return defer.succeed("Frank")
@ -95,11 +99,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
return defer.succeed("http://foo") return defer.succeed("http://foo")
self.datastore.get_profile_avatar_url = get_profile_avatar_url self.datastore.get_profile_avatar_url = get_profile_avatar_url
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
def get_presence_list(user_localpart, accepted=None): def get_presence_list(user_localpart, accepted=None):
return defer.succeed([ return defer.succeed(self.presence_list)
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
])
self.datastore.get_presence_list = get_presence_list self.datastore.get_presence_list = get_presence_list
def do_users_share_a_room(userlist): def do_users_share_a_room(userlist):
@ -109,7 +114,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.handlers = hs.get_handlers() self.handlers = hs.get_handlers()
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None) def update(*args, **kwargs):
# print "mock_update_client: %s, %s" %(args, kwargs)
return defer.succeed(None)
self.mock_update_client.side_effect = update
self.handlers.presence_handler.push_update_to_clients = ( self.handlers.presence_handler.push_update_to_clients = (
self.mock_update_client) self.mock_update_client)
@ -130,6 +138,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_set_my_state(self): def test_set_my_state(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
mocked_set = self.datastore.set_presence_state mocked_set = self.datastore.set_presence_state
mocked_set.return_value = defer.succeed({"state": OFFLINE}) mocked_set.return_value = defer.succeed({"state": OFFLINE})
@ -142,6 +155,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_local(self): def test_push_local(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
self.datastore.set_presence_state.return_value = defer.succeed( self.datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE}) {"state": ONLINE})
@ -173,12 +191,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
presence) presence)
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=[],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"] statuscache = self.mock_update_client.call_args[1]["statuscache"]
@ -198,12 +214,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.u_apple, "I am an Apple") self.u_apple, "I am an Apple")
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=[],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"] statuscache = self.mock_update_client.call_args[1]["statuscache"]
@ -217,6 +231,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_remote(self): def test_push_remote(self):
self.presence_list = [
{"observed_user_id": "@potato:remote"},
]
self.datastore.set_presence_state.return_value = defer.succeed( self.datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE}) {"state": ONLINE})
@ -247,6 +265,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_recv_remote(self): def test_recv_remote(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
# TODO(paul): Gut-wrenching # TODO(paul): Gut-wrenching
potato_set = self.handlers.presence_handler._remote_recvmap.setdefault( potato_set = self.handlers.presence_handler._remote_recvmap.setdefault(
self.u_potato, set()) self.u_potato, set())
@ -264,7 +287,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_called_with( self.mock_update_client.assert_called_with(
observer_user=self.u_apple, users_to_push=set([self.u_apple]),
room_ids=[],
observed_user=self.u_potato, observed_user=self.u_potato,
statuscache=ANY) statuscache=ANY)

View File

@ -21,7 +21,7 @@ from synapse.api.events.room import (
RoomMemberEvent, MessageEvent RoomMemberEvent, MessageEvent
) )
from twisted.internet import defer from twisted.internet import defer, reactor
from collections import namedtuple from collections import namedtuple
from mock import patch, Mock from mock import patch, Mock
@ -263,18 +263,43 @@ class DeferredMockCallable(object):
d.callback(None) d.callback(None)
return result return result
raise AssertionError("Was not expecting call(%s)" % failure = AssertionError("Was not expecting call(%s)" %
_format_call(args, kwargs) _format_call(args, kwargs)
) )
for _, _, d in self.expectations:
try:
d.errback(failure)
except:
pass
raise failure
def expect_call_and_return(self, call, result): def expect_call_and_return(self, call, result):
self.expectations.append((call, result, defer.Deferred())) self.expectations.append((call, result, defer.Deferred()))
@defer.inlineCallbacks @defer.inlineCallbacks
def await_calls(self): def await_calls(self, timeout=1000):
while self.expectations: deferred = defer.DeferredList(
(_, _, d) = self.expectations.pop(0) [d for _, _, d in self.expectations],
yield d fireOnOneErrback=True
)
timer = reactor.callLater(
timeout/1000,
deferred.errback,
AssertionError(
"%d pending calls left: %s"% (
len([e for e in self.expectations if not e[2].called]),
[e for e in self.expectations if not e[2].called]
)
)
)
yield deferred
timer.cancel()
self.calls = [] self.calls = []
def assert_had_no_calls(self): def assert_had_no_calls(self):