Require SQLite >= 3.27.0 (#13760)

This commit is contained in:
David Robertson 2022-09-09 11:14:10 +01:00 committed by GitHub
parent 69fa29700e
commit f2d2481e56
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 103 additions and 205 deletions

View file

@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
if self.db_pool.engine.can_native_upsert:
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None
else:
# If we're on an old SQLite we emulate the above logic by first
# clearing out any existing stale locks and then upserting.
def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
sql = """
DELETE FROM worker_locks
WHERE
lock_name = ?
AND lock_key = ?
AND (last_renewed_ts < ? OR instance_name = ?)
"""
txn.execute(
sql,
(lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
)
inserted = self.db_pool.simple_upsert_txn_emulated(
txn,
table="worker_locks",
keyvalues={
"lock_name": lock_name,
"lock_key": lock_key,
},
values={},
insertion_values={
"token": token,
"last_renewed_ts": self._clock.time_msec(),
"instance_name": self._instance_name,
},
)
return inserted
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
# We take out the lock if either a) there is no row for the lock
# already, b) the existing row has timed out, or c) the row is
# for this instance (which means the process got killed and
# restarted)
sql = """
INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (lock_name, lock_key)
DO UPDATE
SET
token = EXCLUDED.token,
instance_name = EXCLUDED.instance_name,
last_renewed_ts = EXCLUDED.last_renewed_ts
WHERE
worker_locks.last_renewed_ts < ?
OR worker_locks.instance_name = EXCLUDED.instance_name
"""
txn.execute(
sql,
(
lock_name,
lock_key,
self._instance_name,
token,
now,
now - _LOCK_TIMEOUT_MS,
),
)
if not did_lock:
return None
# We only acquired the lock if we inserted or updated the table.
return bool(txn.rowcount)
did_lock = await self.db_pool.runInteraction(
"try_acquire_lock",
_try_acquire_lock_txn,
# We can autocommit here as we're executing a single query, this
# will avoid serialization errors.
db_autocommit=True,
)
if not did_lock:
return None
lock = Lock(
self._reactor,

View file

@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
absolute_updates = [
"%(field)s = EXCLUDED.%(field)s" % {"field": field}
for field in absolutes.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
relative_updates = [
"%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
% {"table": table, "field": field}
for field in additive_relatives.keys()
]
insert_cols = []
qargs = []
insert_cols = []
qargs = []
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
):
insert_cols.append(key)
qargs.append(val)
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
"key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self.db_pool.simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
self.db_pool.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
if current_row[key] is None:
current_row[key] = val
else:
current_row[key] += val
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
txn.execute(sql, qargs)
async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.

View file

@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval: how long until next retry in ms
"""
if self.database_engine.can_native_upsert:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as its a single upsert
)
else:
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_emulated,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)
await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as it's a single upsert
)
def _set_destination_retry_timings_native(
self,
@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_last_ts: int,
retry_interval: int,
) -> None:
assert self.database_engine.can_native_upsert
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#