Merge branch 'develop' of github.com:matrix-org/synapse into federation_authorization

Conflicts:
	synapse/federation/transport.py
	synapse/handlers/message.py
This commit is contained in:
Erik Johnston 2014-10-27 10:20:44 +00:00
commit bb4a20174c
57 changed files with 563 additions and 6425 deletions

View file

@ -16,4 +16,4 @@
""" This is a reference implementation of a synapse home server.
"""
__version__ = "0.3.4"
__version__ = "0.4.1"

View file

@ -66,8 +66,8 @@ class EventFactory(object):
if "event_id" not in kwargs:
kwargs["event_id"] = self.create_event_id()
if "ts" not in kwargs:
kwargs["ts"] = int(self.clock.time_msec())
if "origin_server_ts" not in kwargs:
kwargs["origin_server_ts"] = int(self.clock.time_msec())
# The "age" key is a delta timestamp that should be converted into an
# absolute timestamp the minute we see it.

View file

@ -94,7 +94,7 @@ class ServerConfig(Config):
with open(args.signing_key_path, "w") as signing_key_file:
syutil.crypto.signing_key.write_signing_keys(
signing_key_file,
(syutil.crypto.SigningKey.generate("auto"),),
(syutil.crypto.signing_key.generate_singing_key("auto"),),
)
else:
signing_keys = cls.read_file(args.signing_key_path, "signing_key")

View file

@ -16,6 +16,9 @@ from twisted.internet import ssl
from OpenSSL import SSL
from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName
import logging
logger = logging.getLogger(__name__)
class ServerContextFactory(ssl.ContextFactory):
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
@ -31,7 +34,7 @@ class ServerContextFactory(ssl.ContextFactory):
_ecCurve = _OpenSSLECCurve(_defaultCurveName)
_ecCurve.addECKeyToContext(context)
except:
pass
logger.exception("Failed to enable eliptic curve for TLS")
context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
context.use_certificate(config.tls_certificate)
context.use_privatekey(config.tls_private_key)

View file

@ -38,6 +38,7 @@ class Keyring(object):
@defer.inlineCallbacks
def verify_json_for_server(self, server_name, json_object):
logger.debug("Verifying for %s", server_name)
key_ids = signature_ids(json_object, server_name)
if not key_ids:
raise SynapseError(

View file

@ -96,7 +96,7 @@ class PduCodec(object):
if k not in ["event_id", "room_id", "type", "prev_events"]
})
if "ts" not in kwargs:
kwargs["ts"] = int(self.clock.time_msec())
if "origin_server_ts" not in kwargs:
kwargs["origin_server_ts"] = int(self.clock.time_msec())
return Pdu(**kwargs)

View file

@ -157,7 +157,7 @@ class TransactionActions(object):
transaction.prev_ids = yield self.store.prep_send_transaction(
transaction.transaction_id,
transaction.destination,
transaction.ts,
transaction.origin_server_ts,
[(p["pdu_id"], p["origin"]) for p in transaction.pdus]
)

View file

@ -321,7 +321,7 @@ class ReplicationLayer(object):
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
self.received_edu(edu.origin, edu.edu_type, edu.content)
self.received_edu(transaction.origin, edu.edu_type, edu.content)
results = yield defer.DeferredList(dl)
@ -474,7 +474,7 @@ class ReplicationLayer(object):
return Transaction(
origin=self.server_name,
pdus=pdus,
ts=int(self._clock.time_msec()),
origin_server_ts=int(self._clock.time_msec()),
destination=None,
)
@ -654,7 +654,7 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
ts=self._clock.time_msec(),
origin_server_ts=self._clock.time_msec(),
transaction_id=str(self._next_txn_id),
origin=self.server_name,
destination=destination,

View file

@ -301,6 +301,11 @@ class TransportLayer(object):
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
if not auth_headers:
raise SynapseError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
for auth in auth_headers:
if auth.startswith("X-Matrix"):
(origin, key, sig) = parse_auth_header(auth)
@ -319,13 +324,13 @@ class TransportLayer(object):
def _with_authentication(self, handler):
@defer.inlineCallbacks
def new_handler(request, *args, **kwargs):
(origin, content) = yield self._authenticate_request(request)
try:
(origin, content) = yield self._authenticate_request(request)
response = yield handler(
origin, content, request.args, *args, **kwargs
)
except:
logger.exception("Callback failed")
logger.exception("_authenticate_request failed")
raise
defer.returnValue(response)
return new_handler
@ -496,9 +501,13 @@ class TransportLayer(object):
defer.returnValue((400, {"error": "Invalid transaction"}))
return
code, response = yield self.received_handler.on_incoming_transaction(
transaction_data
)
try:
code, response = yield self.received_handler.on_incoming_transaction(
transaction_data
)
except:
logger.exception("on_incoming_transaction failed")
raise
defer.returnValue((code, response))

View file

