Merge branch 'develop' into pushers

Conflicts:
	synapse/rest/__init__.py
This commit is contained in:
David Baker 2015-01-22 17:46:16 +00:00
commit 5c6189ea3e
91 changed files with 638 additions and 1705 deletions

View file

@ -26,8 +26,8 @@ from twisted.web.resource import Resource
from twisted.web.static import File
from twisted.web.server import Site
from synapse.http.server import JsonResource, RootRedirect
from synapse.media.v0.content_repository import ContentRepoResource
from synapse.media.v1.media_repository import MediaRepositoryResource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.http.server_key_resource import LocalKey
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.api.urls import (
@ -241,13 +241,20 @@ def setup():
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
"Have you checked for version specific instructions in UPGRADES.rst?\n"
"Have you checked for version specific instructions in"
" UPGRADES.rst?\n"
)
sys.exit(1)
logger.info("Database prepared in %s.", db_name)
hs.get_db_pool()
db_pool = hs.get_db_pool()
if db_name == ":memory:":
# Memory databases will need to be setup each time they are opened.
reactor.callWhenRunning(
db_pool.runWithConnection, prepare_database
)
if config.manhole:
f = twisted.manhole.telnet.ShellFactory()

View file

@ -20,7 +20,10 @@ import os
class DatabaseConfig(Config):
def __init__(self, args):
super(DatabaseConfig, self).__init__(args)
self.database_path = self.abspath(args.database_path)
if args.database_path == ":memory:":
self.database_path = ":memory:"
else:
self.database_path = self.abspath(args.database_path)
@classmethod
def add_arguments(cls, parser):

View file

@ -33,12 +33,6 @@ class EventBuilder(EventBase):
unsigned=unsigned
)
def update_event_key(self, key, value):
self._event_dict[key] = value
def update_event_keys(self, other_dict):
self._event_dict.update(other_dict)
def build(self):
return FrozenEvent.from_event(self)

View file

