Merge branch 'develop' into markjh/synchrotronII

This commit is contained in:
Mark Haines 2016-06-03 14:51:33 +01:00
commit dd6f62ed99
14 changed files with 196 additions and 127 deletions

View File

@ -89,7 +89,7 @@ class EmailConfig(Config):
# enable_notifs: false # enable_notifs: false
# smtp_host: "localhost" # smtp_host: "localhost"
# smtp_port: 25 # smtp_port: 25
# notif_from: Your Friendly Matrix Home Server <noreply@example.com> # notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
# app_name: Matrix # app_name: Matrix
# template_dir: res/templates # template_dir: res/templates
# notif_template_html: notif_mail.html # notif_template_html: notif_mail.html

View File

@ -140,7 +140,7 @@ class TypingHandler(object):
def user_left_room(self, user, room_id): def user_left_room(self, user, room_id):
user_id = user.to_string() user_id = user.to_string()
if self.is_mine_id(user_id): if self.is_mine_id(user_id):
member = RoomMember(room_id=room_id, user=user_id) member = RoomMember(room_id=room_id, user_id=user_id)
yield self._stopped_typing(member) yield self._stopped_typing(member)
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -33,11 +33,7 @@ from .metric import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# We'll keep all the available metrics in a single toplevel dict, one shared all_metrics = []
# for the entire process. We don't currently support per-HomeServer instances
# of metrics, because in practice any one python VM will host only one
# HomeServer anyway. This makes a lot of implementation neater
all_metrics = {}
class Metrics(object): class Metrics(object):
@ -53,7 +49,7 @@ class Metrics(object):
metric = metric_class(full_name, *args, **kwargs) metric = metric_class(full_name, *args, **kwargs)
all_metrics[full_name] = metric all_metrics.append(metric)
return metric return metric
def register_counter(self, *args, **kwargs): def register_counter(self, *args, **kwargs):
@ -84,12 +80,12 @@ def render_all():
# TODO(paul): Internal hack # TODO(paul): Internal hack
update_resource_metrics() update_resource_metrics()
for name in sorted(all_metrics.keys()): for metric in all_metrics:
try: try:
strs += all_metrics[name].render() strs += metric.render()
except Exception: except Exception:
strs += ["# FAILED to render %s" % name] strs += ["# FAILED to render"]
logger.exception("Failed to render %s metric", name) logger.exception("Failed to render metric")
strs.append("") # to generate a final CRLF strs.append("") # to generate a final CRLF

View File

@ -47,9 +47,6 @@ class BaseMetric(object):
for k, v in zip(self.labels, values)]) for k, v in zip(self.labels, values)])
) )
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CounterMetric(BaseMetric): class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing """The simplest kind of metric; one that stores a monotonically-increasing
@ -83,6 +80,9 @@ class CounterMetric(BaseMetric):
def render_item(self, k): def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CallbackMetric(BaseMetric): class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever """A metric that returns the numeric value returned by a callback whenever
@ -126,30 +126,30 @@ class DistributionMetric(object):
class CacheMetric(object): class CacheMetric(object):
"""A combination of two CounterMetrics, one to count cache hits and one to __slots__ = ("name", "cache_name", "hits", "misses", "size_callback")
count a total, and a callback metric to yield the current size.
This metric generates standard metric name pairs, so that monitoring rules def __init__(self, name, size_callback, cache_name):
can easily be applied to measure hit ratio."""
def __init__(self, name, size_callback, labels=[]):
self.name = name self.name = name
self.cache_name = cache_name
self.hits = CounterMetric(name + ":hits", labels=labels) self.hits = 0
self.total = CounterMetric(name + ":total", labels=labels) self.misses = 0
self.size = CallbackMetric( self.size_callback = size_callback
name + ":size",
callback=size_callback,
labels=labels,
)
def inc_hits(self, *values): def inc_hits(self):
self.hits.inc(*values) self.hits += 1
self.total.inc(*values)
def inc_misses(self, *values): def inc_misses(self):
self.total.inc(*values) self.misses += 1
def render(self): def render(self):
return self.hits.render() + self.total.render() + self.size.render() size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
]

View File

@ -99,7 +99,14 @@ class Mailer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def send_notification_mail(self, app_id, user_id, email_address, def send_notification_mail(self, app_id, user_id, email_address,
push_actions, reason): push_actions, reason):
raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1] try:
from_string = self.hs.config.email_notif_from % {
"app": self.app_name
}
except TypeError:
from_string = self.hs.config.email_notif_from
raw_from = email.utils.parseaddr(from_string)[1]
raw_to = email.utils.parseaddr(email_address)[1] raw_to = email.utils.parseaddr(email_address)[1]
if raw_to == '': if raw_to == '':

View File

@ -17,7 +17,7 @@ from twisted.internet import defer
from .appservice import ( from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore ApplicationServiceStore, ApplicationServiceTransactionStore
) )
from ._base import Cache, LoggingTransaction from ._base import LoggingTransaction
from .directory import DirectoryStore from .directory import DirectoryStore
from .events import EventsStore from .events import EventsStore
from .presence import PresenceStore, UserPresenceState from .presence import PresenceStore, UserPresenceState
@ -45,6 +45,7 @@ from .search import SearchStore
from .tags import TagsStore from .tags import TagsStore
from .account_data import AccountDataStore from .account_data import AccountDataStore
from .openid import OpenIdStore from .openid import OpenIdStore
from .client_ips import ClientIpStore
from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
@ -58,12 +59,6 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
class DataStore(RoomMemberStore, RoomStore, class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore, PresenceStore, TransactionStore,
@ -84,6 +79,7 @@ class DataStore(RoomMemberStore, RoomStore,
AccountDataStore, AccountDataStore,
EventPushActionsStore, EventPushActionsStore,
OpenIdStore, OpenIdStore,
ClientIpStore,
): ):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
@ -91,11 +87,6 @@ class DataStore(RoomMemberStore, RoomStore,
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.database_engine = hs.database_engine self.database_engine = hs.database_engine
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
self._stream_id_gen = StreamIdGenerator( self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering", db_conn, "events", "stream_ordering",
extra_tables=[("local_invites", "stream_id")] extra_tables=[("local_invites", "stream_id")]
@ -216,39 +207,6 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows] return [UserPresenceState(**row) for row in rows]
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
},
values={
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def count_daily_users(self): def count_daily_users(self):
""" """

View File

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import SQLBaseStore, Cache
from twisted.internet import defer
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
class ClientIpStore(SQLBaseStore):
def __init__(self, hs):
self.client_ip_last_seen = Cache(
name="client_ip_last_seen",
keylen=4,
)
super(ClientIpStore, self).__init__(hs)
@defer.inlineCallbacks
def insert_client_ip(self, user, access_token, ip, user_agent):
now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip)
try:
last_seen = self.client_ip_last_seen.get(key)
except KeyError:
last_seen = None
# Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None)
self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint,
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert(
"user_ips",
keyvalues={
"user_id": user.to_string(),
"access_token": access_token,
"ip": ip,
"user_agent": user_agent,
},
values={
"last_seen": now,
},
desc="insert_client_ip",
lock=False,
)

View File

@ -102,6 +102,15 @@ class ObservableDeferred(object):
def observers(self): def observers(self):
return self._observers return self._observers
def has_called(self):
return self._result is not None
def has_succeeded(self):
return self._result is not None and self._result[0] is True
def get_result(self):
return self._result[1]
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self._deferred, name) return getattr(self._deferred, name)

View File

@ -24,12 +24,22 @@ DEBUG_CACHES = False
metrics = synapse.metrics.get_metrics_for("synapse.util.caches") metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {} caches_by_name = {}
cache_counter = metrics.register_cache( # cache_counter = metrics.register_cache(
# "cache",
# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
# labels=["name"],
# )
def register_cache(name, cache):
caches_by_name[name] = cache
return metrics.register_cache(
"cache", "cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, lambda: len(cache),
labels=["name"], name,
) )
_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
caches_by_name["string_cache"] = _string_cache caches_by_name["string_cache"] = _string_cache

View File

