From 5d8a93a10ebdd58fd15d29f4e6e4f389b65855cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 May 2016 10:29:21 +0100 Subject: [PATCH 01/12] Add some log information at returned replication streams --- synapse/replication/resource.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index ff78c60f1..d7c49462c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -159,6 +159,17 @@ class ReplicationResource(Resource): result = yield self.notifier.wait_for_replication(replicate, timeout) + for stream_name, stream_content in result.items(): + logger.info( + "Replicating %d rows of %s from %s -> %s", + len(stream_content["rows"]), + stream_name, + stream_content["position"], + request_streams.get(stream_name), + ) + if stream_content["position"] == request_streams.get(stream_name): + logger.warn("Returning same position for stream: %s", stream_name) + request.write(json.dumps(result, ensure_ascii=False)) finish_request(request) From 9c272da05fcf51534aaa877647bc3b82bf841cf3 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 May 2016 13:42:44 +0100 Subject: [PATCH 02/12] Add an openidish mechanism for proving to third parties that you own a given user_id --- synapse/federation/federation_server.py | 5 ++ synapse/federation/transport/server.py | 47 ++++++++++- synapse/rest/__init__.py | 2 + synapse/rest/client/v2_alpha/openid.py | 96 ++++++++++++++++++++++ synapse/storage/__init__.py | 4 +- synapse/storage/openid.py | 32 ++++++++ synapse/storage/schema/delta/32/openid.sql | 9 ++ 7 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 synapse/rest/client/v2_alpha/openid.py create mode 100644 synapse/storage/openid.py create mode 100644 synapse/storage/schema/delta/32/openid.sql diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 429ab6dde..f1d231b9d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -387,6 +387,11 @@ class FederationServer(FederationBase): "events": [ev.get_pdu_json(time_now) for ev in missing_events], }) + @log_function + def on_openid_userinfo(self, token): + ts_now_ms = self._clock.time_msec() + return self.store.get_user_id_for_open_id_token(token, ts_now_ms) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3e552b6c4..5b6c7d11d 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request, parse_string from synapse.util.ratelimitutils import FederationRateLimiter import functools @@ -448,6 +448,50 @@ class On3pidBindServlet(BaseFederationServlet): return code +class OpenIdUserInfo(BaseFederationServlet): + """ + Exchange a bearer token for information about a user. + + The response format should be compatible with: + http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse + + GET /openid/userinfo?access_token=ABDEFGH HTTP/1.1 + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "sub": "@userpart:example.org", + } + """ + + PATH = "/openid/userinfo" + + @defer.inlineCallbacks + def on_GET(self, request): + token = parse_string(request, "access_token") + if token is None: + defer.returnValue((401, { + "errcode": "M_MISSING_TOKEN", "error": "Access Token required" + })) + return + + user_id = yield self.handler.on_openid_userinfo(token) + + if user_id is None: + defer.returnValue((401, { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired" + })) + + defer.returnValue((200, {"sub": user_id})) + + # Avoid doing remote HS authorization checks which are done by default by + # BaseFederationServlet. + def _wrap(self, code): + return code + + SERVLET_CLASSES = ( FederationSendServlet, FederationPullServlet, @@ -468,6 +512,7 @@ SERVLET_CLASSES = ( FederationClientKeysClaimServlet, FederationThirdPartyInviteExchangeServlet, On3pidBindServlet, + OpenIdUserInfo, ) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index e805cb911..8b223e032 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -45,6 +45,7 @@ from synapse.rest.client.v2_alpha import ( tags, account_data, report_event, + openid, ) from synapse.http.server import JsonResource @@ -88,3 +89,4 @@ class ClientRestResource(JsonResource): tags.register_servlets(hs, client_resource) account_data.register_servlets(hs, client_resource) report_event.register_servlets(hs, client_resource) + openid.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py new file mode 100644 index 000000000..ddea75032 --- /dev/null +++ b/synapse/rest/client/v2_alpha/openid.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ._base import client_v2_patterns + +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.api.errors import AuthError +from synapse.util.stringutils import random_string + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class IdTokenServlet(RestServlet): + """ + Get a bearer token that may be passed to a third party to confirm ownership + of a matrix user id. + + The format of the response could be made compatible with the format given + in http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse + + But instead of returning a signed "id_token" the response contains the + name of the issuing matrix homeserver. This means that for now the third + party will need to check the validity of the "id_token" against the + federation /openid/userinfo endpoint of the homeserver. + + Request: + + POST /user/{user_id}/openid/token?access_token=... HTTP/1.1 + + {} + + Response: + + HTTP/1.1 200 OK + { + "access_token": "ABDEFGH", + "token_type": "Bearer", + "matrix_server_name": "example.com", + "expires_in": 3600, + } + """ + PATTERNS = client_v2_patterns( + "/user/(?P[^/]*)/openid/token" + ) + + EXPIRES_MS = 3600 * 1000 + + def __init__(self, hs): + super(IdTokenServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + + @defer.inlineCallbacks + def on_POST(self, request, user_id): + requester = yield self.auth.get_user_by_req(request) + if user_id != requester.user.to_string(): + raise AuthError(403, "Cannot request tokens for other users.") + + # Parse the request body to make sure it's JSON, but ignore the contents + # for now. + parse_json_object_from_request(request) + + token = random_string(24) + ts_valid_until_ms = self.clock.time_msec() + self.EXPIRES_MS + + yield self.store.insert_open_id_token(token, ts_valid_until_ms, user_id) + + defer.returnValue((200, { + "access_token": token, + "token_type": "Bearer", + "matrix_server_name": self.server_name, + "expires_in": self.EXPIRES_MS / 1000, + })) + + +def register_servlets(hs, http_server): + IdTokenServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7122b0cbb..d970fde9e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,6 +44,7 @@ from .receipts import ReceiptsStore from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore +from .openid import OpenIdStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -81,7 +82,8 @@ class DataStore(RoomMemberStore, RoomStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore + EventPushActionsStore, + OpenIdStore, ): def __init__(self, db_conn, hs): diff --git a/synapse/storage/openid.py b/synapse/storage/openid.py new file mode 100644 index 000000000..5dabb607b --- /dev/null +++ b/synapse/storage/openid.py @@ -0,0 +1,32 @@ +from ._base import SQLBaseStore + + +class OpenIdStore(SQLBaseStore): + def insert_open_id_token(self, token, ts_valid_until_ms, user_id): + return self._simple_insert( + table="open_id_tokens", + values={ + "token": token, + "ts_valid_until_ms": ts_valid_until_ms, + "user_id": user_id, + }, + desc="insert_open_id_token" + ) + + def get_user_id_for_open_id_token(self, token, ts_now_ms): + def get_user_id_for_token_txn(txn): + sql = ( + "SELECT user_id FROM open_id_tokens" + " WHERE token = ? AND ? <= ts_valid_until_ms" + ) + + txn.execute(sql, (token, ts_now_ms)) + + rows = txn.fetchall() + if not rows: + return None + else: + return rows[0][0] + return self.runInteraction( + "get_user_id_for_token", get_user_id_for_token_txn + ) diff --git a/synapse/storage/schema/delta/32/openid.sql b/synapse/storage/schema/delta/32/openid.sql new file mode 100644 index 000000000..36f37b11c --- /dev/null +++ b/synapse/storage/schema/delta/32/openid.sql @@ -0,0 +1,9 @@ + +CREATE TABLE open_id_tokens ( + token TEXT NOT NULL PRIMARY KEY, + ts_valid_until_ms bigint NOT NULL, + user_id TEXT NOT NULL, + UNIQUE (token) +); + +CREATE index open_id_tokens_ts_valid_until_ms ON open_id_tokens(ts_valid_until_ms); From 8940281d1b2e67720ae257d224dbef7280cfa55c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 May 2016 15:10:03 +0100 Subject: [PATCH 03/12] Don't warn --- synapse/replication/resource.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index d7c49462c..69ad1de86 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -167,8 +167,6 @@ class ReplicationResource(Resource): stream_content["position"], request_streams.get(stream_name), ) - if stream_content["position"] == request_streams.get(stream_name): - logger.warn("Returning same position for stream: %s", stream_name) request.write(json.dumps(result, ensure_ascii=False)) finish_request(request) From 573ef3f1c953542693a1784311154d3345caf5c1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 5 May 2016 15:15:00 +0100 Subject: [PATCH 04/12] Rename openid/token to openid/request_token --- synapse/rest/client/v2_alpha/openid.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/openid.py b/synapse/rest/client/v2_alpha/openid.py index ddea75032..aa1cae8e1 100644 --- a/synapse/rest/client/v2_alpha/openid.py +++ b/synapse/rest/client/v2_alpha/openid.py @@ -42,7 +42,7 @@ class IdTokenServlet(RestServlet): Request: - POST /user/{user_id}/openid/token?access_token=... HTTP/1.1 + POST /user/{user_id}/openid/request_token?access_token=... HTTP/1.1 {} @@ -57,7 +57,7 @@ class IdTokenServlet(RestServlet): } """ PATTERNS = client_v2_patterns( - "/user/(?P[^/]*)/openid/token" + "/user/(?P[^/]*)/openid/request_token" ) EXPIRES_MS = 3600 * 1000 From 56b5e83e36f22f3eab1ebf7a46b9f23f0c1a3e8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:20:18 +0100 Subject: [PATCH 05/12] Reduce database inserts when sending transactions --- synapse/handlers/presence.py | 2 +- synapse/storage/transactions.py | 157 +++++++++++++++++++++++--------- 2 files changed, 114 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d0c8f1328..639567953 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -168,7 +168,7 @@ class PresenceHandler(BaseHandler): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 0 * 1000, + 30 * 1000, self.clock.looping_call, self._handle_timeouts, 5000, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index d338dfcf0..17fc60198 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -16,16 +16,54 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached +from twisted.internet import defer, reactor + from canonicaljson import encode_canonical_json + +from collections import namedtuple + +import itertools import logging logger = logging.getLogger(__name__) +_TransactionRow = namedtuple( + "_TransactionRow", ( + "id", "transaction_id", "destination", "ts", "response_code", + "response_json", + ) +) + +_UpdateTransactionRow = namedtuple( + "_TransactionRow", ( + "response_code", "response_json", + ) +) + + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + def __init__(self, hs): + super(TransactionStore, self).__init__(hs) + + # New transactions that are currently in flights + self.inflight_transactions = {} + + # Newly delievered transactions that *weren't* persisted while in flight + self.new_delivered_transactions = {} + + # Newly delivered transactions that *were* persisted while in flight + self.update_delivered_transactions = {} + + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) + hs.get_clock().looping_call( + self._persist_in_mem_txns, + 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -108,17 +146,28 @@ class TransactionStore(SQLBaseStore): list: A list of previous transaction ids. """ - return self.runInteraction( - "prep_send_transaction", - self._prep_send_transaction, - transaction_id, destination, origin_server_ts + auto_id = self._transaction_id_gen.get_next() + + txn_row = _TransactionRow( + id=auto_id, + transaction_id=transaction_id, + destination=destination, + ts=origin_server_ts, + response_code=0, + response_json=None, ) - def _prep_send_transaction(self, txn, transaction_id, destination, - origin_server_ts): + self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - next_id = self._transaction_id_gen.get_next() + # TODO: Fetch prev_txns + return self.runInteraction( + "prep_send_transaction", + self._get_prevs_txn, + destination, + ) + + def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. @@ -133,23 +182,6 @@ class TransactionStore(SQLBaseStore): prev_txns = [r["transaction_id"] for r in results] - # Actually add the new transaction to the sent_transactions table. - - self._simple_insert_txn( - txn, - table="sent_transactions", - values={ - "id": next_id, - "transaction_id": transaction_id, - "destination": destination, - "ts": origin_server_ts, - "response_code": 0, - "response_json": None, - } - ) - - # TODO Update the tx id -> pdu id mapping - return prev_txns def delivered_txn(self, transaction_id, destination, code, response_dict): @@ -161,27 +193,21 @@ class TransactionStore(SQLBaseStore): code (int) response_json (str) """ - return self.runInteraction( - "delivered_txn", - self._delivered_txn, - transaction_id, destination, code, - buffer(encode_canonical_json(response_dict)), - ) - def _delivered_txn(self, txn, transaction_id, destination, - code, response_json): - self._simple_update_one_txn( - txn, - table="sent_transactions", - keyvalues={ - "transaction_id": transaction_id, - "destination": destination, - }, - updatevalues={ - "response_code": code, - "response_json": None, # For now, don't persist response_json - } - ) + txn_row = self.inflight_transactions.get( + destination, {} + ).pop(transaction_id, None) + + if txn_row: + d = self.new_delivered_transactions.setdefault(destination, {}) + d[transaction_id] = txn_row._replace( + response_code=code, + response_json=None, # For now, don't persist response + ) + else: + d = self.update_delivered_transactions.setdefault(destination, {}) + # For now, don't persist response + d[transaction_id] = _UpdateTransactionRow(code, None) def get_transactions_after(self, transaction_id, destination): """Get all transactions after a given local transaction_id. @@ -305,3 +331,46 @@ class TransactionStore(SQLBaseStore): txn.execute(query, (self._clock.time_msec(),)) return self.cursor_to_dict(txn) + + @defer.inlineCallbacks + def _persist_in_mem_txns(self): + try: + inflight = self.inflight_transactions + new_delivered = self.new_delivered_transactions + update_delivered = self.update_delivered_transactions + + self.inflight_transactions = {} + self.new_delivered_transactions = {} + self.update_delivered_transactions = {} + + full_rows = [ + row._asdict() + for txn_map in itertools.chain(inflight.values(), new_delivered.values()) + for row in txn_map.values() + ] + + def f(txn): + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) + + for dest, txn_map in update_delivered.items(): + for txn_id, update_row in txn_map.items(): + self._simple_update_one_txn( + txn, + table="sent_transactions", + keyvalues={ + "transaction_id": txn_id, + "destination": dest, + }, + updatevalues={ + "response_code": update_row.response_code, + "response_json": None, # For now, don't persist response + } + ) + + yield self.runInteraction("_persist_in_mem_txns", f) + except: + logger.exception("Failed to persist transactions!") From 1d275dba69272f6df5a79871dc4b8d31e71514d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:25:58 +0100 Subject: [PATCH 06/12] Don't needlessly enter transaction --- synapse/storage/transactions.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 17fc60198..ba1969b24 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -159,10 +159,8 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - # TODO: Fetch prev_txns - return self.runInteraction( - "prep_send_transaction", + "_get_prevs_txn", self._get_prevs_txn, destination, ) @@ -350,11 +348,12 @@ class TransactionStore(SQLBaseStore): ] def f(txn): - self._simple_insert_many_txn( - txn=txn, - table="sent_transactions", - values=full_rows - ) + if full_rows: + self._simple_insert_many_txn( + txn=txn, + table="sent_transactions", + values=full_rows + ) for dest, txn_map in update_delivered.items(): for txn_id, update_row in txn_map.items(): @@ -371,6 +370,7 @@ class TransactionStore(SQLBaseStore): } ) - yield self.runInteraction("_persist_in_mem_txns", f) + if full_rows or update_delivered: + yield self.runInteraction("_persist_in_mem_txns", f) except: logger.exception("Failed to persist transactions!") From d13459636fab9637a43c950d17f883ca77b832f7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 11:30:55 +0100 Subject: [PATCH 07/12] Pull prev txn from in memory --- synapse/storage/transactions.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index ba1969b24..6c7481a72 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -58,6 +58,8 @@ class TransactionStore(SQLBaseStore): # Newly delivered transactions that *were* persisted while in flight self.update_delivered_transactions = {} + self.last_transaction = {} + reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns) hs.get_clock().looping_call( self._persist_in_mem_txns, @@ -159,11 +161,15 @@ class TransactionStore(SQLBaseStore): self.inflight_transactions.setdefault(destination, {})[transaction_id] = txn_row - return self.runInteraction( - "_get_prevs_txn", - self._get_prevs_txn, - destination, - ) + prev_txn = self.last_transaction.get(destination) + if prev_txn: + return defer.succeed(prev_txn) + else: + return self.runInteraction( + "_get_prevs_txn", + self._get_prevs_txn, + destination, + ) def _get_prevs_txn(self, txn, destination): # First we find out what the prev_txns should be. @@ -196,6 +202,8 @@ class TransactionStore(SQLBaseStore): destination, {} ).pop(transaction_id, None) + self.last_transaction[destination] = transaction_id + if txn_row: d = self.new_delivered_transactions.setdefault(destination, {}) d[transaction_id] = txn_row._replace( From b6e0be701eb8ae36236242549ef356397f5f1d06 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 14:31:38 +0100 Subject: [PATCH 08/12] Queue events for persistence --- synapse/storage/events.py | 157 +++++++++++++++++++++++++++++++++++--- 1 file changed, 145 insertions(+), 12 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0307b2af3..07e873fce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,12 +19,14 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json -from collections import namedtuple +from collections import deque, namedtuple + import logging import math @@ -50,6 +52,80 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +class _EventPeristenceQueue(object): + """Queues up events so that they can be persisted in bulk with only one + concurrent transaction per room. + """ + + _EventPersistQueueItem = namedtuple("_EventPersistQueueItem", ( + "events_and_contexts", "current_state", "backfilled", "deferred", + )) + + def __init__(self): + self._event_persist_queues = {} + self._currently_persisting_rooms = set() + + def add_to_queue(self, room_id, events_and_contexts, backfilled, current_state): + """Add events to the queue, with the given persist_event options. + """ + queue = self._event_persist_queues.setdefault(room_id, deque()) + if queue: + end_item = queue[-1] + if end_item.current_state or current_state: + # We perist events with current_state set to True one at a time + pass + if end_item.backfilled == backfilled: + end_item.events_and_contexts.extend(events_and_contexts) + return end_item.deferred.observe() + + deferred = ObservableDeferred(defer.Deferred()) + + queue.append(self._EventPersistQueueItem( + events_and_contexts=events_and_contexts, + backfilled=backfilled, + current_state=current_state, + deferred=deferred, + )) + + return deferred.observe() + + def handle_queue(self, room_id, callback): + """Attempts to handle the queue for a room if not already being handled. + + The given callback will be invoked with a 'queue' arg, which is a + generator over _EventPersistQueueItem's. The queue will finish if there + are no longer any items in the room queue. + + This function should therefore be called whenever anything is added + to the queue. + + If another callback is currently handling the queue then it will not be + invoked. + """ + + if room_id in self._currently_persisting_rooms: + return + + self._currently_persisting_rooms.add(room_id) + + try: + callback(self._get_drainining_queue(room_id)) + finally: + self._currently_persisting_rooms.discard(room_id) + + def _get_drainining_queue(self, room_id): + queue = self._event_persist_queues.pop(room_id, None) + if not queue: + return + + try: + while True: + yield queue.popleft() + except IndexError: + # Queue has been drained. + pass + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" @@ -59,19 +135,80 @@ class EventsStore(SQLBaseStore): self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts ) - @defer.inlineCallbacks + self._event_persist_queue = _EventPeristenceQueue() + def persist_events(self, events_and_contexts, backfilled=False): """ Write events to the database Args: events_and_contexts: list of tuples of (event, context) backfilled: ? - - Returns: Tuple of stream_orderings where the first is the minimum and - last is the maximum stream ordering assigned to the events when - persisting. - """ + partitioned = {} + for event, ctx in events_and_contexts: + partitioned.setdefault(event.room_id, []).append((event, ctx)) + + deferreds = [] + for room_id, evs_ctxs in partitioned.items(): + d = self._event_persist_queue.add_to_queue( + room_id, evs_ctxs, + backfilled=backfilled, + current_state=None, + ) + deferreds.append(d) + + for room_id in partitioned.keys(): + self._maybe_start_persisting(room_id) + + return defer.gatherResults(deferreds, consumeErrors=True) + + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, current_state=None, backfilled=False): + deferred = self._event_persist_queue.add_to_queue( + event.room_id, [(event, context)], + backfilled=backfilled, + current_state=current_state, + ) + + self._maybe_start_persisting(event.room_id) + + yield deferred + + max_persisted_id = yield self._stream_id_gen.get_current_token() + defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id)) + + def _maybe_start_persisting(self, room_id): + @defer.inlineCallbacks + def persisting_queue(queue): + for item in queue: + try: + ret = None + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) + logger.info("Resolving with ret: %r", ret) + item.deferred.callback(ret) + except Exception as e: + logger.exception("Failed to persist events") + item.deferred.errback(e) + + self._event_persist_queue.handle_queue(room_id, persisting_queue) + + @defer.inlineCallbacks + def _persist_events(self, events_and_contexts, backfilled=False): if not events_and_contexts: return @@ -118,8 +255,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks @log_function - def persist_event(self, event, context, current_state=None, backfilled=False): - + def _persist_event(self, event, context, current_state=None, backfilled=False): try: with self._stream_id_gen.get_next() as stream_ordering: with self._state_groups_id_gen.get_next() as state_group_id: @@ -136,9 +272,6 @@ class EventsStore(SQLBaseStore): except _RollbackButIsFineException: pass - max_persisted_id = yield self._stream_id_gen.get_current_token() - defer.returnValue((stream_ordering, max_persisted_id)) - @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, From fd85b167ecb650451bf8bbfc68b771f607bb1d9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 15:38:42 +0100 Subject: [PATCH 09/12] Pull loop one level up --- synapse/storage/events.py | 77 +++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 36 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 07e873fce..9f8f0a082 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.async import ObservableDeferred +from synapse.util.async import ObservableDeferred, run_on_reactor from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes @@ -89,12 +89,14 @@ class _EventPeristenceQueue(object): return deferred.observe() - def handle_queue(self, room_id, callback): + def handle_queue(self, room_id, per_item_callback): """Attempts to handle the queue for a room if not already being handled. - The given callback will be invoked with a 'queue' arg, which is a - generator over _EventPersistQueueItem's. The queue will finish if there - are no longer any items in the room queue. + The given callback will be invoked with for each item in the queue,1 + of type _EventPersistQueueItem. The per_item_callback will continuously + be called with new items, unless the queue becomnes empty. The return + value of the function will be given to the deferreds waiting on the item, + exceptions will be passed to the deferres as well. This function should therefore be called whenever anything is added to the queue. @@ -108,15 +110,26 @@ class _EventPeristenceQueue(object): self._currently_persisting_rooms.add(room_id) - try: - callback(self._get_drainining_queue(room_id)) - finally: - self._currently_persisting_rooms.discard(room_id) + @defer.inlineCallbacks + def handle_queue_loop(): + try: + queue = self._get_drainining_queue(room_id) + for item in queue: + try: + ret = yield per_item_callback(item) + item.deferred.callback(ret) + except Exception as e: + item.deferred.errback(e) + finally: + queue = self._event_persist_queues.pop(room_id, None) + if queue: + self._event_persist_queues[room_id] = queue + self._currently_persisting_rooms.discard(room_id) + + preserve_fn(handle_queue_loop)() def _get_drainining_queue(self, room_id): - queue = self._event_persist_queues.pop(room_id, None) - if not queue: - return + queue = self._event_persist_queues.setdefault(room_id, deque()) try: while True: @@ -180,30 +193,22 @@ class EventsStore(SQLBaseStore): def _maybe_start_persisting(self, room_id): @defer.inlineCallbacks - def persisting_queue(queue): - for item in queue: - try: - ret = None - if item.current_state: - for event, context in item.events_and_contexts: - # There should only ever be one item in - # events_and_contexts when current_state is - # not None - yield self._persist_event( - event, context, - current_state=item.current_state, - backfilled=item.backfilled, - ) - else: - yield self._persist_events( - item.events_and_contexts, - backfilled=item.backfilled, - ) - logger.info("Resolving with ret: %r", ret) - item.deferred.callback(ret) - except Exception as e: - logger.exception("Failed to persist events") - item.deferred.errback(e) + def persisting_queue(item): + if item.current_state: + for event, context in item.events_and_contexts: + # There should only ever be one item in + # events_and_contexts when current_state is + # not None + yield self._persist_event( + event, context, + current_state=item.current_state, + backfilled=item.backfilled, + ) + else: + yield self._persist_events( + item.events_and_contexts, + backfilled=item.backfilled, + ) self._event_persist_queue.handle_queue(room_id, persisting_queue) From fcb2c3f0db70593709c3bfbfc66772066c4aa061 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 6 May 2016 15:47:40 +0100 Subject: [PATCH 10/12] Remove unused import --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9f8f0a082..a27b7919c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ from twisted.internet import defer, reactor from synapse.events import FrozenEvent, USE_FROZEN_DICTS from synapse.events.utils import prune_event -from synapse.util.async import ObservableDeferred, run_on_reactor +from synapse.util.async import ObservableDeferred from synapse.util.logcontext import preserve_fn, PreserveLoggingContext from synapse.util.logutils import log_function from synapse.api.constants import EventTypes From 4ea762c1a28edbf18d0a183ed35fb8a5a11847c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:08:21 +0100 Subject: [PATCH 11/12] Add cache to get_user_by_id --- synapse/storage/registration.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7af0cae6a..bda84a744 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -101,6 +101,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id ) + self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) def _register( @@ -156,6 +157,7 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( table="users", @@ -193,6 +195,7 @@ class RegistrationStore(SQLBaseStore): }, { 'password_hash': password_hash }) + self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[]): From f6ebaf4a3255fa298cb0c8a7f23294265038de7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:10:06 +0100 Subject: [PATCH 12/12] Run transaction queue on reactor This ensures that any CPU work that happens doesn't block message sending. --- synapse/federation/transaction_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 1928da03b..5787f854d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -20,6 +20,7 @@ from .persistence import TransactionActions from .units import Transaction from synapse.api.errors import HttpResponseException +from synapse.util.async import run_on_reactor from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( @@ -199,6 +200,8 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + yield run_on_reactor() + # list of (pending_pdu, deferred, order) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending