mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Fix database performance of read/write worker locks (#16061)
We were seeing serialization errors when taking out multiple read locks. The transactions were retried, so isn't causing any failures. Introduced in #15782.
This commit is contained in:
parent
0377cb4fab
commit
eb0dbab15b
1
changelog.d/16061.misc
Normal file
1
changelog.d/16061.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix database performance of read/write worker locks.
|
@ -26,7 +26,6 @@ from synapse.storage.database import (
|
|||||||
LoggingDatabaseConnection,
|
LoggingDatabaseConnection,
|
||||||
LoggingTransaction,
|
LoggingTransaction,
|
||||||
)
|
)
|
||||||
from synapse.storage.engines import PostgresEngine
|
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
@ -96,6 +95,10 @@ class LockStore(SQLBaseStore):
|
|||||||
|
|
||||||
self._acquiring_locks: Set[Tuple[str, str]] = set()
|
self._acquiring_locks: Set[Tuple[str, str]] = set()
|
||||||
|
|
||||||
|
self._clock.looping_call(
|
||||||
|
self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
|
||||||
|
)
|
||||||
|
|
||||||
@wrap_as_background_process("LockStore._on_shutdown")
|
@wrap_as_background_process("LockStore._on_shutdown")
|
||||||
async def _on_shutdown(self) -> None:
|
async def _on_shutdown(self) -> None:
|
||||||
"""Called when the server is shutting down"""
|
"""Called when the server is shutting down"""
|
||||||
@ -216,6 +219,7 @@ class LockStore(SQLBaseStore):
|
|||||||
lock_name,
|
lock_name,
|
||||||
lock_key,
|
lock_key,
|
||||||
write,
|
write,
|
||||||
|
db_autocommit=True,
|
||||||
)
|
)
|
||||||
except self.database_engine.module.IntegrityError:
|
except self.database_engine.module.IntegrityError:
|
||||||
return None
|
return None
|
||||||
@ -233,60 +237,21 @@ class LockStore(SQLBaseStore):
|
|||||||
# `worker_read_write_locks` and seeing if that fails any
|
# `worker_read_write_locks` and seeing if that fails any
|
||||||
# constraints. If it doesn't then we have acquired the lock,
|
# constraints. If it doesn't then we have acquired the lock,
|
||||||
# otherwise we haven't.
|
# otherwise we haven't.
|
||||||
#
|
|
||||||
# Before that though we clear the table of any stale locks.
|
|
||||||
|
|
||||||
now = self._clock.time_msec()
|
now = self._clock.time_msec()
|
||||||
token = random_string(6)
|
token = random_string(6)
|
||||||
|
|
||||||
delete_sql = """
|
self.db_pool.simple_insert_txn(
|
||||||
DELETE FROM worker_read_write_locks
|
txn,
|
||||||
WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
|
table="worker_read_write_locks",
|
||||||
"""
|
values={
|
||||||
|
"lock_name": lock_name,
|
||||||
insert_sql = """
|
"lock_key": lock_key,
|
||||||
INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
|
"write_lock": write,
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
"instance_name": self._instance_name,
|
||||||
"""
|
"token": token,
|
||||||
|
"last_renewed_ts": now,
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
},
|
||||||
# For Postgres we can send these queries at the same time.
|
|
||||||
txn.execute(
|
|
||||||
delete_sql + ";" + insert_sql,
|
|
||||||
(
|
|
||||||
# DELETE args
|
|
||||||
now - _LOCK_TIMEOUT_MS,
|
|
||||||
lock_name,
|
|
||||||
lock_key,
|
|
||||||
# UPSERT args
|
|
||||||
lock_name,
|
|
||||||
lock_key,
|
|
||||||
write,
|
|
||||||
self._instance_name,
|
|
||||||
token,
|
|
||||||
now,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# For SQLite these need to be two queries.
|
|
||||||
txn.execute(
|
|
||||||
delete_sql,
|
|
||||||
(
|
|
||||||
now - _LOCK_TIMEOUT_MS,
|
|
||||||
lock_name,
|
|
||||||
lock_key,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
txn.execute(
|
|
||||||
insert_sql,
|
|
||||||
(
|
|
||||||
lock_name,
|
|
||||||
lock_key,
|
|
||||||
write,
|
|
||||||
self._instance_name,
|
|
||||||
token,
|
|
||||||
now,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
lock = Lock(
|
lock = Lock(
|
||||||
@ -351,6 +316,24 @@ class LockStore(SQLBaseStore):
|
|||||||
|
|
||||||
return locks
|
return locks
|
||||||
|
|
||||||
|
@wrap_as_background_process("_reap_stale_read_write_locks")
|
||||||
|
async def _reap_stale_read_write_locks(self) -> None:
|
||||||
|
delete_sql = """
|
||||||
|
DELETE FROM worker_read_write_locks
|
||||||
|
WHERE last_renewed_ts < ?
|
||||||
|
"""
|
||||||
|
|
||||||
|
def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
|
||||||
|
txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
|
||||||
|
if txn.rowcount:
|
||||||
|
logger.info("Reaped %d stale locks", txn.rowcount)
|
||||||
|
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"_reap_stale_read_write_locks",
|
||||||
|
reap_stale_read_write_locks_txn,
|
||||||
|
db_autocommit=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Lock:
|
class Lock:
|
||||||
"""An async context manager that manages an acquired lock, ensuring it is
|
"""An async context manager that manages an acquired lock, ensuring it is
|
||||||
|
@ -12,13 +12,14 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.internet.base import ReactorBase
|
from twisted.internet.base import ReactorBase
|
||||||
from twisted.internet.defer import Deferred
|
from twisted.internet.defer import Deferred
|
||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS
|
from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
@ -380,8 +381,8 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
|
|||||||
self.get_success(lock.__aenter__())
|
self.get_success(lock.__aenter__())
|
||||||
|
|
||||||
# Wait for ages with the lock, we should not be able to get the lock.
|
# Wait for ages with the lock, we should not be able to get the lock.
|
||||||
self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
|
for _ in range(0, 10):
|
||||||
self.pump()
|
self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000))
|
||||||
|
|
||||||
lock2 = self.get_success(
|
lock2 = self.get_success(
|
||||||
self.store.try_acquire_read_write_lock("name", "key", write=True)
|
self.store.try_acquire_read_write_lock("name", "key", write=True)
|
||||||
|
Loading…
Reference in New Issue
Block a user