Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor

Conflicts:
	synapse/handlers/events.py
	synapse/rest/events.py
	synapse/rest/room.py
This commit is contained in:
Erik Johnston 2014-08-27 14:13:06 +01:00
commit 47519cd8c2
23 changed files with 778 additions and 755 deletions

View file

@ -162,6 +162,8 @@ class Auth(object):
"""
try:
user_id = yield self.store.get_user_by_token(token=token)
if not user_id:
raise StoreError()
defer.returnValue(self.hs.parse_userid(user_id))
except StoreError:
raise AuthError(403, "Unrecognised access token.",

View file

@ -43,6 +43,22 @@ import re
logger = logging.getLogger(__name__)
SCHEMAS = [
"transactions",
"pdu",
"users",
"profiles",
"presence",
"im",
"room_aliases",
]
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 1
class SynapseHomeServer(HomeServer):
def build_http_client(self):
@ -65,31 +81,39 @@ class SynapseHomeServer(HomeServer):
don't have to worry about overwriting existing content.
"""
logging.info("Preparing database: %s...", self.db_name)
with sqlite3.connect(self.db_name) as db_conn:
c = db_conn.cursor()
c.execute("PRAGMA user_version")
row = c.fetchone()
if row and row[0]:
user_version = row[0]
if user_version < SCHEMA_VERSION:
# TODO(paul): add some kind of intelligent fixup here
raise ValueError("Cannot use this database as the " +
"schema version (%d) does not match (%d)" %
(user_version, SCHEMA_VERSION)
)
else:
for sql_loc in SCHEMAS:
sql_script = read_schema(sql_loc)
c.executescript(sql_script)
db_conn.commit()
c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)
c.close()
logging.info("Database prepared in %s.", self.db_name)
pool = adbapi.ConnectionPool(
'sqlite3', self.db_name, check_same_thread=False,
cp_min=1, cp_max=1)
schemas = [
"transactions",
"pdu",
"users",
"profiles",
"presence",
"im",
"room_aliases",
]
for sql_loc in schemas:
sql_script = read_schema(sql_loc)
with sqlite3.connect(self.db_name) as db_conn:
c = db_conn.cursor()
c.executescript(sql_script)
c.close()
db_conn.commit()
logging.info("Database prepared in %s.", self.db_name)
return pool
def create_resource_tree(self, web_client, redirect_root_to_web_client):
@ -184,6 +208,7 @@ class SynapseHomeServer(HomeServer):
def start_listening(self, port):
reactor.listenTCP(port, Site(self.root_resource))
logger.info("Synapse now listening on port %d", port)
def setup_logging(verbosity=0, filename=None, config_path=None):
@ -282,7 +307,7 @@ def setup():
redirect_root_to_web_client=True)
hs.start_listening(args.port)
hs.build_db_pool()
hs.get_db_pool()
if args.manhole:
f = twisted.manhole.telnet.ShellFactory()

View file

@ -17,12 +17,13 @@ from .register import RegistrationHandler
from .room import (
MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler
)
from .events import EventStreamHandler
from .events import EventStreamHandler, EventHandler
from .federation import FederationHandler
from .login import LoginHandler
from .profile import ProfileHandler
from .presence import PresenceHandler
from .directory import DirectoryHandler
from .typing import TypingNotificationHandler
class Handlers(object):
@ -39,9 +40,11 @@ class Handlers(object):
self.room_creation_handler = RoomCreationHandler(hs)
self.room_member_handler = RoomMemberHandler(hs)
self.event_stream_handler = EventStreamHandler(hs)
self.event_handler = EventHandler(hs)
self.federation_handler = FederationHandler(hs)
self.profile_handler = ProfileHandler(hs)
self.presence_handler = PresenceHandler(hs)
self.room_list_handler = RoomListHandler(hs)
self.login_handler = LoginHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.typing_notification_handler = TypingNotificationHandler(hs)

View file

@ -47,26 +47,81 @@ class EventStreamHandler(BaseHandler):
def get_stream(self, auth_user_id, pagin_config, timeout=0):
auth_user = self.hs.parse_userid(auth_user_id)
if pagin_config.from_token is None:
pagin_config.from_token = None
try:
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
if auth_user in self._stop_timer_per_user:
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user))
else:
self.distributor.fire(
"started_user_eventstream", auth_user
)
self._streams_per_user[auth_user] += 1
rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout
)
if pagin_config.from_token is None:
pagin_config.from_token = None
chunks = [
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in events
]
rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
}
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout
)
defer.returnValue(chunk)
chunks = [
e.get_dict() if isinstance(e, SynapseEvent) else e
for e in events
]
chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
}
defer.returnValue(chunk)
finally:
self._streams_per_user[auth_user] -= 1
if not self._streams_per_user[auth_user]:
del self._streams_per_user[auth_user]
# 10 seconds of grace to allow the client to reconnect again
# before we think they're gone
def _later():
self.distributor.fire(
"stopped_user_eventstream", auth_user
)
del self._stop_timer_per_user[auth_user]
self._stop_timer_per_user[auth_user] = (
self.clock.call_later(5, _later)
)
class EventHandler(BaseHandler):
@defer.inlineCallbacks
def get_event(self, user, event_id):
"""Retrieve a single specified event.
Args:
user (synapse.types.UserID): The user requesting the event
event_id (str): The event ID to obtain.
Returns:
dict: An event, or None if there is no event matching this ID.
Raises:
SynapseError if there was a problem retrieving this event, or
AuthError if the user does not have the rights to inspect this
event.
"""
event = yield self.store.get_event(event_id)
if not event:
defer.returnValue(None)
return
yield self.auth.check(event, raises=True)
defer.returnValue(event)

