From 2674aeb96a1b75583dc3ea514cbce580e8ae35c8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 16:16:47 +0000 Subject: [PATCH 01/13] Factor out ExpiringCache from StateHandler --- synapse/state.py | 46 +++----------- synapse/util/expiringcache.py | 115 ++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 37 deletions(-) create mode 100644 synapse/util/expiringcache.py diff --git a/synapse/state.py b/synapse/state.py index fe5f3dc84..80cced351 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.util.expiringcache import ExpiringCache from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.events.snapshot import EventContext @@ -51,7 +52,6 @@ class _StateCacheEntry(object): def __init__(self, state, state_group, ts): self.state = state self.state_group = state_group - self.ts = ts class StateHandler(object): @@ -69,12 +69,15 @@ class StateHandler(object): def start_caching(self): logger.debug("start_caching") - self._state_cache = {} + self._state_cache = ExpiringCache( + cache_name="state_cache", + clock=self.clock, + max_len=SIZE_OF_CACHE, + expiry_ms=EVICTION_TIMEOUT_SECONDS*1000, + reset_expiry_on_get=True, + ) - def f(): - self._prune_cache() - - self.clock.looping_call(f, 5*1000) + self._state_cache.start() @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): @@ -409,34 +412,3 @@ class StateHandler(object): return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() return sorted(events, key=key_func) - - def _prune_cache(self): - logger.debug( - "_prune_cache. before len: %d", - len(self._state_cache.keys()) - ) - - now = self.clock.time_msec() - - if len(self._state_cache.keys()) > SIZE_OF_CACHE: - sorted_entries = sorted( - self._state_cache.items(), - key=lambda k, v: v.ts, - ) - - for k, _ in sorted_entries[SIZE_OF_CACHE:]: - self._state_cache.pop(k) - - keys_to_delete = set() - - for key, cache_entry in self._state_cache.items(): - if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000: - keys_to_delete.add(key) - - for k in keys_to_delete: - self._state_cache.pop(k) - - logger.debug( - "_prune_cache. after len: %d", - len(self._state_cache.keys()) - ) diff --git a/synapse/util/expiringcache.py b/synapse/util/expiringcache.py new file mode 100644 index 000000000..fb5b27ad1 --- /dev/null +++ b/synapse/util/expiringcache.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + + +logger = logging.getLogger(__name__) + + +class ExpiringCache(object): + def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, + reset_expiry_on_get=False): + """ + Args: + cache_name (str): Name of this cache, used for logging. + clock (Clock) + max_len (int): Max size of dict. If the dict grows larger than this + then the oldest items get automatically evicted. Default is 0, + which indicates there is no max limit. + expiry_ms (int): How long before an item is evicted from the cache + in milliseconds. Default is 0, indicating items never get + evicted based on time. + reset_expiry_on_get (bool): If true, will reset the expiry time for + an item on access. Defaults to False. + + """ + self._cache_name = cache_name + + self._clock = clock + + self._max_len = max_len + self._expiry_ms = expiry_ms + + self._reset_expiry_on_get = reset_expiry_on_get + + self._cache = {} + + def start(self): + if not self._expiry_ms: + # Don't bother starting the loop if things never expire + return + + def f(): + self._prune_cache() + + self._clock.looping_call(f, self._expiry_ms) + + def __setitem__(self, key, value): + now = self._clock.time_msec() + self._cache[key] = _CacheEntry(now, value) + + # Evict if there are now too many items + if self._max_len and len(self._cache.keys()) > self._max_len: + sorted_entries = sorted( + self._cache.items(), + key=lambda k, v: v.time, + ) + + for k, _ in sorted_entries[self._max_len:]: + self._cache.pop(k) + + def __getitem__(self, key): + entry = self._cache[key] + + if self._reset_expiry_on_get: + entry.time = self._clock.time_msec() + + return entry.value + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return default + + def _purge_cache(self): + if not self._expiry_ms: + # zero expiry time means don't expire. This should never get called + # since we have this check in start too. + return + begin_length = len(self._cache) + + now = self._clock.time_msec() + + keys_to_delete = set() + + for key, cache_entry in self._cache.items(): + if now - cache_entry.time > self._expiry_ms: + keys_to_delete.add(key) + + for k in keys_to_delete: + self._cache.pop(k) + + logger.debug( + "[%s] _prune_cache before: %d, after len: %d", + self._cache_name, begin_length, len(self._cache.keys()) + ) + + +class _CacheEntry(object): + def __init__(self, time, value): + self.time = time + self.value = value From baa5b9a97582d4b3c825be1225aba7863230c047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 18:02:39 +0000 Subject: [PATCH 02/13] Cache results of get_pdu. --- synapse/federation/federation_client.py | 42 +++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46..83b4947b9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,7 +19,8 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -30,6 +31,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._fail_fetch_pdu_cache = None + + def start_pdu_fail_cache(self): + self._fail_fetch_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._fail_fetch_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -160,6 +175,11 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._fail_fetch_pdu_cache: + e = self._fail_fetch_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: @@ -181,8 +201,21 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - except CodeMessageException: - raise + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -190,6 +223,9 @@ class FederationClient(FederationBase): ) continue + if self._fail_fetch_pdu_cache is not None: + self._fail_fetch_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks From 02bfa889de1990eb8dc47770834bef777252dc82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:08:27 +0000 Subject: [PATCH 03/13] Handle recieving failures in transactions --- synapse/federation/federation_server.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e94d0411b..34f5b1744 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.enqueue_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,6 +132,9 @@ class FederationServer(FederationBase): edu.content ) + for failure in getattr(transaction, "failures", []): + logger.info("Got failure %r", failure) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] From c82e26ad4ba80742adc68a7e8c52441d760e4746 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:24:13 +0000 Subject: [PATCH 04/13] Actually respond with JSON to incoming transaction --- synapse/federation/federation_server.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34f5b1744..c48a41b5d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -147,6 +147,8 @@ class FederationServer(FederationBase): logger.debug("Returning: %s", str(ret)) + response = ret + yield self.transaction_actions.set_response( transaction, 200, response From 659ead082ff11842fea803e44d75667e0ca38d71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:58:52 +0000 Subject: [PATCH 05/13] Format the response of transaction request in a nicer way --- synapse/federation/federation_server.py | 19 +++++++++++++++---- synapse/federation/transaction_queue.py | 22 ++++++++++++++++++++-- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c48a41b5d..d1ec0b9ea 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -118,7 +118,7 @@ class FederationServer(FederationBase): def handle_failure(failure): failure.trap(FederationError) - self.enqueue_failure(failure.value, transaction.origin) + self.send_failure(failure.value, transaction.origin) d.addErrback(handle_failure) @@ -132,7 +132,7 @@ class FederationServer(FederationBase): edu.content ) - for failure in getattr(transaction, "failures", []): + for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) results = yield defer.DeferredList(dl, consumeErrors=True) @@ -143,11 +143,15 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) - response = ret + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } yield self.transaction_actions.set_response( transaction, @@ -358,6 +362,13 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) + raise FederationError( + "ERROR", + 403, + "Forbidden", + affected=pdu.event_id, + ) + state = None auth_chain = [] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2eba..6faaa066f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -91,7 +91,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send pdu", failure) + logger.warn("Failed to send pdu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -116,7 +116,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send edu", failure) + logger.warn("Failed to send edu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -133,6 +133,15 @@ class TransactionQueue(object): (failure, deferred) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send failure", failure.value) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + yield deferred @defer.inlineCallbacks @@ -249,6 +258,15 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: code = e.code response = e.response From 4de93001bf6a7d8e770b990ea3546237b2569609 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:12:06 +0000 Subject: [PATCH 06/13] Make matrixfederationclient log more nicely --- synapse/http/matrixfederationclient.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 192794800..764b151d9 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -16,6 +16,7 @@ from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError +from twisted.python.failure import Failure from twisted.web.client import readBody, _AgentBase, _URI from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone @@ -146,14 +147,22 @@ class MatrixFederationHttpClient(object): ) raise SynapseError(400, "Domain specified not found.") + if hasattr(e, "reasons"): + reasons = ", ".join( + f.value.message + for f in e.reasons + ) + else: + reasons = e.message + logger.warn( - "Sending request failed to %s: %s %s : %s", + "Sending request failed to %s: %s %s: %s - %s", destination, method, url_bytes, - e + type(e). __name__, + reasons, ) - _print_ex(e) if retries_left: yield sleep(2 ** (5 - retries_left)) @@ -447,14 +456,6 @@ def _readBodyToFile(response, stream, max_size): return d -def _print_ex(e): - if hasattr(e, "reasons") and e.reasons: - for ex in e.reasons: - _print_ex(ex) - else: - logger.warn(e) - - class _JsonProducer(object): """ Used by the twisted http client to create the HTTP body from json """ From 472734a8cc19d4e4b9f5e311b59f58a37142f8a2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:13:50 +0000 Subject: [PATCH 07/13] Consume errors in time_bound_deferred --- synapse/util/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index e77eba90a..79109d0b1 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -99,8 +99,6 @@ class Clock(object): except: pass - return res - given_deferred.addCallbacks(callback=sucess, errback=err) timer = self.call_later(time_out, timed_out_fn) From 0647e27a414e5a86cab57bba65931376e855c289 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:19:54 +0000 Subject: [PATCH 08/13] Remove unused import --- synapse/http/matrixfederationclient.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 764b151d9..454c3d4ab 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -16,7 +16,6 @@ from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError -from twisted.python.failure import Failure from twisted.web.client import readBody, _AgentBase, _URI from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone From 676e8ee78ae3f51cbc0113c8d810d4bc1e81cdab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:22:45 +0000 Subject: [PATCH 09/13] Remove debug raise --- synapse/federation/federation_server.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 679fb141e..22b966383 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -361,13 +361,6 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - raise FederationError( - "ERROR", - 403, - "Forbidden", - affected=pdu.event_id, - ) - state = None auth_chain = [] From 8b919c00f30c03ddae257f0129f58f2d0285723c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:44:01 +0000 Subject: [PATCH 10/13] Start the get_pdu cache --- synapse/app/homeserver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 27b478a1c..7565d9444 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -275,6 +275,7 @@ def setup(): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() + hs.get_replication_layer().start_pdu_fail_cache() if config.daemonize: print config.pid_file From e7e20417ca468c1afe2e27162b9790f860d2da51 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:44:26 +0000 Subject: [PATCH 11/13] ExpiringCache: purge every 1/2 interval --- synapse/util/expiringcache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/expiringcache.py b/synapse/util/expiringcache.py index fb5b27ad1..3c8409b16 100644 --- a/synapse/util/expiringcache.py +++ b/synapse/util/expiringcache.py @@ -55,7 +55,7 @@ class ExpiringCache(object): def f(): self._prune_cache() - self._clock.looping_call(f, self._expiry_ms) + self._clock.looping_call(f, self._expiry_ms/2) def __setitem__(self, key, value): now = self._clock.time_msec() From 964bb43fbe15105c58f9550f376fa76709734cd8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:44:41 +0000 Subject: [PATCH 12/13] Fix typo in function name --- synapse/util/expiringcache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/expiringcache.py b/synapse/util/expiringcache.py index 3c8409b16..1c7859297 100644 --- a/synapse/util/expiringcache.py +++ b/synapse/util/expiringcache.py @@ -85,7 +85,7 @@ class ExpiringCache(object): except KeyError: return default - def _purge_cache(self): + def _prune_cache(self): if not self._expiry_ms: # zero expiry time means don't expire. This should never get called # since we have this check in start too. From ec847059f3e9b9b5de62aa2f7ad2366c4e883fac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:14:10 +0000 Subject: [PATCH 13/13] Rename _fail_fetch_pdu_cache to _get_pdu_cache --- synapse/app/homeserver.py | 2 +- synapse/federation/federation_client.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7565d9444..7be82d057 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -275,7 +275,7 @@ def setup(): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() - hs.get_replication_layer().start_pdu_fail_cache() + hs.get_replication_layer().start_get_pdu_cache() if config.daemonize: print config.pid_file diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83b4947b9..6042e366b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,10 +32,10 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): def __init__(self): - self._fail_fetch_pdu_cache = None + self._get_pdu_cache = None - def start_pdu_fail_cache(self): - self._fail_fetch_pdu_cache = ExpiringCache( + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, @@ -43,7 +43,7 @@ class FederationClient(FederationBase): reset_expiry_on_get=False, ) - self._fail_fetch_pdu_cache.start() + self._get_pdu_cache.start() @log_function def send_pdu(self, pdu, destinations): @@ -175,8 +175,8 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. - if self._fail_fetch_pdu_cache: - e = self._fail_fetch_pdu_cache.get(event_id) + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) if e: defer.returnValue(e) @@ -223,8 +223,8 @@ class FederationClient(FederationBase): ) continue - if self._fail_fetch_pdu_cache is not None: - self._fail_fetch_pdu_cache[event_id] = pdu + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu)