implement reserved users for mau limits

This commit is contained in:
Neil Johnson 2018-08-07 17:49:43 +01:00
parent a74b25faaa
commit e8eba2b4e3
4 changed files with 99 additions and 13 deletions

View File

@ -518,6 +518,8 @@ def run(hs):
# If you increase the loop period, the accuracy of user_daily_visits # If you increase the loop period, the accuracy of user_daily_visits
# table will decrease # table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
# monthly active user limiting functionality
clock.looping_call( clock.looping_call(
hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
) )
@ -530,9 +532,13 @@ def run(hs):
current_mau_gauge.set(float(count)) current_mau_gauge.set(float(count))
max_mau_value_gauge.set(float(hs.config.max_mau_value)) max_mau_value_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(
hs.config.mau_limits_reserved_threepids
)
generate_monthly_active_users() generate_monthly_active_users()
if hs.config.limit_usage_by_mau: if hs.config.limit_usage_by_mau:
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
# End of monthly active user settings
if hs.config.report_stats: if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals") logger.info("Scheduling stats reporting for 3 hour intervals")

View File

@ -75,7 +75,7 @@ class ServerConfig(Config):
"max_mau_value", 0, "max_mau_value", 0,
) )
self.mau_limits_reserved_threepids = config.get( self.mau_limits_reserved_threepids = config.get(
"mau_limit_reserved_threepid", [] "mau_limit_reserved_threepids", []
) )
# FIXME: federation_domain_whitelist needs sytests # FIXME: federation_domain_whitelist needs sytests

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
from twisted.internet import defer from twisted.internet import defer
@ -19,6 +20,7 @@ from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
# Number of msec of granularity to store the monthly_active_user timestamp # Number of msec of granularity to store the monthly_active_user timestamp
# This means it is not necessary to update the table on every request # This means it is not necessary to update the table on every request
@ -26,24 +28,31 @@ LAST_SEEN_GRANULARITY = 60 * 60 * 1000
class MonthlyActiveUsersStore(SQLBaseStore): class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks
def __init__(self, dbconn, hs): def __init__(self, dbconn, hs):
super(MonthlyActiveUsersStore, self).__init__(None, hs) super(MonthlyActiveUsersStore, self).__init__(None, hs)
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.hs = hs self.hs = hs
threepids = self.hs.config.mau_limits_reserved_threepids self.reserved_users = ()
self.reserved_user_ids = set()
@defer.inlineCallbacks
def initialise_reserved_users(self, threepids):
# TODO Why can't I do this in init?
store = self.hs.get_datastore()
reserved_user_list = []
for tp in threepids: for tp in threepids:
user_id = yield hs.get_datastore().get_user_id_by_threepid( user_id = yield store.get_user_id_by_threepid(
tp["medium"], tp["address"] tp["medium"], tp["address"]
) )
if user_id: if user_id:
self.reserved_user_ids.add(user_id) self.upsert_monthly_active_user(user_id)
reserved_user_list.append(user_id)
else: else:
logger.warning( logger.warning(
"mau limit reserved threepid %s not found in db" % tp "mau limit reserved threepid %s not found in db" % tp
) )
self.reserved_users = tuple(reserved_user_list)
@defer.inlineCallbacks
def reap_monthly_active_users(self): def reap_monthly_active_users(self):
""" """
Cleans out monthly active user table to ensure that no stale Cleans out monthly active user table to ensure that no stale
@ -58,8 +67,20 @@ class MonthlyActiveUsersStore(SQLBaseStore):
int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
) )
# Purge stale users # Purge stale users
sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
txn.execute(sql, (thirty_days_ago,)) # questionmarks is a hack to overcome sqlite not supporting
# tuples in 'WHERE IN %s'
questionmarks = '?' * len(self.reserved_users)
query_args = [thirty_days_ago]
query_args.extend(self.reserved_users)
sql = """
DELETE FROM monthly_active_users
WHERE timestamp < ?
AND user_id NOT IN ({})
""".format(','.join(questionmarks))
txn.execute(sql, query_args)
# If MAU user count still exceeds the MAU threshold, then delete on # If MAU user count still exceeds the MAU threshold, then delete on
# a least recently active basis. # a least recently active basis.
@ -69,6 +90,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# While Postgres does not require 'LIMIT', but also does not support # While Postgres does not require 'LIMIT', but also does not support
# negative LIMIT values. So there is no way to write it that both can # negative LIMIT values. So there is no way to write it that both can
# support # support
query_args = [self.hs.config.max_mau_value]
query_args.extend(self.reserved_users)
sql = """ sql = """
DELETE FROM monthly_active_users DELETE FROM monthly_active_users
WHERE user_id NOT IN ( WHERE user_id NOT IN (
@ -76,8 +99,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
ORDER BY timestamp DESC ORDER BY timestamp DESC
LIMIT ? LIMIT ?
) )
""" AND user_id NOT IN ({})
txn.execute(sql, (self.hs.config.max_mau_value,)) """.format(','.join(questionmarks))
txn.execute(sql, query_args)
yield self.runInteraction("reap_monthly_active_users", _reap_users) yield self.runInteraction("reap_monthly_active_users", _reap_users)
# It seems poor to invalidate the whole cache, Postgres supports # It seems poor to invalidate the whole cache, Postgres supports
@ -136,7 +160,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Arguments: Arguments:
user_id (str): user to add/update user_id (str): user to add/update
Return: Return:
int : timestamp since last seen, None if never seen Deferred[int] : timestamp since last seen, None if never seen
""" """
@ -158,7 +182,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Args: Args:
user_id(str): the user_id to query user_id(str): the user_id to query
""" """
if self.hs.config.limit_usage_by_mau: if self.hs.config.limit_usage_by_mau:
last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id) last_seen_timestamp = yield self._user_last_seen_monthly_active(user_id)
now = self.hs.get_clock().time_msec() now = self.hs.get_clock().time_msec()

View File

@ -19,6 +19,8 @@ import tests.unittest
import tests.utils import tests.utils
from tests.utils import setup_test_homeserver from tests.utils import setup_test_homeserver
FORTY_DAYS = 40 * 24 * 60 * 60
class MonthlyActiveUsersTestCase(tests.unittest.TestCase): class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -29,6 +31,56 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
self.hs = yield setup_test_homeserver() self.hs = yield setup_test_homeserver()
self.store = self.hs.get_datastore() self.store = self.hs.get_datastore()
@defer.inlineCallbacks
def test_initialise_reserved_users(self):
user1 = "@user1:server"
user1_email = "user1@matrix.org"
user2 = "@user2:server"
user2_email = "user2@matrix.org"
threepids = [
{'medium': 'email', 'address': user1_email},
{'medium': 'email', 'address': user2_email}
]
user_num = len(threepids)
yield self.store.register(
user_id=user1,
token="123",
password_hash=None)
yield self.store.register(
user_id=user2,
token="456",
password_hash=None)
now = int(self.hs.get_clock().time_msec())
yield self.store.user_add_threepid(user1, "email", user1_email, now, now)
yield self.store.user_add_threepid(user2, "email", user2_email, now, now)
yield self.store.initialise_reserved_users(threepids)
active_count = yield self.store.get_monthly_active_count()
# Test total counts
self.assertEquals(active_count, user_num)
# Test user is marked as active
timestamp = yield self.store._user_last_seen_monthly_active(user1)
self.assertTrue(timestamp)
timestamp = yield self.store._user_last_seen_monthly_active(user2)
self.assertTrue(timestamp)
# Test that users are never removed from the db.
self.hs.config.max_mau_value = 0
self.hs.get_clock().advance_time(FORTY_DAYS)
yield self.store.reap_monthly_active_users()
active_count = yield self.store.get_monthly_active_count()
self.assertEquals(active_count, user_num)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_can_insert_and_count_mau(self): def test_can_insert_and_count_mau(self):
count = yield self.store.get_monthly_active_count() count = yield self.store.get_monthly_active_count()
@ -63,4 +115,9 @@ class MonthlyActiveUsersTestCase(tests.unittest.TestCase):
self.assertTrue(count, initial_users) self.assertTrue(count, initial_users)
yield self.store.reap_monthly_active_users() yield self.store.reap_monthly_active_users()
count = yield self.store.get_monthly_active_count() count = yield self.store.get_monthly_active_count()
self.assertTrue(count, initial_users - self.hs.config.max_mau_value) self.assertEquals(count, initial_users - self.hs.config.max_mau_value)
self.hs.get_clock().advance_time(FORTY_DAYS)
yield self.store.reap_monthly_active_users()
count = yield self.store.get_monthly_active_count()
self.assertEquals(count, 0)