@ -256,23 +256,21 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def get_state_for_context(self, destination, context, event_id):
"""Requests all of the `current` state PDUs for a given context from
def get_state_for_room(self, destination, room_id, event_id):
"""Requests all of the `current` state PDUs for a given room from
a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
context (str): The context we're interested in.
room_id (str): The id of the room we're interested in.
event_id (str): The id of the event we want the state at.
Returns:
Deferred: Results in a list of PDUs.
"""
result = yield self.transport_layer.get_context_state(
destination,
context,
event_id=event_id,
result = yield self.transport_layer.get_room_state(
destination, room_id, event_id=event_id,
)
pdus = [
@ -288,9 +286,9 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, context, event_id):
def get_event_auth(self, destination, room_id, event_id):
res = yield self.transport_layer.get_event_auth(
destination, context, event_id,
destination, room_id, event_id,
)
auth_chain = [
@ -304,9 +302,9 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def on_backfill_request(self, origin, context, versions, limit):
def on_backfill_request(self, origin, room_id, versions, limit):
pdus = yield self.handler.on_backfill_request(
origin, context, versions, limit
origin, room_id, versions, limit
)
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict()))
@ -380,12 +378,10 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def on_context_state_request(self, origin, context, event_id):
def on_context_state_request(self, origin, room_id, event_id):
if event_id:
pdus = yield self.handler.get_state_for_pdu(
origin,
context,
event_id,
origin, room_id, event_id,
)
auth_chain = yield self.store.get_auth_chain(
[pdu.event_id for pdu in pdus]
@ -413,7 +409,7 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
def on_pull_request(self, origin, versions):
raise NotImplementedError("Pull transacions not implemented")
raise NotImplementedError("Pull transactions not implemented")
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
@ -422,30 +418,21 @@ class ReplicationLayer(object):
defer.returnValue((200, response))
else:
defer.returnValue(
(404, "No handler for Query type '%s'" % (query_type, ))
(404, "No handler for Query type '%s'" % (query_type,))
)
@defer.inlineCallbacks
def on_make_join_request(self, context, user_id):
pdu = yield self.handler.on_make_join_request(context, user_id)
def on_make_join_request(self, room_id, user_id):
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({
"event": pdu.get_pdu_json(time_now),
})
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
pdu = self.event_from_pdu_json(content)
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec()
defer.returnValue(
(
200,
{
"event": ret_pdu.get_pdu_json(time_now),
}
)
)
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
@defer.inlineCallbacks
def on_send_join_request(self, origin, content):
@ -462,26 +449,17 @@ class ReplicationLayer(object):
}))
@defer.inlineCallbacks
def on_event_auth(self, origin, context, event_id):
def on_event_auth(self, origin, room_id, event_id):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
defer.returnValue(
(
200,
{
"auth_chain": [
a.get_pdu_json(time_now) for a in auth_pdus
],
}
)
)
defer.returnValue((200, {
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
}))
@defer.inlineCallbacks
def make_join(self, destination, context, user_id):
def make_join(self, destination, room_id, user_id):
ret = yield self.transport_layer.make_join(
destination=destination,
context=context,
user_id=user_id,
destination, room_id, user_id
)
pdu_dict = ret["event"]
@ -494,10 +472,10 @@ class ReplicationLayer(object):
def send_join(self, destination, pdu):
time_now = self._clock.time_msec()
_, content = yield self.transport_layer.send_join(
destination,
pdu.room_id,
pdu.event_id,
pdu.get_pdu_json(time_now),
destination=destination,
room_id=pdu.room_id,
event_id=pdu.event_id,
content=pdu.get_pdu_json(time_now),
)
logger.debug("Got content: %s", content)
@ -507,9 +485,6 @@ class ReplicationLayer(object):
for p in content.get("state", [])
]
# FIXME: We probably want to do something with the auth_chain given
# to us
auth_chain = [
self.event_from_pdu_json(p, outlier=True)
for p in content.get("auth_chain", [])
@ -523,11 +498,11 @@ class ReplicationLayer(object):
})
@defer.inlineCallbacks
def send_invite(self, destination, context, event_id, pdu):
def send_invite(self, destination, room_id, event_id, pdu):
time_now = self._clock.time_msec()
code, content = yield self.transport_layer.send_invite(
destination=destination,
context=context,
room_id=room_id,
event_id=event_id,
content=pdu.get_pdu_json(time_now),
)
@ -657,7 +632,7 @@ class ReplicationLayer(object):
"_handle_new_pdu getting state for %s",
pdu.room_id
)
state, auth_chain = yield self.get_state_for_context(
state, auth_chain = yield self.get_state_for_room(
origin, pdu.room_id, pdu.event_id,
)
@ -816,7 +791,7 @@ class _TransactionQueue(object):
logger.info("TX [%s] is ready for retry", destination)
logger.info("TX [%s] _attempt_new_transaction", destination)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
@ -830,14 +805,15 @@ class _TransactionQueue(object):
pending_failures = self.pending_failures_by_dest.pop(destination, [])
if pending_pdus:
logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus))
logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
return
logger.debug(
"TX [%s] Attempting new transaction "
"(pdus: %d, edus: %d, failures: %d)",
"TX [%s] Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination,
len(pending_pdus),
len(pending_edus),

View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The transport layer is responsible for both sending transactions to remote
home servers and receiving a variety of requests from other home servers.
By default this is done over HTTPS (and all home servers are required to
support HTTPS), however individual pairings of servers may decide to
communicate over a different (albeit still reliable) protocol.
"""
from .server import TransportLayerServer
from .client import TransportLayerClient
class TransportLayer(TransportLayerServer, TransportLayerClient):
"""This is a basic implementation of the transport layer that translates
transactions and other requests to/from HTTP.
Attributes:
server_name (str): Local home server host
server (synapse.http.server.HttpServer): the http server to
register listeners on
client (synapse.http.client.HttpClient): the http client used to
send requests
request_handler (TransportRequestHandler): The handler to fire when we
receive requests for data.
received_handler (TransportReceivedHandler): The handler to fire when
we receive data.
"""
def __init__(self, homeserver, server_name, server, client):
"""
Args:
server_name (str): Local home server host
server (synapse.protocol.http.HttpServer): the http server to
register listeners on
client (synapse.protocol.http.HttpClient): the http client used to
send requests
"""
self.keyring = homeserver.get_keyring()
self.server_name = server_name
self.server = server
self.client = client
self.request_handler = None
self.received_handler = None

View file

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
import json
logger = logging.getLogger(__name__)
class TransportLayerClient(object):
"""Sends federation HTTP requests to other servers"""
@log_function
def get_room_state(self, destination, room_id, event_id):
""" Requests all state for a given room from the given server at the
given event.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
context (str): The name of the context we want the state of
event_id (str): The event we want the context at.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_room_state dest=%s, room=%s",
destination, room_id)
path = PREFIX + "/state/%s/" % room_id
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@log_function
def get_event(self, destination, event_id):
""" Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
event_id (str): The id of the event being requested.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, )
return self.client.get_json(destination, path=path)
@log_function
def backfill(self, destination, room_id, event_tuples, limit):
""" Requests `limit` previous PDUs in a given context before list of
PDUs.
Args:
dest (str)
room_id (str)
event_tuples (list)
limt (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug(
"backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s",
destination, room_id, repr(event_tuples), str(limit)
)
if not event_tuples:
# TODO: raise?
return
path = PREFIX + "/backfill/%s/" % (room_id,)
args = {
"v": event_tuples,
"limit": [str(limit)],
}
return self.client.get_json(
destination,
path=path,
args=args,
)
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to its destination
Args:
transaction (Transaction)
Returns:
Deferred: Results of the deferred is a tuple in the form of
(response_code, response_body) where the response_body is a
python dict decoded from json
"""
logger.debug(
"send_data dest=%s, txid=%s",
transaction.destination, transaction.transaction_id
)
if transaction.destination == self.server_name:
raise RuntimeError("Transport layer cannot send to itself!")
# FIXME: This is only used by the tests. The actual json sent is
# generated by the json_data_callback.
json_data = transaction.get_dict()
code, response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
json_data_callback=json_data_callback,
)
logger.debug(
"send_data dest=%s, txid=%s, got response: %d",
transaction.destination, transaction.transaction_id, code
)
defer.returnValue((code, response))
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail):
path = PREFIX + "/query/%s" % query_type
response = yield self.client.get_json(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
)
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
response = yield self.client.get_json(
destination=destination,
path=path,
retry_on_dns_fail=retry_on_dns_fail,
)
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
code, content = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
if not 200 <= code < 300:
raise RuntimeError("Got %d from send_join", code)
defer.returnValue(json.loads(content))
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
code, content = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
if not 200 <= code < 300:
raise RuntimeError("Got %d from send_invite", code)
defer.returnValue(json.loads(content))
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
response = yield self.client.get_json(
destination=destination,
path=path,
)
defer.returnValue(response)

View file