@ -22,7 +22,7 @@ from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
) )
from . import caches_by_name, DEBUG_CACHES, cache_counter from . import DEBUG_CACHES, register_cache
from twisted.internet import defer from twisted.internet import defer
@ -33,6 +33,7 @@ import functools
import inspect import inspect
import threading import threading
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -43,6 +44,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
class Cache(object): class Cache(object):
__slots__ = (
"cache",
"max_entries",
"name",
"keylen",
"sequence",
"thread",
"metrics",
)
def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
if lru: if lru:
@ -59,7 +69,7 @@ class Cache(object):
self.keylen = keylen self.keylen = keylen
self.sequence = 0 self.sequence = 0
self.thread = None self.thread = None
caches_by_name[name] = self.cache self.metrics = register_cache(name, self.cache)
def check_thread(self): def check_thread(self):
expected_thread = self.thread expected_thread = self.thread
@ -74,10 +84,10 @@ class Cache(object):
def get(self, key, default=_CacheSentinel): def get(self, key, default=_CacheSentinel):
val = self.cache.get(key, _CacheSentinel) val = self.cache.get(key, _CacheSentinel)
if val is not _CacheSentinel: if val is not _CacheSentinel:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return val return val
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
if default is _CacheSentinel: if default is _CacheSentinel:
raise KeyError() raise KeyError()
@ -293,16 +303,21 @@ class CacheListDescriptor(object):
# cached is a dict arg -> deferred, where deferred results in a # cached is a dict arg -> deferred, where deferred results in a
# 2-tuple (`arg`, `result`) # 2-tuple (`arg`, `result`)
cached = {} results = {}
cached_defers = {}
missing = [] missing = []
for arg in list_args: for arg in list_args:
key = list(keyargs) key = list(keyargs)
key[self.list_pos] = arg key[self.list_pos] = arg
try: try:
res = cache.get(tuple(key)).observe() res = cache.get(tuple(key))
if not res.has_succeeded():
res = res.observe()
res.addCallback(lambda r, arg: (arg, r), arg) res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res cached_defers[arg] = res
else:
results[arg] = res.get_result()
except KeyError: except KeyError:
missing.append(arg) missing.append(arg)
@ -340,12 +355,21 @@ class CacheListDescriptor(object):
res = observer.observe() res = observer.observe()
res.addCallback(lambda r, arg: (arg, r), arg) res.addCallback(lambda r, arg: (arg, r), arg)
cached[arg] = res cached_defers[arg] = res
if cached_defers:
def update_results_dict(res):
results.update(res)
return results
return preserve_context_over_deferred(defer.gatherResults( return preserve_context_over_deferred(defer.gatherResults(
cached.values(), cached_defers.values(),
consumeErrors=True, consumeErrors=True,
).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) ).addCallback(update_results_dict).addErrback(
unwrapFirstError
))
else:
return results
obj.__dict__[self.orig.__name__] = wrapped obj.__dict__[self.orig.__name__] = wrapped

View File

