Remove option to skip locking of tables during emulated upserts (#14469)

To perform an emulated upsert into a table safely, we must either:
 * lock the table,
 * be the only writer upserting into the table
 * or rely on another unique index being present.

When the 2nd or 3rd cases were applicable, we previously avoided locking
the table as an optimization. However, as seen in #14406, it is easy to
slip up when adding new schema deltas and corrupt the database.

The only time we lock when performing emulated upserts is while waiting
for background updates on postgres. On sqlite, we do no locking at all.

Let's remove the option to skip locking tables, so that we don't shoot
ourselves in the foot again.

Signed-off-by: Sean Quah <seanq@matrix.org>
This commit is contained in:
Sean Quah 2022-11-28 13:42:06 +00:00 committed by GitHub
parent 2dad42a9fb
commit f792dd74e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 19 additions and 74 deletions

1
changelog.d/14469.misc Normal file
View File

@ -0,0 +1 @@
Remove option to skip locking of tables when performing emulated upserts, to avoid a class of bugs in future.

View File

@ -1129,7 +1129,6 @@ class DatabasePool:
values: Dict[str, Any], values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None, insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert", desc: str = "simple_upsert",
lock: bool = True,
) -> bool: ) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values. """Insert a row with values + insertion_values; on conflict, update with values.
@ -1154,21 +1153,12 @@ class DatabasePool:
requiring that a unique index exist on the column names used to detect a requiring that a unique index exist on the column names used to detect a
conflict (i.e. `keyvalues.keys()`). conflict (i.e. `keyvalues.keys()`).
If there is no such index, we can "emulate" an upsert with a SELECT followed If there is no such index yet[*], we can "emulate" an upsert with a SELECT
by either an INSERT or an UPDATE. This is unsafe: we cannot make the same followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters
atomicity guarantees that a native upsert can and are very vulnerable to races run at the SERIALIZABLE isolation level: we cannot make the same atomicity
and crashes. Therefore if we wish to upsert without an appropriate unique index, guarantees that a native upsert can and are very vulnerable to races and
we must either: crashes. Therefore to upsert without an appropriate unique index, we acquire a
table-level lock before the emulated upsert.
1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
2. VERY CAREFULLY ensure that we are the only thread and worker which will be
writing to this table, in which case we can proceed without a lock
(`lock=False`).
Generally speaking, you should use `lock=True`. If the table in question has a
unique index[*], this class will use a native upsert (which is atomic and so can
ignore the `lock` argument). Otherwise this class will use an emulated upsert,
in which case we want the safer option unless we been VERY CAREFUL.
[*]: Some tables have unique indices added to them in the background. Those [*]: Some tables have unique indices added to them in the background. Those
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES, tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
@ -1189,7 +1179,6 @@ class DatabasePool:
values: The nonunique columns and their new values values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
desc: description of the transaction, for logging and metrics desc: description of the transaction, for logging and metrics
lock: True to lock the table when doing the upsert.
Returns: Returns:
Returns True if a row was inserted or updated (i.e. if `values` is Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True) not empty then this always returns True)
@ -1209,7 +1198,6 @@ class DatabasePool:
keyvalues, keyvalues,
values, values,
insertion_values, insertion_values,
lock=lock,
db_autocommit=autocommit, db_autocommit=autocommit,
) )
except self.engine.module.IntegrityError as e: except self.engine.module.IntegrityError as e:
@ -1232,7 +1220,6 @@ class DatabasePool:
values: Dict[str, Any], values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None, insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None, where_clause: Optional[str] = None,
lock: bool = True,
) -> bool: ) -> bool:
""" """
Pick the UPSERT method which works best on the platform. Either the Pick the UPSERT method which works best on the platform. Either the
@ -1245,8 +1232,6 @@ class DatabasePool:
values: The nonunique columns and their new values values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert. where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
Returns: Returns:
Returns True if a row was inserted or updated (i.e. if `values` is Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True) not empty then this always returns True)
@ -1270,7 +1255,6 @@ class DatabasePool:
values, values,
insertion_values=insertion_values, insertion_values=insertion_values,
where_clause=where_clause, where_clause=where_clause,
lock=lock,
) )
def simple_upsert_txn_emulated( def simple_upsert_txn_emulated(
@ -1291,14 +1275,15 @@ class DatabasePool:
insertion_values: additional key/values to use only when inserting insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert. where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert. lock: True to lock the table when doing the upsert.
Must not be False unless the table has already been locked.
Returns: Returns:
Returns True if a row was inserted or updated (i.e. if `values` is Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True) not empty then this always returns True)
""" """
insertion_values = insertion_values or {} insertion_values = insertion_values or {}
# We need to lock the table :(, unless we're *really* careful
if lock: if lock:
# We need to lock the table :(
self.engine.lock_table(txn, table) self.engine.lock_table(txn, table)
def _getwhere(key: str) -> str: def _getwhere(key: str) -> str:
@ -1406,7 +1391,6 @@ class DatabasePool:
value_names: Collection[str], value_names: Collection[str],
value_values: Collection[Collection[Any]], value_values: Collection[Collection[Any]],
desc: str, desc: str,
lock: bool = True,
) -> None: ) -> None:
""" """
Upsert, many times. Upsert, many times.
@ -1418,8 +1402,6 @@ class DatabasePool:
value_names: The value column names value_names: The value column names
value_values: A list of each row's value column values. value_values: A list of each row's value column values.
Ignored if value_names is empty. Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
""" """
# We can autocommit if it safe to upsert # We can autocommit if it safe to upsert
@ -1433,7 +1415,6 @@ class DatabasePool:
key_values, key_values,
value_names, value_names,
value_values, value_values,
lock=lock,
db_autocommit=autocommit, db_autocommit=autocommit,
) )
@ -1445,7 +1426,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]], key_values: Collection[Iterable[Any]],
value_names: Collection[str], value_names: Collection[str],
value_values: Iterable[Iterable[Any]], value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None: ) -> None:
""" """
Upsert, many times. Upsert, many times.
@ -1457,8 +1437,6 @@ class DatabasePool:
value_names: The value column names value_names: The value column names
value_values: A list of each row's value column values. value_values: A list of each row's value column values.
Ignored if value_names is empty. Ignored if value_names is empty.
lock: True to lock the table when doing the upsert. Unused when performing
a native upsert.
""" """
if table not in self._unsafe_to_upsert_tables: if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert( return self.simple_upsert_many_txn_native_upsert(
@ -1466,7 +1444,12 @@ class DatabasePool:
) )
else: else:
return self.simple_upsert_many_txn_emulated( return self.simple_upsert_many_txn_emulated(
txn, table, key_names, key_values, value_names, value_values, lock=lock txn,
table,
key_names,
key_values,
value_names,
value_values,
) )
def simple_upsert_many_txn_emulated( def simple_upsert_many_txn_emulated(
@ -1477,7 +1460,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]], key_values: Collection[Iterable[Any]],
value_names: Collection[str], value_names: Collection[str],
value_values: Iterable[Iterable[Any]], value_values: Iterable[Iterable[Any]],
lock: bool = True,
) -> None: ) -> None:
""" """
Upsert, many times, but without native UPSERT support or batching. Upsert, many times, but without native UPSERT support or batching.
@ -1489,14 +1471,12 @@ class DatabasePool:
value_names: The value column names value_names: The value column names
value_values: A list of each row's value column values. value_values: A list of each row's value column values.
Ignored if value_names is empty. Ignored if value_names is empty.
lock: True to lock the table when doing the upsert.
""" """
# No value columns, therefore make a blank list so that the following # No value columns, therefore make a blank list so that the following
# zip() works correctly. # zip() works correctly.
if not value_names: if not value_names:
value_values = [() for x in range(len(key_values))] value_values = [() for x in range(len(key_values))]
if lock:
# Lock the table just once, to prevent it being done once per row. # Lock the table just once, to prevent it being done once per row.
# Note that, according to Postgres' documentation, once obtained, # Note that, according to Postgres' documentation, once obtained,
# the lock is held for the remainder of the current transaction. # the lock is held for the remainder of the current transaction.

View File

@ -449,9 +449,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
content_json = json_encoder.encode(content) content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id: async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
await self.db_pool.simple_upsert( await self.db_pool.simple_upsert(
desc="add_room_account_data", desc="add_room_account_data",
table="room_account_data", table="room_account_data",
@ -461,7 +458,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"account_data_type": account_data_type, "account_data_type": account_data_type,
}, },
values={"stream_id": next_id, "content": content_json}, values={"stream_id": next_id, "content": content_json},
lock=False,
) )
self._account_data_stream_cache.entity_has_changed(user_id, next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@ -517,15 +513,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) -> None: ) -> None:
content_json = json_encoder.encode(content) content_json = json_encoder.encode(content)
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
self.db_pool.simple_upsert_txn( self.db_pool.simple_upsert_txn(
txn, txn,
table="account_data", table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type}, keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json}, values={"stream_id": next_id, "content": content_json},
lock=False,
) )
# Ignored users get denormalized into a separate table as an optimisation. # Ignored users get denormalized into a separate table as an optimisation.

View File

@ -451,8 +451,6 @@ class ApplicationServiceTransactionWorkerStore(
table="application_services_state", table="application_services_state",
keyvalues={"as_id": service.id}, keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos}, values={f"{stream_type}_stream_id": pos},
# no need to lock when emulating upsert: as_id is a unique key
lock=False,
desc="set_appservice_stream_type_pos", desc="set_appservice_stream_type_pos",
) )

View File

@ -1744,9 +1744,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_cache", table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id}, keyvalues={"user_id": user_id, "device_id": device_id},
values={"content": json_encoder.encode(content)}, values={"content": json_encoder.encode(content)},
# we don't need to lock, because we assume we are the only thread
# updating this user's devices.
lock=False,
) )
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id)) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
@ -1760,9 +1757,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties", table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
values={"stream_id": stream_id}, values={"stream_id": stream_id},
# again, we can assume we are the only thread updating this user's
# extremity.
lock=False,
) )
async def update_remote_device_list_cache( async def update_remote_device_list_cache(
@ -1815,9 +1809,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties", table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
values={"stream_id": stream_id}, values={"stream_id": stream_id},
# we don't need to lock, because we can assume we are the only thread
# updating this user's extremity.
lock=False,
) )
async def add_device_change_to_streams( async def add_device_change_to_streams(

View File

@ -1686,7 +1686,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
}, },
insertion_values={}, insertion_values={},
desc="insert_insertion_extremity", desc="insert_insertion_extremity",
lock=False,
) )
async def insert_received_event_to_staging( async def insert_received_event_to_staging(

View File

@ -325,14 +325,11 @@ class PusherWorkerStore(SQLBaseStore):
async def set_throttle_params( async def set_throttle_params(
self, pusher_id: str, room_id: str, params: ThrottleParams self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None: ) -> None:
# no need to lock because `pusher_throttle` has a primary key on
# (pusher, room_id) so simple_upsert will retry
await self.db_pool.simple_upsert( await self.db_pool.simple_upsert(
"pusher_throttle", "pusher_throttle",
{"pusher": pusher_id, "room_id": room_id}, {"pusher": pusher_id, "room_id": room_id},
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms}, {"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params", desc="set_throttle_params",
lock=False,
) )
async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int: async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
@ -589,8 +586,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
device_id: Optional[str] = None, device_id: Optional[str] = None,
) -> None: ) -> None:
async with self._pushers_id_gen.get_next() as stream_id: async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert( await self.db_pool.simple_upsert(
table="pushers", table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
@ -609,7 +604,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"device_id": device_id, "device_id": device_id,
}, },
desc="add_pusher", desc="add_pusher",
lock=False,
) )
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate( user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(

View File

@ -1847,9 +1847,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": room_creator, "creator": room_creator,
"has_auth_chain_index": has_auth_chain_index, "has_auth_chain_index": has_auth_chain_index,
}, },
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
) )
async def store_partial_state_room( async def store_partial_state_room(
@ -1970,9 +1967,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": "", "creator": "",
"has_auth_chain_index": has_auth_chain_index, "has_auth_chain_index": has_auth_chain_index,
}, },
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
) )
async def set_room_is_public(self, room_id: str, is_public: bool) -> None: async def set_room_is_public(self, room_id: str, is_public: bool) -> None:

View File

@ -44,6 +44,4 @@ class RoomBatchStore(SQLBaseStore):
table="event_to_state_groups", table="event_to_state_groups",
keyvalues={"event_id": event_id}, keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id}, values={"state_group": state_group_id, "event_id": event_id},
# Unique constraint on event_id so we don't have to lock
lock=False,
) )

View File

@ -481,7 +481,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory", table="user_directory",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url}, values={"display_name": display_name, "avatar_url": avatar_url},
lock=False, # We're only inserter
) )
if isinstance(self.database_engine, PostgresEngine): if isinstance(self.database_engine, PostgresEngine):
@ -511,7 +510,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory_search", table="user_directory_search",
keyvalues={"user_id": user_id}, keyvalues={"user_id": user_id},
values={"value": value}, values={"value": value},
lock=False, # We're only inserter
) )
else: else:
# This should be unreachable. # This should be unreachable.