Merge branch 'develop' into room-initial-sync

Conflicts:
	synapse/handlers/message.py
This commit is contained in:
Paul "LeoNerd" Evans 2014-11-17 16:59:24 +00:00
commit 31a049eb69
197 changed files with 4504 additions and 67061 deletions

View file

@ -14,7 +14,18 @@
# limitations under the License.
from twisted.internet import defer
from synapse.api.errors import LimitExceededError
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership
import logging
logger = logging.getLogger(__name__)
class BaseHandler(object):
@ -30,6 +41,9 @@ class BaseHandler(object):
self.clock = hs.get_clock()
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
def ratelimit(self, user_id):
time_now = self.clock.time()
allowed, time_allowed = self.ratelimiter.send_message(
@ -44,16 +58,58 @@ class BaseHandler(object):
@defer.inlineCallbacks
def _on_new_room_event(self, event, snapshot, extra_destinations=[],
extra_users=[]):
extra_users=[], suppress_auth=False,
do_invite_host=None):
yield run_on_reactor()
snapshot.fill_out_prev_events(event)
yield self.state_handler.annotate_event_with_state(event)
yield self.auth.add_auth_events(event)
logger.debug("Signing event...")
add_hashes_and_signatures(
event, self.server_name, self.signing_key
)
logger.debug("Signed event.")
if not suppress_auth:
logger.debug("Authing...")
self.auth.check(event, raises=True)
logger.debug("Authed")
else:
logger.debug("Suppressed auth.")
if do_invite_host:
federation_handler = self.hs.get_handlers().federation_handler
invite_event = yield federation_handler.send_invite(
do_invite_host,
event
)
# FIXME: We need to check if the remote changed anything else
event.signatures = invite_event.signatures
yield self.store.persist_event(event)
destinations = set(extra_destinations)
# Send a PDU to all hosts who have joined the room.
destinations.update((yield self.store.get_joined_hosts_for_room(
event.room_id
)))
for k, s in event.state_events.items():
try:
if k[0] == RoomMemberEvent.TYPE:
if s.content["membership"] == Membership.JOIN:
destinations.add(
self.hs.parse_userid(s.state_key).domain
)
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
)
event.destinations = list(destinations)
self.notifier.on_new_room_event(event, extra_users=extra_users)

View file

@ -147,10 +147,8 @@ class DirectoryHandler(BaseHandler):
content={"aliases": aliases},
)
snapshot = yield self.store.snapshot_room(
room_id=room_id,
user_id=user_id,
)
snapshot = yield self.store.snapshot_room(event)
yield self.state_handler.handle_new_event(event, snapshot)
yield self._on_new_room_event(event, snapshot, extra_users=[user_id])
yield self._on_new_room_event(
event, snapshot, extra_users=[user_id], suppress_auth=True
)

View file

