Merge pull request #4081 from matrix-org/neilj/fix_mau_init

fix race condiftion in calling initialise_reserved_users
This commit is contained in:
Neil Johnson 2018-10-25 16:33:40 +01:00 committed by GitHub
commit 95ad128851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 33 deletions

2
changelog.d/4081.bugfix Normal file
View File

@ -0,0 +1,2 @@
Fix race condition where config defined reserved users were not being added to
the monthly active user list prior to the homeserver reactor firing up

View File

@ -553,14 +553,6 @@ def run(hs):
generate_monthly_active_users, generate_monthly_active_users,
) )
# XXX is this really supposed to be a background process? it looks
# like it needs to complete before some of the other stuff runs.
run_as_background_process(
"initialise_reserved_users",
hs.get_datastore().initialise_reserved_users,
hs.config.mau_limits_reserved_threepids,
)
start_generate_monthly_active_users() start_generate_monthly_active_users()
if hs.config.limit_usage_by_mau: if hs.config.limit_usage_by_mau:
clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)

View File

@ -207,6 +207,7 @@ class HomeServer(object):
logger.info("Setting up.") logger.info("Setting up.")
with self.get_db_conn() as conn: with self.get_db_conn() as conn:
self.datastore = self.DATASTORE_CLASS(conn, self) self.datastore = self.DATASTORE_CLASS(conn, self)
conn.commit()
logger.info("Finished setting up.") logger.info("Finished setting up.")
def get_reactor(self): def get_reactor(self):

View File

@ -33,19 +33,29 @@ class MonthlyActiveUsersStore(SQLBaseStore):
self._clock = hs.get_clock() self._clock = hs.get_clock()
self.hs = hs self.hs = hs
self.reserved_users = () self.reserved_users = ()
# Do not add more reserved users than the total allowable number
self._initialise_reserved_users(
dbconn.cursor(),
hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
)
@defer.inlineCallbacks def _initialise_reserved_users(self, txn, threepids):
def initialise_reserved_users(self, threepids): """Ensures that reserved threepids are accounted for in the MAU table, should
store = self.hs.get_datastore() be called on start up.
Args:
txn (cursor):
threepids (list[dict]): List of threepid dicts to reserve
"""
reserved_user_list = [] reserved_user_list = []
# Do not add more reserved users than the total allowable number for tp in threepids:
for tp in threepids[:self.hs.config.max_mau_value]: user_id = self.get_user_id_by_threepid_txn(
user_id = yield store.get_user_id_by_threepid( txn,
tp["medium"], tp["address"] tp["medium"], tp["address"]
) )
if user_id: if user_id:
yield self.upsert_monthly_active_user(user_id) self.upsert_monthly_active_user_txn(txn, user_id)
reserved_user_list.append(user_id) reserved_user_list.append(user_id)
else: else:
logger.warning( logger.warning(
@ -55,8 +65,7 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def reap_monthly_active_users(self): def reap_monthly_active_users(self):
""" """Cleans out monthly active user table to ensure that no stale
Cleans out monthly active user table to ensure that no stale
entries exist. entries exist.
Returns: Returns:
@ -165,19 +174,44 @@ class MonthlyActiveUsersStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def upsert_monthly_active_user(self, user_id): def upsert_monthly_active_user(self, user_id):
"""Updates or inserts the user into the monthly active user table, which
is used to track the current MAU usage of the server
Args:
user_id (str): user to add/update
""" """
Updates or inserts monthly active user member is_insert = yield self.runInteraction(
Arguments: "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
user_id (str): user to add/update user_id
Deferred[bool]: True if a new entry was created, False if an )
existing one was updated.
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
self.get_monthly_active_count.invalidate(())
def upsert_monthly_active_user_txn(self, txn, user_id):
"""Updates or inserts monthly active user member
Note that, after calling this method, it will generally be necessary
to invalidate the caches on user_last_seen_monthly_active and
get_monthly_active_count. We can't do that here, because we are running
in a database thread rather than the main thread, and we can't call
txn.call_after because txn may not be a LoggingTransaction.
Args:
txn (cursor):
user_id (str): user to add/update
Returns:
bool: True if a new entry was created, False if an
existing one was updated.
""" """
# Am consciously deciding to lock the table on the basis that is ought # Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple # never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity. # upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more # See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert( is_insert = self._simple_upsert_txn(
desc="upsert_monthly_active_user", txn,
table="monthly_active_users", table="monthly_active_users",
keyvalues={ keyvalues={
"user_id": user_id, "user_id": user_id,
@ -186,9 +220,8 @@ class MonthlyActiveUsersStore(SQLBaseStore):
"timestamp": int(self._clock.time_msec()), "timestamp": int(self._clock.time_msec()),
}, },
) )
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,)) return is_insert
self.get_monthly_active_count.invalidate(())
@cached(num_args=1) @cached(num_args=1)
def user_last_seen_monthly_active(self, user_id): def user_last_seen_monthly_active(self, user_id):

View File

@ -474,17 +474,44 @@ class RegistrationStore(RegistrationWorkerStore,
@defer.inlineCallbacks @defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address): def get_user_id_by_threepid(self, medium, address):
ret = yield self._simple_select_one( """Returns user id from threepid
Args:
medium (str): threepid medium e.g. email
address (str): threepid address e.g. me@example.com
Returns:
Deferred[str|None]: user id or None if no user id/threepid mapping exists
"""
user_id = yield self.runInteraction(
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn,
medium, address
)
defer.returnValue(user_id)
def get_user_id_by_threepid_txn(self, txn, medium, address):
"""Returns user id from threepid
Args:
txn (cursor):
medium (str): threepid medium e.g. email
address (str): threepid address e.g. me@example.com
Returns:
str|None: user id or None if no user id/threepid mapping exists
"""
ret = self._simple_select_one_txn(
txn,
"user_threepids", "user_threepids",
{ {
"medium": medium, "medium": medium,
"address": address "address": address
}, },
['user_id'], True, 'get_user_id_by_threepid' ['user_id'], True
) )
if ret: if ret:
defer.returnValue(ret['user_id']) return ret['user_id']
defer.returnValue(None) return None
def user_delete_threepid(self, user_id, medium, address): def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete( return self._simple_delete(

View File

@ -52,7 +52,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
now = int(self.hs.get_clock().time_msec()) now = int(self.hs.get_clock().time_msec())
self.store.user_add_threepid(user1, "email", user1_email, now, now) self.store.user_add_threepid(user1, "email", user1_email, now, now)
self.store.user_add_threepid(user2, "email", user2_email, now, now) self.store.user_add_threepid(user2, "email", user2_email, now, now)
self.store.initialise_reserved_users(threepids)
self.store.runInteraction(
"initialise", self.store._initialise_reserved_users, threepids
)
self.pump() self.pump()
active_count = self.store.get_monthly_active_count() active_count = self.store.get_monthly_active_count()
@ -199,7 +202,10 @@ class MonthlyActiveUsersTestCase(HomeserverTestCase):
{'medium': 'email', 'address': user2_email}, {'medium': 'email', 'address': user2_email},
] ]
self.hs.config.mau_limits_reserved_threepids = threepids self.hs.config.mau_limits_reserved_threepids = threepids
self.store.initialise_reserved_users(threepids) self.store.runInteraction(
"initialise", self.store._initialise_reserved_users, threepids
)
self.pump() self.pump()
count = self.store.get_registered_reserved_users_count() count = self.store.get_registered_reserved_users_count()
self.assertEquals(self.get_success(count), 0) self.assertEquals(self.get_success(count), 0)