@ -15,7 +15,7 @@
from synapse.util.caches.lrucache import LruCache from synapse.util.caches.lrucache import LruCache
from collections import namedtuple from collections import namedtuple
from . import caches_by_name, cache_counter from . import register_cache
import threading import threading
import logging import logging
@ -43,7 +43,7 @@ class DictionaryCache(object):
__slots__ = [] __slots__ = []
self.sentinel = Sentinel() self.sentinel = Sentinel()
caches_by_name[name] = self.cache self.metrics = register_cache(name, self.cache)
def check_thread(self): def check_thread(self):
expected_thread = self.thread expected_thread = self.thread
@ -58,7 +58,7 @@ class DictionaryCache(object):
def get(self, key, dict_keys=None): def get(self, key, dict_keys=None):
entry = self.cache.get(key, self.sentinel) entry = self.cache.get(key, self.sentinel)
if entry is not self.sentinel: if entry is not self.sentinel:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
if dict_keys is None: if dict_keys is None:
return DictionaryEntry(entry.full, dict(entry.value)) return DictionaryEntry(entry.full, dict(entry.value))
@ -69,7 +69,7 @@ class DictionaryCache(object):
if k in entry.value if k in entry.value
}) })
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return DictionaryEntry(False, {}) return DictionaryEntry(False, {})
def invalidate(self, key): def invalidate(self, key):

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches import register_cache
import logging import logging
@ -49,7 +49,7 @@ class ExpiringCache(object):
self._cache = {} self._cache = {}
caches_by_name[cache_name] = self._cache self.metrics = register_cache(cache_name, self._cache)
def start(self): def start(self):
if not self._expiry_ms: if not self._expiry_ms:
@ -78,9 +78,9 @@ class ExpiringCache(object):
def __getitem__(self, key): def __getitem__(self, key):
try: try:
entry = self._cache[key] entry = self._cache[key]
cache_counter.inc_hits(self._cache_name) self.metrics.inc_hits()
except KeyError: except KeyError:
cache_counter.inc_misses(self._cache_name) self.metrics.inc_misses()
raise raise
if self._reset_expiry_on_get: if self._reset_expiry_on_get:

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.util.caches import cache_counter, caches_by_name from synapse.util.caches import register_cache
from blist import sorteddict from blist import sorteddict
@ -42,7 +42,7 @@ class StreamChangeCache(object):
self._cache = sorteddict() self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos self._earliest_known_stream_pos = current_stream_pos
self.name = name self.name = name
caches_by_name[self.name] = self._cache self.metrics = register_cache(self.name, self._cache)
for entity, stream_pos in prefilled_cache.items(): for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos) self.entity_has_changed(entity, stream_pos)
@ -53,19 +53,19 @@ class StreamChangeCache(object):
assert type(stream_pos) is int assert type(stream_pos) is int
if stream_pos < self._earliest_known_stream_pos: if stream_pos < self._earliest_known_stream_pos:
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return True return True
latest_entity_change_pos = self._entity_to_key.get(entity, None) latest_entity_change_pos = self._entity_to_key.get(entity, None)
if latest_entity_change_pos is None: if latest_entity_change_pos is None:
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return False return False
if stream_pos < latest_entity_change_pos: if stream_pos < latest_entity_change_pos:
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return True return True
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
return False return False
def get_entities_changed(self, entities, stream_pos): def get_entities_changed(self, entities, stream_pos):
@ -82,10 +82,10 @@ class StreamChangeCache(object):
self._cache[k] for k in keys[i:] self._cache[k] for k in keys[i:]
).intersection(entities) ).intersection(entities)
cache_counter.inc_hits(self.name) self.metrics.inc_hits()
else: else:
result = entities result = entities
cache_counter.inc_misses(self.name) self.metrics.inc_misses()
return result return result

View File

@ -61,9 +61,6 @@ class CounterMetricTestCase(unittest.TestCase):
'vector{method="PUT"} 1', 'vector{method="PUT"} 1',
]) ])
# Check that passing too few values errors
self.assertRaises(ValueError, counter.inc)
class CallbackMetricTestCase(unittest.TestCase): class CallbackMetricTestCase(unittest.TestCase):
@ -138,27 +135,27 @@ class CacheMetricTestCase(unittest.TestCase):
def test_cache(self): def test_cache(self):
d = dict() d = dict()
metric = CacheMetric("cache", lambda: len(d)) metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 0', 'cache:hits{name="cache_name"} 0',
'cache:total 0', 'cache:total{name="cache_name"} 0',
'cache:size 0', 'cache:size{name="cache_name"} 0',
]) ])
metric.inc_misses() metric.inc_misses()
d["key"] = "value" d["key"] = "value"
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 0', 'cache:hits{name="cache_name"} 0',
'cache:total 1', 'cache:total{name="cache_name"} 1',
'cache:size 1', 'cache:size{name="cache_name"} 1',
]) ])
metric.inc_hits() metric.inc_hits()
self.assertEquals(metric.render(), [ self.assertEquals(metric.render(), [
'cache:hits 1', 'cache:hits{name="cache_name"} 1',
'cache:total 2', 'cache:total{name="cache_name"} 2',
'cache:size 1', 'cache:size{name="cache_name"} 1',
]) ])