Allow monthly active user limiting support for worker mode, fixes #4639. (#6742)

This commit is contained in:
Neil Johnson 2020-01-22 11:05:14 +00:00 committed by Erik Johnston
parent 5d7a6ad223
commit 5e52d8563b
6 changed files with 100 additions and 82 deletions

1
changelog.d/6742.bugfix Normal file
View File

@ -0,0 +1 @@
Fix monthly active user limiting support for worker mode, fixes [#4639](https://github.com/matrix-org/synapse/issues/4639).

View File

@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
SlavedTransactionStore, SlavedTransactionStore,
SlavedProfileStore, SlavedProfileStore,
SlavedClientIpStore, SlavedClientIpStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
pass pass

View File

@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
RoomStateEventRestServlet, RoomStateEventRestServlet,
) )
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
SlavedEventStore, SlavedEventStore,
SlavedRegistrationStore, SlavedRegistrationStore,
RoomStore, RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
pass pass

View File

@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
RoomStore, RoomStore,
DirectoryStore, DirectoryStore,
SlavedTransactionStore, SlavedTransactionStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
pass pass

View File

@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore,
)
from synapse.storage.data_stores.main.presence import UserPresenceState from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
SlavedEventStore, SlavedEventStore,
SlavedClientIpStore, SlavedClientIpStore,
RoomStore, RoomStore,
MonthlyActiveUsersWorkerStore,
BaseSlavedStore, BaseSlavedStore,
): ):
pass pass

View File

@ -27,12 +27,76 @@ logger = logging.getLogger(__name__)
LAST_SEEN_GRANULARITY = 60 * 60 * 1000 LAST_SEEN_GRANULARITY = 60 * 60 * 1000
class MonthlyActiveUsersStore(SQLBaseStore): class MonthlyActiveUsersWorkerStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs): def __init__(self, database: Database, db_conn, hs):
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs) super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.hs = hs self.hs = hs
@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
(count,) = txn.fetchone()
return count
return self.db.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
def get_registered_reserved_users(self):
"""Of the reserved threepids defined in config, which are associated
with registered users?
Returns:
Defered[list]: Real reserved users
"""
users = []
for tp in self.hs.config.mau_limits_reserved_threepids[
: self.hs.config.max_mau_value
]:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
users.append(user_id)
return users
@cached(num_args=1)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
user_id (str): user to add/update
Return:
Deferred[int] : timestamp since last seen, None if never seen
"""
return self.db.simple_select_one_onecol(
table="monthly_active_users",
keyvalues={"user_id": user_id},
retcol="timestamp",
allow_none=True,
desc="user_last_seen_monthly_active",
)
class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
# Do not add more reserved users than the total allowable number # Do not add more reserved users than the total allowable number
# cur = LoggingTransaction(
self.db.new_transaction( self.db.new_transaction(
db_conn, db_conn,
"initialise_mau_threepids", "initialise_mau_threepids",
@ -146,10 +210,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
txn.execute(sql, query_args) txn.execute(sql, query_args)
reserved_users = yield self.get_registered_reserved_users()
yield self.db.runInteraction(
"reap_monthly_active_users", _reap_users, reserved_users
)
# It seems poor to invalidate the whole cache, Postgres supports # It seems poor to invalidate the whole cache, Postgres supports
# 'Returning' which would allow me to invalidate only the # 'Returning' which would allow me to invalidate only the
# specific users, but sqlite has no way to do this and instead # specific users, but sqlite has no way to do this and instead
@ -157,46 +217,15 @@ class MonthlyActiveUsersStore(SQLBaseStore):
# is racy. # is racy.
# Have resolved to invalidate the whole cache for now and do # Have resolved to invalidate the whole cache for now and do
# something about it if and when the perf becomes significant # something about it if and when the perf becomes significant
self.user_last_seen_monthly_active.invalidate_all() self._invalidate_all_cache_and_stream(
self.get_monthly_active_count.invalidate_all() txn, self.user_last_seen_monthly_active
@cached(num_args=0)
def get_monthly_active_count(self):
"""Generates current count of monthly active users
Returns:
Defered[int]: Number of current monthly active users
"""
def _count_users(txn):
sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
txn.execute(sql)
(count,) = txn.fetchone()
return count
return self.db.runInteraction("count_users", _count_users)
@defer.inlineCallbacks
def get_registered_reserved_users(self):
"""Of the reserved threepids defined in config, which are associated
with registered users?
Returns:
Defered[list]: Real reserved users
"""
users = []
for tp in self.hs.config.mau_limits_reserved_threepids[
: self.hs.config.max_mau_value
]:
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
) )
if user_id: self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
users.append(user_id)
return users reserved_users = yield self.get_registered_reserved_users()
yield self.db.runInteraction(
"reap_monthly_active_users", _reap_users, reserved_users
)
@defer.inlineCallbacks @defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id): def upsert_monthly_active_user(self, user_id):
@ -222,23 +251,9 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
) )
user_in_mau = self.user_last_seen_monthly_active.cache.get(
(user_id,), None, update_metrics=False
)
if user_in_mau is None:
self.get_monthly_active_count.invalidate(())
self.user_last_seen_monthly_active.invalidate((user_id,))
def upsert_monthly_active_user_txn(self, txn, user_id): def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member """Updates or inserts monthly active user member
Note that, after calling this method, it will generally be necessary
to invalidate the caches on user_last_seen_monthly_active and
get_monthly_active_count. We can't do that here, because we are running
in a database thread rather than the main thread, and we can't call
txn.call_after because txn may not be a LoggingTransaction.
We consciously do not call is_support_txn from this method because it We consciously do not call is_support_txn from this method because it
is not possible to cache the response. is_support_txn will be false in is not possible to cache the response. is_support_txn will be false in
almost all cases, so it seems reasonable to call it only for almost all cases, so it seems reasonable to call it only for
@ -269,27 +284,13 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={"timestamp": int(self._clock.time_msec())}, values={"timestamp": int(self._clock.time_msec())},
) )
return is_insert self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
self._invalidate_cache_and_stream(
@cached(num_args=1) txn, self.user_last_seen_monthly_active, (user_id,)
def user_last_seen_monthly_active(self, user_id):
"""
Checks if a given user is part of the monthly active user group
Arguments:
user_id (str): user to add/update
Return:
Deferred[int] : timestamp since last seen, None if never seen
"""
return self.db.simple_select_one_onecol(
table="monthly_active_users",
keyvalues={"user_id": user_id},
retcol="timestamp",
allow_none=True,
desc="user_last_seen_monthly_active",
) )
return is_insert
@defer.inlineCallbacks @defer.inlineCallbacks
def populate_monthly_active_users(self, user_id): def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally """Checks on the state of monthly active user limits and optionally