146
synapse/handlers/typing.py Normal file
View file

@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# Copyright 2014 matrix.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from ._base import BaseHandler
import logging
from collections import namedtuple
logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
RoomMember = namedtuple("RoomMember", ("room_id", "user"))
class TypingNotificationHandler(BaseHandler):
def __init__(self, hs):
super(TypingNotificationHandler, self).__init__(hs)
self.homeserver = hs
self.clock = hs.get_clock()
self.federation = hs.get_replication_layer()
self.federation.register_edu_handler("m.typing", self._recv_edu)
self._member_typing_until = {}
@defer.inlineCallbacks
def started_typing(self, target_user, auth_user, room_id, timeout):
if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server")
if target_user != auth_user:
raise AuthError(400, "Cannot set another user's typing state")
until = self.clock.time_msec() + timeout
member = RoomMember(room_id=room_id, user=target_user)
was_present = member in self._member_typing_until
self._member_typing_until[member] = until
if was_present:
# No point sending another notification
defer.returnValue(None)
yield self._push_update(
room_id=room_id,
user=target_user,
typing=True,
)
@defer.inlineCallbacks
def stopped_typing(self, target_user, auth_user, room_id):
if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server")
if target_user != auth_user:
raise AuthError(400, "Cannot set another user's typing state")
member = RoomMember(room_id=room_id, user=target_user)
if member not in self._member_typing_until:
# No point
defer.returnValue(None)
yield self._push_update(
room_id=room_id,
user=target_user,
typing=False,
)
@defer.inlineCallbacks
def _push_update(self, room_id, user, typing):
localusers = set()
remotedomains = set()
rm_handler = self.homeserver.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(room_id,
localusers=localusers, remotedomains=remotedomains,
ignore_user=user)
for u in localusers:
self.push_update_to_clients(
room_id=room_id,
observer_user=u,
observed_user=user,
typing=typing,
)
deferreds = []
for domain in remotedomains:
deferreds.append(self.federation.send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": room_id,
"user_id": user.to_string(),
"typing": typing,
},
))
yield defer.DeferredList(deferreds, consumeErrors=False)
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
room_id = content["room_id"]
user = self.homeserver.parse_userid(content["user_id"])
localusers = set()
rm_handler = self.homeserver.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(room_id,
localusers=localusers)
for u in localusers:
self.push_update_to_clients(
room_id=room_id,
observer_user=u,
observed_user=user,
typing=content["typing"]
)
def push_update_to_clients(self, room_id, observer_user, observed_user,
typing):
# TODO(paul) steal this from presence.py
pass

View file

@ -48,6 +48,22 @@ class EventStreamRestServlet(RestServlet):
return (200, {})
# TODO: Unit test gets, with and without auth, with different kinds of events.
class EventRestServlet(RestServlet):
PATTERN = client_path_pattern("/events/(?P<event_id>[^/]*)$")
@defer.inlineCallbacks
def on_GET(self, request, event_id):
auth_user = yield self.auth.get_user_by_req(request)
handler = self.handlers.event_handler
event = yield handler.get_event(auth_user, event_id)
if event:
defer.returnValue((200, event.get_dict()))
else:
defer.returnValue((404, "Event not found."))
def register_servlets(hs, http_server):
EventStreamRestServlet(hs).register(http_server)
EventRestServlet(hs).register(http_server)

View file

@ -68,7 +68,7 @@ class PresenceStatusRestServlet(RestServlet):
class PresenceListRestServlet(RestServlet):
PATTERN = client_path_pattern("/presence_list/(?P<user_id>[^/]*)")
PATTERN = client_path_pattern("/presence/list/(?P<user_id>[^/]*)")
@defer.inlineCallbacks
def on_GET(self, request, user_id):

View file

@ -18,11 +18,9 @@ from twisted.internet import defer
from base import RestServlet, client_path_pattern
from synapse.api.errors import SynapseError, Codes
from synapse.api.events.room import (
MessageEvent, RoomMemberEvent, FeedbackEvent
)
from synapse.api.constants import Feedback
from synapse.streams.config import PaginationConfig
from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership
import json
import logging
@ -36,31 +34,28 @@ class RoomCreateRestServlet(RestServlet):
# No PATTERN; we have custom dispatch rules here
def register(self, http_server):
# /rooms OR /rooms/<roomid>
http_server.register_path("POST",
client_path_pattern("/rooms$"),
self.on_POST)
http_server.register_path("PUT",
client_path_pattern(
"/rooms/(?P<room_id>[^/]*)$"),
self.on_PUT)
PATTERN = "/createRoom"
register_txn_path(self, PATTERN, http_server)
# define CORS for all of /rooms in RoomCreateRestServlet for simplicity
http_server.register_path("OPTIONS",
client_path_pattern("/rooms(?:/.*)?$"),
self.on_OPTIONS)
# define CORS for /createRoom[/txnid]
http_server.register_path("OPTIONS",
client_path_pattern("/createRoom(?:/.*)?$"),
self.on_OPTIONS)
@defer.inlineCallbacks
def on_PUT(self, request, room_id):
room_id = urllib.unquote(room_id)
auth_user = yield self.auth.get_user_by_req(request)
def on_PUT(self, request, txn_id):
try:
defer.returnValue(self.txns.get_client_transaction(request, txn_id))
except KeyError:
pass
if not room_id:
raise SynapseError(400, "PUT must specify a room ID")
response = yield self.on_POST(request)
room_config = self.get_room_config(request)
info = yield self.make_room(room_config, auth_user, room_id)
room_config.update(info)
defer.returnValue((200, info))
self.txns.store_client_transaction(request, txn_id, response)
defer.returnValue(response)
@defer.inlineCallbacks
def on_POST(self, request):
@ -210,24 +205,63 @@ class RoomSendEventRestServlet(RestServlet):
defer.returnValue(response)
# TODO: Needs unit testing for room ID + alias joins
class JoinRoomAliasServlet(RestServlet):
PATTERN = client_path_pattern("/join/(?P<room_alias>[^/]+)$")
def register(self, http_server):
# /join/$room_identifier[/$txn_id]
PATTERN = ("/join/(?P<room_identifier>[^/]*)")
register_txn_path(self, PATTERN, http_server)
@defer.inlineCallbacks
def on_PUT(self, request, room_alias):
def on_POST(self, request, room_identifier):
user = yield self.auth.get_user_by_req(request)
if not user:
defer.returnValue((403, "Unrecognized user"))
# the identifier could be a room alias or a room id. Try one then the
# other if it fails to parse, without swallowing other valid
# SynapseErrors.
logger.debug("room_alias: %s", room_alias)
identifier = None
is_room_alias = False
try:
identifier = self.hs.parse_roomalias(
urllib.unquote(room_identifier)
)
is_room_alias = True
except SynapseError:
identifier = self.hs.parse_roomid(
urllib.unquote(room_identifier)
)
room_alias = self.hs.parse_roomalias(urllib.unquote(room_alias))
# TODO: Support for specifying the home server to join with?
handler = self.handlers.room_member_handler
ret_dict = yield handler.join_room_alias(user, room_alias)
if is_room_alias:
handler = self.handlers.room_member_handler
ret_dict = yield handler.join_room_alias(user, identifier)
defer.returnValue((200, ret_dict))
else: # room id
event = self.event_factory.create_event(
etype=RoomMemberEvent.TYPE,
content={"membership": Membership.JOIN},
room_id=urllib.unquote(identifier.to_string()),
user_id=user.to_string(),
state_key=user.to_string()
)
handler = self.handlers.room_member_handler
yield handler.change_membership(event)
defer.returnValue((200, ""))
defer.returnValue((200, ret_dict))
@defer.inlineCallbacks
def on_PUT(self, request, room_identifier, txn_id):
try:
defer.returnValue(self.txns.get_client_transaction(request, txn_id))
except KeyError:
pass
response = yield self.on_POST(request, room_identifier)
self.txns.store_client_transaction(request, txn_id, response)
defer.returnValue(response)
# TODO: Needs unit testing

View file

@ -28,7 +28,7 @@ from synapse.handlers import Handlers
from synapse.rest import RestServletFactory
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.types import UserID, RoomAlias
from synapse.types import UserID, RoomAlias, RoomID
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.lockutils import LockManager
@ -119,17 +119,30 @@ class BaseHomeServer(object):
setattr(BaseHomeServer, "get_%s" % (depname), _get)
# TODO: Why are these parse_ methods so high up along with other globals?
# Surely these should be in a util package or in the api package?
# Other utility methods
def parse_userid(self, s):
"""Parse the string given by 's' as a User ID and return a UserID
object."""
return UserID.from_string(s, hs=self)
def parse_roomid(self, s):
"""Parse the string given by 's' as a Room ID and return a RoomID
object."""
return RoomID.from_string(s, hs=self)
def parse_roomalias(self, s):
"""Parse the string given by 's' as a Room Alias and return a RoomAlias
object."""
return RoomAlias.from_string(s, hs=self)
def parse_roomid(self, s):
"""Parse the string given by 's' as a Room ID and return a RoomID
object."""
return RoomID.from_string(s, hs=self)
# Build magic accessors for every dependency
for depname in BaseHomeServer.DEPENDENCIES:
BaseHomeServer._make_dependency_method(depname)

View file

@ -80,7 +80,6 @@ class DataStore(RoomMemberStore, RoomStore,
[
"event_id",
"type",
"sender",
"room_id",
"content",
"unrecognized_keys"