Reduce serialization errors in MultiWriterIdGen (#8456)

We call `_update_stream_positions_table_txn` a lot, which is an UPSERT
that can conflict in `REPEATABLE READ` isolation level. Instead of doing
a transaction consisting of a single query we may as well run it outside
of a transaction.
This commit is contained in:
Erik Johnston 2020-10-07 15:15:57 +01:00
parent d9b55bd830
commit fa8934b175
7 changed files with 109 additions and 5 deletions

1
changelog.d/8456.misc Normal file
View File

@ -0,0 +1 @@
Reduce number of serialization errors of `MultiWriterIdGenerator._update_table`.

View File

@ -403,6 +403,24 @@ class DatabasePool:
*args: Any, *args: Any,
**kwargs: Any **kwargs: Any
) -> R: ) -> R:
"""Start a new database transaction with the given connection.
Note: The given func may be called multiple times under certain
failure modes. This is normally fine when in a standard transaction,
but care must be taken if the connection is in `autocommit` mode that
the function will correctly handle being aborted and retried half way
through its execution.
Args:
conn
desc
after_callbacks
exception_callbacks
func
*args
**kwargs
"""
start = monotonic_time() start = monotonic_time()
txn_id = self._TXN_ID txn_id = self._TXN_ID
@ -508,7 +526,12 @@ class DatabasePool:
sql_txn_timer.labels(desc).observe(duration) sql_txn_timer.labels(desc).observe(duration)
async def runInteraction( async def runInteraction(
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any self,
desc: str,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R: ) -> R:
"""Starts a transaction on the database and runs a given function """Starts a transaction on the database and runs a given function
@ -518,6 +541,18 @@ class DatabasePool:
database transaction (twisted.enterprise.adbapi.Transaction) as database transaction (twisted.enterprise.adbapi.Transaction) as
its first argument, followed by `args` and `kwargs`. its first argument, followed by `args` and `kwargs`.
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transactions
that are only a single query.
Currently, this is only implemented for Postgres. SQLite will still
run the function inside a transaction.
WARNING: This means that if func fails half way through then
the changes will *not* be rolled back. `func` may also get
called multiple times if the transaction is retried, so must
correctly handle that case.
args: positional args to pass to `func` args: positional args to pass to `func`
kwargs: named args to pass to `func` kwargs: named args to pass to `func`
@ -538,6 +573,7 @@ class DatabasePool:
exception_callbacks, exception_callbacks,
func, func,
*args, *args,
db_autocommit=db_autocommit,
**kwargs **kwargs
) )
@ -551,7 +587,11 @@ class DatabasePool:
return cast(R, result) return cast(R, result)
async def runWithConnection( async def runWithConnection(
self, func: "Callable[..., R]", *args: Any, **kwargs: Any self,
func: "Callable[..., R]",
*args: Any,
db_autocommit: bool = False,
**kwargs: Any
) -> R: ) -> R:
"""Wraps the .runWithConnection() method on the underlying db_pool. """Wraps the .runWithConnection() method on the underlying db_pool.
@ -560,6 +600,9 @@ class DatabasePool:
database connection (twisted.enterprise.adbapi.Connection) as database connection (twisted.enterprise.adbapi.Connection) as
its first argument, followed by `args` and `kwargs`. its first argument, followed by `args` and `kwargs`.
args: positional args to pass to `func` args: positional args to pass to `func`
db_autocommit: Whether to run the function in "autocommit" mode,
i.e. outside of a transaction. This is useful for transaction
that are only a single query. Currently only affects postgres.
kwargs: named args to pass to `func` kwargs: named args to pass to `func`
Returns: Returns:
@ -575,6 +618,13 @@ class DatabasePool:
start_time = monotonic_time() start_time = monotonic_time()
def inner_func(conn, *args, **kwargs): def inner_func(conn, *args, **kwargs):
# We shouldn't be in a transaction. If we are then something
# somewhere hasn't committed after doing work. (This is likely only
# possible during startup, as `run*` will ensure changes are
# committed/rolled back before putting the connection back in the
# pool).
assert not self.engine.in_transaction(conn)
with LoggingContext("runWithConnection", parent_context) as context: with LoggingContext("runWithConnection", parent_context) as context:
sched_duration_sec = monotonic_time() - start_time sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec) sql_scheduling_timer.observe(sched_duration_sec)
@ -584,7 +634,14 @@ class DatabasePool:
logger.debug("Reconnecting closed database connection") logger.debug("Reconnecting closed database connection")
conn.reconnect() conn.reconnect()
return func(conn, *args, **kwargs) try:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, True)
return func(conn, *args, **kwargs)
finally:
if db_autocommit:
self.engine.attempt_to_set_autocommit(conn, False)
return await make_deferred_yieldable( return await make_deferred_yieldable(
self._db_pool.runWithConnection(inner_func, *args, **kwargs) self._db_pool.runWithConnection(inner_func, *args, **kwargs)

View File

@ -97,3 +97,20 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
"""Gets a string giving the server version. For example: '3.22.0' """Gets a string giving the server version. For example: '3.22.0'
""" """
... ...
@abc.abstractmethod
def in_transaction(self, conn: Connection) -> bool:
"""Whether the connection is currently in a transaction.
"""
...
@abc.abstractmethod
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
"""Attempt to set the connections autocommit mode.
When True queries are run outside of transactions.
Note: This has no effect on SQLite3, so callers still need to
commit/rollback the connections.
"""
...

View File

@ -15,7 +15,8 @@
import logging import logging
from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from synapse.storage.types import Connection
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -119,6 +120,7 @@ class PostgresEngine(BaseDatabaseEngine):
cursor.execute("SET synchronous_commit TO OFF") cursor.execute("SET synchronous_commit TO OFF")
cursor.close() cursor.close()
db_conn.commit()
@property @property
def can_native_upsert(self): def can_native_upsert(self):
@ -171,3 +173,9 @@ class PostgresEngine(BaseDatabaseEngine):
return "%i.%i" % (numver / 10000, numver % 10000) return "%i.%i" % (numver / 10000, numver % 10000)
else: else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
def in_transaction(self, conn: Connection) -> bool:
return conn.status != self.module.extensions.STATUS_READY # type: ignore
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
return conn.set_session(autocommit=autocommit) # type: ignore

View File

@ -17,6 +17,7 @@ import threading
import typing import typing
from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Connection
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
import sqlite3 # noqa: F401 import sqlite3 # noqa: F401
@ -86,6 +87,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
db_conn.create_function("rank", 1, _rank) db_conn.create_function("rank", 1, _rank)
db_conn.execute("PRAGMA foreign_keys = ON;") db_conn.execute("PRAGMA foreign_keys = ON;")
db_conn.commit()
def is_deadlock(self, error): def is_deadlock(self, error):
return False return False
@ -105,6 +107,14 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
""" """
return "%i.%i.%i" % self.module.sqlite_version_info return "%i.%i.%i" % self.module.sqlite_version_info
def in_transaction(self, conn: Connection) -> bool:
return conn.in_transaction # type: ignore
def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
# Twisted doesn't let us set attributes on the connections, so we can't
# set the connection to autocommit mode.
pass
# Following functions taken from: https://github.com/coleifer/peewee # Following functions taken from: https://github.com/coleifer/peewee

View File

@ -24,6 +24,7 @@ from typing_extensions import Deque
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import PostgresSequenceGenerator from synapse.storage.util.sequence import PostgresSequenceGenerator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -552,7 +553,7 @@ class MultiWriterIdGenerator:
# do. # do.
break break
def _update_stream_positions_table_txn(self, txn): def _update_stream_positions_table_txn(self, txn: Cursor):
"""Update the `stream_positions` table with newly persisted position. """Update the `stream_positions` table with newly persisted position.
""" """
@ -602,10 +603,13 @@ class _MultiWriterCtxManager:
stream_ids = attr.ib(type=List[int], factory=list) stream_ids = attr.ib(type=List[int], factory=list)
async def __aenter__(self) -> Union[int, List[int]]: async def __aenter__(self) -> Union[int, List[int]]:
# It's safe to run this in autocommit mode as fetching values from a
# sequence ignores transaction semantics anyway.
self.stream_ids = await self.id_gen._db.runInteraction( self.stream_ids = await self.id_gen._db.runInteraction(
"_load_next_mult_id", "_load_next_mult_id",
self.id_gen._load_next_mult_id_txn, self.id_gen._load_next_mult_id_txn,
self.multiple_ids or 1, self.multiple_ids or 1,
db_autocommit=True,
) )
# Assert the fetched ID is actually greater than any ID we've already # Assert the fetched ID is actually greater than any ID we've already
@ -636,10 +640,16 @@ class _MultiWriterCtxManager:
# #
# We only do this on the success path so that the persisted current # We only do this on the success path so that the persisted current
# position points to a persisted row with the correct instance name. # position points to a persisted row with the correct instance name.
#
# We do this in autocommit mode as a) the upsert works correctly outside
# transactions and b) reduces the amount of time the rows are locked
# for. If we don't do this then we'll often hit serialization errors due
# to the fact we default to REPEATABLE READ isolation levels.
if self.id_gen._writers: if self.id_gen._writers:
await self.id_gen._db.runInteraction( await self.id_gen._db.runInteraction(
"MultiWriterIdGenerator._update_table", "MultiWriterIdGenerator._update_table",
self.id_gen._update_stream_positions_table_txn, self.id_gen._update_stream_positions_table_txn,
db_autocommit=True,
) )
return False return False

View File

@ -56,6 +56,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
engine = create_engine(sqlite_config) engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine) fake_engine = Mock(wraps=engine)
fake_engine.can_native_upsert = False fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine) db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
db._db_pool = self.db_pool db._db_pool = self.db_pool