2020-07-16 05:55:51 -04:00
|
|
|
#
|
2023-11-21 15:29:58 -05:00
|
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
|
|
#
|
2024-01-23 06:26:48 -05:00
|
|
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
2023-11-21 15:29:58 -05:00
|
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as
|
|
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
|
|
# License, or (at your option) any later version.
|
|
|
|
#
|
|
|
|
# See the GNU Affero General Public License for more details:
|
|
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
|
|
#
|
|
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
|
|
#
|
|
|
|
# [This file includes modifications made by New Vector Limited]
|
2020-07-16 05:55:51 -04:00
|
|
|
#
|
|
|
|
#
|
|
|
|
import abc
|
2020-09-28 13:00:30 -04:00
|
|
|
import logging
|
2020-07-16 05:55:51 -04:00
|
|
|
import threading
|
2021-01-14 12:19:35 -05:00
|
|
|
from typing import TYPE_CHECKING, Callable, List, Optional
|
2020-07-16 05:55:51 -04:00
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
from synapse.storage.engines import (
|
|
|
|
BaseDatabaseEngine,
|
|
|
|
IncorrectDatabaseSetup,
|
|
|
|
PostgresEngine,
|
|
|
|
)
|
|
|
|
from synapse.storage.types import Connection, Cursor
|
|
|
|
|
2021-01-14 12:19:35 -05:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
from synapse.storage.database import LoggingDatabaseConnection
|
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
2021-01-21 10:09:09 -05:00
|
|
|
_INCONSISTENT_STREAM_ERROR = """
|
|
|
|
Postgres sequence '%(seq)s' is inconsistent with associated stream position
|
|
|
|
of '%(stream_name)s' in the 'stream_positions' table.
|
|
|
|
|
|
|
|
This is likely a programming error and should be reported at
|
|
|
|
https://github.com/matrix-org/synapse.
|
|
|
|
|
|
|
|
A temporary workaround to fix this error is to shut down Synapse (including
|
|
|
|
any and all workers) and run the following SQL:
|
|
|
|
|
|
|
|
DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
|
|
|
|
|
|
|
|
This will need to be done every time the server is restarted.
|
|
|
|
"""
|
|
|
|
|
2020-07-16 05:55:51 -04:00
|
|
|
|
|
|
|
class SequenceGenerator(metaclass=abc.ABCMeta):
|
|
|
|
"""A class which generates a unique sequence of integers"""
|
|
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
|
def get_next_id_txn(self, txn: Cursor) -> int:
|
|
|
|
"""Gets the next ID in the sequence"""
|
|
|
|
...
|
|
|
|
|
2021-01-21 12:00:12 -05:00
|
|
|
@abc.abstractmethod
|
|
|
|
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
|
|
|
|
"""Get the next `n` IDs in the sequence"""
|
|
|
|
...
|
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
@abc.abstractmethod
|
|
|
|
def check_consistency(
|
2020-10-02 10:20:45 -04:00
|
|
|
self,
|
2021-01-14 12:19:35 -05:00
|
|
|
db_conn: "LoggingDatabaseConnection",
|
2020-10-02 10:20:45 -04:00
|
|
|
table: str,
|
|
|
|
id_column: str,
|
2021-01-21 10:09:09 -05:00
|
|
|
stream_name: Optional[str] = None,
|
2020-10-02 10:20:45 -04:00
|
|
|
positive: bool = True,
|
2021-10-08 10:25:16 -04:00
|
|
|
) -> None:
|
2020-09-28 13:00:30 -04:00
|
|
|
"""Should be called during start up to test that the current value of
|
|
|
|
the sequence is greater than or equal to the maximum ID in the table.
|
|
|
|
|
2021-01-21 10:09:09 -05:00
|
|
|
This is to handle various cases where the sequence value can get out of
|
|
|
|
sync with the table, e.g. if Synapse gets rolled back to a previous
|
2020-09-28 13:00:30 -04:00
|
|
|
version and the rolled forwards again.
|
2021-01-21 10:09:09 -05:00
|
|
|
|
|
|
|
If a stream name is given then this will check that any value in the
|
|
|
|
`stream_positions` table is less than or equal to the current sequence
|
|
|
|
value. If it isn't then it's likely that streams have been crossed
|
|
|
|
somewhere (e.g. two ID generators have the same stream name).
|
2020-09-28 13:00:30 -04:00
|
|
|
"""
|
|
|
|
...
|
|
|
|
|
2020-07-16 05:55:51 -04:00
|
|
|
|
|
|
|
class PostgresSequenceGenerator(SequenceGenerator):
|
|
|
|
"""An implementation of SequenceGenerator which uses a postgres sequence"""
|
|
|
|
|
|
|
|
def __init__(self, sequence_name: str):
|
|
|
|
self._sequence_name = sequence_name
|
|
|
|
|
|
|
|
def get_next_id_txn(self, txn: Cursor) -> int:
|
|
|
|
txn.execute("SELECT nextval(?)", (self._sequence_name,))
|
2021-02-05 15:39:19 -05:00
|
|
|
fetch_res = txn.fetchone()
|
|
|
|
assert fetch_res is not None
|
|
|
|
return fetch_res[0]
|
2020-07-16 05:55:51 -04:00
|
|
|
|
2020-08-25 12:32:30 -04:00
|
|
|
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
|
|
|
|
txn.execute(
|
|
|
|
"SELECT nextval(?) FROM generate_series(1, ?)", (self._sequence_name, n)
|
|
|
|
)
|
|
|
|
return [i for (i,) in txn]
|
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
def check_consistency(
|
2020-10-02 10:20:45 -04:00
|
|
|
self,
|
2021-01-14 12:19:35 -05:00
|
|
|
db_conn: "LoggingDatabaseConnection",
|
2020-10-02 10:20:45 -04:00
|
|
|
table: str,
|
|
|
|
id_column: str,
|
2021-01-21 10:09:09 -05:00
|
|
|
stream_name: Optional[str] = None,
|
2020-10-02 10:20:45 -04:00
|
|
|
positive: bool = True,
|
2021-10-08 10:25:16 -04:00
|
|
|
) -> None:
|
2021-01-21 10:09:09 -05:00
|
|
|
"""See SequenceGenerator.check_consistency for docstring."""
|
|
|
|
|
2020-10-02 10:20:45 -04:00
|
|
|
txn = db_conn.cursor(txn_name="sequence.check_consistency")
|
2020-09-28 13:00:30 -04:00
|
|
|
|
|
|
|
# First we get the current max ID from the table.
|
|
|
|
table_sql = "SELECT GREATEST(%(agg)s(%(id)s), 0) FROM %(table)s" % {
|
|
|
|
"id": id_column,
|
|
|
|
"table": table,
|
|
|
|
"agg": "MAX" if positive else "-MIN",
|
|
|
|
}
|
|
|
|
|
|
|
|
txn.execute(table_sql)
|
|
|
|
row = txn.fetchone()
|
|
|
|
if not row:
|
|
|
|
# Table is empty, so nothing to do.
|
|
|
|
txn.close()
|
|
|
|
return
|
|
|
|
|
|
|
|
# Now we fetch the current value from the sequence and compare with the
|
|
|
|
# above.
|
|
|
|
max_stream_id = row[0]
|
|
|
|
txn.execute(
|
|
|
|
"SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
|
|
|
|
)
|
2021-02-05 15:39:19 -05:00
|
|
|
fetch_res = txn.fetchone()
|
|
|
|
assert fetch_res is not None
|
|
|
|
last_value, is_called = fetch_res
|
2021-01-21 10:09:09 -05:00
|
|
|
|
|
|
|
# If we have an associated stream check the stream_positions table.
|
|
|
|
max_in_stream_positions = None
|
|
|
|
if stream_name:
|
|
|
|
txn.execute(
|
|
|
|
"SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
|
|
|
|
(stream_name,),
|
|
|
|
)
|
|
|
|
row = txn.fetchone()
|
|
|
|
if row:
|
|
|
|
max_in_stream_positions = row[0]
|
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
# If `is_called` is False then `last_value` is actually the value that
|
|
|
|
# will be generated next, so we decrement to get the true "last value".
|
|
|
|
if not is_called:
|
|
|
|
last_value -= 1
|
|
|
|
|
|
|
|
if max_stream_id > last_value:
|
2024-06-14 11:40:29 -04:00
|
|
|
# The sequence is lagging behind the tables. This is probably due to
|
|
|
|
# rolling back to a version before the sequence was used and then
|
|
|
|
# forwards again. We resolve this by setting the sequence to the
|
|
|
|
# right value.
|
2020-09-28 13:00:30 -04:00
|
|
|
logger.warning(
|
2024-06-14 11:40:29 -04:00
|
|
|
"Postgres sequence %s is behind table %s: %d < %d. Updating sequence.",
|
2020-10-12 10:51:41 -04:00
|
|
|
self._sequence_name,
|
|
|
|
table,
|
2020-09-28 13:00:30 -04:00
|
|
|
last_value,
|
|
|
|
max_stream_id,
|
|
|
|
)
|
2024-06-14 11:40:29 -04:00
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
SELECT setval('{self._sequence_name}', GREATEST(
|
|
|
|
(SELECT last_value FROM {self._sequence_name}),
|
|
|
|
({table_sql})
|
|
|
|
));
|
|
|
|
"""
|
|
|
|
txn.execute(sql)
|
|
|
|
|
|
|
|
txn.close()
|
2020-09-28 13:00:30 -04:00
|
|
|
|
2021-01-21 10:09:09 -05:00
|
|
|
# If we have values in the stream positions table then they have to be
|
|
|
|
# less than or equal to `last_value`
|
|
|
|
if max_in_stream_positions and max_in_stream_positions > last_value:
|
|
|
|
raise IncorrectDatabaseSetup(
|
|
|
|
_INCONSISTENT_STREAM_ERROR
|
2021-01-21 11:03:25 -05:00
|
|
|
% {"seq": self._sequence_name, "stream_name": stream_name}
|
2021-01-21 10:09:09 -05:00
|
|
|
)
|
|
|
|
|
2020-07-16 05:55:51 -04:00
|
|
|
|
|
|
|
GetFirstCallbackType = Callable[[Cursor], int]
|
|
|
|
|
|
|
|
|
|
|
|
class LocalSequenceGenerator(SequenceGenerator):
|
|
|
|
"""An implementation of SequenceGenerator which uses local locking
|
|
|
|
|
|
|
|
This only works reliably if there are no other worker processes generating IDs at
|
|
|
|
the same time.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, get_first_callback: GetFirstCallbackType):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
get_first_callback: a callback which is called on the first call to
|
2023-03-02 13:27:00 -05:00
|
|
|
get_next_id_txn; should return the current maximum id
|
2020-07-16 05:55:51 -04:00
|
|
|
"""
|
|
|
|
# the callback. this is cleared after it is called, so that it can be GCed.
|
2021-07-15 12:46:54 -04:00
|
|
|
self._callback: Optional[GetFirstCallbackType] = get_first_callback
|
2020-07-16 05:55:51 -04:00
|
|
|
|
|
|
|
# The current max value, or None if we haven't looked in the DB yet.
|
2021-07-15 12:46:54 -04:00
|
|
|
self._current_max_id: Optional[int] = None
|
2020-07-16 05:55:51 -04:00
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
|
|
|
def get_next_id_txn(self, txn: Cursor) -> int:
|
|
|
|
# We do application locking here since if we're using sqlite then
|
|
|
|
# we are a single process synapse.
|
|
|
|
with self._lock:
|
|
|
|
if self._current_max_id is None:
|
|
|
|
assert self._callback is not None
|
|
|
|
self._current_max_id = self._callback(txn)
|
|
|
|
self._callback = None
|
|
|
|
|
|
|
|
self._current_max_id += 1
|
|
|
|
return self._current_max_id
|
|
|
|
|
2021-01-21 12:00:12 -05:00
|
|
|
def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
|
|
|
|
with self._lock:
|
|
|
|
if self._current_max_id is None:
|
|
|
|
assert self._callback is not None
|
|
|
|
self._current_max_id = self._callback(txn)
|
|
|
|
self._callback = None
|
|
|
|
|
|
|
|
first_id = self._current_max_id + 1
|
|
|
|
self._current_max_id += n
|
|
|
|
return [first_id + i for i in range(n)]
|
|
|
|
|
2020-09-28 13:00:30 -04:00
|
|
|
def check_consistency(
|
2021-01-21 10:09:09 -05:00
|
|
|
self,
|
|
|
|
db_conn: Connection,
|
|
|
|
table: str,
|
|
|
|
id_column: str,
|
|
|
|
stream_name: Optional[str] = None,
|
|
|
|
positive: bool = True,
|
2021-10-08 10:25:16 -04:00
|
|
|
) -> None:
|
2020-09-28 13:00:30 -04:00
|
|
|
# There is nothing to do for in memory sequences
|
|
|
|
pass
|
|
|
|
|
2020-07-16 05:55:51 -04:00
|
|
|
|
|
|
|
def build_sequence_generator(
|
2021-02-24 05:13:53 -05:00
|
|
|
db_conn: "LoggingDatabaseConnection",
|
2020-07-16 05:55:51 -04:00
|
|
|
database_engine: BaseDatabaseEngine,
|
|
|
|
get_first_callback: GetFirstCallbackType,
|
|
|
|
sequence_name: str,
|
2021-02-24 05:13:53 -05:00
|
|
|
table: Optional[str],
|
|
|
|
id_column: Optional[str],
|
|
|
|
stream_name: Optional[str] = None,
|
|
|
|
positive: bool = True,
|
2020-07-16 05:55:51 -04:00
|
|
|
) -> SequenceGenerator:
|
|
|
|
"""Get the best impl of SequenceGenerator available
|
|
|
|
|
|
|
|
This uses PostgresSequenceGenerator on postgres, and a locally-locked impl on
|
|
|
|
sqlite.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
database_engine: the database engine we are connected to
|
|
|
|
get_first_callback: a callback which gets the next sequence ID. Used if
|
|
|
|
we're on sqlite.
|
|
|
|
sequence_name: the name of a postgres sequence to use.
|
2021-02-24 05:13:53 -05:00
|
|
|
table, id_column, stream_name, positive: If set then `check_consistency`
|
|
|
|
is called on the created sequence. See docstring for
|
|
|
|
`check_consistency` details.
|
2020-07-16 05:55:51 -04:00
|
|
|
"""
|
|
|
|
if isinstance(database_engine, PostgresEngine):
|
2021-07-15 12:46:54 -04:00
|
|
|
seq: SequenceGenerator = PostgresSequenceGenerator(sequence_name)
|
2020-07-16 05:55:51 -04:00
|
|
|
else:
|
2021-02-24 05:13:53 -05:00
|
|
|
seq = LocalSequenceGenerator(get_first_callback)
|
|
|
|
|
|
|
|
if table:
|
|
|
|
assert id_column
|
|
|
|
seq.check_consistency(
|
|
|
|
db_conn=db_conn,
|
|
|
|
table=table,
|
|
|
|
id_column=id_column,
|
|
|
|
stream_name=stream_name,
|
|
|
|
positive=positive,
|
|
|
|
)
|
|
|
|
|
|
|
|
return seq
|