@ -17,13 +17,18 @@
from ._base import BaseHandler
from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent
from synapse.api.events.utils import prune_event
from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.federation.pdu_codec import PduCodec
from synapse.api.errors import SynapseError
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import (
compute_event_signature, check_event_content_hash
)
from syutil.jsonutil import encode_canonical_json
from twisted.internet import defer, reactor
from twisted.internet import defer
import logging
@ -38,6 +43,8 @@ class FederationHandler(BaseHandler):
of the home server (including auth and state conflict resoultion)
b) converting events that were produced by local clients that may need
to be sent to remote home servers.
c) doing the necessary dances to invite remote users and join remote
rooms.
"""
def __init__(self, hs):
@ -55,12 +62,14 @@ class FederationHandler(BaseHandler):
self.state_handler = hs.get_state_handler()
# self.auth_handler = gs.get_auth_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.lock_manager = hs.get_room_lock_manager()
self.replication_layer.set_handler(self)
self.pdu_codec = PduCodec(hs)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@log_function
@defer.inlineCallbacks
@ -78,7 +87,9 @@ class FederationHandler(BaseHandler):
processing.
"""
pdu = self.pdu_codec.pdu_from_event(event)
yield run_on_reactor()
pdu = event
if not hasattr(pdu, "destinations") or not pdu.destinations:
pdu.destinations = []
@ -87,97 +98,113 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, pdu, backfilled):
def on_receive_pdu(self, pdu, backfilled, state=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it throught the StateHandler.
do auth checks and put it through the StateHandler.
"""
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
logger.debug("Got event: %s", event.event_id)
with (yield self.lock_manager.lock(pdu.context)):
if event.is_state and not backfilled:
is_new_state = yield self.state_handler.handle_new_state(
pdu
)
else:
is_new_state = False
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
self.room_queues[event.room_id].append(pdu)
return
logger.debug("Processing event: %s", event.event_id)
redacted_event = prune_event(event)
redacted_pdu_json = redacted_event.get_pdu_json()
try:
yield self.keyring.verify_json_for_server(
event.origin, redacted_pdu_json
)
except SynapseError as e:
logger.warn("Signature check failed for %s redacted to %s",
encode_canonical_json(pdu.get_pdu_json()),
encode_canonical_json(redacted_pdu_json),
)
raise FederationError(
"ERROR",
e.code,
e.msg,
affected=event.event_id,
)
if not check_event_content_hash(event):
logger.warn(
"Event content has been tampered, redacting %s, %s",
event.event_id, encode_canonical_json(event.get_full_dict())
)
event = redacted_event
is_new_state = yield self.state_handler.annotate_event_with_state(
event,
old_state=state
)
logger.debug("Event: %s", event)
try:
self.auth.check(event, raises=True)
except AuthError as e:
raise FederationError(
"ERROR",
e.code,
e.msg,
affected=event.event_id,
)
is_new_state = is_new_state and not backfilled
# TODO: Implement something in federation that allows us to
# respond to PDU.
target_is_mine = False
if hasattr(event, "target_host"):
target_is_mine = event.target_host == self.hs.hostname
yield self.store.persist_event(
event,
backfilled,
is_new_state=is_new_state
)
if event.type == InviteJoinEvent.TYPE:
if not target_is_mine:
logger.debug("Ignoring invite/join event %s", event)
return
room = yield self.store.get_room(event.room_id)
# If we receive an invite/join event then we need to join the
# sender to the given room.
# TODO: We should probably auth this or some such
content = event.content
content.update({"membership": Membership.JOIN})
new_event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
state_key=event.user_id,
room_id=event.room_id,
user_id=event.user_id,
membership=Membership.JOIN,
content=content
)
yield self.hs.get_handlers().room_member_handler.change_membership(
new_event,
do_auth=False,
)
else:
with (yield self.room_lock.lock(event.room_id)):
yield self.store.persist_event(
event,
backfilled,
is_new_state=is_new_state
if not room:
# Huh, let's try and get the current state
try:
yield self.replication_layer.get_state_for_context(
event.origin, event.room_id, event.event_id,
)
room = yield self.store.get_room(event.room_id)
if not room:
# Huh, let's try and get the current state
try:
yield self.replication_layer.get_state_for_context(
event.origin, event.room_id
)
hosts = yield self.store.get_joined_hosts_for_room(
event.room_id
)
if self.hs.hostname in hosts:
try:
yield self.store.store_room(
room_id=event.room_id,
room_creator_user_id="",
is_public=False,
)
except:
pass
except:
logger.exception(
"Failed to get current state for room %s",
event.room_id
)
if not backfilled:
extra_users = []
if event.type == RoomMemberEvent.TYPE:
target_user_id = event.state_key
target_user = self.hs.parse_userid(target_user_id)
extra_users.append(target_user)
yield self.notifier.on_new_room_event(
event, extra_users=extra_users
hosts = yield self.store.get_joined_hosts_for_room(
event.room_id
)
if self.hs.hostname in hosts:
try:
yield self.store.store_room(
room_id=event.room_id,
room_creator_user_id="",
is_public=False,
)
except:
pass
except:
logger.exception(
"Failed to get current state for room %s",
event.room_id
)
if not backfilled:
extra_users = []
if event.type == RoomMemberEvent.TYPE:
target_user_id = event.state_key
target_user = self.hs.parse_userid(target_user_id)
extra_users.append(target_user)
yield self.notifier.on_new_room_event(
event, extra_users=extra_users
)
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
@ -189,79 +216,349 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit):
pdus = yield self.replication_layer.backfill(dest, room_id, limit)
""" Trigger a backfill request to `dest` for the given `room_id`
"""
extremities = yield self.store.get_oldest_events_in_room(room_id)
pdus = yield self.replication_layer.backfill(
dest,
room_id,
limit,
extremities=extremities,
)
events = []
for pdu in pdus:
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
# FIXME (erikj): Not sure this actually works :/
yield self.state_handler.annotate_event_with_state(event)
events.append(event)
yield self.store.persist_event(event, backfilled=True)
defer.returnValue(events)
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
Invites must be signed by the invitee's server before distribution.
"""
pdu = yield self.replication_layer.send_invite(
destination=target_host,
context=event.room_id,
event_id=event.event_id,
pdu=event
)
defer.returnValue(pdu)
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
defer.returnValue([e for e in auth])
@log_function
@defer.inlineCallbacks
def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
""" Attempts to join the `joinee` to the room `room_id` via the
server `target_host`.
hosts = yield self.store.get_joined_hosts_for_room(room_id)
if self.hs.hostname in hosts:
# We are already in the room.
logger.debug("We're already in the room apparently")
defer.returnValue(False)
This first triggers a /make_join/ request that returns a partial
event that we can fill out and sign. This is then sent to the
remote server via /send_join/ which responds with the state at that
event and the auth_chains.
# First get current state to see if we are already joined.
try:
yield self.replication_layer.get_state_for_context(
target_host, room_id
)
hosts = yield self.store.get_joined_hosts_for_room(room_id)
if self.hs.hostname in hosts:
# Oh, we were actually in the room already.
logger.debug("We're already in the room apparently")
defer.returnValue(False)
except Exception:
logger.exception("Failed to get current state")
new_event = self.event_factory.create_event(
etype=InviteJoinEvent.TYPE,
target_host=target_host,
room_id=room_id,
user_id=joinee,
content=content
We suspend processing of any received events from this room until we
have finished processing the join.
"""
pdu = yield self.replication_layer.make_join(
target_host,
room_id,
joinee
)
new_event.destinations = [target_host]
logger.debug("Got response to make_join: %s", pdu)
snapshot.fill_out_prev_events(new_event)
yield self.handle_new_event(new_event, snapshot)
event = pdu
# TODO (erikj): Time out here.
d = defer.Deferred()
self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
reactor.callLater(10, d.cancel)
# We should assert some things.
assert(event.type == RoomMemberEvent.TYPE)
assert(event.user_id == joinee)
assert(event.state_key == joinee)
assert(event.room_id == room_id)
event.outlier = False
self.room_queues[room_id] = []
try:
yield d
except defer.CancelledError:
raise SynapseError(500, "Unable to join remote room")
event.event_id = self.event_factory.create_event_id()
event.content = content
try:
yield self.store.store_room(
room_id=room_id,
room_creator_user_id="",
is_public=False
state = yield self.replication_layer.send_join(
target_host,
event
)
except:
pass
logger.debug("do_invite_join state: %s", state)
yield self.state_handler.annotate_event_with_state(
event,
old_state=state
)
logger.debug("do_invite_join event: %s", event)
try:
yield self.store.store_room(
room_id=room_id,
room_creator_user_id="",
is_public=False
)
except:
# FIXME
pass
for e in state:
# FIXME: Auth these.
e.outlier = True
yield self.state_handler.annotate_event_with_state(
e,
)
yield self.store.persist_event(
e,
backfilled=False,
is_new_state=True
)
yield self.store.persist_event(
event,
backfilled=False,
is_new_state=True
)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
for p in room_queue:
try:
yield self.on_receive_pdu(p, backfilled=False)
except:
pass
defer.returnValue(True)
@defer.inlineCallbacks
@log_function
def on_make_join_request(self, context, user_id):
""" We've received a /make_join/ request, so we create a partial
join event for the room and return that. We don *not* persist or
process it until the other server has signed it and sent it back.
"""
event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
content={"membership": Membership.JOIN},
room_id=context,
user_id=user_id,
state_key=user_id,
)
snapshot = yield self.store.snapshot_room(event)
snapshot.fill_out_prev_events(event)
yield self.state_handler.annotate_event_with_state(event)
yield self.auth.add_auth_events(event)
self.auth.check(event, raises=True)
pdu = event
defer.returnValue(pdu)
@defer.inlineCallbacks
@log_function
def on_send_join_request(self, origin, pdu):
""" We have received a join event for a room. Fully process it and
respond with the current state and auth chains.
"""
event = pdu
event.outlier = False
is_new_state = yield self.state_handler.annotate_event_with_state(event)
self.auth.check(event, raises=True)
# FIXME (erikj): All this is duplicated above :(
yield self.store.persist_event(
event,
backfilled=False,
is_new_state=is_new_state
)
extra_users = []
if event.type == RoomMemberEvent.TYPE:
target_user_id = event.state_key
target_user = self.hs.parse_userid(target_user_id)
extra_users.append(target_user)
yield self.notifier.on_new_room_event(
event, extra_users=extra_users
)
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
new_pdu = event
destinations = set()
for k, s in event.state_events.items():
try:
if k[0] == RoomMemberEvent.TYPE:
if s.content["membership"] == Membership.JOIN:
destinations.add(
self.hs.parse_userid(s.state_key).domain
)
except:
logger.warn(
"Failed to get destination from event %s", s.event_id
)
new_pdu.destinations = list(destinations)
yield self.replication_layer.send_pdu(new_pdu)
auth_chain = yield self.store.get_auth_chain(event.event_id)
defer.returnValue({
"state": event.state_events.values(),
"auth_chain": auth_chain,
})
@defer.inlineCallbacks
def on_invite_request(self, origin, pdu):
""" We've got an invite event. Process and persist it. Sign it.
Respond with the now signed event.
"""
event = pdu
event.outlier = True
event.signatures.update(
compute_event_signature(
event,
self.hs.hostname,
self.hs.config.signing_key[0]
)
)
yield self.state_handler.annotate_event_with_state(event)
yield self.store.persist_event(
event,
backfilled=False,
)
target_user = self.hs.parse_userid(event.state_key)
yield self.notifier.on_new_room_event(
event, extra_users=[target_user],
)
defer.returnValue(event)
@defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id):
yield run_on_reactor()
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
state_groups = yield self.store.get_state_groups(
[event_id]
)
if state_groups:
_, state = state_groups.items().pop()
results = {
(e.type, e.state_key): e for e in state
}
event = yield self.store.get_event(event_id)
if hasattr(event, "state_key"):
# Get previous state
if hasattr(event, "replaces_state") and event.replaces_state:
prev_event = yield self.store.get_event(
event.replaces_state
)
results[(event.type, event.state_key)] = prev_event
else:
del results[(event.type, event.state_key)]
defer.returnValue(results.values())
else:
defer.returnValue([])
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, context, pdu_list, limit):
in_room = yield self.auth.check_host_in_room(context, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self.store.get_backfill_events(
context,
pdu_list,
limit
)
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
def get_persisted_pdu(self, origin, event_id):
""" Get a PDU from the database with given origin and id.
Returns:
Deferred: Results in a `Pdu`.
"""
event = yield self.store.get_event(
event_id,
allow_none=True,
)
if event:
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
)
if not in_room:
raise AuthError(403, "Host not in room.")
defer.returnValue(event)
else:
defer.returnValue(None)
@log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@log_function
def _on_user_joined(self, user, room_id):
waiters = self.waiting_for_join_list.get((user.to_string(), room_id), [])
waiters = self.waiting_for_join_list.get(
(user.to_string(), room_id),
[]
)
while waiters:
waiters.pop().callback(None)

