Merge branch 'develop' into storage_transactions

Conflicts:
	synapse/api/auth.py
	synapse/handlers/room.py
	synapse/storage/__init__.py
This commit is contained in:
Mark Haines 2014-08-26 16:07:05 +01:00
commit 4b63b06cad
74 changed files with 3467 additions and 870 deletions

View file

@ -24,4 +24,5 @@ class BaseHandler(object):
self.notifier = hs.get_notifier()
self.room_lock = hs.get_room_lock_manager()
self.state_handler = hs.get_state_handler()
self.distributor = hs.get_distributor()
self.hs = hs

View file

@ -32,6 +32,15 @@ logger = logging.getLogger(__name__)
class FederationHandler(BaseHandler):
"""Handles events that originated from federation."""
def __init__(self, hs):
super(FederationHandler, self).__init__(hs)
self.distributor.observe(
"user_joined_room",
self._on_user_joined
)
self.waiting_for_join_list = {}
@log_function
@defer.inlineCallbacks
@ -56,7 +65,7 @@ class FederationHandler(BaseHandler):
content.update({"membership": Membership.JOIN})
new_event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
target_user_id=event.user_id,
state_key=event.user_id,
room_id=event.room_id,
user_id=event.user_id,
membership=Membership.JOIN,
@ -103,6 +112,13 @@ class FederationHandler(BaseHandler):
if not backfilled:
yield self.notifier.on_new_room_event(event, store_id)
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.target_user_id)
self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
@log_function
@defer.inlineCallbacks
@ -152,12 +168,14 @@ class FederationHandler(BaseHandler):
yield federation.handle_new_event(new_event)
store_id = yield self.store.persist_event(new_event)
self.notifier.on_new_room_event(new_event, store_id)
# TODO (erikj): Time out here.
d = defer.Deferred()
self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
yield d
try:
yield self.store.store_room(
event.room_id,
room_id,
"",
is_public=False
)
@ -166,3 +184,10 @@ class FederationHandler(BaseHandler):
defer.returnValue(True)
@log_function
def _on_user_joined(self, user, room_id):
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
while waiters:
waiters.pop().callback(None)

View file

@ -142,6 +142,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user):
defer.returnValue(True)
return
# FIXME (erikj): This code path absolutely kills the database.
assert(observed_user.is_mine)
if observer_user == observed_user:
@ -155,12 +159,11 @@ class PresenceHandler(BaseHandler):
if allowed_by_subscription:
defer.returnValue(True)
rm_handler = self.homeserver.get_handlers().room_member_handler
for room_id in (yield rm_handler.get_rooms_for_user(observer_user)):
if observed_user in (yield rm_handler.get_room_members(room_id)):
defer.returnValue(True)
share_room = yield self.store.do_users_share_a_room(
[observer_user, observed_user]
)
defer.returnValue(False)
defer.returnValue(share_room)
@defer.inlineCallbacks
def get_state(self, target_user, auth_user):
@ -187,6 +190,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks
def set_state(self, target_user, auth_user, state):
return
# TODO (erikj): Turn this back on. Why did we end up sending EDUs
# everywhere?
if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server")
@ -667,7 +674,7 @@ class PresenceHandler(BaseHandler):
def push_update_to_clients(self, observer_user, observed_user,
statuscache):
state = statuscache.make_event(user=observed_user, clock=self.clock)
statuscache.make_event(user=observed_user, clock=self.clock)
self.notifier.on_new_user_event(
observer_user.to_string(),

View file

@ -20,15 +20,14 @@ from synapse.types import UserID, RoomAlias, RoomID
from synapse.api.constants import Membership
from synapse.api.errors import RoomError, StoreError, SynapseError
from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
RoomConfigEvent
RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
)
from synapse.api.streams.event import EventStream, EventsStreamData
from synapse.handlers.presence import PresenceStreamData
from synapse.util import stringutils
from ._base import BaseHandler
import logging
import json
logger = logging.getLogger(__name__)
@ -260,21 +259,38 @@ class MessageHandler(BaseHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
ret = []
rooms_ret = []
now_rooms_token = yield self.store.get_room_events_max_id()
# FIXME (erikj): Fix this.
presence_stream = PresenceStreamData(self.hs)
now_presence_token = yield presence_stream.max_token()
presence = yield presence_stream.get_rows(
user_id, 0, now_presence_token, None, None
)
# FIXME (erikj): We need to not generate this token,
now_token = "%s_%s" % (now_rooms_token, now_presence_token)
for event in room_list:
d = {
"room_id": event.room_id,
"membership": event.membership,
}
ret.append(d)
if event.membership == Membership.INVITE:
d["inviter"] = event.user_id
rooms_ret.append(d)
if event.membership != Membership.JOIN:
continue
try:
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=50,
limit=10,
end_token=now_rooms_token,
)
d["messages"] = {
@ -282,10 +298,15 @@ class MessageHandler(BaseHandler):
"start": token[0],
"end": token[1],
}
current_state = yield self.store.get_current_state(event.room_id)
d["state"] = [c.get_dict() for c in current_state]
except:
logger.exception("Failed to get snapshot")
logger.debug("snapshot_all_rooms returning: %s", ret)
ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
# logger.debug("snapshot_all_rooms returning: %s", ret)
defer.returnValue(ret)
@ -377,7 +398,7 @@ class RoomCreationHandler(BaseHandler):
content = {"membership": Membership.JOIN}
join_event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
target_user_id=user_id,
state_key=user_id,
room_id=room_id,
user_id=user_id,
membership=Membership.JOIN,
@ -505,6 +526,7 @@ class RoomMemberHandler(BaseHandler):
Raises:
SynapseError if there was a problem changing the membership.
"""
target_user_id = event.state_key
snapshot = yield self.store.snapshot_room(
event.room_id, event.user_id,
@ -512,7 +534,7 @@ class RoomMemberHandler(BaseHandler):
)
## TODO(markjh): get prev state from snapshot.
prev_state = yield self.store.get_room_member(
event.target_user_id, event.room_id
target_user_id, event.room_id
)
if prev_state:
@ -569,7 +591,7 @@ class RoomMemberHandler(BaseHandler):
content.update({"membership": Membership.JOIN})
new_event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
target_user_id=joinee.to_string(),
state_key=joinee.to_string(),
room_id=room_id,
user_id=joinee.to_string(),
membership=Membership.JOIN,
@ -586,7 +608,7 @@ class RoomMemberHandler(BaseHandler):
@defer.inlineCallbacks
def _do_join(self, event, snapshot, room_host=None, do_auth=True):
joinee = self.hs.parse_userid(event.target_user_id)
joinee = self.hs.parse_userid(event.state_key)
# room_id = RoomID.from_string(event.room_id, self.hs)
room_id = event.room_id
@ -697,16 +719,17 @@ class RoomMemberHandler(BaseHandler):
# If we're inviting someone, then we should also send it to that
# HS.
target_user_id = event.state_key
if membership == Membership.INVITE:
host = UserID.from_string(
event.target_user_id, self.hs
target_user_id, self.hs
).domain
destinations.append(host)
# If we are joining a remote HS, include that.
if membership == Membership.JOIN:
host = UserID.from_string(
event.target_user_id, self.hs
target_user_id, self.hs
).domain
destinations.append(host)