Avoid locking account_data tables for upserts

This commit is contained in:
Richard van der Hoff 2017-11-16 18:07:01 +00:00
parent 624a8bbd67
commit c46139a17e

View File

@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
""" """
content_json = json.dumps(content) content_json = json.dumps(content)
def add_account_data_txn(txn, next_id): with self._account_data_id_gen.get_next() as next_id:
self._simple_upsert_txn( # no need to lock here as room_account_data has a unique constraint
txn, # on (user_id, room_id, account_data_type) so _simple_upsert will
# retry if there is a conflict.
yield self._simple_upsert(
desc="add_room_account_data",
table="room_account_data", table="room_account_data",
keyvalues={ keyvalues={
"user_id": user_id, "user_id": user_id,
@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
values={ values={
"stream_id": next_id, "stream_id": next_id,
"content": content_json, "content": content_json,
} },
lock=False,
) )
txn.call_after(
self._account_data_stream_cache.entity_has_changed,
user_id, next_id,
)
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id: # it's theoretically possible for the above to succeed and the
yield self.runInteraction( # below to fail - in which case we might reuse a stream id on
"add_room_account_data", add_account_data_txn, next_id # restart, and the above update might not get propagated. That
) # doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
result = self._account_data_id_gen.get_current_token() result = self._account_data_id_gen.get_current_token()
defer.returnValue(result) defer.returnValue(result)
@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
""" """
content_json = json.dumps(content) content_json = json.dumps(content)
def add_account_data_txn(txn, next_id): with self._account_data_id_gen.get_next() as next_id:
self._simple_upsert_txn( # no need to lock here as account_data has a unique constraint on
txn, # (user_id, account_data_type) so _simple_upsert will retry if
# there is a conflict.
yield self._simple_upsert(
desc="add_user_account_data",
table="account_data", table="account_data",
keyvalues={ keyvalues={
"user_id": user_id, "user_id": user_id,
@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
values={ values={
"stream_id": next_id, "stream_id": next_id,
"content": content_json, "content": content_json,
} },
lock=False,
) )
txn.call_after(
self._account_data_stream_cache.entity_has_changed, # it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(
user_id, next_id, user_id, next_id,
) )
txn.call_after(self.get_account_data_for_user.invalidate, (user_id,)) self.get_account_data_for_user.invalidate((user_id,))
txn.call_after( self.get_global_account_data_by_type_for_user.invalidate(
self.get_global_account_data_by_type_for_user.invalidate,
(account_data_type, user_id,) (account_data_type, user_id,)
) )
self._update_max_stream_id(txn, next_id)
with self._account_data_id_gen.get_next() as next_id:
yield self.runInteraction(
"add_user_account_data", add_account_data_txn, next_id
)
result = self._account_data_id_gen.get_current_token() result = self._account_data_id_gen.get_current_token()
defer.returnValue(result) defer.returnValue(result)
def _update_max_stream_id(self, txn, next_id): def _update_max_stream_id(self, next_id):
"""Update the max stream_id """Update the max stream_id
Args: Args:
txn: The database cursor
next_id(int): The the revision to advance to. next_id(int): The the revision to advance to.
""" """
update_max_id_sql = ( def _update(txn):
"UPDATE account_data_max_stream_id" update_max_id_sql = (
" SET stream_id = ?" "UPDATE account_data_max_stream_id"
" WHERE stream_id < ?" " SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
return self.runInteraction(
"update_account_data_max_stream_id",
_update,
) )
txn.execute(update_max_id_sql, (next_id, next_id))
@cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):