View file

@ -16,7 +16,6 @@
from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.events.room import RoomTopicEvent
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
from ._base import BaseHandler
@ -26,7 +25,6 @@ import logging
logger = logging.getLogger(__name__)
class MessageHandler(BaseHandler):
def __init__(self, hs):
@ -59,7 +57,8 @@ class MessageHandler(BaseHandler):
# user_id=sender_id
# )
# TODO (erikj): Once we work out the correct c-s api we need to think on how to do this.
# TODO (erikj): Once we work out the correct c-s api we need to think
# on how to do this.
defer.returnValue(None)
@ -81,12 +80,11 @@ class MessageHandler(BaseHandler):
user = self.hs.parse_userid(event.user_id)
assert user.is_mine, "User must be our own: %s" % (user,)
snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
snapshot = yield self.store.snapshot_room(event)
if not suppress_auth:
yield self.auth.check(event, snapshot, raises=True)
yield self._on_new_room_event(event, snapshot)
yield self._on_new_room_event(
event, snapshot, suppress_auth=suppress_auth
)
self.hs.get_handlers().presence_handler.bump_presence_active_time(
user
@ -111,7 +109,9 @@ class MessageHandler(BaseHandler):
data_source = self.hs.get_event_sources().sources["room"]
if not pagin_config.from_token:
pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
pagin_config.from_token = (
yield self.hs.get_event_sources().get_current_token()
)
user = self.hs.parse_userid(user_id)
@ -142,66 +142,27 @@ class MessageHandler(BaseHandler):
SynapseError if something went wrong.
"""
snapshot = yield self.store.snapshot_room(
event.room_id,
event.user_id,
state_type=event.type,
state_key=event.state_key,
)
yield self.auth.check(event, snapshot, raises=True)
yield self.state_handler.handle_new_event(event, snapshot)
snapshot = yield self.store.snapshot_room(event)
yield self._on_new_room_event(event, snapshot)
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
event_type=None, state_key="",
public_room_rules=[],
private_room_rules=["join"]):
event_type=None, state_key=""):
""" Get data from a room.
Args:
event : The room path event
public_room_rules : A list of membership states the user can be in,
in order to read this data IN A PUBLIC ROOM. An empty list means
'any state'.
private_room_rules : A list of membership states the user can be
in, in order to read this data IN A PRIVATE ROOM. An empty list
means 'any state'.
Returns:
The path data content.
Raises:
SynapseError if something went wrong.
"""
if event_type == RoomTopicEvent.TYPE:
# anyone invited/joined can read the topic
private_room_rules = ["invite", "join"]
have_joined = yield self.auth.check_joined_room(room_id, user_id)
if not have_joined:
raise RoomError(403, "User not in room.")
# does this room exist
room = yield self.store.get_room(room_id)
if not room:
raise RoomError(403, "Room does not exist.")
# does this user exist in this room
member = yield self.store.get_room_member(
room_id=room_id,
user_id="" if not user_id else user_id)
member_state = member.membership if member else None
if room.is_public and public_room_rules:
# make sure the user meets public room rules
if member_state not in public_room_rules:
raise RoomError(403, "Member does not meet public room rules.")
elif not room.is_public and private_room_rules:
# make sure the user meets private room rules
if member_state not in private_room_rules:
raise RoomError(
403, "Member does not meet private room rules.")
data = yield self.store.get_current_state(
data = yield self.state_handler.get_current_state(
room_id, event_type, state_key
)
defer.returnValue(data)
@ -219,9 +180,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def send_feedback(self, event):
snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
yield self.auth.check(event, snapshot, raises=True)
snapshot = yield self.store.snapshot_room(event)
# store message in db
yield self._on_new_room_event(event, snapshot)
@ -239,7 +198,7 @@ class MessageHandler(BaseHandler):
yield self.auth.check_joined_room(room_id, user_id)
# TODO: This is duplicating logic from snapshot_all_rooms
current_state = yield self.store.get_current_state(room_id)
current_state = yield self.state_handler.get_current_state(room_id)
defer.returnValue([self.hs.serialize_event(c) for c in current_state])
@defer.inlineCallbacks
@ -289,8 +248,10 @@ class MessageHandler(BaseHandler):
d = {
"room_id": event.room_id,
"membership": event.membership,
"visibility": ("public" if event.room_id in
public_room_ids else "private"),
"visibility": (
"public" if event.room_id in public_room_ids
else "private"
),
}
if event.membership == Membership.INVITE:
@ -316,10 +277,12 @@ class MessageHandler(BaseHandler):
"end": end_token.to_string(),
}
current_state = yield self.store.get_current_state(
current_state = yield self.state_handler.get_current_state(
event.room_id
)
d["state"] = [self.hs.serialize_event(c) for c in current_state]
d["state"] = [
self.hs.serialize_event(c) for c in current_state
]
except:
logger.exception("Failed to get snapshot")

View file

@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
from ._base import BaseHandler
@ -153,10 +152,13 @@ class ProfileHandler(BaseHandler):
if not user.is_mine:
defer.returnValue(None)
(displayname, avatar_url) = yield defer.gatherResults([
self.store.get_profile_displayname(user.localpart),
self.store.get_profile_avatar_url(user.localpart),
])
(displayname, avatar_url) = yield defer.gatherResults(
[
self.store.get_profile_displayname(user.localpart),
self.store.get_profile_avatar_url(user.localpart),
],
consumeErrors=True
)
state["displayname"] = displayname
state["avatar_url"] = avatar_url
@ -196,10 +198,7 @@ class ProfileHandler(BaseHandler):
)
for j in joins:
snapshot = yield self.store.snapshot_room(
j.room_id, j.state_key, RoomMemberEvent.TYPE,
j.state_key
)
snapshot = yield self.store.snapshot_room(j)
content = {
"membership": j.content["membership"],
@ -218,5 +217,6 @@ class ProfileHandler(BaseHandler):
user_id=j.state_key,
)
yield self.state_handler.handle_new_event(new_event, snapshot)
yield self._on_new_room_event(new_event, snapshot)
yield self._on_new_room_event(
new_event, snapshot, suppress_auth=True
)

View file

@ -21,10 +21,10 @@ from synapse.api.constants import Membership, JoinRules
from synapse.api.errors import StoreError, SynapseError
from synapse.api.events.room import (
RoomMemberEvent, RoomCreateEvent, RoomPowerLevelsEvent,
RoomJoinRulesEvent, RoomAddStateLevelEvent, RoomTopicEvent,
RoomSendEventLevelEvent, RoomOpsPowerLevelsEvent, RoomNameEvent,
RoomTopicEvent, RoomNameEvent, RoomJoinRulesEvent,
)
from synapse.util import stringutils
from synapse.util.async import run_on_reactor
from ._base import BaseHandler
import logging
@ -111,26 +111,15 @@ class RoomCreationHandler(BaseHandler):
user, room_id, is_public=is_public
)
if room_alias:
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
)
@defer.inlineCallbacks
def handle_event(event):
snapshot = yield self.store.snapshot_room(
room_id=room_id,
user_id=user_id,
)
snapshot = yield self.store.snapshot_room(event)
logger.debug("Event: %s", event)
yield self.state_handler.handle_new_event(event, snapshot)
yield self._on_new_room_event(event, snapshot, extra_users=[user])
yield self._on_new_room_event(
event, snapshot, extra_users=[user], suppress_auth=True
)
for event in creation_events:
yield handle_event(event)
@ -141,7 +130,6 @@ class RoomCreationHandler(BaseHandler):
etype=RoomNameEvent.TYPE,
room_id=room_id,
user_id=user_id,
required_power_level=50,
content={"name": name},
)
@ -153,7 +141,6 @@ class RoomCreationHandler(BaseHandler):
etype=RoomTopicEvent.TYPE,
room_id=room_id,
user_id=user_id,
required_power_level=50,
content={"topic": topic},
)
@ -188,9 +175,18 @@ class RoomCreationHandler(BaseHandler):
join_event,
do_auth=False
)
result = {"room_id": room_id}
if room_alias:
result["room_alias"] = room_alias.to_string()
directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
user_id=user_id,
room_id=room_id,
room_alias=room_alias,
servers=[self.hs.hostname],
)
defer.returnValue(result)
@ -198,7 +194,6 @@ class RoomCreationHandler(BaseHandler):
event_keys = {
"room_id": room_id,
"user_id": creator.to_string(),
"required_power_level": 100,
}
def create(etype, **content):
@ -215,7 +210,21 @@ class RoomCreationHandler(BaseHandler):
power_levels_event = self.event_factory.create_event(
etype=RoomPowerLevelsEvent.TYPE,
content={creator.to_string(): 100, "default": 0},
content={
"users": {
creator.to_string(): 100,
},
"users_default": 0,
"events": {
RoomNameEvent.TYPE: 100,
RoomPowerLevelsEvent.TYPE: 100,
},
"events_default": 0,
"state_default": 50,
"ban": 50,
"kick": 50,
"redact": 50
},
**event_keys
)
@ -225,30 +234,10 @@ class RoomCreationHandler(BaseHandler):
join_rule=join_rule,
)
add_state_event = create(
etype=RoomAddStateLevelEvent.TYPE,
level=100,
)
send_event = create(
etype=RoomSendEventLevelEvent.TYPE,
level=0,
)
ops = create(
etype=RoomOpsPowerLevelsEvent.TYPE,
ban_level=50,
kick_level=50,
redact_level=50,
)
return [
creation_event,
power_levels_event,
join_rules_event,
add_state_event,
send_event,
ops,
]
@ -363,10 +352,8 @@ class RoomMemberHandler(BaseHandler):
"""
target_user_id = event.state_key
snapshot = yield self.store.snapshot_room(
event.room_id, event.user_id,
RoomMemberEvent.TYPE, target_user_id
)
snapshot = yield self.store.snapshot_room(event)
## TODO(markjh): get prev state from snapshot.
prev_state = yield self.store.get_room_member(
target_user_id, event.room_id
@ -375,13 +362,6 @@ class RoomMemberHandler(BaseHandler):
if prev_state:
event.content["prev"] = prev_state.membership
# if prev_state and prev_state.membership == event.membership:
# # treat this event as a NOOP.
# if do_auth: # This is mainly to fix a unit test.
# yield self.auth.check(event, raises=True)
# defer.returnValue({})
# return
room_id = event.room_id
# If we're trying to join a room then we have to do this differently
@ -391,29 +371,17 @@ class RoomMemberHandler(BaseHandler):
yield self._do_join(event, snapshot, do_auth=do_auth)
else:
# This is not a JOIN, so we can handle it normally.
if do_auth:
yield self.auth.check(event, snapshot, raises=True)
# If we're banning someone, set a req power level
if event.membership == Membership.BAN:
if not hasattr(event, "required_power_level") or event.required_power_level is None:
# Add some default required_power_level
user_level = yield self.store.get_power_level(
event.room_id,
event.user_id,
)
event.required_power_level = user_level
if prev_state and prev_state.membership == event.membership:
# double same action, treat this event as a NOOP.
defer.returnValue({})
return
yield self.state_handler.handle_new_event(event, snapshot)
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
snapshot=snapshot,
do_auth=do_auth,
)
defer.returnValue({"room_id": room_id})
@ -443,10 +411,7 @@ class RoomMemberHandler(BaseHandler):
content=content,
)
snapshot = yield self.store.snapshot_room(
room_id, joinee.to_string(), RoomMemberEvent.TYPE,
joinee.to_string()
)
snapshot = yield self.store.snapshot_room(new_event)
yield self._do_join(new_event, snapshot, room_host=host, do_auth=True)
@ -468,9 +433,12 @@ class RoomMemberHandler(BaseHandler):
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
hosts = yield self.store.get_joined_hosts_for_room(room_id)
is_host_in_room = yield self.auth.check_host_in_room(
event.room_id,
self.hs.hostname
)
if self.hs.hostname in hosts:
if is_host_in_room:
should_do_dance = False
elif room_host:
should_do_dance = True
@ -502,14 +470,11 @@ class RoomMemberHandler(BaseHandler):
if not have_joined:
logger.debug("Doing normal join")
if do_auth:
yield self.auth.check(event, snapshot, raises=True)
yield self.state_handler.handle_new_event(event, snapshot)
yield self._do_local_membership_update(
event,
membership=event.content["membership"],
snapshot=snapshot,
do_auth=do_auth,
)
user = self.hs.parse_userid(event.user_id)
@ -553,26 +518,29 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue([r.room_id for r in rooms])
def _do_local_membership_update(self, event, membership, snapshot):
destinations = []
@defer.inlineCallbacks
def _do_local_membership_update(self, event, membership, snapshot,
do_auth):
yield run_on_reactor()
# If we're inviting someone, then we should also send it to that
# HS.
target_user_id = event.state_key
target_user = self.hs.parse_userid(target_user_id)
if membership == Membership.INVITE:
host = target_user.domain
destinations.append(host)
if membership == Membership.INVITE and not target_user.is_mine:
do_invite_host = target_user.domain
else:
do_invite_host = None
# Always include target domain
host = target_user.domain
destinations.append(host)
return self._on_new_room_event(
event, snapshot, extra_destinations=destinations,
extra_users=[target_user]
yield self._on_new_room_event(
event,
snapshot,
extra_users=[target_user],
suppress_auth=(not do_auth),
do_invite_host=do_invite_host,
)
class RoomListHandler(BaseHandler):
@defer.inlineCallbacks