From 1f1dee94f6025ce0a6e414cd6098cb766567bdd8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 May 2016 10:13:25 +0100 Subject: [PATCH 01/37] Manually run GC on reactor tick. This also adds a metric for amount of time spent in GC. --- synapse/metrics/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5664d5a38..17be491b9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -22,6 +22,7 @@ import functools import os import stat import time +import gc from twisted.internet import reactor @@ -155,6 +156,7 @@ get_metrics_for("process").register_callback("fds", _process_fds, labels=["type" reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +gc_time = reactor_metrics.register_distribution("gc_time") def runUntilCurrentTimer(func): @@ -182,6 +184,18 @@ def runUntilCurrentTimer(func): end = time.time() * 1000 tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + + threshold = gc.get_threshold() + counts = gc.get_count() + + start = time.time() * 1000 + for i in [2, 1, 0]: + if threshold[i] < counts[i]: + logger.info("Collecting gc %d", i) + gc.collect(i) + end = time.time() * 1000 + gc_time.inc_by(end - start) + return ret return f @@ -196,5 +210,6 @@ try: # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) + gc.disable() except AttributeError: pass From 7d6e89ed22aab4bae9ce033c2e4757a595257942 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 May 2016 16:31:08 +0100 Subject: [PATCH 02/37] Add a comment --- synapse/metrics/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 17be491b9..c82685a52 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -185,6 +185,8 @@ def runUntilCurrentTimer(func): tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) + # Check if we need to do a manual GC (since its been disabled), and do + # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() @@ -210,6 +212,9 @@ try: # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) + + # We manually run the GC each reactor tick so that we can get some metrics + # about time spent doing GC, gc.disable() except AttributeError: pass From 60d53f9e9596520472954831ce8fea251a462d46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 May 2016 09:32:29 +0100 Subject: [PATCH 03/37] Count number of GC collects --- synapse/metrics/__init__.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index c82685a52..bba265707 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,7 +156,12 @@ get_metrics_for("process").register_callback("fds", _process_fds, labels=["type" reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") -gc_time = reactor_metrics.register_distribution("gc_time") + +gc_time = ( + reactor_metrics.register_distribution("gc_time_gen0"), + reactor_metrics.register_distribution("gc_time_gen2"), + reactor_metrics.register_distribution("gc_time_gen2"), +) def runUntilCurrentTimer(func): @@ -189,14 +194,15 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - - start = time.time() * 1000 for i in [2, 1, 0]: if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) + + start = time.time() * 1000 gc.collect(i) - end = time.time() * 1000 - gc_time.inc_by(end - start) + end = time.time() * 1000 + + gc_time[i].inc_by(end - start) return ret From abb151f3c9bf78f2825dba18da6bbc88ce61d32c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 11:57:26 +0100 Subject: [PATCH 04/37] Add a separate process that can handle /sync requests --- synapse/app/synchrotron.py | 467 +++++++++++++++++++++++++++++++++++++ 1 file changed, 467 insertions(+) create mode 100644 synapse/app/synchrotron.py diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 000000000..f592ad352 --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,467 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.storage.presence import UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, +): + def get_presence_list_accepted(self, user_localpart): + return () + + def insert_client_ip(self, user, access_token, ip, user_agent): + pass + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + # TODO: Send this less frequently. + # TODO: Make sure this doesn't race. Currently we can lose updates + # if two users come online in quick sucession and the second http + # to the master completes before the first. + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users() + + def _end(): + if affect_presence: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + def _send_syncing_users(self): + return self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() From a7ff5a17702812ae586228396d534a8ed3d88475 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 13:40:55 +0100 Subject: [PATCH 05/37] Presence metrics. Change def of small delta --- synapse/handlers/presence.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fc8538b41..eb877763e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -50,6 +50,9 @@ timers_fired_counter = metrics.register_counter("timers_fired") federation_presence_counter = metrics.register_counter("federation_presence") bump_active_time_counter = metrics.register_counter("bump_active_time") +full_update_presence_counter = metrics.register_counter("full_update_presence") +partial_update_presence_counter = metrics.register_counter("partial_update_presence") + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" @@ -974,13 +977,13 @@ class PresenceEventSource(object): user_ids_changed = set() changed = None - if from_key and max_token - from_key < 100: - # For small deltas, its quicker to get all changes and then - # work out if we share a room or they're in our presence list + if from_key: changed = stream_change_cache.get_all_entities_changed(from_key) - # get_all_entities_changed can return None - if changed is not None: + if changed is not None and len(changed) < 100: + # For small deltas, its quicker to get all changes and then + # work out if we share a room or they're in our presence list + partial_update_presence_counter.inc() for other_user_id in changed: if other_user_id in friends: user_ids_changed.add(other_user_id) @@ -992,6 +995,8 @@ class PresenceEventSource(object): else: # Too many possible updates. Find all users we can see and check # if any of them have changed. + full_update_presence_counter.inc() + user_ids_to_check = set() for room_id in room_ids: users = yield self.store.get_users_in_room(room_id) From 4ce84a1acd89a7f61896e92605e5463864848122 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 13:49:16 +0100 Subject: [PATCH 06/37] Change metric style --- synapse/handlers/presence.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index eb877763e..0e19f777b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -50,8 +50,7 @@ timers_fired_counter = metrics.register_counter("timers_fired") federation_presence_counter = metrics.register_counter("federation_presence") bump_active_time_counter = metrics.register_counter("bump_active_time") -full_update_presence_counter = metrics.register_counter("full_update_presence") -partial_update_presence_counter = metrics.register_counter("partial_update_presence") +get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them @@ -980,10 +979,10 @@ class PresenceEventSource(object): if from_key: changed = stream_change_cache.get_all_entities_changed(from_key) - if changed is not None and len(changed) < 100: + if changed is not None and len(changed) < 500: # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list - partial_update_presence_counter.inc() + get_updates_counter.inc("stream") for other_user_id in changed: if other_user_id in friends: user_ids_changed.add(other_user_id) @@ -995,7 +994,7 @@ class PresenceEventSource(object): else: # Too many possible updates. Find all users we can see and check # if any of them have changed. - full_update_presence_counter.inc() + get_updates_counter.inc("full") user_ids_to_check = set() for room_id in room_ids: From 80aade380545a0b661e2bbef48e175900ed4d41f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:24:19 +0100 Subject: [PATCH 07/37] Send updates to the syncing users every ten seconds or immediately if they've just come online --- synapse/app/synchrotron.py | 53 +++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f592ad352..7b45c87a9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,7 +16,7 @@ import synapse -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig @@ -41,7 +41,7 @@ from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string @@ -135,6 +135,8 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) +UPDATE_SYNCING_USERS_MS = 10 * 1000 + class SynchrotronPresence(object): def __init__(self, hs): @@ -153,6 +155,13 @@ class SynchrotronPresence(object): self.process_id = random_string(16) logger.info("Presence process_id is %r", self.process_id) + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -165,12 +174,10 @@ class SynchrotronPresence(object): if affect_presence: curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - # TODO: Send this less frequently. - # TODO: Make sure this doesn't race. Currently we can lose updates - # if two users come online in quick sucession and the second http - # to the master completes before the first. - # TODO: Don't block the sync request on this HTTP hit. - yield self._send_syncing_users() + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() def _end(): if affect_presence: @@ -185,8 +192,24 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) - def _send_syncing_users(self): - return self.http_client.post_json_get_json(self.syncing_users_url, { + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { "process_id": self.process_id, "syncing_users": [ user_id for user_id, count in self.user_to_num_current_syncs.items() @@ -194,6 +217,16 @@ class SynchrotronPresence(object): ], }) + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + def process_replication(self, result): stream = result.get("presence", {"rows": []}) for row in stream["rows"]: From eef541a2919649e6d756d45a29d47fe76cfe02e2 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:42:35 +0100 Subject: [PATCH 08/37] Move insert_client_ip to a separate class --- synapse/storage/__init__.py | 48 ++----------------------- synapse/storage/client_ips.py | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 45 deletions(-) create mode 100644 synapse/storage/client_ips.py diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6928a213e..e93c3de66 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore ) -from ._base import Cache, LoggingTransaction +from ._base import LoggingTransaction from .directory import DirectoryStore from .events import EventsStore from .presence import PresenceStore, UserPresenceState @@ -45,6 +45,7 @@ from .search import SearchStore from .tags import TagsStore from .account_data import AccountDataStore from .openid import OpenIdStore +from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator @@ -58,12 +59,6 @@ import logging logger = logging.getLogger(__name__) -# Number of msec of granularity to store the user IP 'last seen' time. Smaller -# times give more inserts into the database even for readonly API hits -# 120 seconds == 2 minutes -LAST_SEEN_GRANULARITY = 120 * 1000 - - class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, @@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore, AccountDataStore, EventPushActionsStore, OpenIdStore, + ClientIpStore, ): def __init__(self, db_conn, hs): @@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore, self._clock = hs.get_clock() self.database_engine = hs.database_engine - self.client_ip_last_seen = Cache( - name="client_ip_last_seen", - keylen=4, - ) - self._stream_id_gen = StreamIdGenerator( db_conn, "events", "stream_ordering", extra_tables=[("local_invites", "stream_id")] @@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore, return [UserPresenceState(**row) for row in rows] - @defer.inlineCallbacks - def insert_client_ip(self, user, access_token, ip, user_agent): - now = int(self._clock.time_msec()) - key = (user.to_string(), access_token, ip) - - try: - last_seen = self.client_ip_last_seen.get(key) - except KeyError: - last_seen = None - - # Rate-limited inserts - if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: - defer.returnValue(None) - - self.client_ip_last_seen.prefill(key, now) - - # It's safe not to lock here: a) no unique constraint, - # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely - yield self._simple_upsert( - "user_ips", - keyvalues={ - "user_id": user.to_string(), - "access_token": access_token, - "ip": ip, - "user_agent": user_agent, - }, - values={ - "last_seen": now, - }, - desc="insert_client_ip", - lock=False, - ) - @defer.inlineCallbacks def count_daily_users(self): """ diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py new file mode 100644 index 000000000..a90990e00 --- /dev/null +++ b/synapse/storage/client_ips.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore, Cache + +from twisted.internet import defer + + +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120 * 1000 + + +class ClientIpStore(SQLBaseStore): + + def __init__(self, hs): + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + ) + + super(ClientIpStore, self).__init__(hs) + + @defer.inlineCallbacks + def insert_client_ip(self, user, access_token, ip, user_agent): + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, ip) + + try: + last_seen = self.client_ip_last_seen.get(key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) + + self.client_ip_last_seen.prefill(key, now) + + # It's safe not to lock here: a) no unique constraint, + # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely + yield self._simple_upsert( + "user_ips", + keyvalues={ + "user_id": user.to_string(), + "access_token": access_token, + "ip": ip, + "user_agent": user_agent, + }, + values={ + "last_seen": now, + }, + desc="insert_client_ip", + lock=False, + ) From 0b3c80a234cd8f16c8714af7e7b719dc2e635b20 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:55:01 +0100 Subject: [PATCH 09/37] Use ClientIpStore to record client ips --- synapse/app/synchrotron.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b45c87a9..0446a1643 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -27,6 +27,7 @@ from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -36,6 +37,7 @@ from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore @@ -119,13 +121,12 @@ class SynchrotronSlavedStore( SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () - def insert_client_ip(self, user, access_token, ip, user_agent): - pass - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. From da491e75b2d46c885f7fbb9240501c223e7c59bd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 14:56:36 +0100 Subject: [PATCH 10/37] Appease flake8 --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0446a1643..af06ce70d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStre because the constructor is different ): def get_presence_list_accepted(self, user_localpart): return () From 48340e4f13a8090feac070ebb507e7629d03b530 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 15:02:27 +0100 Subject: [PATCH 11/37] Clear the list of ongoing syncs on shutdown --- synapse/app/synchrotron.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index af06ce70d..f4b416f77 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -163,6 +163,8 @@ class SynchrotronPresence(object): UPDATE_SYNCING_USERS_MS, ) + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + def set_state(self, user, state): # TODO Hows this supposed to work? pass @@ -193,6 +195,13 @@ class SynchrotronPresence(object): defer.returnValue(_user_syncing()) + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + def _send_syncing_users_regularly(self): # Only send an update if we aren't in the middle of sending one. if not self._sending_sync: From 06d40c8b9841cd877e70e205d55a08f423ff2ec9 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 3 Jun 2016 16:31:23 +0100 Subject: [PATCH 12/37] Add substitutions to email notif From --- synapse/push/mailer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 88402e42a..933a53fc3 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -186,7 +186,7 @@ class Mailer(object): multipart_msg = MIMEMultipart('alternative') multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) - multipart_msg['From'] = self.hs.config.email_notif_from + multipart_msg['From'] = self.hs.config.email_notif_from % (self.app_name, ) multipart_msg['To'] = email_address multipart_msg['Date'] = email.utils.formatdate() multipart_msg['Message-ID'] = email.utils.make_msgid() From fbf608decbf85051379dc24446b1b6e89ff97e8c Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 3 Jun 2016 16:38:39 +0100 Subject: [PATCH 13/37] Oops, we're using the dict form --- synapse/push/mailer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 933a53fc3..011bc4d2b 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -186,7 +186,9 @@ class Mailer(object): multipart_msg = MIMEMultipart('alternative') multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) - multipart_msg['From'] = self.hs.config.email_notif_from % (self.app_name, ) + multipart_msg['From'] = self.hs.config.email_notif_from % { + "app": self.app_name + } multipart_msg['To'] = email_address multipart_msg['Date'] = email.utils.formatdate() multipart_msg['Message-ID'] = email.utils.make_msgid() From 72c4d482e99d30fe96e2b24389629abe5b572626 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 3 Jun 2016 16:39:50 +0100 Subject: [PATCH 14/37] 3rd time lucky: we'd already calculated it above --- synapse/push/mailer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 011bc4d2b..e5c3929cd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -186,9 +186,7 @@ class Mailer(object): multipart_msg = MIMEMultipart('alternative') multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text) - multipart_msg['From'] = self.hs.config.email_notif_from % { - "app": self.app_name - } + multipart_msg['From'] = from_string multipart_msg['To'] = email_address multipart_msg['Date'] = email.utils.formatdate() multipart_msg['Message-ID'] = email.utils.make_msgid() From 05e01f21d7012c1853ff566c8a76aa66087bfbd7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 17:12:48 +0100 Subject: [PATCH 15/37] Remove event fetching from DB threads --- synapse/replication/slave/storage/events.py | 5 - synapse/storage/appservice.py | 21 ++- synapse/storage/events.py | 138 -------------------- synapse/storage/room.py | 46 ++++--- synapse/storage/search.py | 29 ++-- synapse/storage/stream.py | 34 +++-- tests/storage/test_appservice.py | 2 +- 7 files changed, 75 insertions(+), 200 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cbc1ae419..877c68508 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -131,15 +131,10 @@ class SlavedEventStore(BaseSlavedStore): _get_events_from_cache = DataStore._get_events_from_cache.__func__ _invalidate_get_event_cache = DataStore._invalidate_get_event_cache.__func__ - _parse_events_txn = DataStore._parse_events_txn.__func__ - _get_events_txn = DataStore._get_events_txn.__func__ - _get_event_txn = DataStore._get_event_txn.__func__ _enqueue_events = DataStore._enqueue_events.__func__ _do_fetch = DataStore._do_fetch.__func__ - _fetch_events_txn = DataStore._fetch_events_txn.__func__ _fetch_event_rows = DataStore._fetch_event_rows.__func__ _get_event_from_row = DataStore._get_event_from_row.__func__ - _get_event_from_row_txn = DataStore._get_event_from_row_txn.__func__ _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index feb9d228a..ffb7d4a25 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -298,6 +298,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): dict(txn_id=txn_id, as_id=service.id) ) + @defer.inlineCallbacks def get_oldest_unsent_txn(self, service): """Get the oldest transaction which has not been sent for this service. @@ -308,12 +309,23 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ - return self.runInteraction( + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", self._get_oldest_unsent_txn, service ) + if not entry: + defer.returnValue(None) + + event_ids = json.loads(entry["event_ids"]) + + events = yield self.get_events(event_ids) + + defer.returnValue(AppServiceTransaction( + service=service, id=entry["txn_id"], events=events + )) + def _get_oldest_unsent_txn(self, txn, service): # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) @@ -328,12 +340,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore): entry = rows[0] - event_ids = json.loads(entry["event_ids"]) - events = self._get_events_txn(txn, event_ids) - - return AppServiceTransaction( - service=service, id=entry["txn_id"], events=events - ) + return entry def _get_last_txn(self, txn, service_id): txn.execute( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2b3f79577..b710505a7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -762,41 +762,6 @@ class EventsStore(SQLBaseStore): if e_id in event_map and event_map[e_id] ]) - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not event_ids: - return [] - - event_map = self._get_events_from_cache( - event_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - missing_events_ids = [e for e in event_ids if e not in event_map] - - if not missing_events_ids: - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - - missing_events = self._fetch_events_txn( - txn, - missing_events_ids, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - event_map.update(missing_events) - - return [ - event_map[e_id] for e_id in event_ids - if e_id in event_map and event_map[e_id] - ] - def _invalidate_get_event_cache(self, event_id): for check_redacted in (False, True): for get_prev_content in (False, True): @@ -804,18 +769,6 @@ class EventsStore(SQLBaseStore): (event_id, check_redacted, get_prev_content) ) - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - events = self._get_events_txn( - txn, [event_id], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - return events[0] if events else None - def _get_events_from_cache(self, events, check_redacted, get_prev_content, allow_rejected): event_map = {} @@ -981,34 +934,6 @@ class EventsStore(SQLBaseStore): return rows - def _fetch_events_txn(self, txn, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): - if not events: - return {} - - rows = self._fetch_event_rows( - txn, events, - ) - - if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] - - res = [ - self._get_event_from_row_txn( - txn, - row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=row["rejects"], - ) - for row in rows - ] - - return { - r.event_id: r - for r in res - } - @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, check_redacted=True, get_prev_content=False, @@ -1070,69 +995,6 @@ class EventsStore(SQLBaseStore): defer.returnValue(ev) - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - d = json.loads(js) - internal_metadata = json.loads(internal_metadata) - - if rejected_reason: - rejected_reason = self._simple_select_one_onecol_txn( - txn, - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - ) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - - if check_redacted and redacted: - ev = prune_event(ev) - - redaction_id = self._simple_select_one_onecol_txn( - txn, - table="redactions", - keyvalues={"redacts": ev.event_id}, - retcol="event_id", - ) - - ev.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = self._get_event_txn( - txn, - redaction_id, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) - - return ev - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 26933e593..97f9f1929 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -194,32 +194,44 @@ class RoomStore(SQLBaseStore): @cachedInlineCallbacks() def get_room_name_and_aliases(self, room_id): - def f(txn): + def get_room_name(txn): sql = ( - "SELECT event_id FROM current_state_events " - "WHERE room_id = ? " + "SELECT name FROM room_names" + " INNER JOIN current_state_events USING (room_id, event_id)" + " WHERE room_id = ?" + " LIMIT 1" ) - sql += " AND ((type = 'm.room.name' AND state_key = '')" - sql += " OR type = 'm.room.aliases')" - txn.execute(sql, (room_id,)) - results = self.cursor_to_dict(txn) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + return None - return self._parse_events_txn(txn, results) + return [row[0] for row in txn.fetchall()] - events = yield self.runInteraction("get_room_name_and_aliases", f) + def get_room_aliases(txn): + sql = ( + "SELECT content FROM current_state_events" + " INNER JOIN events USING (room_id, event_id)" + " WHERE room_id = ?" + ) + txn.execute(sql, (room_id,)) + return [row[0] for row in txn.fetchall()] + + name = yield self.runInteraction("get_room_name", get_room_name) + alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases) - name = None aliases = [] - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) + for c in alias_contents: + try: + content = json.loads(c) + except: + continue + + aliases.extend(content.get('aliases', [])) defer.returnValue((name, aliases)) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 022429962..12941d177 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -21,6 +21,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging import re +import ujson as json logger = logging.getLogger(__name__) @@ -52,7 +53,7 @@ class SearchStore(BackgroundUpdateStore): def reindex_search_txn(txn): sql = ( - "SELECT stream_ordering, event_id FROM events" + "SELECT stream_ordering, event_id, room_id, type, content FROM events" " WHERE ? <= stream_ordering AND stream_ordering < ?" " AND (%s)" " ORDER BY stream_ordering DESC" @@ -61,28 +62,30 @@ class SearchStore(BackgroundUpdateStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = txn.fetchall() + rows = self.cursor_to_dict(txn) if not rows: return 0 - min_stream_id = rows[-1][0] - event_ids = [row[1] for row in rows] - - events = self._get_events_txn(txn, event_ids) + min_stream_id = rows[-1]["stream_ordering"] event_search_rows = [] - for event in events: + for row in rows: try: - event_id = event.event_id - room_id = event.room_id - content = event.content - if event.type == "m.room.message": + event_id = row["event_id"] + room_id = row["room_id"] + etype = row["type"] + try: + content = json.loads(row["content"]) + except: + continue + + if etype == "m.room.message": key = "content.body" value = content["body"] - elif event.type == "m.room.topic": + elif etype == "m.room.topic": key = "content.topic" value = content["topic"] - elif event.type == "m.room.name": + elif etype == "m.room.name": key = "content.name" value = content["name"] except (KeyError, AttributeError): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 95b12559a..b9ad965fd 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -132,29 +132,25 @@ class StreamStore(SQLBaseStore): return True return False - ret = self._get_events_txn( - txn, - # apply the filter on the room id list - [ - r["event_id"] for r in rows - if app_service_interested(r) - ], - get_prev_content=True - ) + return [r for r in rows if app_service_interested(r)] - self._set_before_and_after(ret, rows) + rows = yield self.runInteraction("get_appservice_room_stream", f) - if rows: - key = "s%d" % max(r["stream_ordering"] for r in rows) - else: - # Assume we didn't get anything because there was nothing to - # get. - key = to_key + ret = yield self._get_events( + [r["event_id"] for r in rows], + get_prev_content=True + ) - return ret, key + self._set_before_and_after(ret, rows, topo_order=from_id is None) - results = yield self.runInteraction("get_appservice_room_stream", f) - defer.returnValue(results) + if rows: + key = "s%d" % max(r["stream_ordering"] for r in rows) + else: + # Assume we didn't get anything because there was nothing to + # get. + key = to_key + + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0, diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 573419812..f44c4870e 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store._get_events_txn = Mock(return_value=events) + self.store.get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) From 10ea3f46ba3eda2f7c220a5e5902b687feb3042c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 17:55:32 +0100 Subject: [PATCH 16/37] Change the way we cache events --- synapse/storage/events.py | 80 ++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b710505a7..779743b8f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -139,6 +139,9 @@ class _EventPeristenceQueue(object): pass +_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) + + class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" @@ -741,7 +744,6 @@ class EventsStore(SQLBaseStore): event_map = self._get_events_from_cache( event_ids, check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) @@ -751,40 +753,49 @@ class EventsStore(SQLBaseStore): missing_events = yield self._enqueue_events( missing_events_ids, check_redacted=check_redacted, - get_prev_content=get_prev_content, allow_rejected=allow_rejected, ) event_map.update(missing_events) - defer.returnValue([ + events = [ event_map[e_id] for e_id in event_id_list if e_id in event_map and event_map[e_id] - ]) + ] + + if get_prev_content: + for event in events: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender + + defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate( - (event_id, check_redacted, get_prev_content) - ) + self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, check_redacted, get_prev_content, - allow_rejected): + def _get_events_from_cache(self, events, check_redacted, allow_rejected): event_map = {} for event_id in events: - try: - ret = self._get_event_cache.get( - (event_id, check_redacted, get_prev_content,) - ) + ret = self._get_event_cache.get((event_id,), None) + if not ret: + continue - if allow_rejected or not ret.rejected_reason: - event_map[event_id] = ret + if allow_rejected or not ret.event.rejected_reason: + if check_redacted and ret.redacted_event: + event_map[event_id] = ret.redacted_event else: - event_map[event_id] = None - except KeyError: - pass + event_map[event_id] = ret.event + else: + event_map[event_id] = None return event_map @@ -855,8 +866,7 @@ class EventsStore(SQLBaseStore): reactor.callFromThread(fire, event_list) @defer.inlineCallbacks - def _enqueue_events(self, events, check_redacted=True, - get_prev_content=False, allow_rejected=False): + def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): """Fetches events from the database using the _event_fetch_list. This allows batch and bulk fetching of events - it allows us to fetch events without having to create a new transaction for each request for events. @@ -895,7 +905,6 @@ class EventsStore(SQLBaseStore): preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], check_redacted=check_redacted, - get_prev_content=get_prev_content, rejected_reason=row["rejects"], ) for row in rows @@ -936,8 +945,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): + check_redacted=True, rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -949,14 +957,17 @@ class EventsStore(SQLBaseStore): desc="_get_event_from_row", ) - ev = FrozenEvent( + original_ev = FrozenEvent( d, internal_metadata_dict=internal_metadata, rejected_reason=rejected_reason, ) + ev = original_ev + redacted_event = None if check_redacted and redacted: ev = prune_event(ev) + redacted_event = ev redaction_id = yield self._simple_select_one_onecol( table="redactions", @@ -979,19 +990,10 @@ class EventsStore(SQLBaseStore): # will serialise this field correctly ev.unsigned["redacted_because"] = because - if get_prev_content and "replaces_state" in ev.unsigned: - prev = yield self.get_event( - ev.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - ev.unsigned["prev_content"] = prev.content - ev.unsigned["prev_sender"] = prev.sender - - self._get_event_cache.prefill( - (ev.event_id, check_redacted, get_prev_content), ev - ) + self._get_event_cache.prefill((ev.event_id,), _EventCacheEntry( + event=original_ev, + redacted_event=redacted_event, + )) defer.returnValue(ev) From 8f79084bd44f76223048c1bd6d836f904edcc95e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:03:40 +0100 Subject: [PATCH 17/37] Add get_presence_list_accepted to the broken caches in synchrotron --- synapse/app/synchrotron.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f4b416f77..c77854fab 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState +from synapse.storage.presence import PresenceStore, UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree @@ -124,9 +124,6 @@ class SynchrotronSlavedStore( BaseSlavedStore, ClientIpStore, # After BaseSlavedStre because the constructor is different ): - def get_presence_list_accepted(self, user_localpart): - return () - # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a # way to invalidate the forgotten rooms cache correctly. @@ -136,6 +133,13 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["who_forgot_in_room"] ) + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -357,6 +361,7 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + store.get_presence_list_accepted.invalidate_all() def notify_from_stream( result, stream_name, stream_key, room=None, user=None From ac9716f1546ae486cac435b8a577cc2c54b666d6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 3 Jun 2016 18:10:00 +0100 Subject: [PATCH 18/37] Fix spelling --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index c77854fab..aa81e1c5d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -122,7 +122,7 @@ class SynchrotronSlavedStore( SlavedFilteringStore, SlavedPresenceStore, BaseSlavedStore, - ClientIpStore, # After BaseSlavedStre because the constructor is different + ClientIpStore, # After BaseSlavedStore because the constructor is different ): # XXX: This is a bit broken because we don't persist forgotten rooms # in a way that they can be streamed. This means that we don't have a From cffe46408f40db082df76adc263cf5014031ae54 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Jun 2016 18:25:21 +0100 Subject: [PATCH 19/37] Don't rely on options when inserting event into cache --- synapse/storage/events.py | 83 ++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 779743b8f..5db24e86f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -741,13 +741,12 @@ class EventsStore(SQLBaseStore): event_id_list = event_ids event_ids = set(event_ids) - event_map = self._get_events_from_cache( + event_entry_map = self._get_events_from_cache( event_ids, - check_redacted=check_redacted, allow_rejected=allow_rejected, ) - missing_events_ids = [e for e in event_ids if e not in event_map] + missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: missing_events = yield self._enqueue_events( @@ -756,32 +755,40 @@ class EventsStore(SQLBaseStore): allow_rejected=allow_rejected, ) - event_map.update(missing_events) + event_entry_map.update(missing_events) - events = [ - event_map[e_id] for e_id in event_id_list - if e_id in event_map and event_map[e_id] - ] + events = [] + for event_id in event_id_list: + entry = event_entry_map.get(event_id, None) + if not entry: + continue - if get_prev_content: - for event in events: - if "replaces_state" in event.unsigned: - prev = yield self.get_event( - event.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - event.unsigned = dict(event.unsigned) - event.unsigned["prev_content"] = prev.content - event.unsigned["prev_sender"] = prev.sender + if allow_rejected or not entry.event.rejected_reason: + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event + + events.append(event) + + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender defer.returnValue(events) def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, check_redacted, allow_rejected): + def _get_events_from_cache(self, events, allow_rejected): event_map = {} for event_id in events: @@ -790,10 +797,7 @@ class EventsStore(SQLBaseStore): continue if allow_rejected or not ret.event.rejected_reason: - if check_redacted and ret.redacted_event: - event_map[event_id] = ret.redacted_event - else: - event_map[event_id] = ret.event + event_map[event_id] = ret else: event_map[event_id] = None @@ -904,7 +908,6 @@ class EventsStore(SQLBaseStore): [ preserve_fn(self._get_event_from_row)( row["internal_metadata"], row["json"], row["redacts"], - check_redacted=check_redacted, rejected_reason=row["rejects"], ) for row in rows @@ -913,7 +916,7 @@ class EventsStore(SQLBaseStore): ) defer.returnValue({ - e.event_id: e + e.event.event_id: e for e in res if e }) @@ -945,7 +948,7 @@ class EventsStore(SQLBaseStore): @defer.inlineCallbacks def _get_event_from_row(self, internal_metadata, js, redacted, - check_redacted=True, rejected_reason=None): + rejected_reason=None): d = json.loads(js) internal_metadata = json.loads(internal_metadata) @@ -954,7 +957,7 @@ class EventsStore(SQLBaseStore): table="rejections", keyvalues={"event_id": rejected_reason}, retcol="reason", - desc="_get_event_from_row", + desc="_get_event_from_row_rejected_reason", ) original_ev = FrozenEvent( @@ -963,20 +966,18 @@ class EventsStore(SQLBaseStore): rejected_reason=rejected_reason, ) - ev = original_ev redacted_event = None - if check_redacted and redacted: - ev = prune_event(ev) - redacted_event = ev + if redacted: + redacted_event = prune_event(original_ev) redaction_id = yield self._simple_select_one_onecol( table="redactions", - keyvalues={"redacts": ev.event_id}, + keyvalues={"redacts": redacted_event.event_id}, retcol="event_id", - desc="_get_event_from_row", + desc="_get_event_from_row_redactions", ) - ev.unsigned["redacted_by"] = redaction_id + redacted_event.unsigned["redacted_by"] = redaction_id # Get the redaction event. because = yield self.get_event( @@ -988,14 +989,16 @@ class EventsStore(SQLBaseStore): if because: # It's fine to do add the event directly, since get_pdu_json # will serialise this field correctly - ev.unsigned["redacted_because"] = because + redacted_event.unsigned["redacted_because"] = because - self._get_event_cache.prefill((ev.event_id,), _EventCacheEntry( + cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event, - )) + ) - defer.returnValue(ev) + self._get_event_cache.prefill((original_ev.event_id,), cache_entry) + + defer.returnValue(cache_entry) @defer.inlineCallbacks def count_daily_messages(self): From 70aee0717c22acf7eabb5f158cbaf527137bc90e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 11:08:12 +0100 Subject: [PATCH 20/37] Add events to cache when we persist them --- synapse/storage/events.py | 41 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db24e86f..16398dc0a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -635,6 +635,8 @@ class EventsStore(SQLBaseStore): ], ) + self._add_to_cache(txn, events_and_contexts) + if backfilled: # Backfilled events come before the current state so we don't need # to update the current state table @@ -676,6 +678,45 @@ class EventsStore(SQLBaseStore): return + def _add_to_cache(self, txn, events_and_contexts): + to_prefill = [] + + rows = [] + N = 200 + for i in range(0, len(events_and_contexts), N): + ev_map = { + e[0].event_id: e[0] + for e in events_and_contexts[i:i + N] + } + if not ev_map: + break + + sql = ( + "SELECT " + " e.event_id as event_id, " + " r.redacts as redacts," + " rej.event_id as rejects " + " FROM events as e" + " LEFT JOIN rejections as rej USING (event_id)" + " LEFT JOIN redactions as r ON e.event_id = r.redacts" + " WHERE e.event_id IN (%s)" + ) % (",".join(["?"] * len(ev_map)),) + + txn.execute(sql, ev_map.keys()) + rows = self.cursor_to_dict(txn) + for row in rows: + event = ev_map[row["event_id"]] + if not row["rejects"] and not row["redacts"]: + to_prefill.append(_EventCacheEntry( + event=event, + redacted_event=None, + )) + + def prefill(): + for cache_entry in to_prefill: + self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry) + txn.call_after(prefill) + def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) From 7aa778fba9bb81087c3a1029e0a0d4ff55b1a065 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 11:58:09 +0100 Subject: [PATCH 21/37] Add metric counter for number of persisted events --- synapse/storage/events.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db24e86f..ff4f742f6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes from canonicaljson import encode_canonical_json from collections import deque, namedtuple +import synapse +import synapse.metrics + import logging import math @@ -35,6 +38,10 @@ import ujson as json logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) +persist_event_counter = metrics.register_counter("persisted_events") + + def encode_json(json_object): if USE_FROZEN_DICTS: # ujson doesn't like frozen_dicts @@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore): events_and_contexts=chunk, backfilled=backfilled, ) + persist_event_counter.inc_by(len(chunk)) @defer.inlineCallbacks @log_function @@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore): current_state=current_state, backfilled=backfilled, ) + persist_event_counter.inc() except _RollbackButIsFineException: pass From 377eb480ca66a376e85cf8927f7f9112ed60e8bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 15:14:21 +0100 Subject: [PATCH 22/37] Fire after 30s not 8h --- synapse/handlers/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0e19f777b..2e772da66 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,7 +183,7 @@ class PresenceHandler(object): # The initial delay is to allow disconnected clients a chance to # reconnect before we treat them as offline. self.clock.call_later( - 30 * 1000, + 30, self.clock.looping_call, self._handle_timeouts, 5000, From 96dc600579cd6ef9937b0e007f51aa4da0fc122d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 15:44:41 +0100 Subject: [PATCH 23/37] Fix typos --- synapse/handlers/presence.py | 70 +++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2e772da66..94160a5be 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -283,44 +283,48 @@ class PresenceHandler(object): """Checks the presence of users that have timed out and updates as appropriate. """ + logger.info("Handling presence timeouts") now = self.clock.time_msec() - with Measure(self.clock, "presence_handle_timeouts"): - # Fetch the list of users that *may* have timed out. Things may have - # changed since the timeout was set, so we won't necessarily have to - # take any action. - users_to_check = set(self.wheel_timer.fetch(now)) + try: + with Measure(self.clock, "presence_handle_timeouts"): + # Fetch the list of users that *may* have timed out. Things may have + # changed since the timeout was set, so we won't necessarily have to + # take any action. + users_to_check = set(self.wheel_timer.fetch(now)) - # Check whether the lists of syncing processes from an external - # process have expired. - expired_process_ids = [ - process_id for process_id, last_update - in self.external_process_last_update.items() - if now - last_update > EXTERNAL_PROCESS_EXPIRY - ] - for process_id in expired_process_ids: - users_to_check.update( - self.external_process_to_current_syncs.pop(process_id, ()) + # Check whether the lists of syncing processes from an external + # process have expired. + expired_process_ids = [ + process_id for process_id, last_update + in self.external_process_last_updated_ms.items() + if now - last_update > EXTERNAL_PROCESS_EXPIRY + ] + for process_id in expired_process_ids: + users_to_check.update( + self.external_process_last_updated_ms.pop(process_id, ()) + ) + self.external_process_last_update.pop(process_id) + + states = [ + self.user_to_current_state.get( + user_id, UserPresenceState.default(user_id) + ) + for user_id in users_to_check + ] + + timers_fired_counter.inc_by(len(states)) + + changes = handle_timeouts( + states, + is_mine_fn=self.is_mine_id, + syncing_user_ids=self.get_currently_syncing_users(), + now=now, ) - self.external_process_last_update.pop(process_id) - states = [ - self.user_to_current_state.get( - user_id, UserPresenceState.default(user_id) - ) - for user_id in users_to_check - ] - - timers_fired_counter.inc_by(len(states)) - - changes = handle_timeouts( - states, - is_mine_fn=self.is_mine_id, - syncing_users=self.get_syncing_users(), - now=now, - ) - - preserve_fn(self._update_states)(changes) + preserve_fn(self._update_states)(changes) + except: + logger.exception("Exception in _handle_timeouts loop") @defer.inlineCallbacks def bump_presence_active_time(self, user): From 216a05b3e39e08b0600a39fc111b4d669d06ff7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Jun 2016 16:00:09 +0100 Subject: [PATCH 24/37] .values() returns list of sets --- synapse/handlers/presence.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 94160a5be..6b70fa381 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -406,7 +406,8 @@ class PresenceHandler(object): user_id for user_id, count in self.user_to_num_current_syncs.items() if count } - syncing_user_ids.update(self.external_process_to_current_syncs.values()) + for user_ids in self.external_process_to_current_syncs.values(): + syncing_user_ids.update(user_ids) return syncing_user_ids @defer.inlineCallbacks From 5ef84da4f11f1b1cceb0c44d9867bb597ee68e64 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:05:28 +0100 Subject: [PATCH 25/37] Yield on the sleeps intended to backoff replication --- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f1de1e7ce..3c3fa3805 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -311,7 +311,7 @@ class PusherServer(HomeServer): poke_pushers(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(30) + yield sleep(30) def setup(config_options): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5d..7273055cc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -443,7 +443,7 @@ class SynchrotronServer(HomeServer): notify(result) except: logger.exception("Error replicating from %r", replication_url) - sleep(5) + yield sleep(5) def build_presence_handler(self): return SynchrotronPresence(self) From 4a5bbb1941ae63f1d6632aa35e80274e56c8dbb9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 6 Jun 2016 16:37:12 +0100 Subject: [PATCH 26/37] Fix a KeyError in the synchrotron presence --- synapse/app/synchrotron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index aa81e1c5d..3d0d5cc15 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -187,7 +187,10 @@ class SynchrotronPresence(object): yield self._send_syncing_users_now() def _end(): - if affect_presence: + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: self.user_to_num_current_syncs[user_id] -= 1 @contextlib.contextmanager From 310197bab5cf8ed2c26fae522f15f092dbcdff58 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 09:34:50 +0100 Subject: [PATCH 27/37] Fix AS retries --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index ffb7d4a25..a28157163 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - events = yield self.get_events(event_ids) + event_map = yield self.get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=events + service=service, id=entry["txn_id"], events=event_map.values() )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f44c4870e..6db4b966d 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] + events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out self.store.get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 10, events.values()) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events, txn.events) + self.assertEquals(events.values(), txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self): From 84379062f9ec259abc302af321d4ed8f5a958c01 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 10:24:50 +0100 Subject: [PATCH 28/37] Fix AS retries, but with correct ordering --- synapse/storage/appservice.py | 4 ++-- tests/storage/test_appservice.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index a28157163..d1ee533fa 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -320,10 +320,10 @@ class ApplicationServiceTransactionStore(SQLBaseStore): event_ids = json.loads(entry["event_ids"]) - event_map = yield self.get_events(event_ids) + events = yield self._get_events(event_ids) defer.returnValue(AppServiceTransaction( - service=service, id=entry["txn_id"], events=event_map.values() + service=service, id=entry["txn_id"], events=events )) def _get_oldest_unsent_txn(self, txn, service): diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 6db4b966d..3e2862daa 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -353,21 +353,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = {"e1": Mock(event_id="e1"), "e2": Mock(event_id="e2")} + events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events = Mock(return_value=events) + self.store._get_events = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events.values()) + yield self._insert_txn(service.id, 10, events) yield self._insert_txn(service.id, 11, other_events) yield self._insert_txn(service.id, 12, other_events) txn = yield self.store.get_oldest_unsent_txn(service) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) - self.assertEquals(events.values(), txn.events) + self.assertEquals(events, txn.events) @defer.inlineCallbacks def test_get_appservices_by_state_single(self): From 88625db05f274ad855fb51b33c84c09c947a6bd0 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 7 Jun 2016 11:33:36 +0100 Subject: [PATCH 29/37] Notify users for events in rooms they join. Change how the notifier updates the map from room_id to user streams on receiving a join event. Make it update the map when it notifies for the join event, rather than using the "user_joined_room" distributor signal --- synapse/notifier.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index cbec4d30a..30883a069 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.util.logutils import log_function @@ -152,10 +152,6 @@ class Notifier(object): self.appservice_handler = hs.get_application_service_handler() self.state_handler = hs.get_state_handler() - hs.get_distributor().observe( - "user_joined_room", self._user_joined_room - ) - self.clock.looping_call( self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS ) @@ -248,6 +244,9 @@ class Notifier(object): ) app_streams |= app_user_streams + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + self._user_joined_room(event.state_key, event.room_id) + self.on_new_event( "room_key", room_stream_id, users=extra_users, @@ -483,9 +482,8 @@ class Notifier(object): user_stream.appservice, set() ).add(user_stream) - def _user_joined_room(self, user, room_id): - user = str(user) - new_user_stream = self.user_to_user_stream.get(user) + def _user_joined_room(self, user_id, room_id): + new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) From 75331c5fca6d2207094b8cbf0b3bb34cc52a4ec4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 13:33:13 +0100 Subject: [PATCH 30/37] Change the way we do stats --- synapse/metrics/__init__.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index f317034b8..ef14bcd84 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -153,11 +153,7 @@ reactor_metrics = get_metrics_for("reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") -gc_time = ( - reactor_metrics.register_distribution("gc_time_gen0"), - reactor_metrics.register_distribution("gc_time_gen2"), - reactor_metrics.register_distribution("gc_time_gen2"), -) +gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) def runUntilCurrentTimer(func): @@ -190,7 +186,7 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - for i in [2, 1, 0]: + for i in (0, 1, 2): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) @@ -198,7 +194,7 @@ def runUntilCurrentTimer(func): gc.collect(i) end = time.time() * 1000 - gc_time[i].inc_by(end - start) + gc_time.inc_by(end - start, i) return ret From 48e65099b52383743a47844b6369e173b9a96f90 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 13:40:22 +0100 Subject: [PATCH 31/37] Also record number of unreachable objects --- synapse/metrics/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ef14bcd84..b29cec3de 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -154,6 +154,7 @@ tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) +gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) def runUntilCurrentTimer(func): @@ -186,15 +187,16 @@ def runUntilCurrentTimer(func): # one if necessary. threshold = gc.get_threshold() counts = gc.get_count() - for i in (0, 1, 2): + for i in (2, 1, 0): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) start = time.time() * 1000 - gc.collect(i) + unreachable = gc.collect(i) end = time.time() * 1000 gc_time.inc_by(end - start, i) + gc_unreachable.inc_by(unreachable, i) return ret From 0b2158719c43eab87ab7a9448ae1d85008b92b92 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 7 Jun 2016 15:07:11 +0100 Subject: [PATCH 32/37] Remove dead code. Loading push rules now happens in the datastore, so we can remove the methods that loaded them outside the datastore. The ``waiting_for_join_list`` in federation handler is populated by anything, so can be removed. The ``_get_members_events_txn`` method isn't called from anywhere so can be removed. --- synapse/handlers/federation.py | 13 ------------ synapse/push/bulk_push_rule_evaluator.py | 8 -------- synapse/push/clientformat.py | 26 ------------------------ synapse/storage/roommember.py | 7 ------- 4 files changed, 54 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 648a505e6..ff83c608e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -66,10 +66,6 @@ class FederationHandler(BaseHandler): self.hs = hs - self.distributor.observe("user_joined_room", self.user_joined_room) - - self.waiting_for_join_list = {} - self.store = hs.get_datastore() self.replication_layer = hs.get_replication_layer() self.state_handler = hs.get_state_handler() @@ -1091,15 +1087,6 @@ class FederationHandler(BaseHandler): def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) - @log_function - def user_joined_room(self, user, room_id): - waiters = self.waiting_for_join_list.get( - (user.to_string(), room_id), - [] - ) - while waiters: - waiters.pop().callback(None) - @defer.inlineCallbacks @log_function def _handle_new_event(self, origin, event, state=None, auth_events=None, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 6e42121b1..756e5da51 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -14,7 +14,6 @@ # limitations under the License. import logging -import ujson as json from twisted.internet import defer @@ -27,13 +26,6 @@ from synapse.visibility import filter_events_for_clients logger = logging.getLogger(__name__) -def decode_rule_json(rule): - rule = dict(rule) - rule['conditions'] = json.loads(rule['conditions']) - rule['actions'] = json.loads(rule['actions']) - return rule - - @defer.inlineCallbacks def _get_rules(room_id, user_ids, store): rules_by_user = yield store.bulk_get_push_rules(user_ids) diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py index b3983f794..e0331b2d2 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py @@ -13,37 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.push.baserules import list_with_base_rules - from synapse.push.rulekinds import ( PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP ) import copy -import simplejson as json - - -def load_rules_for_user(user, rawrules, enabled_map): - ruleslist = [] - for rawrule in rawrules: - rule = dict(rawrule) - rule["conditions"] = json.loads(rawrule["conditions"]) - rule["actions"] = json.loads(rawrule["actions"]) - ruleslist.append(rule) - - # We're going to be mutating this a lot, so do a deep copy - rules = list(list_with_base_rules(ruleslist)) - - for i, rule in enumerate(rules): - rule_id = rule['rule_id'] - if rule_id in enabled_map: - if rule.get('enabled', True) != bool(enabled_map[rule_id]): - # Rules are cached across users. - rule = dict(rule) - rule['enabled'] = bool(enabled_map[rule_id]) - rules[i] = rule - - return rules def format_push_rules_for_user(user, ruleslist): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 64b4bd371..8bd693be7 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -243,13 +243,6 @@ class RoomMemberStore(SQLBaseStore): user_ids = yield self.get_users_in_room(room_id) defer.returnValue(set(get_domain_from_id(uid) for uid in user_ids)) - def _get_members_events_txn(self, txn, room_id, membership=None, user_id=None): - rows = self._get_members_rows_txn( - txn, - room_id, membership, user_id, - ) - return [r["event_id"] for r in rows] - def _get_members_rows_txn(self, txn, room_id, membership=None, user_id=None): where_clause = "c.room_id = ?" where_values = [room_id] From dded389ac16ec023c986df400d25ca94a4a28677 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 15:45:56 +0100 Subject: [PATCH 33/37] Allow setting of gc.set_thresholds --- synapse/app/homeserver.py | 5 +++++ synapse/app/pusher.py | 5 +++++ synapse/app/synchrotron.py | 15 ++++++++++----- synapse/config/server.py | 19 ++++++++++++++++++- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index df675c0ed..22e1721fc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,6 +16,7 @@ import synapse +import gc import logging import os import sys @@ -351,6 +352,8 @@ class SynapseService(service.Service): def startService(self): hs = setup(self.config) change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) def stopService(self): return self._port.stopListening() @@ -422,6 +425,8 @@ def run(hs): # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) reactor.run() if hs.config.daemonize: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 3c3fa3805..7e2bf7ecc 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -43,6 +43,7 @@ from twisted.web.resource import Resource from daemonize import Daemonize +import gc import sys import logging @@ -342,6 +343,8 @@ def setup(config_options): ps.start_listening() change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) def start(): ps.replicate() @@ -361,6 +364,8 @@ if __name__ == '__main__': def run(): with LoggingContext("run"): change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) reactor.run() daemon = Daemonize( diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5c552ffb2..f9673ab8d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -57,6 +57,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import gc import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -484,6 +485,8 @@ def setup(config_options): ss.start_listening() change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + ss.set_threshold(*ss.config.gc_thresholds) def start(): ss.get_datastore().start_profiling() @@ -496,17 +499,19 @@ def setup(config_options): if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) + ss = setup(sys.argv[1:]) - if ps.config.daemonize: + if ss.config.daemonize: def run(): with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + gc.set_threshold(*ss.config.gc_thresholds) reactor.run() daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, + app="synapse-synchrotron", + pid=ss.config.pid_file, action=run, auto_close_fds=False, verbose=True, diff --git a/synapse/config/server.py b/synapse/config/server.py index c2d8f8a52..44b8d422e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config +from ._base import Config, ConfigError class ServerConfig(Config): @@ -38,6 +38,20 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + bind_port = config.get("bind_port") if bind_port: self.listeners = [] @@ -157,6 +171,9 @@ class ServerConfig(Config): # hard limit. soft_file_limit: 0 + # The GC threshold parameters to pass to `gc.set_threshold`, if defined + # gc_thresholds: [700, 10, 10] + # A list of other Home Servers to fetch the public room directory from # and include in the public room directory of this home server # This is a temporary stopgap solution to populate new server with a From 2d1d1025fac846e2746dc627c0ebb6542c1488d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:26:25 +0100 Subject: [PATCH 34/37] Add gc_threshold to pusher and synchrotron --- synapse/app/pusher.py | 14 ++++++++++++++ synapse/app/synchrotron.py | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7e2bf7ecc..4ec23d84c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -65,6 +65,20 @@ class SlaveConfig(DatabaseConfig): self.pid_file = self.abspath(config.get("pid_file")) self.public_baseurl = config["public_baseurl"] + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + # some things used by the auth handler but not actually used in the # pusher codebase self.bcrypt_rounds = None diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index f9673ab8d..297e19945 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -78,6 +78,20 @@ class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): self.macaroon_secret_key = config["macaroon_secret_key"] self.expire_access_token = config.get("expire_access_token", False) + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + def default_config(self, server_name, **kwargs): pid_file = self.abspath("synchroton.pid") return """\ From 64935d11f7730702cafba8591512ddb57e8fadf1 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 7 Jun 2016 16:35:28 +0100 Subject: [PATCH 35/37] Add script for running sytest with dendron --- jenkins-dendron-postgres.sh | 84 +++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100755 jenkins-dendron-postgres.sh diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh new file mode 100755 index 000000000..8e3a4c51a --- /dev/null +++ b/jenkins-dendron-postgres.sh @@ -0,0 +1,84 @@ +#!/bin/bash + +set -eux + +: ${WORKSPACE:="$(pwd)"} + +export PYTHONDONTWRITEBYTECODE=yep +export SYNAPSE_CACHE_FACTOR=1 + +# Output test results as junit xml +export TRIAL_FLAGS="--reporter=subunit" +export TOXSUFFIX="| subunit-1to2 | subunit2junitxml --no-passthrough --output-to=results.xml" +# Write coverage reports to a separate file for each process +export COVERAGE_OPTS="-p" +export DUMP_COVERAGE_COMMAND="coverage help" + +# Output flake8 violations to violations.flake8.log +# Don't exit with non-0 status code on Jenkins, +# so that the build steps continue and a later step can decided whether to +# UNSTABLE or FAILURE this build. +export PEP8SUFFIX="--output-file=violations.flake8.log || echo flake8 finished with status code \$?" + +rm .coverage* || echo "No coverage files to remove" + +tox --notest -e py27 + +TOX_BIN=$WORKSPACE/.tox/py27/bin +python synapse/python_dependencies.py | xargs -n1 $TOX_BIN/pip install +$TOX_BIN/pip install psycopg2 +$TOX_BIN/pip install lxml + +: ${GIT_BRANCH:="origin/$(git rev-parse --abbrev-ref HEAD)"} + +if [[ ! -e .dendron-base ]]; then + git clone https://github.com/matrix-org/dendron.git .dendron-base --mirror +else + (cd .dendron-base; git fetch -p) +fi + +rm -rf dendron +git clone .dendron-base dendron --shared +cd dendron + +: ${GOPATH:=${WORKSPACE}/.gopath} +if [[ "${GOPATH}" != *:* ]]; then + mkdir -p "${GOPATH}" + export PATH="${GOPATH}/bin:${PATH}" +fi +export GOPATH + +git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) + +go get github.com/constabulary/gb/... +gb generate +gb build + +cd .. + + +if [[ ! -e .sytest-base ]]; then + git clone https://github.com/matrix-org/sytest.git .sytest-base --mirror +else + (cd .sytest-base; git fetch -p) +fi + +rm -rf sytest +git clone .sytest-base sytest --shared +cd sytest + +git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) + +: ${PORT_BASE:=8000} + +./jenkins/prep_sytest_for_postgres.sh + +echo >&2 "Running sytest with PostgreSQL"; +./jenkins/install_and_run.sh --python $TOX_BIN/python \ + --synapse-directory $WORKSPACE \ + --dendron $WORKSPACE/dendron/bin/dendron \ + --synchrotron \ + --pusher \ + --port-base $PORT_BASE + +cd .. From 18f0cc7d993408a754e7ff26e9474a969adf762a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 16:51:01 +0100 Subject: [PATCH 36/37] Record some more GC metrics --- synapse/metrics/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index b29cec3de..8f69aa1ff 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,6 +156,11 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) +reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects())) +reactor_metrics.register_callback( + "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] +) + def runUntilCurrentTimer(func): From 0f2165ccf4fd0ae6636018cea7e1b91141179e88 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jun 2016 17:00:45 +0100 Subject: [PATCH 37/37] Don't track total objects as its too expensive to calculate --- synapse/metrics/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 8f69aa1ff..bdd7292a3 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -156,7 +156,6 @@ pending_calls_metric = reactor_metrics.register_distribution("pending_calls") gc_time = reactor_metrics.register_distribution("gc_time", labels=["gen"]) gc_unreachable = reactor_metrics.register_counter("gc_unreachable", labels=["gen"]) -reactor_metrics.register_callback("gc_total_objects", lambda: len(gc.get_objects())) reactor_metrics.register_callback( "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] )