mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-11-10 14:24:57 -05:00
Merge pull request #3633 from matrix-org/neilj/mau_tracker
API for monthly_active_users table
This commit is contained in:
commit
990fe9fc23
18 changed files with 414 additions and 163 deletions
|
|
@ -213,7 +213,7 @@ class Auth(object):
|
|||
default=[b""]
|
||||
)[0]
|
||||
if user and access_token and ip_addr:
|
||||
self.store.insert_client_ip(
|
||||
yield self.store.insert_client_ip(
|
||||
user_id=user.to_string(),
|
||||
access_token=access_token,
|
||||
ip=ip_addr,
|
||||
|
|
@ -773,3 +773,15 @@ class Auth(object):
|
|||
raise AuthError(
|
||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def check_auth_blocking(self):
|
||||
"""Checks if the user should be rejected for some external reason,
|
||||
such as monthly active user limiting or global disable flag
|
||||
"""
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
current_mau = yield self.store.get_monthly_active_count()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise AuthError(
|
||||
403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
|
||||
)
|
||||
|
|
|
|||
|
|
@ -518,12 +518,15 @@ def run(hs):
|
|||
# If you increase the loop period, the accuracy of user_daily_visits
|
||||
# table will decrease
|
||||
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
|
||||
clock.looping_call(
|
||||
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def generate_monthly_active_users():
|
||||
count = 0
|
||||
if hs.config.limit_usage_by_mau:
|
||||
count = yield hs.get_datastore().count_monthly_users()
|
||||
count = yield hs.get_datastore().get_monthly_active_count()
|
||||
current_mau_gauge.set(float(count))
|
||||
max_mau_value_gauge.set(float(hs.config.max_mau_value))
|
||||
|
||||
|
|
|
|||
|
|
@ -69,12 +69,12 @@ class ServerConfig(Config):
|
|||
|
||||
# Options to control access by tracking MAU
|
||||
self.limit_usage_by_mau = config.get("limit_usage_by_mau", False)
|
||||
self.max_mau_value = 0
|
||||
if self.limit_usage_by_mau:
|
||||
self.max_mau_value = config.get(
|
||||
"max_mau_value", 0,
|
||||
)
|
||||
else:
|
||||
self.max_mau_value = 0
|
||||
|
||||
# FIXME: federation_domain_whitelist needs sytests
|
||||
self.federation_domain_whitelist = None
|
||||
federation_domain_whitelist = config.get(
|
||||
|
|
|
|||
|
|
@ -520,7 +520,7 @@ class AuthHandler(BaseHandler):
|
|||
"""
|
||||
logger.info("Logging in user %s on device %s", user_id, device_id)
|
||||
access_token = yield self.issue_access_token(user_id, device_id)
|
||||
yield self._check_mau_limits()
|
||||
yield self.auth.check_auth_blocking()
|
||||
|
||||
# the device *should* have been registered before we got here; however,
|
||||
# it's possible we raced against a DELETE operation. The thing we
|
||||
|
|
@ -734,7 +734,7 @@ class AuthHandler(BaseHandler):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def validate_short_term_login_token_and_get_user_id(self, login_token):
|
||||
yield self._check_mau_limits()
|
||||
yield self.auth.check_auth_blocking()
|
||||
auth_api = self.hs.get_auth()
|
||||
user_id = None
|
||||
try:
|
||||
|
|
@ -907,19 +907,6 @@ class AuthHandler(BaseHandler):
|
|||
else:
|
||||
return defer.succeed(False)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_mau_limits(self):
|
||||
"""
|
||||
Ensure that if mau blocking is enabled that invalid users cannot
|
||||
log in.
|
||||
"""
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
current_mau = yield self.store.count_monthly_users()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise AuthError(
|
||||
403, "MAU Limit Exceeded", errcode=Codes.MAU_LIMIT_EXCEEDED
|
||||
)
|
||||
|
||||
|
||||
@attr.s
|
||||
class MacaroonGenerator(object):
|
||||
|
|
|
|||
|
|
@ -540,9 +540,7 @@ class RegistrationHandler(BaseHandler):
|
|||
Do not accept registrations if monthly active user limits exceeded
|
||||
and limiting is enabled
|
||||
"""
|
||||
if self.hs.config.limit_usage_by_mau is True:
|
||||
current_mau = yield self.store.count_monthly_users()
|
||||
if current_mau >= self.hs.config.max_mau_value:
|
||||
raise RegistrationError(
|
||||
403, "MAU Limit Exceeded", Codes.MAU_LIMIT_EXCEEDED
|
||||
)
|
||||
try:
|
||||
yield self.auth.check_auth_blocking()
|
||||
except AuthError as e:
|
||||
raise RegistrationError(e.code, str(e), e.errcode)
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ from .filtering import FilteringStore
|
|||
from .group_server import GroupServerStore
|
||||
from .keys import KeyStore
|
||||
from .media_repository import MediaRepositoryStore
|
||||
from .monthly_active_users import MonthlyActiveUsersStore
|
||||
from .openid import OpenIdStore
|
||||
from .presence import PresenceStore, UserPresenceState
|
||||
from .profile import ProfileStore
|
||||
|
|
@ -87,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
UserDirectoryStore,
|
||||
GroupServerStore,
|
||||
UserErasureStore,
|
||||
MonthlyActiveUsersStore,
|
||||
):
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
|
|
@ -94,7 +96,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
self._clock = hs.get_clock()
|
||||
self.database_engine = hs.database_engine
|
||||
|
||||
self.db_conn = db_conn
|
||||
self._stream_id_gen = StreamIdGenerator(
|
||||
db_conn, "events", "stream_ordering",
|
||||
extra_tables=[("local_invites", "stream_id")]
|
||||
|
|
@ -267,31 +268,6 @@ class DataStore(RoomMemberStore, RoomStore,
|
|||
|
||||
return self.runInteraction("count_users", _count_users)
|
||||
|
||||
def count_monthly_users(self):
|
||||
"""Counts the number of users who used this homeserver in the last 30 days
|
||||
|
||||
This method should be refactored with count_daily_users - the only
|
||||
reason not to is waiting on definition of mau
|
||||
|
||||
Returns:
|
||||
Defered[int]
|
||||
"""
|
||||
def _count_monthly_users(txn):
|
||||
thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
sql = """
|
||||
SELECT COALESCE(count(*), 0) FROM (
|
||||
SELECT user_id FROM user_ips
|
||||
WHERE last_seen > ?
|
||||
GROUP BY user_id
|
||||
) u
|
||||
"""
|
||||
|
||||
txn.execute(sql, (thirty_days_ago,))
|
||||
count, = txn.fetchone()
|
||||
return count
|
||||
|
||||
return self.runInteraction("count_monthly_users", _count_monthly_users)
|
||||
|
||||
def count_r30_users(self):
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ LAST_SEEN_GRANULARITY = 120 * 1000
|
|||
|
||||
class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen",
|
||||
keylen=4,
|
||||
|
|
@ -74,6 +75,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
"before", "shutdown", self._update_client_ips_batch
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
|
||||
now=None):
|
||||
if not now:
|
||||
|
|
@ -84,7 +86,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
last_seen = self.client_ip_last_seen.get(key)
|
||||
except KeyError:
|
||||
last_seen = None
|
||||
|
||||
yield self.populate_monthly_active_users(user_id)
|
||||
# Rate-limited inserts
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
return
|
||||
|
|
|
|||
162
synapse/storage/monthly_active_users.py
Normal file
162
synapse/storage/monthly_active_users.py
Normal file
|
|
@ -0,0 +1,162 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2018 New Vector
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
# Number of msec of granularity to store the monthly_active_user timestamp
|
||||
# This means it is not necessary to update the table on every request
|
||||
LAST_SEEN_GRANULARITY = 60 * 60 * 1000
|
||||
|
||||
|
||||
class MonthlyActiveUsersStore(SQLBaseStore):
|
||||
def __init__(self, dbconn, hs):
|
||||
super(MonthlyActiveUsersStore, self).__init__(None, hs)
|
||||
self._clock = hs.get_clock()
|
||||
self.hs = hs
|
||||
|
||||
def reap_monthly_active_users(self):
|
||||
"""
|
||||
Cleans out monthly active user table to ensure that no stale
|
||||
entries exist.
|
||||
|
||||
Returns:
|
||||
Deferred[]
|
||||
"""
|
||||
def _reap_users(txn):
|
||||
|
||||
thirty_days_ago = (
|
||||
int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
|
||||
)
|
||||
# Purge stale users
|
||||
sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
|
||||
txn.execute(sql, (thirty_days_ago,))
|
||||
|
||||
# If MAU user count still exceeds the MAU threshold, then delete on
|
||||
# a least recently active basis.
|
||||
# Note it is not possible to write this query using OFFSET due to
|
||||
# incompatibilities in how sqlite and postgres support the feature.
|
||||
# sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
|
||||
# While Postgres does not require 'LIMIT', but also does not support
|
||||
# negative LIMIT values. So there is no way to write it that both can
|
||||
# support
|
||||
sql = """
|
||||
DELETE FROM monthly_active_users
|
||||
WHERE user_id NOT IN (
|
||||
SELECT user_id FROM monthly_active_users
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT ?
|
||||
)
|
||||
"""
|
||||
txn.execute(sql, (self.hs.config.max_mau_value,))
|
||||
|
||||
yield self.runInteraction("reap_monthly_active_users", _reap_users)
|
||||
# It seems poor to invalidate the whole cache, Postgres supports
|
||||
# 'Returning' which would allow me to invalidate only the
|
||||
# specific users, but sqlite has no way to do this and instead
|
||||
# I would need to SELECT and the DELETE which without locking
|
||||
# is racy.
|
||||
# Have resolved to invalidate the whole cache for now and do
|
||||
# something about it if and when the perf becomes significant
|
||||
self._user_last_seen_monthly_active.invalidate_all()
|
||||
self.get_monthly_active_count.invalidate_all()
|
||||
|
||||
@cached(num_args=0)
|
||||
def get_monthly_active_count(self):
|
||||
"""Generates current count of monthly active users
|
||||
|
||||
Returns:
|
||||
Defered[int]: Number of current monthly active users
|
||||
"""
|
||||
|
||||
def _count_users(txn):
|
||||
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
|
||||
|
||||
txn.execute(sql)
|
||||
count, = txn.fetchone()
|
||||
return count
|
||||
return self.runInteraction("count_users", _count_users)
|
||||
|
||||
def upsert_monthly_active_user(self, user_id):
|
||||
"""
|
||||
Updates or inserts monthly active user member
|
||||
Arguments:
|
||||
user_id (str): user to add/update
|
||||
Deferred[bool]: True if a new entry was created, False if an
|
||||
existing one was updated.
|
||||
"""
|
||||
is_insert = self._simple_upsert(
|
||||
desc="upsert_monthly_active_user",
|
||||
table="monthly_active_users",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
},
|
||||
values={
|
||||
"timestamp": int(self._clock.time_msec()),
|
||||
},
|
||||
lock=False,
|
||||
)
|
||||
if is_insert:
|
||||
self._user_last_seen_monthly_active.invalidate((user_id,))
|
||||
self.get_monthly_active_count.invalidate(())
|
||||
|
||||
@cached(num_args=1)
|
||||
def _user_last_seen_monthly_active(self, user_id):
|
||||
"""
|
||||
Checks if a given user is part of the monthly active user group
|
||||
Arguments:
|
||||
user_id (str): user to add/update
|
||||
Return:
|
||||
int : timestamp since last seen, None if never seen
|
||||
|
||||
"""
|
||||
|
||||
return(self._simple_select_one_onecol(
|
||||
table="monthly_active_users",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
},
|
||||
retcol="timestamp",
|
||||
allow_none=True,
|
||||
desc="_user_last_seen_monthly_active",
|
||||
))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def populate_monthly_active_users(self, user_id):
|
||||
"""Checks on the state of monthly active user limits and optionally
|
||||
add the user to the monthly active tables
|
||||
|
||||
Args:
|
||||
user_id(str): the user_id to query
|
||||
"""
|
||||
|
||||
if self.hs.config.limit_usage_by_mau:
|
||||
last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id)
|
||||
now = self.hs.get_clock().time_msec()
|
||||
|
||||
# We want to reduce to the total number of db writes, and are happy
|
||||
# to trade accuracy of timestamp in order to lighten load. This means
|
||||
# We always insert new users (where MAU threshold has not been reached),
|
||||
# but only update if we have not previously seen the user for
|
||||
# LAST_SEEN_GRANULARITY ms
|
||||
if last_seen_timestamp is None:
|
||||
count = yield self.get_monthly_active_count()
|
||||
if count < self.hs.config.max_mau_value:
|
||||
yield self.upsert_monthly_active_user(user_id)
|
||||
elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
|
||||
yield self.upsert_monthly_active_user(user_id)
|
||||
|
|
@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
# Remember to update this number every time a change is made to database
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
SCHEMA_VERSION = 50
|
||||
SCHEMA_VERSION = 51
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
|||
27
synapse/storage/schema/delta/51/monthly_active_users.sql
Normal file
27
synapse/storage/schema/delta/51/monthly_active_users.sql
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
/* Copyright 2018 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- a table of monthly active users, for use where blocking based on mau limits
|
||||
CREATE TABLE monthly_active_users (
|
||||
user_id TEXT NOT NULL,
|
||||
-- Last time we saw the user. Not guaranteed to be accurate due to rate limiting
|
||||
-- on updates, Granularity of updates governed by
|
||||
-- synapse.storage.monthly_active_users.LAST_SEEN_GRANULARITY
|
||||
-- Measured in ms since epoch.
|
||||
timestamp BIGINT NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX monthly_active_users_users ON monthly_active_users(user_id);
|
||||
CREATE INDEX monthly_active_users_time_stamp ON monthly_active_users(timestamp);
|
||||
Loading…
Add table
Add a link
Reference in a new issue