@ -13,14 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""The transport layer is responsible for both sending transactions to remote
home servers and receiving a variety of requests from other home servers.
Typically, this is done over HTTP (and all home servers are required to
support HTTP), however individual pairings of servers may decide to communicate
over a different (albeit still reliable) protocol.
"""
from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
@ -35,241 +27,8 @@ import re
logger = logging.getLogger(__name__)
class TransportLayer(object):
"""This is a basic implementation of the transport layer that translates
transactions and other requests to/from HTTP.
Attributes:
server_name (str): Local home server host
server (synapse.http.server.HttpServer): the http server to
register listeners on
client (synapse.http.client.HttpClient): the http client used to
send requests
request_handler (TransportRequestHandler): The handler to fire when we
receive requests for data.
received_handler (TransportReceivedHandler): The handler to fire when
we receive data.
"""
def __init__(self, homeserver, server_name, server, client):
"""
Args:
server_name (str): Local home server host
server (synapse.protocol.http.HttpServer): the http server to
register listeners on
client (synapse.protocol.http.HttpClient): the http client used to
send requests
"""
self.keyring = homeserver.get_keyring()
self.server_name = server_name
self.server = server
self.client = client
self.request_handler = None
self.received_handler = None
@log_function
def get_context_state(self, destination, context, event_id=None):
""" Requests all state for a given context (i.e. room) from the
given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
context (str): The name of the context we want the state of
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_context_state dest=%s, context=%s",
destination, context)
subpath = "/state/%s/" % context
args = {}
if event_id:
args["event_id"] = event_id
return self._do_request_for_transaction(
destination, subpath, args=args
)
@log_function
def get_event(self, destination, event_id):
""" Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
event_id (str): The id of the event being requested.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_pdu dest=%s, event_id=%s",
destination, event_id)
subpath = "/event/%s/" % (event_id, )
return self._do_request_for_transaction(destination, subpath)
@log_function
def backfill(self, dest, context, event_tuples, limit):
""" Requests `limit` previous PDUs in a given context before list of
PDUs.
Args:
dest (str)
context (str)
event_tuples (list)
limt (int)
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug(
"backfill dest=%s, context=%s, event_tuples=%s, limit=%s",
dest, context, repr(event_tuples), str(limit)
)
if not event_tuples:
# TODO: raise?
return
subpath = "/backfill/%s/" % (context,)
args = {
"v": event_tuples,
"limit": [str(limit)],
}
return self._do_request_for_transaction(
dest,
subpath,
args=args,
)
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None):
""" Sends the given Transaction to its destination
Args:
transaction (Transaction)
Returns:
Deferred: Results of the deferred is a tuple in the form of
(response_code, response_body) where the response_body is a
python dict decoded from json
"""
logger.debug(
"send_data dest=%s, txid=%s",
transaction.destination, transaction.transaction_id
)
if transaction.destination == self.server_name:
raise RuntimeError("Transport layer cannot send to itself!")
# FIXME: This is only used by the tests. The actual json sent is
# generated by the json_data_callback.
json_data = transaction.get_dict()
code, response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
json_data_callback=json_data_callback,
)
logger.debug(
"send_data dest=%s, txid=%s, got response: %d",
transaction.destination, transaction.transaction_id, code
)
defer.returnValue((code, response))
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail):
path = PREFIX + "/query/%s" % query_type
response = yield self.client.get_json(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
)
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def make_join(self, destination, context, user_id, retry_on_dns_fail=True):
path = PREFIX + "/make_join/%s/%s" % (context, user_id,)
response = yield self.client.get_json(
destination=destination,
path=path,
retry_on_dns_fail=retry_on_dns_fail,
)
defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_join(self, destination, context, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (
context,
event_id,
)
code, content = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
if not 200 <= code < 300:
raise RuntimeError("Got %d from send_join", code)
defer.returnValue(json.loads(content))
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, context, event_id, content):
path = PREFIX + "/invite/%s/%s" % (
context,
event_id,
)
code, content = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
if not 200 <= code < 300:
raise RuntimeError("Got %d from send_invite", code)
defer.returnValue(json.loads(content))
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, context, event_id):
path = PREFIX + "/event_auth/%s/%s" % (
context,
event_id,
)
response = yield self.client.get_json(
destination=destination,
path=path,
)
defer.returnValue(response)
class TransportLayerServer(object):
"""Handles incoming federation HTTP requests"""
@defer.inlineCallbacks
def _authenticate_request(self, request):
@ -373,8 +132,6 @@ class TransportLayer(object):
"""
self.request_handler = handler
# TODO(markjh): Namespace the federation URI paths
# This is for when someone asks us for everything since version X
self.server.register_path(
"GET",
@ -528,34 +285,6 @@ class TransportLayer(object):
defer.returnValue((code, response))
@defer.inlineCallbacks
@log_function
def _do_request_for_transaction(self, destination, subpath, args={}):
"""
Args:
destination (str)
path (str)
args (dict): This is parsed directly to the HttpClient.
Returns:
Deferred: Results in a dict.
"""
data = yield self.client.get_json(
destination,
path=PREFIX + subpath,
args=args,
)
# Add certain keys to the JSON, ready for decoding as a Transaction
data.update(
origin=destination,
destination=self.server_name,
transaction_id=None
)
defer.returnValue(data)
@log_function
def _on_backfill_request(self, origin, context, v_list, limits):
if not limits:

View file

@ -144,7 +144,5 @@ class BaseHandler(object):
yield self.notifier.on_new_room_event(event, extra_users=extra_users)
yield federation_handler.handle_new_event(
event,
None,
destinations=destinations,
event, destinations=destinations,
)

View file

@ -70,10 +70,8 @@ class EventStreamHandler(BaseHandler):
pagin_config.from_token = None
rm_handler = self.hs.get_handlers().room_member_handler
logger.debug("BETA")
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
logger.debug("ALPHA")
with PreserveLoggingContext():
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout

View file

@ -75,14 +75,14 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
def handle_new_event(self, event, snapshot, destinations):
def handle_new_event(self, event, destinations):
""" Takes in an event from the client to server side, that has already
been authed and handled by the state module, and sends it to any
remote home servers that may be interested.
Args:
event
snapshot (.storage.Snapshot): THe snapshot the event happened after
event: The event to send
destinations: A list of destinations to send it to
Returns:
Deferred: Resolved when it has successfully been queued for
@ -154,7 +154,7 @@ class FederationHandler(BaseHandler):
replication = self.replication_layer
if not state:
state, auth_chain = yield replication.get_state_for_context(
state, auth_chain = yield replication.get_state_for_room(
origin, context=event.room_id, event_id=event.event_id,
)
@ -281,7 +281,7 @@ class FederationHandler(BaseHandler):
"""
pdu = yield self.replication_layer.send_invite(
destination=target_host,
context=event.room_id,
room_id=event.room_id,
event_id=event.event_id,
pdu=event
)

View file

@ -120,6 +120,10 @@ class TypingNotificationHandler(BaseHandler):
member = RoomMember(room_id=room_id, user=target_user)
if member in self._member_typing_timer:
self.clock.cancel_call_later(self._member_typing_timer[member])
del self._member_typing_timer[member]
yield self._stopped_typing(member)
@defer.inlineCallbacks
@ -142,8 +146,10 @@ class TypingNotificationHandler(BaseHandler):
del self._member_typing_until[member]
self.clock.cancel_call_later(self._member_typing_timer[member])
del self._member_typing_timer[member]
if member in self._member_typing_timer:
# Don't cancel it - either it already expired, or the real
# stopped_typing() will cancel it
del self._member_typing_timer[member]
@defer.inlineCallbacks
def _push_update(self, room_id, user, typing):

View file

@ -72,7 +72,6 @@ class MatrixFederationHttpClient(object):
requests.
"""
def __init__(self, hs):
self.hs = hs
self.signing_key = hs.config.signing_key[0]

View file

@ -244,14 +244,14 @@ class Notifier(object):
)
if timeout:
self.clock.call_later(timeout/1000.0, _timeout_listener)
self._register_with_keys(listener)
yield self._check_for_updates(listener)
if not timeout:
_timeout_listener()
else:
self.clock.call_later(timeout/1000.0, _timeout_listener)
return

