Merge branch 'develop' into jaywink/admin-forward-extremities

This commit is contained in:
Jason Robinson 2021-01-23 21:41:35 +02:00
commit 8965b6cfec
209 changed files with 7827 additions and 2430 deletions

View file

@ -162,9 +162,13 @@ class DataStore(
database,
stream_name="caches",
instance_name=hs.get_instance_name(),
table="cache_invalidation_stream_by_instance",
instance_column="instance_name",
id_column="stream_id",
tables=[
(
"cache_invalidation_stream_by_instance",
"instance_name",
"stream_id",
)
],
sequence_name="cache_invalidation_stream_seq",
writers=[],
)

View file

@ -14,14 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import Dict, List, Optional, Set, Tuple
from synapse.api.constants import AccountDataTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@ -30,14 +32,57 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
# The ABCMeta metaclass ensures that it cannot be instantiated without
# the abstract methods being implemented.
class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
class AccountDataWorkerStore(SQLBaseStore):
"""This is an abstract base class where subclasses must implement
`get_max_account_data_stream_id` which can be called in the initializer.
"""
def __init__(self, database: DatabasePool, db_conn, hs):
self._instance_name = hs.get_instance_name()
if isinstance(database.engine, PostgresEngine):
self._can_write_to_account_data = (
self._instance_name in hs.config.worker.writers.account_data
)
self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="account_data",
instance_name=self._instance_name,
tables=[
("room_account_data", "instance_name", "stream_id"),
("room_tags_revisions", "instance_name", "stream_id"),
("account_data", "instance_name", "stream_id"),
],
sequence_name="account_data_sequence",
writers=hs.config.worker.writers.account_data,
)
else:
self._can_write_to_account_data = True
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.account_data:
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)
else:
self._account_data_id_gen = SlavedIdTracker(
db_conn,
"room_account_data",
"stream_id",
extra_tables=[("room_tags_revisions", "stream_id")],
)
account_max = self.get_max_account_data_stream_id()
self._account_data_stream_cache = StreamChangeCache(
"AccountDataAndTagsChangeCache", account_max
@ -45,14 +90,13 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
super().__init__(database, db_conn, hs)
@abc.abstractmethod
def get_max_account_data_stream_id(self):
def get_max_account_data_stream_id(self) -> int:
"""Get the current max stream ID for account data stream
Returns:
int
"""
raise NotImplementedError()
return self._account_data_id_gen.get_current_token()
@cached()
async def get_account_data_for_user(
@ -307,28 +351,26 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
)
)
class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
db_conn,
"account_data_max_stream_id",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)
super().__init__(database, db_conn, hs)
def get_max_account_data_stream_id(self) -> int:
"""Get the current max stream id for the private user data stream
Returns:
The maximum stream ID.
"""
return self._account_data_id_gen.get_current_token()
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == TagAccountDataStream.NAME:
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
self.get_tags_for_user.invalidate((row.user_id,))
self._account_data_stream_cache.entity_has_changed(row.user_id, token)
elif stream_name == AccountDataStream.NAME:
self._account_data_id_gen.advance(instance_name, token)
for row in rows:
if not row.room_id:
self.get_global_account_data_by_type_for_user.invalidate(
(row.data_type, row.user_id)
)
self.get_account_data_for_user.invalidate((row.user_id,))
self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
self.get_account_data_for_room_and_type.invalidate(
(row.user_id, row.room_id, row.data_type)
)
self._account_data_stream_cache.entity_has_changed(row.user_id, token)
return super().process_replication_rows(stream_name, instance_name, token, rows)
async def add_account_data_to_room(
self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
@ -344,6 +386,8 @@ class AccountDataStore(AccountDataWorkerStore):
Returns:
The maximum stream ID.
"""
assert self._can_write_to_account_data
content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id:
@ -362,14 +406,6 @@ class AccountDataStore(AccountDataWorkerStore):
lock=False,
)
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
await self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
@ -392,6 +428,8 @@ class AccountDataStore(AccountDataWorkerStore):
Returns:
The maximum stream ID.
"""
assert self._can_write_to_account_data
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"add_user_account_data",
@ -402,18 +440,6 @@ class AccountDataStore(AccountDataWorkerStore):
content,
)
# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
#
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
await self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
@ -487,23 +513,6 @@ class AccountDataStore(AccountDataWorkerStore):
for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
async def _update_max_stream_id(self, next_id: int) -> None:
"""Update the max stream_id
Args:
next_id: The the revision to advance to.
"""
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
await self.db_pool.runInteraction("update_account_data_max_stream_id", _update)
class AccountDataStore(AccountDataWorkerStore):
pass

View file

@ -407,6 +407,34 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
"_prune_old_user_ips", _prune_old_user_ips_txn
)
async def get_last_client_ip_by_device(
self, user_id: str, device_id: Optional[str]
) -> Dict[Tuple[str, str], dict]:
"""For each device_id listed, give the user_ip it was last seen on.
The result might be slightly out of date as client IPs are inserted in batches.
Args:
user_id: The user to fetch devices for.
device_id: If None fetches all devices for the user
Returns:
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)
return {(d["user_id"], d["device_id"]): d for d in res}
class ClientIpStore(ClientIpWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@ -512,18 +540,9 @@ class ClientIpStore(ClientIpWorkerStore):
A dictionary mapping a tuple of (user_id, device_id) to dicts, with
keys giving the column names from the devices table.
"""
ret = await super().get_last_client_ip_by_device(user_id, device_id)
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)
ret = {(d["user_id"], d["device_id"]): d for d in res}
# Update what is retrieved from the database with data which is pending insertion.
for key in self._batch_row_update:
uid, access_token, ip = key
if uid == user_id:

View file

@ -54,9 +54,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
db=database,
stream_name="to_device",
instance_name=self._instance_name,
table="device_inbox",
instance_column="instance_name",
id_column="stream_id",
tables=[("device_inbox", "instance_name", "stream_id")],
sequence_name="device_inbox_sequence",
writers=hs.config.worker.writers.to_device,
)

View file

@ -897,7 +897,7 @@ class DeviceWorkerStore(SQLBaseStore):
DELETE FROM device_lists_outbound_last_success
WHERE destination = ? AND user_id = ?
"""
txn.executemany(sql, ((row[0], row[1]) for row in rows))
txn.execute_batch(sql, ((row[0], row[1]) for row in rows))
logger.info("Pruned %d device list outbound pokes", count)
@ -1343,7 +1343,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Delete older entries in the table, as we really only care about
# when the latest change happened.
txn.executemany(
txn.execute_batch(
"""
DELETE FROM device_lists_stream
WHERE user_id = ? AND device_id = ? AND stream_id < ?

View file

@ -25,6 +25,7 @@ from twisted.enterprise.adbapi import Connection
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, make_in_list_sql_clause
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.types import JsonDict
from synapse.util import json_encoder
@ -513,21 +514,35 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
for user_chunk in batch_iter(user_ids, 100):
clause, params = make_in_list_sql_clause(
txn.database_engine, "k.user_id", user_chunk
)
sql = (
"""
SELECT k.user_id, k.keytype, k.keydata, k.stream_id
FROM e2e_cross_signing_keys k
INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
FROM e2e_cross_signing_keys
GROUP BY user_id, keytype) s
USING (user_id, stream_id, keytype)
WHERE
"""
+ clause
txn.database_engine, "user_id", user_chunk
)
# Fetch the latest key for each type per user.
if isinstance(self.database_engine, PostgresEngine):
# The `DISTINCT ON` clause will pick the *first* row it
# encounters, so ordering by stream ID desc will ensure we get
# the latest key.
sql = """
SELECT DISTINCT ON (user_id, keytype) user_id, keytype, keydata, stream_id
FROM e2e_cross_signing_keys
WHERE %(clause)s
ORDER BY user_id, keytype, stream_id DESC
""" % {
"clause": clause
}
else:
# SQLite has special handling for bare columns when using
# MIN/MAX with a `GROUP BY` clause where it picks the value from
# a row that matches the MIN/MAX.
sql = """
SELECT user_id, keytype, keydata, MAX(stream_id)
FROM e2e_cross_signing_keys
WHERE %(clause)s
GROUP BY user_id, keytype
""" % {
"clause": clause
}
txn.execute(sql, params)
rows = self.db_pool.cursor_to_dict(txn)
@ -707,50 +722,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
"""Get the current stream id from the _device_list_id_gen"""
...
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
async def set_e2e_device_keys(
self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
) -> bool:
"""Stores device keys for a device. Returns whether there was a change
or the keys were already in the database.
"""
def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
retcol="key_json",
allow_none=True,
)
# In py3 we need old_key_json to match new_key_json type. The DB
# returns unicode while encode_canonical_json returns bytes.
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False
self.db_pool.simple_upsert_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True
return await self.db_pool.runInteraction(
"set_e2e_device_keys", _set_e2e_device_keys_txn
)
async def claim_e2e_one_time_keys(
self, query_list: Iterable[Tuple[str, str, str]]
) -> Dict[str, Dict[str, Dict[str, bytes]]]:
@ -840,6 +811,50 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
async def set_e2e_device_keys(
self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
) -> bool:
"""Stores device keys for a device. Returns whether there was a change
or the keys were already in the database.
"""
def _set_e2e_device_keys_txn(txn):
set_tag("user_id", user_id)
set_tag("device_id", device_id)
set_tag("time_now", time_now)
set_tag("device_keys", device_keys)
old_key_json = self.db_pool.simple_select_one_onecol_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
retcol="key_json",
allow_none=True,
)
# In py3 we need old_key_json to match new_key_json type. The DB
# returns unicode while encode_canonical_json returns bytes.
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
if old_key_json == new_key_json:
log_kv({"Message": "Device key already stored."})
return False
self.db_pool.simple_upsert_txn(
txn,
table="e2e_device_keys_json",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"ts_added_ms": time_now, "key_json": new_key_json},
)
log_kv({"message": "Device keys stored."})
return True
return await self.db_pool.runInteraction(
"set_e2e_device_keys", _set_e2e_device_keys_txn
)
async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
def delete_e2e_keys_by_device_txn(txn):
log_kv(

View file

@ -24,6 +24,8 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.types import Collection
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@ -32,6 +34,11 @@ from synapse.util.iterutils import batch_iter
logger = logging.getLogger(__name__)
class _NoChainCoverIndex(Exception):
def __init__(self, room_id: str):
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
@ -151,15 +158,193 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
The set of the difference in auth chains.
"""
# Check if we have indexed the room so we can use the chain cover
# algorithm.
room = await self.get_room(room_id)
if room["has_auth_chain_index"]:
try:
return await self.db_pool.runInteraction(
"get_auth_chain_difference_chains",
self._get_auth_chain_difference_using_cover_index_txn,
room_id,
state_sets,
)
except _NoChainCoverIndex:
# For whatever reason we don't actually have a chain cover index
# for the events in question, so we fall back to the old method.
pass
return await self.db_pool.runInteraction(
"get_auth_chain_difference",
self._get_auth_chain_difference_txn,
state_sets,
)
def _get_auth_chain_difference_using_cover_index_txn(
self, txn: Cursor, room_id: str, state_sets: List[Set[str]]
) -> Set[str]:
"""Calculates the auth chain difference using the chain index.
See docs/auth_chain_difference_algorithm.md for details
"""
# First we look up the chain ID/sequence numbers for all the events, and
# work out the chain/sequence numbers reachable from each state set.
initial_events = set(state_sets[0]).union(*state_sets[1:])
# Map from event_id -> (chain ID, seq no)
chain_info = {} # type: Dict[str, Tuple[int, int]]
# Map from chain ID -> seq no -> event Id
chain_to_event = {} # type: Dict[int, Dict[int, str]]
# All the chains that we've found that are reachable from the state
# sets.
seen_chains = set() # type: Set[int]
sql = """
SELECT event_id, chain_id, sequence_number
FROM event_auth_chains
WHERE %s
"""
for batch in batch_iter(initial_events, 1000):
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", batch
)
txn.execute(sql % (clause,), args)
for event_id, chain_id, sequence_number in txn:
chain_info[event_id] = (chain_id, sequence_number)
seen_chains.add(chain_id)
chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
# Check that we actually have a chain ID for all the events.
events_missing_chain_info = initial_events.difference(chain_info)
if events_missing_chain_info:
# This can happen due to e.g. downgrade/upgrade of the server. We
# raise an exception and fall back to the previous algorithm.
logger.info(
"Unexpectedly found that events don't have chain IDs in room %s: %s",
room_id,
events_missing_chain_info,
)
raise _NoChainCoverIndex(room_id)
# Corresponds to `state_sets`, except as a map from chain ID to max
# sequence number reachable from the state set.
set_to_chain = [] # type: List[Dict[int, int]]
for state_set in state_sets:
chains = {} # type: Dict[int, int]
set_to_chain.append(chains)
for event_id in state_set:
chain_id, seq_no = chain_info[event_id]
chains[chain_id] = max(seq_no, chains.get(chain_id, 0))
# Now we look up all links for the chains we have, adding chains to
# set_to_chain that are reachable from each set.
sql = """
SELECT
origin_chain_id, origin_sequence_number,
target_chain_id, target_sequence_number
FROM event_auth_chain_links
WHERE %s
"""
# (We need to take a copy of `seen_chains` as we want to mutate it in
# the loop)
for batch in batch_iter(set(seen_chains), 1000):
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch
)
txn.execute(sql % (clause,), args)
for (
origin_chain_id,
origin_sequence_number,
target_chain_id,
target_sequence_number,
) in txn:
for chains in set_to_chain:
# chains are only reachable if the origin sequence number of
# the link is less than the max sequence number in the
# origin chain.
if origin_sequence_number <= chains.get(origin_chain_id, 0):
chains[target_chain_id] = max(
target_sequence_number, chains.get(target_chain_id, 0),
)
seen_chains.add(target_chain_id)
# Now for each chain we figure out the maximum sequence number reachable
# from *any* state set and the minimum sequence number reachable from
# *all* state sets. Events in that range are in the auth chain
# difference.
result = set()
# Mapping from chain ID to the range of sequence numbers that should be
# pulled from the database.
chain_to_gap = {} # type: Dict[int, Tuple[int, int]]
for chain_id in seen_chains:
min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
max_seq_no = max(chains.get(chain_id, 0) for chains in set_to_chain)
if min_seq_no < max_seq_no:
# We have a non empty gap, try and fill it from the events that
# we have, otherwise add them to the list of gaps to pull out
# from the DB.
for seq_no in range(min_seq_no + 1, max_seq_no + 1):
event_id = chain_to_event.get(chain_id, {}).get(seq_no)
if event_id:
result.add(event_id)
else:
chain_to_gap[chain_id] = (min_seq_no, max_seq_no)
break
if not chain_to_gap:
# If there are no gaps to fetch, we're done!
return result
if isinstance(self.database_engine, PostgresEngine):
# We can use `execute_values` to efficiently fetch the gaps when
# using postgres.
sql = """
SELECT event_id
FROM event_auth_chains AS c, (VALUES ?) AS l(chain_id, min_seq, max_seq)
WHERE
c.chain_id = l.chain_id
AND min_seq < sequence_number AND sequence_number <= max_seq
"""
args = [
(chain_id, min_no, max_no)
for chain_id, (min_no, max_no) in chain_to_gap.items()
]
rows = txn.execute_values(sql, args)
result.update(r for r, in rows)
else:
# For SQLite we just fall back to doing a noddy for loop.
sql = """
SELECT event_id FROM event_auth_chains
WHERE chain_id = ? AND ? < sequence_number AND sequence_number <= ?
"""
for chain_id, (min_no, max_no) in chain_to_gap.items():
txn.execute(sql, (chain_id, min_no, max_no))
result.update(r for r, in txn)
return result
def _get_auth_chain_difference_txn(
self, txn, state_sets: List[Set[str]]
) -> Set[str]:
"""Calculates the auth chain difference using a breadth first search.
This is used when we don't have a cover index for the room.
"""
# Algorithm Description
# ~~~~~~~~~~~~~~~~~~~~~

View file

@ -487,7 +487,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
VALUES (?, ?, ?, ?, ?, ?)
"""
txn.executemany(
txn.execute_batch(
sql,
(
_gen_entry(user_id, actions)
@ -803,7 +803,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
],
)
txn.executemany(
txn.execute_batch(
"""
UPDATE event_push_summary
SET notif_count = ?, unread_count = ?, stream_ordering = ?
@ -835,6 +835,52 @@ class EventPushActionsWorkerStore(SQLBaseStore):
(rotate_to_stream_ordering,),
)
def _remove_old_push_actions_before_txn(
self, txn, room_id, user_id, stream_ordering
):
"""
Purges old push actions for a user and room before a given
stream_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
Args:
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id),
)
# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
)
txn.execute(
"""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""",
(room_id, user_id, stream_ordering),
)
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@ -894,52 +940,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
return push_actions
def _remove_old_push_actions_before_txn(
self, txn, room_id, user_id, stream_ordering
):
"""
Purges old push actions for a user and room before a given
stream_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
Args:
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id),
)
# We need to join on the events table to get the received_ts for
# event_push_actions and sqlite won't let us use a join in a delete so
# we can't just delete where received_ts < x. Furthermore we can
# only identify event_push_actions by a tuple of room_id, event_id
# we we can't use a subquery.
# Instead, we look up the stream ordering for the last event in that
# room received before the threshold time and delete event_push_actions
# in the room with a stream_odering before that.
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
)
txn.execute(
"""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""",
(room_id, user_id, stream_ordering),
)
def _action_has_highlight(actions):
for action in actions:

View file

@ -17,7 +17,17 @@
import itertools
import logging
from collections import OrderedDict, namedtuple
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Set,
Tuple,
)
import attr
from prometheus_client import Counter
@ -35,7 +45,7 @@ from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.iterutils import batch_iter
from synapse.util.iterutils import batch_iter, sorted_topologically
if TYPE_CHECKING:
from synapse.server import HomeServer
@ -366,26 +376,7 @@ class PersistEventsStore:
# Insert into event_to_state_groups.
self._store_event_state_mappings_txn(txn, events_and_contexts)
# We want to store event_auth mappings for rejected events, as they're
# used in state res v2.
# This is only necessary if the rejected event appears in an accepted
# event's auth chain, but its easier for now just to store them (and
# it doesn't take much storage compared to storing the entire event
# anyway).
self.db_pool.simple_insert_many_txn(
txn,
table="event_auth",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
}
for event, _ in events_and_contexts
for auth_id in event.auth_event_ids()
if event.is_state()
],
)
self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
# _store_rejected_events_txn filters out any events which were
# rejected, and returns the filtered list.
@ -407,6 +398,491 @@ class PersistEventsStore:
# room_memberships, where applicable.
self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
def _persist_event_auth_chain_txn(
self, txn: LoggingTransaction, events: List[EventBase],
) -> None:
# We only care about state events, so this if there are no state events.
if not any(e.is_state() for e in events):
return
# We want to store event_auth mappings for rejected events, as they're
# used in state res v2.
# This is only necessary if the rejected event appears in an accepted
# event's auth chain, but its easier for now just to store them (and
# it doesn't take much storage compared to storing the entire event
# anyway).
self.db_pool.simple_insert_many_txn(
txn,
table="event_auth",
values=[
{
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
}
for event in events
for auth_id in event.auth_event_ids()
if event.is_state()
],
)
# We now calculate chain ID/sequence numbers for any state events we're
# persisting. We ignore out of band memberships as we're not in the room
# and won't have their auth chain (we'll fix it up later if we join the
# room).
#
# See: docs/auth_chain_difference_algorithm.md
# We ignore legacy rooms that we aren't filling the chain cover index
# for.
rows = self.db_pool.simple_select_many_txn(
txn,
table="rooms",
column="room_id",
iterable={event.room_id for event in events if event.is_state()},
keyvalues={},
retcols=("room_id", "has_auth_chain_index"),
)
rooms_using_chain_index = {
row["room_id"] for row in rows if row["has_auth_chain_index"]
}
state_events = {
event.event_id: event
for event in events
if event.is_state() and event.room_id in rooms_using_chain_index
}
if not state_events:
return
# We need to know the type/state_key and auth events of the events we're
# calculating chain IDs for. We don't rely on having the full Event
# instances as we'll potentially be pulling more events from the DB and
# we don't need the overhead of fetching/parsing the full event JSON.
event_to_types = {
e.event_id: (e.type, e.state_key) for e in state_events.values()
}
event_to_auth_chain = {
e.event_id: e.auth_event_ids() for e in state_events.values()
}
event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
self._add_chain_cover_index(
txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
)
@classmethod
def _add_chain_cover_index(
cls,
txn,
db_pool: DatabasePool,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
) -> None:
"""Calculate the chain cover index for the given events.
Args:
event_to_room_id: Event ID to the room ID of the event
event_to_types: Event ID to type and state_key of the event
event_to_auth_chain: Event ID to list of auth event IDs of the
event (events with no auth events can be excluded).
"""
# Map from event ID to chain ID/sequence number.
chain_map = {} # type: Dict[str, Tuple[int, int]]
# Set of event IDs to calculate chain ID/seq numbers for.
events_to_calc_chain_id_for = set(event_to_room_id)
# We check if there are any events that need to be handled in the rooms
# we're looking at. These should just be out of band memberships, where
# we didn't have the auth chain when we first persisted.
rows = db_pool.simple_select_many_txn(
txn,
table="event_auth_chain_to_calculate",
keyvalues={},
column="room_id",
iterable=set(event_to_room_id.values()),
retcols=("event_id", "type", "state_key"),
)
for row in rows:
event_id = row["event_id"]
event_type = row["type"]
state_key = row["state_key"]
# (We could pull out the auth events for all rows at once using
# simple_select_many, but this case happens rarely and almost always
# with a single row.)
auth_events = db_pool.simple_select_onecol_txn(
txn, "event_auth", keyvalues={"event_id": event_id}, retcol="auth_id",
)
events_to_calc_chain_id_for.add(event_id)
event_to_types[event_id] = (event_type, state_key)
event_to_auth_chain[event_id] = auth_events
# First we get the chain ID and sequence numbers for the events'
# auth events (that aren't also currently being persisted).
#
# Note that there there is an edge case here where we might not have
# calculated chains and sequence numbers for events that were "out
# of band". We handle this case by fetching the necessary info and
# adding it to the set of events to calculate chain IDs for.
missing_auth_chains = {
a_id
for auth_events in event_to_auth_chain.values()
for a_id in auth_events
if a_id not in events_to_calc_chain_id_for
}
# We loop here in case we find an out of band membership and need to
# fetch their auth event info.
while missing_auth_chains:
sql = """
SELECT event_id, events.type, state_key, chain_id, sequence_number
FROM events
INNER JOIN state_events USING (event_id)
LEFT JOIN event_auth_chains USING (event_id)
WHERE
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "event_id", missing_auth_chains,
)
txn.execute(sql + clause, args)
missing_auth_chains.clear()
for auth_id, event_type, state_key, chain_id, sequence_number in txn:
event_to_types[auth_id] = (event_type, state_key)
if chain_id is None:
# No chain ID, so the event was persisted out of band.
# We add to list of events to calculate auth chains for.
events_to_calc_chain_id_for.add(auth_id)
event_to_auth_chain[auth_id] = db_pool.simple_select_onecol_txn(
txn,
"event_auth",
keyvalues={"event_id": auth_id},
retcol="auth_id",
)
missing_auth_chains.update(
e
for e in event_to_auth_chain[auth_id]
if e not in event_to_types
)
else:
chain_map[auth_id] = (chain_id, sequence_number)
# Now we check if we have any events where we don't have auth chain,
# this should only be out of band memberships.
for event_id in sorted_topologically(event_to_auth_chain, event_to_auth_chain):
for auth_id in event_to_auth_chain[event_id]:
if (
auth_id not in chain_map
and auth_id not in events_to_calc_chain_id_for
):
events_to_calc_chain_id_for.discard(event_id)
# If this is an event we're trying to persist we add it to
# the list of events to calculate chain IDs for next time
# around. (Otherwise we will have already added it to the
# table).
room_id = event_to_room_id.get(event_id)
if room_id:
e_type, state_key = event_to_types[event_id]
db_pool.simple_insert_txn(
txn,
table="event_auth_chain_to_calculate",
values={
"event_id": event_id,
"room_id": room_id,
"type": e_type,
"state_key": state_key,
},
)
# We stop checking the event's auth events since we've
# discarded it.
break
if not events_to_calc_chain_id_for:
return
# Allocate chain ID/sequence numbers to each new event.
new_chain_tuples = cls._allocate_chain_ids(
txn,
db_pool,
event_to_room_id,
event_to_types,
event_to_auth_chain,
events_to_calc_chain_id_for,
chain_map,
)
chain_map.update(new_chain_tuples)
db_pool.simple_insert_many_txn(
txn,
table="event_auth_chains",
values=[
{"event_id": event_id, "chain_id": c_id, "sequence_number": seq}
for event_id, (c_id, seq) in new_chain_tuples.items()
],
)
db_pool.simple_delete_many_txn(
txn,
table="event_auth_chain_to_calculate",
keyvalues={},
column="event_id",
iterable=new_chain_tuples,
)
# Now we need to calculate any new links between chains caused by
# the new events.
#
# Links are pairs of chain ID/sequence numbers such that for any
# event A (CA, SA) and any event B (CB, SB), B is in A's auth chain
# if and only if there is at least one link (CA, S1) -> (CB, S2)
# where SA >= S1 and S2 >= SB.
#
# We try and avoid adding redundant links to the table, e.g. if we
# have two links between two chains which both start/end at the
# sequence number event (or cross) then one can be safely dropped.
#
# To calculate new links we look at every new event and:
# 1. Fetch the chain ID/sequence numbers of its auth events,
# discarding any that are reachable by other auth events, or
# that have the same chain ID as the event.
# 2. For each retained auth event we:
# a. Add a link from the event's to the auth event's chain
# ID/sequence number; and
# b. Add a link from the event to every chain reachable by the
# auth event.
# Step 1, fetch all existing links from all the chains we've seen
# referenced.
chain_links = _LinkMap()
rows = db_pool.simple_select_many_txn(
txn,
table="event_auth_chain_links",
column="origin_chain_id",
iterable={chain_id for chain_id, _ in chain_map.values()},
keyvalues={},
retcols=(
"origin_chain_id",
"origin_sequence_number",
"target_chain_id",
"target_sequence_number",
),
)
for row in rows:
chain_links.add_link(
(row["origin_chain_id"], row["origin_sequence_number"]),
(row["target_chain_id"], row["target_sequence_number"]),
new=False,
)
# We do this in toplogical order to avoid adding redundant links.
for event_id in sorted_topologically(
events_to_calc_chain_id_for, event_to_auth_chain
):
chain_id, sequence_number = chain_map[event_id]
# Filter out auth events that are reachable by other auth
# events. We do this by looking at every permutation of pairs of
# auth events (A, B) to check if B is reachable from A.
reduction = {
a_id
for a_id in event_to_auth_chain.get(event_id, [])
if chain_map[a_id][0] != chain_id
}
for start_auth_id, end_auth_id in itertools.permutations(
event_to_auth_chain.get(event_id, []), r=2,
):
if chain_links.exists_path_from(
chain_map[start_auth_id], chain_map[end_auth_id]
):
reduction.discard(end_auth_id)
# Step 2, figure out what the new links are from the reduced
# list of auth events.
for auth_id in reduction:
auth_chain_id, auth_sequence_number = chain_map[auth_id]
# Step 2a, add link between the event and auth event
chain_links.add_link(
(chain_id, sequence_number), (auth_chain_id, auth_sequence_number)
)
# Step 2b, add a link to chains reachable from the auth
# event.
for target_id, target_seq in chain_links.get_links_from(
(auth_chain_id, auth_sequence_number)
):
if target_id == chain_id:
continue
chain_links.add_link(
(chain_id, sequence_number), (target_id, target_seq)
)
db_pool.simple_insert_many_txn(
txn,
table="event_auth_chain_links",
values=[
{
"origin_chain_id": source_id,
"origin_sequence_number": source_seq,
"target_chain_id": target_id,
"target_sequence_number": target_seq,
}
for (
source_id,
source_seq,
target_id,
target_seq,
) in chain_links.get_additions()
],
)
@staticmethod
def _allocate_chain_ids(
txn,
db_pool: DatabasePool,
event_to_room_id: Dict[str, str],
event_to_types: Dict[str, Tuple[str, str]],
event_to_auth_chain: Dict[str, List[str]],
events_to_calc_chain_id_for: Set[str],
chain_map: Dict[str, Tuple[int, int]],
) -> Dict[str, Tuple[int, int]]:
"""Allocates, but does not persist, chain ID/sequence numbers for the
events in `events_to_calc_chain_id_for`. (c.f. _add_chain_cover_index
for info on args)
"""
# We now calculate the chain IDs/sequence numbers for the events. We do
# this by looking at the chain ID and sequence number of any auth event
# with the same type/state_key and incrementing the sequence number by
# one. If there was no match or the chain ID/sequence number is already
# taken we generate a new chain.
#
# We try to reduce the number of times that we hit the database by
# batching up calls, to make this more efficient when persisting large
# numbers of state events (e.g. during joins).
#
# We do this by:
# 1. Calculating for each event which auth event will be used to
# inherit the chain ID, i.e. converting the auth chain graph to a
# tree that we can allocate chains on. We also keep track of which
# existing chain IDs have been referenced.
# 2. Fetching the max allocated sequence number for each referenced
# existing chain ID, generating a map from chain ID to the max
# allocated sequence number.
# 3. Iterating over the tree and allocating a chain ID/seq no. to the
# new event, by incrementing the sequence number from the
# referenced event's chain ID/seq no. and checking that the
# incremented sequence number hasn't already been allocated (by
# looking in the map generated in the previous step). We generate a
# new chain if the sequence number has already been allocated.
#
existing_chains = set() # type: Set[int]
tree = [] # type: List[Tuple[str, Optional[str]]]
# We need to do this in a topologically sorted order as we want to
# generate chain IDs/sequence numbers of an event's auth events before
# the event itself.
for event_id in sorted_topologically(
events_to_calc_chain_id_for, event_to_auth_chain
):
for auth_id in event_to_auth_chain.get(event_id, []):
if event_to_types.get(event_id) == event_to_types.get(auth_id):
existing_chain_id = chain_map.get(auth_id)
if existing_chain_id:
existing_chains.add(existing_chain_id[0])
tree.append((event_id, auth_id))
break
else:
tree.append((event_id, None))
# Fetch the current max sequence number for each existing referenced chain.
sql = """
SELECT chain_id, MAX(sequence_number) FROM event_auth_chains
WHERE %s
GROUP BY chain_id
"""
clause, args = make_in_list_sql_clause(
db_pool.engine, "chain_id", existing_chains
)
txn.execute(sql % (clause,), args)
chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int]
# Allocate the new events chain ID/sequence numbers.
#
# To reduce the number of calls to the database we don't allocate a
# chain ID number in the loop, instead we use a temporary `object()` for
# each new chain ID. Once we've done the loop we generate the necessary
# number of new chain IDs in one call, replacing all temporary
# objects with real allocated chain IDs.
unallocated_chain_ids = set() # type: Set[object]
new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]]
for event_id, auth_event_id in tree:
# If we reference an auth_event_id we fetch the allocated chain ID,
# either from the existing `chain_map` or the newly generated
# `new_chain_tuples` map.
existing_chain_id = None
if auth_event_id:
existing_chain_id = new_chain_tuples.get(auth_event_id)
if not existing_chain_id:
existing_chain_id = chain_map[auth_event_id]
new_chain_tuple = None # type: Optional[Tuple[Any, int]]
if existing_chain_id:
# We found a chain ID/sequence number candidate, check its
# not already taken.
proposed_new_id = existing_chain_id[0]
proposed_new_seq = existing_chain_id[1] + 1
if chain_to_max_seq_no[proposed_new_id] < proposed_new_seq:
new_chain_tuple = (
proposed_new_id,
proposed_new_seq,
)
# If we need to start a new chain we allocate a temporary chain ID.
if not new_chain_tuple:
new_chain_tuple = (object(), 1)
unallocated_chain_ids.add(new_chain_tuple[0])
new_chain_tuples[event_id] = new_chain_tuple
chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]
# Generate new chain IDs for all unallocated chain IDs.
newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
txn, len(unallocated_chain_ids)
)
# Map from potentially temporary chain ID to real chain ID
chain_id_to_allocated_map = dict(
zip(unallocated_chain_ids, newly_allocated_chain_ids)
) # type: Dict[Any, int]
chain_id_to_allocated_map.update((c, c) for c in existing_chains)
return {
event_id: (chain_id_to_allocated_map[chain_id], seq)
for event_id, (chain_id, seq) in new_chain_tuples.items()
}
def _persist_transaction_ids_txn(
self,
txn: LoggingTransaction,
@ -489,7 +965,7 @@ class PersistEventsStore:
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
txn.executemany(
txn.execute_batch(
sql,
(
(
@ -508,7 +984,7 @@ class PersistEventsStore:
)
# Now we actually update the current_state_events table
txn.executemany(
txn.execute_batch(
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
(
@ -520,7 +996,7 @@ class PersistEventsStore:
# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
txn.executemany(
txn.execute_batch(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@ -540,7 +1016,7 @@ class PersistEventsStore:
# we have no record of the fact the user *was* a member of the
# room but got, say, state reset out of it.
if to_delete or to_insert:
txn.executemany(
txn.execute_batch(
"DELETE FROM local_current_membership"
" WHERE room_id = ? AND user_id = ?",
(
@ -551,7 +1027,7 @@ class PersistEventsStore:
)
if to_insert:
txn.executemany(
txn.execute_batch(
"""INSERT INTO local_current_membership
(room_id, user_id, event_id, membership)
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@ -799,7 +1275,8 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
"""Insert new events into the event, event_json, redaction and
state_events tables.
Args:
txn (twisted.enterprise.adbapi.Connection): db connection
@ -871,6 +1348,29 @@ class PersistEventsStore:
updatevalues={"have_censored": False},
)
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
]
state_values = []
for event, context in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
state_values.append(vals)
self.db_pool.simple_insert_many_txn(
txn, table="state_events", values=state_values
)
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
@ -987,29 +1487,6 @@ class PersistEventsStore:
txn, [event for event, _ in events_and_contexts]
)
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
]
state_values = []
for event, context in state_events_and_contexts:
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
state_values.append(vals)
self.db_pool.simple_insert_many_txn(
txn, table="state_events", values=state_values
)
# Prefill the event cache
self._add_to_cache(txn, events_and_contexts)
@ -1350,7 +1827,7 @@ class PersistEventsStore:
"""
if events_and_contexts:
txn.executemany(
txn.execute_batch(
sql,
(
(
@ -1379,7 +1856,7 @@ class PersistEventsStore:
# Now we delete the staging area for *all* events that were being
# persisted.
txn.executemany(
txn.execute_batch(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
((event.event_id,) for event, _ in all_events_and_contexts),
)
@ -1498,7 +1975,7 @@ class PersistEventsStore:
" )"
)
txn.executemany(
txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
@ -1512,7 +1989,7 @@ class PersistEventsStore:
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
@ -1520,3 +1997,131 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier()
],
)
@attr.s(slots=True)
class _LinkMap:
"""A helper type for tracking links between chains.
"""
# Stores the set of links as nested maps: source chain ID -> target chain ID
# -> source sequence number -> target sequence number.
maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict)
# Stores the links that have been added (with new set to true), as tuples of
# `(source chain ID, source sequence no, target chain ID, target sequence no.)`
additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set)
def add_link(
self,
src_tuple: Tuple[int, int],
target_tuple: Tuple[int, int],
new: bool = True,
) -> bool:
"""Add a new link between two chains, ensuring no redundant links are added.
New links should be added in topological order.
Args:
src_tuple: The chain ID/sequence number of the source of the link.
target_tuple: The chain ID/sequence number of the target of the link.
new: Whether this is a "new" link, i.e. should it be returned
by `get_additions`.
Returns:
True if a link was added, false if the given link was dropped as redundant
"""
src_chain, src_seq = src_tuple
target_chain, target_seq = target_tuple
current_links = self.maps.setdefault(src_chain, {}).setdefault(target_chain, {})
assert src_chain != target_chain
if new:
# Check if the new link is redundant
for current_seq_src, current_seq_target in current_links.items():
# If a link "crosses" another link then its redundant. For example
# in the following link 1 (L1) is redundant, as any event reachable
# via L1 is *also* reachable via L2.
#
# Chain A Chain B
# | |
# L1 |------ |
# | | |
# L2 |---- | -->|
# | | |
# | |--->|
# | |
# | |
#
# So we only need to keep links which *do not* cross, i.e. links
# that both start and end above or below an existing link.
#
# Note, since we add links in topological ordering we should never
# see `src_seq` less than `current_seq_src`.
if current_seq_src <= src_seq and target_seq <= current_seq_target:
# This new link is redundant, nothing to do.
return False
self.additions.add((src_chain, src_seq, target_chain, target_seq))
current_links[src_seq] = target_seq
return True
def get_links_from(
self, src_tuple: Tuple[int, int]
) -> Generator[Tuple[int, int], None, None]:
"""Gets the chains reachable from the given chain/sequence number.
Yields:
The chain ID and sequence number the link points to.
"""
src_chain, src_seq = src_tuple
for target_id, sequence_numbers in self.maps.get(src_chain, {}).items():
for link_src_seq, target_seq in sequence_numbers.items():
if link_src_seq <= src_seq:
yield target_id, target_seq
def get_links_between(
self, source_chain: int, target_chain: int
) -> Generator[Tuple[int, int], None, None]:
"""Gets the links between two chains.
Yields:
The source and target sequence numbers.
"""
yield from self.maps.get(source_chain, {}).get(target_chain, {}).items()
def get_additions(self) -> Generator[Tuple[int, int, int, int], None, None]:
"""Gets any newly added links.
Yields:
The source chain ID/sequence number and target chain ID/sequence number
"""
for src_chain, src_seq, target_chain, _ in self.additions:
target_seq = self.maps.get(src_chain, {}).get(target_chain, {}).get(src_seq)
if target_seq is not None:
yield (src_chain, src_seq, target_chain, target_seq)
def exists_path_from(
self, src_tuple: Tuple[int, int], target_tuple: Tuple[int, int],
) -> bool:
"""Checks if there is a path between the source chain ID/sequence and
target chain ID/sequence.
"""
src_chain, src_seq = src_tuple
target_chain, target_seq = target_tuple
if src_chain == target_chain:
return target_seq <= src_seq
links = self.get_links_between(src_chain, target_chain)
for link_start_seq, link_end_seq in links:
if link_start_seq <= src_seq and target_seq <= link_end_seq:
return True
return False

View file

@ -14,14 +14,41 @@
# limitations under the License.
import logging
from typing import Dict, List, Optional, Tuple
import attr
from synapse.api.constants import EventContentFields
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import make_event_from_dict
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
from synapse.storage.databases.main.events import PersistEventsStore
from synapse.storage.types import Cursor
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
@attr.s(slots=True, frozen=True)
class _CalculateChainCover:
"""Return value for _calculate_chain_cover_txn.
"""
# The last room_id/depth/stream processed.
room_id = attr.ib(type=str)
depth = attr.ib(type=int)
stream = attr.ib(type=int)
# Number of rows processed
processed_count = attr.ib(type=int)
# Map from room_id to last depth/stream processed for each room that we have
# processed all events for (i.e. the rooms we can flip the
# `has_auth_chain_index` for)
finished_room_map = attr.ib(type=Dict[str, Tuple[int, int]])
class EventsBackgroundUpdatesStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@ -99,13 +126,19 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
columns=["user_id", "created_ts"],
)
self.db_pool.updates.register_background_update_handler(
"rejected_events_metadata", self._rejected_events_metadata,
)
self.db_pool.updates.register_background_update_handler(
"chain_cover", self._chain_cover_index,
)
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_txn(txn):
sql = (
"SELECT stream_ordering, event_id, json FROM events"
@ -143,9 +176,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
txn.execute_batch(sql, update_rows)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
@ -175,8 +206,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
max_stream_id = progress["max_stream_id_exclusive"]
rows_inserted = progress.get("rows_inserted", 0)
INSERT_CLUMP_SIZE = 1000
def reindex_search_txn(txn):
sql = (
"SELECT stream_ordering, event_id FROM events"
@ -221,9 +250,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(sql, clump)
txn.execute_batch(sql, rows_to_update)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,
@ -582,3 +609,314 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
await self.db_pool.updates._end_background_update("event_store_labels")
return num_rows
async def _rejected_events_metadata(self, progress: dict, batch_size: int) -> int:
"""Adds rejected events to the `state_events` and `event_auth` metadata
tables.
"""
last_event_id = progress.get("last_event_id", "")
def get_rejected_events(
txn: Cursor,
) -> List[Tuple[str, str, JsonDict, bool, bool]]:
# Fetch rejected event json, their room version and whether we have
# inserted them into the state_events or auth_events tables.
#
# Note we can assume that events that don't have a corresponding
# room version are V1 rooms.
sql = """
SELECT DISTINCT
event_id,
COALESCE(room_version, '1'),
json,
state_events.event_id IS NOT NULL,
event_auth.event_id IS NOT NULL
FROM rejections
INNER JOIN event_json USING (event_id)
LEFT JOIN rooms USING (room_id)
LEFT JOIN state_events USING (event_id)
LEFT JOIN event_auth USING (event_id)
WHERE event_id > ?
ORDER BY event_id
LIMIT ?
"""
txn.execute(sql, (last_event_id, batch_size,))
return [(row[0], row[1], db_to_json(row[2]), row[3], row[4]) for row in txn] # type: ignore
results = await self.db_pool.runInteraction(
desc="_rejected_events_metadata_get", func=get_rejected_events
)
if not results:
await self.db_pool.updates._end_background_update(
"rejected_events_metadata"
)
return 0
state_events = []
auth_events = []
for event_id, room_version, event_json, has_state, has_event_auth in results:
last_event_id = event_id
if has_state and has_event_auth:
continue
room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version)
if not room_version_obj:
# We no longer support this room version, so we just ignore the
# events entirely.
logger.info(
"Ignoring event with unknown room version %r: %r",
room_version,
event_id,
)
continue
event = make_event_from_dict(event_json, room_version_obj)
if not event.is_state():
continue
if not has_state:
state_events.append(
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
)
if not has_event_auth:
for auth_id in event.auth_event_ids():
auth_events.append(
{
"room_id": event.room_id,
"event_id": event.event_id,
"auth_id": auth_id,
}
)
if state_events:
await self.db_pool.simple_insert_many(
table="state_events",
values=state_events,
desc="_rejected_events_metadata_state_events",
)
if auth_events:
await self.db_pool.simple_insert_many(
table="event_auth",
values=auth_events,
desc="_rejected_events_metadata_event_auth",
)
await self.db_pool.updates._background_update_progress(
"rejected_events_metadata", {"last_event_id": last_event_id}
)
if len(results) < batch_size:
await self.db_pool.updates._end_background_update(
"rejected_events_metadata"
)
return len(results)
async def _chain_cover_index(self, progress: dict, batch_size: int) -> int:
"""A background updates that iterates over all rooms and generates the
chain cover index for them.
"""
current_room_id = progress.get("current_room_id", "")
# Where we've processed up to in the room, defaults to the start of the
# room.
last_depth = progress.get("last_depth", -1)
last_stream = progress.get("last_stream", -1)
result = await self.db_pool.runInteraction(
"_chain_cover_index",
self._calculate_chain_cover_txn,
current_room_id,
last_depth,
last_stream,
batch_size,
single_room=False,
)
finished = result.processed_count == 0
total_rows_processed = result.processed_count
current_room_id = result.room_id
last_depth = result.depth
last_stream = result.stream
for room_id, (depth, stream) in result.finished_room_map.items():
# If we've done all the events in the room we flip the
# `has_auth_chain_index` in the DB. Note that its possible for
# further events to be persisted between the above and setting the
# flag without having the chain cover calculated for them. This is
# fine as a) the code gracefully handles these cases and b) we'll
# calculate them below.
await self.db_pool.simple_update(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"has_auth_chain_index": True},
desc="_chain_cover_index",
)
# Handle any events that might have raced with us flipping the
# bit above.
result = await self.db_pool.runInteraction(
"_chain_cover_index",
self._calculate_chain_cover_txn,
room_id,
depth,
stream,
batch_size=None,
single_room=True,
)
total_rows_processed += result.processed_count
if finished:
await self.db_pool.updates._end_background_update("chain_cover")
return total_rows_processed
await self.db_pool.updates._background_update_progress(
"chain_cover",
{
"current_room_id": current_room_id,
"last_depth": last_depth,
"last_stream": last_stream,
},
)
return total_rows_processed
def _calculate_chain_cover_txn(
self,
txn: Cursor,
last_room_id: str,
last_depth: int,
last_stream: int,
batch_size: Optional[int],
single_room: bool,
) -> _CalculateChainCover:
"""Calculate the chain cover for `batch_size` events, ordered by
`(room_id, depth, stream)`.
Args:
txn,
last_room_id, last_depth, last_stream: The `(room_id, depth, stream)`
tuple to fetch results after.
batch_size: The maximum number of events to process. If None then
no limit.
single_room: Whether to calculate the index for just the given
room.
"""
# Get the next set of events in the room (that we haven't already
# computed chain cover for). We do this in topological order.
# We want to do a `(topological_ordering, stream_ordering) > (?,?)`
# comparison, but that is not supported on older SQLite versions
tuple_clause, tuple_args = make_tuple_comparison_clause(
self.database_engine,
[
("events.room_id", last_room_id),
("topological_ordering", last_depth),
("stream_ordering", last_stream),
],
)
extra_clause = ""
if single_room:
extra_clause = "AND events.room_id = ?"
tuple_args.append(last_room_id)
sql = """
SELECT
event_id, state_events.type, state_events.state_key,
topological_ordering, stream_ordering,
events.room_id
FROM events
INNER JOIN state_events USING (event_id)
LEFT JOIN event_auth_chains USING (event_id)
LEFT JOIN event_auth_chain_to_calculate USING (event_id)
WHERE event_auth_chains.event_id IS NULL
AND event_auth_chain_to_calculate.event_id IS NULL
AND %(tuple_cmp)s
%(extra)s
ORDER BY events.room_id, topological_ordering, stream_ordering
%(limit)s
""" % {
"tuple_cmp": tuple_clause,
"limit": "LIMIT ?" if batch_size is not None else "",
"extra": extra_clause,
}
if batch_size is not None:
tuple_args.append(batch_size)
txn.execute(sql, tuple_args)
rows = txn.fetchall()
# Put the results in the necessary format for
# `_add_chain_cover_index`
event_to_room_id = {row[0]: row[5] for row in rows}
event_to_types = {row[0]: (row[1], row[2]) for row in rows}
# Calculate the new last position we've processed up to.
new_last_depth = rows[-1][3] if rows else last_depth # type: int
new_last_stream = rows[-1][4] if rows else last_stream # type: int
new_last_room_id = rows[-1][5] if rows else "" # type: str
# Map from room_id to last depth/stream_ordering processed for the room,
# excluding the last room (which we're likely still processing). We also
# need to include the room passed in if it's not included in the result
# set (as we then know we've processed all events in said room).
#
# This is the set of rooms that we can now safely flip the
# `has_auth_chain_index` bit for.
finished_rooms = {
row[5]: (row[3], row[4]) for row in rows if row[5] != new_last_room_id
}
if last_room_id not in finished_rooms and last_room_id != new_last_room_id:
finished_rooms[last_room_id] = (last_depth, last_stream)
count = len(rows)
# We also need to fetch the auth events for them.
auth_events = self.db_pool.simple_select_many_txn(
txn,
table="event_auth",
column="event_id",
iterable=event_to_room_id,
keyvalues={},
retcols=("event_id", "auth_id"),
)
event_to_auth_chain = {} # type: Dict[str, List[str]]
for row in auth_events:
event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
# Calculate and persist the chain cover index for this set of events.
#
# Annoyingly we need to gut wrench into the persit event store so that
# we can reuse the function to calculate the chain cover for rooms.
PersistEventsStore._add_chain_cover_index(
txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
)
return _CalculateChainCover(
room_id=new_last_room_id,
depth=new_last_depth,
stream=new_last_stream,
processed_count=count,
finished_room_map=finished_rooms,
)

View file

@ -96,9 +96,7 @@ class EventsWorkerStore(SQLBaseStore):
db=database,
stream_name="events",
instance_name=hs.get_instance_name(),
table="events",
instance_column="instance_name",
id_column="stream_ordering",
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_stream_seq",
writers=hs.config.worker.writers.events,
)
@ -107,9 +105,7 @@ class EventsWorkerStore(SQLBaseStore):
db=database,
stream_name="backfill",
instance_name=hs.get_instance_name(),
table="events",
instance_column="instance_name",
id_column="stream_ordering",
tables=[("events", "instance_name", "stream_ordering")],
sequence_name="events_backfill_stream_seq",
positive=False,
writers=hs.config.worker.writers.events,

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -169,7 +170,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
async def get_local_media_before(
self, before_ts: int, size_gt: int, keep_profiles: bool,
) -> Optional[List[str]]:
) -> List[str]:
# to find files that have never been accessed (last_access_ts IS NULL)
# compare with `created_ts`
@ -416,7 +417,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
" WHERE media_origin = ? AND media_id = ?"
)
txn.executemany(
txn.execute_batch(
sql,
(
(time_ms, media_origin, media_id)
@ -429,7 +430,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
" WHERE media_id = ?"
)
txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))
return await self.db_pool.runInteraction(
"update_cached_last_access_time", update_cache_txn
@ -556,7 +557,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
def _delete_url_cache_txn(txn):
txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
return await self.db_pool.runInteraction(
"delete_url_cache", _delete_url_cache_txn
@ -585,11 +586,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
def _delete_url_cache_media_txn(txn):
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
txn.executemany(sql, [(media_id,) for media_id in media_ids])
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
return await self.db_pool.runInteraction(
"delete_url_cache_media", _delete_url_cache_media_txn

View file

@ -82,7 +82,7 @@ class ProfileWorkerStore(SQLBaseStore):
)
async def set_profile_avatar_url(
self, user_localpart: str, new_avatar_url: str
self, user_localpart: str, new_avatar_url: Optional[str]
) -> None:
await self.db_pool.simple_update_one(
table="profiles",

View file

@ -172,7 +172,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
)
# Update backward extremeties
txn.executemany(
txn.execute_batch(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems],

View file

@ -17,14 +17,13 @@
import logging
from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, Optional, Tuple
from canonicaljson import encode_canonical_json
from synapse.push import PusherConfig, ThrottleParams
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
if TYPE_CHECKING:
@ -315,7 +314,7 @@ class PusherStore(PusherWorkerStore):
"device_display_name": device_display_name,
"ts": pushkey_ts,
"lang": lang,
"data": bytearray(encode_canonical_json(data)),
"data": json_encoder.encode(data),
"last_stream_ordering": last_stream_ordering,
"profile_tag": profile_tag,
"id": stream_id,

View file

@ -14,15 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
from typing import Any, Dict, List, Optional, Tuple
from twisted.internet import defer
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
@ -31,28 +33,56 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
logger = logging.getLogger(__name__)
# The ABCMeta metaclass ensures that it cannot be instantiated without
# the abstract methods being implemented.
class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
"""This is an abstract base class where subclasses must implement
`get_max_receipt_stream_id` which can be called in the initializer.
"""
class ReceiptsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
self._instance_name = hs.get_instance_name()
if isinstance(database.engine, PostgresEngine):
self._can_write_to_receipts = (
self._instance_name in hs.config.worker.writers.receipts
)
self._receipts_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
stream_name="receipts",
instance_name=self._instance_name,
tables=[("receipts_linearized", "instance_name", "stream_id")],
sequence_name="receipts_sequence",
writers=hs.config.worker.writers.receipts,
)
else:
self._can_write_to_receipts = True
# We shouldn't be running in worker mode with SQLite, but its useful
# to support it for unit tests.
#
# If this process is the writer than we need to use
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
# updated over replication. (Multiple writers are not supported for
# SQLite).
if hs.get_instance_name() in hs.config.worker.writers.receipts:
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
)
else:
self._receipts_id_gen = SlavedIdTracker(
db_conn, "receipts_linearized", "stream_id"
)
super().__init__(database, db_conn, hs)
self._receipts_stream_cache = StreamChangeCache(
"ReceiptsRoomChangeCache", self.get_max_receipt_stream_id()
)
@abc.abstractmethod
def get_max_receipt_stream_id(self):
"""Get the current max stream ID for receipts stream
Returns:
int
"""
raise NotImplementedError()
return self._receipts_id_gen.get_current_token()
@cached()
async def get_users_with_read_receipts_in_room(self, room_id):
@ -428,19 +458,25 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
class ReceiptsStore(ReceiptsWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
# We instantiate this first as the ReceiptsWorkerStore constructor
# needs to be able to call get_max_receipt_stream_id
self._receipts_id_gen = StreamIdGenerator(
db_conn, "receipts_linearized", "stream_id"
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
self.get_receipts_for_room.invalidate((room_id, receipt_type))
super().__init__(database, db_conn, hs)
def process_replication_rows(self, stream_name, instance_name, token, rows):
if stream_name == ReceiptsStream.NAME:
self._receipts_id_gen.advance(instance_name, token)
for row in rows:
self.invalidate_caches_for_receipt(
row.room_id, row.receipt_type, row.user_id
)
self._receipts_stream_cache.entity_has_changed(row.room_id, token)
def get_max_receipt_stream_id(self):
return self._receipts_id_gen.get_current_token()
return super().process_replication_rows(stream_name, instance_name, token, rows)
def insert_linearized_receipt_txn(
self, txn, room_id, receipt_type, user_id, event_id, data, stream_id
@ -452,6 +488,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
otherwise, the rx timestamp of the event that the RR corresponds to
(or 0 if the event is unknown)
"""
assert self._can_write_to_receipts
res = self.db_pool.simple_select_one_txn(
txn,
table="events",
@ -483,28 +521,14 @@ class ReceiptsStore(ReceiptsWorkerStore):
)
return None
txn.call_after(self.get_receipts_for_room.invalidate, (room_id, receipt_type))
txn.call_after(
self._invalidate_get_users_with_receipts_in_room,
room_id,
receipt_type,
user_id,
)
txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(
self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
)
txn.call_after(
self._receipts_stream_cache.entity_has_changed, room_id, stream_id
)
txn.call_after(
self.get_last_receipt_event_id_for_user.invalidate,
(user_id, room_id, receipt_type),
)
self.db_pool.simple_upsert_txn(
txn,
table="receipts_linearized",
@ -543,6 +567,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
Automatically does conversion between linearized and graph
representations.
"""
assert self._can_write_to_receipts
if not event_ids:
return None
@ -607,6 +633,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
async def insert_graph_receipt(
self, room_id, receipt_type, user_id, event_ids, data
):
assert self._can_write_to_receipts
return await self.db_pool.runInteraction(
"insert_graph_receipt",
self.insert_graph_receipt_txn,
@ -620,6 +648,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_graph_receipt_txn(
self, txn, room_id, receipt_type, user_id, event_ids, data
):
assert self._can_write_to_receipts
txn.call_after(self.get_receipts_for_room.invalidate, (room_id, receipt_type))
txn.call_after(
self._invalidate_get_users_with_receipts_in_room,
@ -653,3 +683,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
"data": json_encoder.encode(data),
},
)
class ReceiptsStore(ReceiptsWorkerStore):
pass

View file

@ -1104,7 +1104,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
FROM user_threepids
"""
txn.executemany(sql, [(id_server,) for id_server in id_servers])
txn.execute_batch(sql, [(id_server,) for id_server in id_servers])
if id_servers:
await self.db_pool.runInteraction(

View file

@ -16,7 +16,6 @@
import collections
import logging
import re
from abc import abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
@ -30,6 +29,7 @@ from synapse.storage.databases.main.search import SearchStore
from synapse.types import JsonDict, ThirdPartyInstanceID
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.stringutils import MXC_REGEX
logger = logging.getLogger(__name__)
@ -84,7 +84,7 @@ class RoomWorkerStore(SQLBaseStore):
return await self.db_pool.simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
retcols=("room_id", "is_public", "creator", "has_auth_chain_index"),
desc="get_room",
allow_none=True,
)
@ -660,8 +660,6 @@ class RoomWorkerStore(SQLBaseStore):
The local and remote media as a lists of tuples where the key is
the hostname and the value is the media ID.
"""
mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (room_id, event_id)
@ -688,7 +686,7 @@ class RoomWorkerStore(SQLBaseStore):
for url in (content_url, thumbnail_url):
if not url:
continue
matches = mxc_re.match(url)
matches = MXC_REGEX.match(url)
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
@ -1166,6 +1164,37 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
# It's overridden by RoomStore for the synapse master.
raise NotImplementedError()
async def has_auth_chain_index(self, room_id: str) -> bool:
"""Check if the room has (or can have) a chain cover index.
Defaults to True if we don't have an entry in `rooms` table nor any
events for the room.
"""
has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
table="rooms",
keyvalues={"room_id": room_id},
retcol="has_auth_chain_index",
desc="has_auth_chain_index",
allow_none=True,
)
if has_auth_chain_index:
return True
# It's possible that we already have events for the room in our DB
# without a corresponding room entry. If we do then we don't want to
# mark the room as having an auth chain cover index.
max_ordering = await self.db_pool.simple_select_one_onecol(
table="events",
keyvalues={"room_id": room_id},
retcol="MAX(stream_ordering)",
allow_none=True,
desc="upsert_room_on_join",
)
return max_ordering is None
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: DatabasePool, db_conn, hs):
@ -1179,12 +1208,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
Called when we join a room over federation, and overwrites any room version
currently in the table.
"""
# It's possible that we already have events for the room in our DB
# without a corresponding room entry. If we do then we don't want to
# mark the room as having an auth chain cover index.
has_auth_chain_index = await self.has_auth_chain_index(room_id)
await self.db_pool.simple_upsert(
desc="upsert_room_on_join",
table="rooms",
keyvalues={"room_id": room_id},
values={"room_version": room_version.identifier},
insertion_values={"is_public": False, "creator": ""},
insertion_values={
"is_public": False,
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.
lock=False,
@ -1219,6 +1257,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
"has_auth_chain_index": True,
},
)
if is_public:
@ -1247,6 +1286,11 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
When we receive an invite or any other event over federation that may relate to a room
we are not in, store the version of the room if we don't already know the room version.
"""
# It's possible that we already have events for the room in our DB
# without a corresponding room entry. If we do then we don't want to
# mark the room as having an auth chain cover index.
has_auth_chain_index = await self.has_auth_chain_index(room_id)
await self.db_pool.simple_upsert(
desc="maybe_store_room_on_outlier_membership",
table="rooms",
@ -1256,6 +1300,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"room_version": room_version.identifier,
"is_public": False,
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
# rooms has a unique constraint on room_id, so no need to lock when doing an
# emulated upsert.

View file

@ -873,8 +873,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
"max_stream_id_exclusive", self._stream_order_on_start + 1
)
INSERT_CLUMP_SIZE = 1000
def add_membership_profile_txn(txn):
sql = """
SELECT stream_ordering, event_id, events.room_id, event_json.json
@ -915,9 +913,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
UPDATE room_memberships SET display_name = ?, avatar_url = ?
WHERE event_id = ? AND room_id = ?
"""
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
clump = to_update[index : index + INSERT_CLUMP_SIZE]
txn.executemany(to_update_sql, clump)
txn.execute_batch(to_update_sql, to_update)
progress = {
"target_min_stream_id_inclusive": target_min_stream_id,

View file

@ -0,0 +1,16 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE access_tokens DROP COLUMN last_used;

View file

@ -0,0 +1,62 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Dropping last_used column from access_tokens table.
CREATE TABLE access_tokens2 (
id BIGINT PRIMARY KEY,
user_id TEXT NOT NULL,
device_id TEXT,
token TEXT NOT NULL,
valid_until_ms BIGINT,
puppets_user_id TEXT,
last_validated BIGINT,
UNIQUE(token)
);
INSERT INTO access_tokens2(id, user_id, device_id, token)
SELECT id, user_id, device_id, token FROM access_tokens;
DROP TABLE access_tokens;
ALTER TABLE access_tokens2 RENAME TO access_tokens;
CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id);
-- Re-adding foreign key reference in event_txn_id table
CREATE TABLE event_txn_id2 (
event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
token_id BIGINT NOT NULL,
txn_id TEXT NOT NULL,
inserted_ts BIGINT NOT NULL,
FOREIGN KEY (event_id)
REFERENCES events (event_id) ON DELETE CASCADE,
FOREIGN KEY (token_id)
REFERENCES access_tokens (id) ON DELETE CASCADE
);
INSERT INTO event_txn_id2(event_id, room_id, user_id, token_id, txn_id, inserted_ts)
SELECT event_id, room_id, user_id, token_id, txn_id, inserted_ts FROM event_txn_id;
DROP TABLE event_txn_id;
ALTER TABLE event_txn_id2 RENAME TO event_txn_id;
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);

View file

@ -0,0 +1,17 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(5828, 'rejected_events_metadata', '{}');

View file

@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
# { "ignored_users": "@someone:example.org": {} }
ignored_users = content.get("ignored_users", {})
if isinstance(ignored_users, dict) and ignored_users:
cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])
# Add indexes after inserting data for efficiency.
logger.info("Adding constraints to ignored_users table")

View file

@ -0,0 +1,52 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- See docs/auth_chain_difference_algorithm.md
CREATE TABLE event_auth_chains (
event_id TEXT PRIMARY KEY,
chain_id BIGINT NOT NULL,
sequence_number BIGINT NOT NULL
);
CREATE UNIQUE INDEX event_auth_chains_c_seq_index ON event_auth_chains (chain_id, sequence_number);
CREATE TABLE event_auth_chain_links (
origin_chain_id BIGINT NOT NULL,
origin_sequence_number BIGINT NOT NULL,
target_chain_id BIGINT NOT NULL,
target_sequence_number BIGINT NOT NULL
);
CREATE INDEX event_auth_chain_links_idx ON event_auth_chain_links (origin_chain_id, target_chain_id);
-- Events that we have persisted but not calculated auth chains for,
-- e.g. out of band memberships (where we don't have the auth chain)
CREATE TABLE event_auth_chain_to_calculate (
event_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
type TEXT NOT NULL,
state_key TEXT NOT NULL
);
CREATE INDEX event_auth_chain_to_calculate_rm_id ON event_auth_chain_to_calculate(room_id);
-- Whether we've calculated the above index for a room.
ALTER TABLE rooms ADD COLUMN has_auth_chain_index BOOLEAN;

View file

@ -0,0 +1,16 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE SEQUENCE IF NOT EXISTS event_auth_chain_id;

View file

@ -0,0 +1,17 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This is no longer used and was only kept until we bumped the schema version.
DROP TABLE IF EXISTS account_data_max_stream_id;

View file

@ -0,0 +1,17 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- This is no longer used and was only kept until we bumped the schema version.
DROP TABLE IF EXISTS cache_invalidation_stream;

View file

@ -0,0 +1,17 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
(5906, 'chain_cover', '{}', 'rejected_events_metadata');

View file

@ -0,0 +1,20 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE room_account_data ADD COLUMN instance_name TEXT;
ALTER TABLE room_tags_revisions ADD COLUMN instance_name TEXT;
ALTER TABLE account_data ADD COLUMN instance_name TEXT;
ALTER TABLE receipts_linearized ADD COLUMN instance_name TEXT;

View file

@ -0,0 +1,32 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE SEQUENCE IF NOT EXISTS account_data_sequence;
-- We need to take the max across all the account_data tables as they share the
-- ID generator
SELECT setval('account_data_sequence', (
SELECT GREATEST(
(SELECT COALESCE(MAX(stream_id), 1) FROM room_account_data),
(SELECT COALESCE(MAX(stream_id), 1) FROM room_tags_revisions),
(SELECT COALESCE(MAX(stream_id), 1) FROM account_data)
)
));
CREATE SEQUENCE IF NOT EXISTS receipts_sequence;
SELECT setval('receipts_sequence', (
SELECT COALESCE(MAX(stream_id), 1) FROM receipts_linearized
));

View file

@ -0,0 +1,18 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- We incorrectly populated these, so we delete them and let the
-- MultiWriterIdGenerator repopulate it.
DELETE FROM stream_positions WHERE stream_name = 'receipts' OR stream_name = 'account_data';

View file

@ -63,7 +63,7 @@ class SearchWorkerStore(SQLBaseStore):
for entry in entries
)
txn.executemany(sql, args)
txn.execute_batch(sql, args)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = (
@ -75,7 +75,7 @@ class SearchWorkerStore(SQLBaseStore):
for entry in entries
)
txn.executemany(sql, args)
txn.execute_batch(sql, args)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")

View file

@ -183,8 +183,6 @@ class TagsWorkerStore(AccountDataWorkerStore):
)
return {row["tag"]: db_to_json(row["content"]) for row in rows}
class TagsStore(TagsWorkerStore):
async def add_tag_to_room(
self, user_id: str, room_id: str, tag: str, content: JsonDict
) -> int:
@ -199,6 +197,8 @@ class TagsStore(TagsWorkerStore):
Returns:
The next account data ID.
"""
assert self._can_write_to_account_data
content_json = json_encoder.encode(content)
def add_tag_txn(txn, next_id):
@ -223,6 +223,7 @@ class TagsStore(TagsWorkerStore):
Returns:
The next account data ID.
"""
assert self._can_write_to_account_data
def remove_tag_txn(txn, next_id):
sql = (
@ -250,21 +251,12 @@ class TagsStore(TagsWorkerStore):
room_id: The ID of the room.
next_id: The the revision to advance to.
"""
assert self._can_write_to_account_data
txn.call_after(
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)
# Note: This is only here for backwards compat to allow admins to
# roll back to a previous Synapse version. Next time we update the
# database version we can remove this table.
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))
update_sql = (
"UPDATE room_tags_revisions"
" SET stream_id = ?"
@ -288,3 +280,7 @@ class TagsStore(TagsWorkerStore):
# which stream_id ends up in the table, as long as it is higher
# than the id that the client has.
pass
class TagsStore(TagsWorkerStore):
pass

View file

@ -464,19 +464,17 @@ class TransactionStore(TransactionWorkerStore):
txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
) -> List[str]:
q = """
SELECT destination FROM destinations
WHERE destination IN (
SELECT destination FROM destination_rooms
WHERE destination_rooms.stream_ordering >
destinations.last_successful_stream_ordering
)
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
)
ORDER BY destination
LIMIT 25
SELECT DISTINCT destination FROM destinations
INNER JOIN destination_rooms USING (destination)
WHERE
stream_ordering > last_successful_stream_ordering
AND destination > ?
AND (
retry_last_ts IS NULL OR
retry_last_ts + retry_interval < ?
)
ORDER BY destination
LIMIT 25
"""
txn.execute(
q,