@ -40,7 +40,7 @@ class Pdu(JsonEncodedObject):
{
"pdu_id": "78c",
"ts": 1404835423000,
"origin_server_ts": 1404835423000,
"origin": "bar",
"prev_ids": [
["23b", "foo"],
@ -55,7 +55,7 @@ class Pdu(JsonEncodedObject):
"pdu_id",
"context",
"origin",
"ts",
"origin_server_ts",
"pdu_type",
"destinations",
"transaction_id",
@ -82,7 +82,7 @@ class Pdu(JsonEncodedObject):
"pdu_id",
"context",
"origin",
"ts",
"origin_server_ts",
"pdu_type",
"content",
]
@ -118,6 +118,7 @@ class Pdu(JsonEncodedObject):
"""
if pdu_tuple:
d = copy.copy(pdu_tuple.pdu_entry._asdict())
d["origin_server_ts"] = d.pop("ts")
d["content"] = json.loads(d["content_json"])
del d["content_json"]
@ -156,11 +157,15 @@ class Edu(JsonEncodedObject):
]
required_keys = [
"origin",
"destination",
"edu_type",
]
# TODO: SYN-103: Remove "origin" and "destination" keys.
# internal_keys = [
# "origin",
# "destination",
# ]
class Transaction(JsonEncodedObject):
""" A transaction is a list of Pdus and Edus to be sent to a remote home
@ -182,7 +187,7 @@ class Transaction(JsonEncodedObject):
"transaction_id",
"origin",
"destination",
"ts",
"origin_server_ts",
"previous_ids",
"pdus",
"edus",
@ -199,7 +204,7 @@ class Transaction(JsonEncodedObject):
"transaction_id",
"origin",
"destination",
"ts",
"origin_server_ts",
"pdus",
]
@ -221,10 +226,10 @@ class Transaction(JsonEncodedObject):
@staticmethod
def create_new(pdus, **kwargs):
""" Used to create a new transaction. Will auto fill out
transaction_id and ts keys.
transaction_id and origin_server_ts keys.
"""
if "ts" not in kwargs:
raise KeyError("Require 'ts' to construct a Transaction")
if "origin_server_ts" not in kwargs:
raise KeyError("Require 'origin_server_ts' to construct a Transaction")
if "transaction_id" not in kwargs:
raise KeyError(
"Require 'transaction_id' to construct a Transaction"

View file

@ -64,7 +64,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(None)
@defer.inlineCallbacks
def send_message(self, event=None, suppress_auth=False, stamp_event=True):
def send_message(self, event=None, suppress_auth=False):
""" Send a message.
Args:
@ -72,7 +72,6 @@ class MessageHandler(BaseHandler):
suppress_auth (bool) : True to suppress auth for this message. This
is primarily so the home server can inject messages into rooms at
will.
stamp_event (bool) : True to stamp event content with server keys.
Raises:
SynapseError if something went wrong.
"""
@ -82,9 +81,6 @@ class MessageHandler(BaseHandler):
user = self.hs.parse_userid(event.user_id)
assert user.is_mine, "User must be our own: %s" % (user,)
if stamp_event:
event.content["hsob_ts"] = int(self.clock.time_msec())
snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
yield self._on_new_room_event(
@ -131,7 +127,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(chunk)
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
def store_room_data(self, event=None):
""" Stores data for a room.
Args:
@ -148,9 +144,6 @@ class MessageHandler(BaseHandler):
state_key=event.state_key,
)
if stamp_event:
event.content["hsob_ts"] = int(self.clock.time_msec())
yield self._on_new_room_event(event, snapshot)
@defer.inlineCallbacks
@ -216,10 +209,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(None)
@defer.inlineCallbacks
def send_feedback(self, event, stamp_event=True):
if stamp_event:
event.content["hsob_ts"] = int(self.clock.time_msec())
def send_feedback(self, event):
snapshot = yield self.store.snapshot_room(event.room_id, event.user_id)
# store message in db

View file

@ -15,6 +15,7 @@
"""Contains functions for registering clients."""
from twisted.internet import defer
from twisted.python import log
from synapse.types import UserID
from synapse.api.errors import (
@ -126,7 +127,7 @@ class RegistrationHandler(BaseHandler):
try:
threepid = yield self._threepid_from_creds(c)
except:
logger.err()
log.err()
raise RegistrationError(400, "Couldn't validate 3pid")
if not threepid:

View file

@ -101,7 +101,9 @@ class BaseHttpClient(object):
while True:
producer = body_callback(method, url_bytes, headers_dict)
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
response = yield self.agent.request(
@ -177,10 +179,6 @@ class MatrixHttpClient(BaseHttpClient):
request = sign_json(request, self.server_name, self.signing_key)
from syutil.jsonutil import encode_canonical_json
logger.debug("Signing " + " " * 11 + "%s %s",
self.server_name, encode_canonical_json(request))
auth_headers = []
for key,sig in request["signatures"][self.server_name].items():
@ -316,6 +314,42 @@ class IdentityServerHttpClient(BaseHttpClient):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
""" Get's some json from the given host homeserver and path
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
response = yield self._create_request(
destination.encode("ascii"),
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
retry_on_dns_fail=retry_on_dns_fail,
body_callback=None
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
class CaptchaServerHttpClient(MatrixHttpClient):
"""Separate HTTP client for talking to google's captcha servers"""

View file

@ -68,7 +68,7 @@ class PresenceStatusRestServlet(RestServlet):
yield self.handlers.presence_handler.set_state(
target_user=user, auth_user=auth_user, state=state)
defer.returnValue((200, ""))
defer.returnValue((200, {}))
def on_OPTIONS(self, request):
return (200, {})
@ -141,7 +141,7 @@ class PresenceListRestServlet(RestServlet):
yield defer.DeferredList(deferreds)
defer.returnValue((200, ""))
defer.returnValue((200, {}))
def on_OPTIONS(self, request):
return (200, {})

View file

@ -36,7 +36,7 @@ class VoipRestServlet(RestServlet):
if not turnUris or not turnSecret or not userLifetime:
defer.returnValue( (200, {}) )
expiry = self.hs.get_clock().time_msec() + userLifetime
expiry = (self.hs.get_clock().time_msec() + userLifetime) / 1000
username = "%d:%s" % (expiry, auth_user.to_string())
mac = hmac.new(turnSecret, msg=username, digestmod=hashlib.sha1)

View file

@ -66,7 +66,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 5
SCHEMA_VERSION = 6
class _RollbackButIsFineException(Exception):
@ -157,6 +157,8 @@ class DataStore(RoomMemberStore, RoomStore,
cols["unrecognized_keys"] = json.dumps(unrec_keys)
cols["ts"] = cols.pop("origin_server_ts")
logger.debug("Persisting: %s", repr(cols))
if pdu.is_state:
@ -454,10 +456,11 @@ def prepare_database(db_conn):
db_conn.commit()
else:
sql_script = "BEGIN TRANSACTION;"
for sql_loc in SCHEMAS:
sql_script = read_schema(sql_loc)
c.executescript(sql_script)
sql_script += read_schema(sql_loc)
sql_script += "COMMIT TRANSACTION;"
c.executescript(sql_script)
db_conn.commit()
c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION)

View file

@ -354,6 +354,7 @@ class SQLBaseStore(object):
d.pop("stream_ordering", None)
d.pop("topological_ordering", None)
d.pop("processed", None)
d["origin_server_ts"] = d.pop("ts", 0)
d.update(json.loads(row_dict["unrecognized_keys"]))
d["content"] = json.loads(d["content"])
@ -361,7 +362,7 @@ class SQLBaseStore(object):
if "age_ts" not in d:
# For compatibility
d["age_ts"] = d["ts"] if "ts" in d else 0
d["age_ts"] = d.get("origin_server_ts", 0)
return self.event_factory.create_event(
etype=d["type"],

View file

@ -0,0 +1,31 @@
/* Copyright 2014 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE IF NOT EXISTS server_tls_certificates(
server_name TEXT, -- Server name.
fingerprint TEXT, -- Certificate fingerprint.
from_server TEXT, -- Which key server the certificate was fetched from.
ts_added_ms INTEGER, -- When the certifcate was added.
tls_certificate BLOB, -- DER encoded x509 certificate.
CONSTRAINT uniqueness UNIQUE (server_name, fingerprint)
);
CREATE TABLE IF NOT EXISTS server_signature_keys(
server_name TEXT, -- Server name.
key_id TEXT, -- Key version.
from_server TEXT, -- Which key server the key was fetched form.
ts_added_ms INTEGER, -- When the key was added.
verify_key BLOB, -- NACL verification key.
CONSTRAINT uniqueness UNIQUE (server_name, key_id)
);

View file

@ -87,7 +87,8 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (code, response_json, transaction_id, origin))
def prep_send_transaction(self, transaction_id, destination, ts, pdu_list):
def prep_send_transaction(self, transaction_id, destination,
origin_server_ts, pdu_list):
"""Persists an outgoing transaction and calculates the values for the
previous transaction id list.
@ -97,7 +98,7 @@ class TransactionStore(SQLBaseStore):
Args:
transaction_id (str)
destination (str)
ts (int)
origin_server_ts (int)
pdu_list (list)
Returns:
@ -106,11 +107,11 @@ class TransactionStore(SQLBaseStore):
return self.runInteraction(
self._prep_send_transaction,
transaction_id, destination, ts, pdu_list
transaction_id, destination, origin_server_ts, pdu_list
)
def _prep_send_transaction(self, txn, transaction_id, destination, ts,
pdu_list):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts, pdu_list):
# First we find out what the prev_txs should be.
# Since we know that we are only sending one transaction at a time,
@ -131,7 +132,7 @@ class TransactionStore(SQLBaseStore):
None,
transaction_id=transaction_id,
destination=destination,
ts=ts,
ts=origin_server_ts,
response_code=0,
response_json=None
))