View file

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -11,39 +11,4 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import (
room, events, register, login, profile, presence, initial_sync, directory,
voip, admin, pusher, push_rule
)
class RestServletFactory(object):
""" A factory for creating REST servlets.
These REST servlets represent the entire client-server REST API. Generally
speaking, they serve as wrappers around events and the handlers that
process them.
See synapse.events for information on synapse events.
"""
def __init__(self, hs):
client_resource = hs.get_resource_for_client()
# TODO(erikj): There *must* be a better way of doing this.
room.register_servlets(hs, client_resource)
events.register_servlets(hs, client_resource)
register.register_servlets(hs, client_resource)
login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)
pusher.register_servlets(hs, client_resource)
push_rule.register_servlets(hs, client_resource)
# limitations under the License.

View file

@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,49 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import (
room, events, register, login, profile, presence, initial_sync, directory,
voip, admin, pusher, push_rule
)
class RestServletFactory(object):
""" A factory for creating REST servlets.
These REST servlets represent the entire client-server REST API. Generally
speaking, they serve as wrappers around events and the handlers that
process them.
See synapse.events for information on synapse events.
"""
def __init__(self, hs):
client_resource = hs.get_resource_for_client()
# TODO(erikj): There *must* be a better way of doing this.
room.register_servlets(hs, client_resource)
events.register_servlets(hs, client_resource)
register.register_servlets(hs, client_resource)
login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)
pusher.register_servlets(hs, client_resource)
push_rule.register_servlets(hs, client_resource)

View file

@ -15,7 +15,7 @@
""" This module contains base REST classes for constructing REST servlets. """
from synapse.api.urls import CLIENT_PREFIX
from synapse.rest.transactions import HttpTransactionStore
from .transactions import HttpTransactionStore
import re
import logging

View file

@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.streams.config import PaginationConfig
from synapse.rest.base import RestServlet, client_path_pattern
from .base import RestServlet, client_path_pattern
import logging

View file

@ -246,7 +246,7 @@ class JoinRoomAliasServlet(RestServlet):
}
)
defer.returnValue((200, {}))
defer.returnValue((200, {"room_id": identifier.to_string()}))
@defer.inlineCallbacks
def on_PUT(self, request, room_identifier, txn_id):

View file

@ -24,7 +24,7 @@ from synapse.events.utils import serialize_event
from synapse.notifier import Notifier
from synapse.api.auth import Auth
from synapse.handlers import Handlers
from synapse.rest import RestServletFactory
from synapse.rest.client.v1 import RestServletFactory
from synapse.state import StateHandler
from synapse.storage import DataStore
from synapse.types import UserID, RoomAlias, RoomID, EventID

View file

@ -58,13 +58,6 @@ class RoomStore(SQLBaseStore):
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
def store_room_config(self, room_id, visibility):
return self._simple_update_one(
table=RoomsTable.table_name,
keyvalues={"room_id": room_id},
updatevalues={"is_public": visibility}
)
def get_room(self, room_id):
"""Retrieve a room.

View file

@ -78,12 +78,6 @@ class StateStore(SQLBaseStore):
f,
)
def store_state_groups(self, event):
return self.runInteraction(
"store_state_groups",
self._store_state_groups_txn, event
)
def _store_state_groups_txn(self, txn, event, context):
if context.current_state is None:
return

View file

@ -39,6 +39,8 @@ from ._base import SQLBaseStore
from synapse.api.errors import SynapseError
from synapse.util.logutils import log_function
from collections import namedtuple
import logging
@ -52,91 +54,79 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
def _parse_stream_token(string):
try:
if string[0] != 's':
raise
return int(string[1:])
except:
raise SynapseError(400, "Invalid token")
class _StreamToken(namedtuple("_StreamToken", "topological stream")):
"""Tokens are positions between events. The token "s1" comes after event 1.
s0 s1
| |
[0] V [1] V [2]
def _parse_topological_token(string):
try:
if string[0] != 't':
raise
parts = string[1:].split('-', 1)
return (int(parts[0]), int(parts[1]))
except:
raise SynapseError(400, "Invalid token")
Tokens can either be a point in the live event stream or a cursor going
through historic events.
When traversing the live event stream events are ordered by when they
arrived at the homeserver.
def is_stream_token(string):
try:
_parse_stream_token(string)
return True
except:
return False
When traversing historic events the events are ordered by their depth in
the event graph "topological_ordering" and then by when they arrived at the
homeserver "stream_ordering".
Live tokens start with an "s" followed by the "stream_ordering" id of the
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, follewed by "-",
followed by the "stream_ordering" id of the event it comes after.
"""
__slots__ = []
def is_topological_token(string):
try:
_parse_topological_token(string)
return True
except:
return False
@classmethod
def parse(cls, string):
try:
if string[0] == 's':
return cls(None, int(string[1:]))
if string[0] == 't':
parts = string[1:].split('-', 1)
return cls(int(parts[1]), int(parts[0]))
except:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@classmethod
def parse_stream_token(cls, string):
try:
if string[0] == 's':
return cls(None, int(string[1:]))
except:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
def _get_token_bound(token, comparison):
try:
s = _parse_stream_token(token)
return "%s %s %d" % ("stream_ordering", comparison, s)
except:
pass
def __str__(self):
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:
return "s%d" % (self.stream,)
try:
top, stream = _parse_topological_token(token)
return "%s %s %d AND %s %s %d" % (
"topological_ordering", comparison, top,
"stream_ordering", comparison, stream,
)
except:
pass
def lower_bound(self):
if self.topological is None:
return "(%d < %s)" % (self.stream, "stream_ordering")
else:
return "(%d < %s OR (%d == %s AND %d < %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
)
raise SynapseError(400, "Invalid token")
def upper_bound(self):
if self.topological is None:
return "(%d >= %s)" % (self.stream, "stream_ordering")
else:
return "(%d > %s OR (%d == %s AND %d >= %s))" % (
self.topological, "topological_ordering",
self.topological, "topological_ordering",
self.stream, "stream_ordering",
)
class StreamStore(SQLBaseStore):
@log_function
def get_room_events(self, user_id, from_key, to_key, room_id, limit=0,
direction='f', with_feedback=False):
# We deal with events request in two different ways depending on if
# this looks like an /events request or a pagination request.
is_events = (
direction == 'f'
and user_id
and is_stream_token(from_key)
and to_key and is_stream_token(to_key)
)
if is_events:
return self.get_room_events_stream(
user_id=user_id,
from_key=from_key,
to_key=to_key,
room_id=room_id,
limit=limit,
with_feedback=with_feedback,
)
else:
return self.paginate_room_events(
from_key=from_key,
to_key=to_key,
room_id=room_id,
limit=limit,
with_feedback=with_feedback,
)
@log_function
def get_room_events_stream(self, user_id, from_key, to_key, room_id,
limit=0, with_feedback=False):
@ -162,8 +152,8 @@ class StreamStore(SQLBaseStore):
limit = MAX_STREAM_SIZE
# From and to keys should be integers from ordering.
from_id = _parse_stream_token(from_key)
to_id = _parse_stream_token(to_key)
from_id = _StreamToken.parse_stream_token(from_key)
to_id = _StreamToken.parse_stream_token(to_key)
if from_key == to_key:
return defer.succeed(([], to_key))
@ -181,7 +171,7 @@ class StreamStore(SQLBaseStore):
}
def f(txn):
txn.execute(sql, (user_id, user_id, from_id, to_id,))
txn.execute(sql, (user_id, user_id, from_id.stream, to_id.stream,))
rows = self.cursor_to_dict(txn)
@ -211,17 +201,21 @@ class StreamStore(SQLBaseStore):
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
from_comp = '<=' if direction == 'b' else '>'
to_comp = '>' if direction == 'b' else '<='
order = "DESC" if direction == 'b' else "ASC"
args = [room_id]
bounds = _get_token_bound(from_key, from_comp)
if to_key:
bounds = "%s AND %s" % (
bounds, _get_token_bound(to_key, to_comp)
)
if direction == 'b':
order = "DESC"
bounds = _StreamToken.parse(from_key).upper_bound()
if to_key:
bounds = "%s AND %s" % (
bounds, _StreamToken.parse(to_key).lower_bound()
)
else:
order = "ASC"
bounds = _StreamToken.parse(from_key).lower_bound()
if to_key:
bounds = "%s AND %s" % (
bounds, _StreamToken.parse(to_key).upper_bound()
)
if int(limit) > 0:
args.append(int(limit))
@ -249,9 +243,13 @@ class StreamStore(SQLBaseStore):
topo = rows[-1]["topological_ordering"]
toke = rows[-1]["stream_ordering"]
if direction == 'b':
topo -= 1
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# when we are going backwards so we subtract one from the
# stream part.
toke -= 1
next_token = "t%s-%s" % (topo, toke)
next_token = str(_StreamToken(topo, toke))
else:
# TODO (erikj): We should work out what to do here instead.
next_token = to_key if to_key else from_key
@ -284,13 +282,14 @@ class StreamStore(SQLBaseStore):
rows.reverse() # As we selected with reverse ordering
if rows:
# XXX: Always subtract 1 since the start token always goes
# backwards (parity with paginate_room_events). It isn't
# obvious that this is correct; we should clarify the algorithm
# used here.
topo = rows[0]["topological_ordering"] - 1
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# since we are going backwards so we subtract one from the
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
start_token = "t%s-%s" % (topo, toke)
start_token = str(_StreamToken(topo, toke))
token = (start_token, end_token)
else: