Merge branch 'develop' of github.com:matrix-org/synapse into develop

This commit is contained in:
Erik Johnston 2015-02-18 10:29:54 +00:00
commit 2635d4e634
8 changed files with 214 additions and 57 deletions

View File

@ -277,6 +277,7 @@ def setup():
hs.get_pusherpool().start() hs.get_pusherpool().start()
hs.get_state_handler().start_caching() hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling() hs.get_datastore().start_profiling()
hs.get_replication_layer().start_get_pdu_cache()
if config.daemonize: if config.daemonize:
print config.pid_file print config.pid_file

View File

@ -19,7 +19,8 @@ from twisted.internet import defer
from .federation_base import FederationBase from .federation_base import FederationBase
from .units import Edu from .units import Edu
from synapse.api.errors import CodeMessageException from synapse.api.errors import CodeMessageException, SynapseError
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
@ -30,6 +31,20 @@ logger = logging.getLogger(__name__)
class FederationClient(FederationBase): class FederationClient(FederationBase):
def __init__(self):
self._get_pdu_cache = None
def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
expiry_ms=120*1000,
reset_expiry_on_get=False,
)
self._get_pdu_cache.start()
@log_function @log_function
def send_pdu(self, pdu, destinations): def send_pdu(self, pdu, destinations):
"""Informs the replication layer about a new PDU generated within the """Informs the replication layer about a new PDU generated within the
@ -160,6 +175,11 @@ class FederationClient(FederationBase):
# TODO: Rate limit the number of times we try and get the same event. # TODO: Rate limit the number of times we try and get the same event.
if self._get_pdu_cache:
e = self._get_pdu_cache.get(event_id)
if e:
defer.returnValue(e)
pdu = None pdu = None
for destination in destinations: for destination in destinations:
try: try:
@ -181,8 +201,21 @@ class FederationClient(FederationBase):
pdu = yield self._check_sigs_and_hash(pdu) pdu = yield self._check_sigs_and_hash(pdu)
break break
except CodeMessageException: except SynapseError:
raise logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except CodeMessageException as e:
if 400 <= e.code < 500:
raise
logger.info(
"Failed to get PDU %s from %s because %s",
event_id, destination, e,
)
continue
except Exception as e: except Exception as e:
logger.info( logger.info(
"Failed to get PDU %s from %s because %s", "Failed to get PDU %s from %s because %s",
@ -190,6 +223,9 @@ class FederationClient(FederationBase):
) )
continue continue
if self._get_pdu_cache is not None:
self._get_pdu_cache[event_id] = pdu
defer.returnValue(pdu) defer.returnValue(pdu)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -114,7 +114,15 @@ class FederationServer(FederationBase):
with PreserveLoggingContext(): with PreserveLoggingContext():
dl = [] dl = []
for pdu in pdu_list: for pdu in pdu_list:
dl.append(self._handle_new_pdu(transaction.origin, pdu)) d = self._handle_new_pdu(transaction.origin, pdu)
def handle_failure(failure):
failure.trap(FederationError)
self.send_failure(failure.value, transaction.origin)
d.addErrback(handle_failure)
dl.append(d)
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]: for edu in [Edu(**x) for x in transaction.edus]:
@ -124,6 +132,9 @@ class FederationServer(FederationBase):
edu.content edu.content
) )
for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure)
results = yield defer.DeferredList(dl, consumeErrors=True) results = yield defer.DeferredList(dl, consumeErrors=True)
ret = [] ret = []
@ -132,10 +143,16 @@ class FederationServer(FederationBase):
ret.append({}) ret.append({})
else: else:
logger.exception(r[1]) logger.exception(r[1])
ret.append({"error": str(r[1])}) ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret)) logger.debug("Returning: %s", str(ret))
response = {
"pdus": dict(zip(
(p.event_id for p in pdu_list), ret
)),
}
yield self.transaction_actions.set_response( yield self.transaction_actions.set_response(
transaction, transaction,
200, response 200, response

View File

@ -91,7 +91,7 @@ class TransactionQueue(object):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
else: else:
logger.warn("Failed to send pdu", failure) logger.warn("Failed to send pdu", failure.value)
with PreserveLoggingContext(): with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
@ -116,7 +116,7 @@ class TransactionQueue(object):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
else: else:
logger.warn("Failed to send edu", failure) logger.warn("Failed to send edu", failure.value)
with PreserveLoggingContext(): with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
@ -133,6 +133,15 @@ class TransactionQueue(object):
(failure, deferred) (failure, deferred)
) )
def eb(failure):
if not deferred.called:
deferred.errback(failure)
else:
logger.warn("Failed to send failure", failure.value)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb)
yield deferred yield deferred
@defer.inlineCallbacks @defer.inlineCallbacks
@ -249,6 +258,15 @@ class TransactionQueue(object):
transaction, json_data_cb transaction, json_data_cb
) )
code = 200 code = 200
if response:
for e_id, r in getattr(response, "pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e: except HttpResponseException as e:
code = e.code code = e.code
response = e.response response = e.response

View File

@ -146,14 +146,22 @@ class MatrixFederationHttpClient(object):
) )
raise SynapseError(400, "Domain specified not found.") raise SynapseError(400, "Domain specified not found.")
if hasattr(e, "reasons"):
reasons = ", ".join(
f.value.message
for f in e.reasons
)
else:
reasons = e.message
logger.warn( logger.warn(
"Sending request failed to %s: %s %s : %s", "Sending request failed to %s: %s %s: %s - %s",
destination, destination,
method, method,
url_bytes, url_bytes,
e type(e). __name__,
reasons,
) )
_print_ex(e)
if retries_left: if retries_left:
yield sleep(2 ** (5 - retries_left)) yield sleep(2 ** (5 - retries_left))
@ -447,14 +455,6 @@ def _readBodyToFile(response, stream, max_size):
return d return d
def _print_ex(e):
if hasattr(e, "reasons") and e.reasons:
for ex in e.reasons:
_print_ex(ex)
else:
logger.warn(e)
class _JsonProducer(object): class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json """ Used by the twisted http client to create the HTTP body from json
""" """

View File

@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.expiringcache import ExpiringCache
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
@ -51,7 +52,6 @@ class _StateCacheEntry(object):
def __init__(self, state, state_group, ts): def __init__(self, state, state_group, ts):
self.state = state self.state = state
self.state_group = state_group self.state_group = state_group
self.ts = ts
class StateHandler(object): class StateHandler(object):
@ -69,12 +69,15 @@ class StateHandler(object):
def start_caching(self): def start_caching(self):
logger.debug("start_caching") logger.debug("start_caching")
self._state_cache = {} self._state_cache = ExpiringCache(
cache_name="state_cache",
clock=self.clock,
max_len=SIZE_OF_CACHE,
expiry_ms=EVICTION_TIMEOUT_SECONDS*1000,
reset_expiry_on_get=True,
)
def f(): self._state_cache.start()
self._prune_cache()
self.clock.looping_call(f, 5*1000)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""): def get_current_state(self, room_id, event_type=None, state_key=""):
@ -409,34 +412,3 @@ class StateHandler(object):
return -int(e.depth), hashlib.sha1(e.event_id).hexdigest() return -int(e.depth), hashlib.sha1(e.event_id).hexdigest()
return sorted(events, key=key_func) return sorted(events, key=key_func)
def _prune_cache(self):
logger.debug(
"_prune_cache. before len: %d",
len(self._state_cache.keys())
)
now = self.clock.time_msec()
if len(self._state_cache.keys()) > SIZE_OF_CACHE:
sorted_entries = sorted(
self._state_cache.items(),
key=lambda k, v: v.ts,
)
for k, _ in sorted_entries[SIZE_OF_CACHE:]:
self._state_cache.pop(k)
keys_to_delete = set()
for key, cache_entry in self._state_cache.items():
if now - cache_entry.ts > EVICTION_TIMEOUT_SECONDS*1000:
keys_to_delete.add(key)
for k in keys_to_delete:
self._state_cache.pop(k)
logger.debug(
"_prune_cache. after len: %d",
len(self._state_cache.keys())
)

View File

@ -99,8 +99,6 @@ class Clock(object):
except: except:
pass pass
return res
given_deferred.addCallbacks(callback=sucess, errback=err) given_deferred.addCallbacks(callback=sucess, errback=err)
timer = self.call_later(time_out, timed_out_fn) timer = self.call_later(time_out, timed_out_fn)

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
logger = logging.getLogger(__name__)
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False):
"""
Args:
cache_name (str): Name of this cache, used for logging.
clock (Clock)
max_len (int): Max size of dict. If the dict grows larger than this
then the oldest items get automatically evicted. Default is 0,
which indicates there is no max limit.
expiry_ms (int): How long before an item is evicted from the cache
in milliseconds. Default is 0, indicating items never get
evicted based on time.
reset_expiry_on_get (bool): If true, will reset the expiry time for
an item on access. Defaults to False.
"""
self._cache_name = cache_name
self._clock = clock
self._max_len = max_len
self._expiry_ms = expiry_ms
self._reset_expiry_on_get = reset_expiry_on_get
self._cache = {}
def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
return
def f():
self._prune_cache()
self._clock.looping_call(f, self._expiry_ms/2)
def __setitem__(self, key, value):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
# Evict if there are now too many items
if self._max_len and len(self._cache.keys()) > self._max_len:
sorted_entries = sorted(
self._cache.items(),
key=lambda k, v: v.time,
)
for k, _ in sorted_entries[self._max_len:]:
self._cache.pop(k)
def __getitem__(self, key):
entry = self._cache[key]
if self._reset_expiry_on_get:
entry.time = self._clock.time_msec()
return entry.value
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def _prune_cache(self):
if not self._expiry_ms:
# zero expiry time means don't expire. This should never get called
# since we have this check in start too.
return
begin_length = len(self._cache)
now = self._clock.time_msec()
keys_to_delete = set()
for key, cache_entry in self._cache.items():
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
for k in keys_to_delete:
self._cache.pop(k)
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
self._cache_name, begin_length, len(self._cache.keys())
)
class _CacheEntry(object):
def __init__(self, time, value):
self.time = time
self.value = value