Merge remote-tracking branch 'upstream/release-v1.41'

This commit is contained in:
Tulir Asokan 2021-08-18 18:12:12 +03:00
commit f285b4200c
237 changed files with 9601 additions and 6005 deletions

View file

@ -941,13 +941,13 @@ class DatabasePool:
`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:
* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
* we somehow know that we are the only thread which will be updating
this table.
1. there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
2. we somehow know that we are the only thread which will be updating
this table.
As an additional note, this parameter only matters for old SQLite versions
because we will use native upserts otherwise.
Args:
table: The table to upsert into

View file

@ -127,9 +127,6 @@ class DataStore(
self._clock = hs.get_clock()
self.database_engine = database.engine
self._public_room_id_gen = StreamIdGenerator(
db_conn, "public_room_list_stream", "stream_id"
)
self._device_list_id_gen = StreamIdGenerator(
db_conn,
"device_lists_stream",
@ -170,6 +167,7 @@ class DataStore(
sequence_name="cache_invalidation_stream_seq",
writers=[],
)
else:
self._cache_id_gen = None

View file

@ -755,81 +755,145 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
"""
@trace
def _claim_e2e_one_time_keys(txn):
sql = (
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" LIMIT 1"
)
fallback_sql = (
"SELECT key_id, key_json, used FROM e2e_fallback_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" LIMIT 1"
)
result = {}
delete = []
used_fallbacks = []
for user_id, device_id, algorithm in query_list:
user_result = result.setdefault(user_id, {})
device_result = user_result.setdefault(device_id, {})
txn.execute(sql, (user_id, device_id, algorithm))
otk_row = txn.fetchone()
if otk_row is not None:
key_id, key_json = otk_row
device_result[algorithm + ":" + key_id] = key_json
delete.append((user_id, device_id, algorithm, key_id))
else:
# no one-time key available, so see if there's a fallback
# key
txn.execute(fallback_sql, (user_id, device_id, algorithm))
fallback_row = txn.fetchone()
if fallback_row is not None:
key_id, key_json, used = fallback_row
device_result[algorithm + ":" + key_id] = key_json
if not used:
used_fallbacks.append(
(user_id, device_id, algorithm, key_id)
)
def _claim_e2e_one_time_key_simple(
txn, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
"""Claim OTK for device for DBs that don't support RETURNING.
# drop any one-time keys that were claimed
sql = (
"DELETE FROM e2e_one_time_keys_json"
" WHERE user_id = ? AND device_id = ? AND algorithm = ?"
" AND key_id = ?"
Returns:
A tuple of key name (algorithm + key ID) and key JSON, if an
OTK was found.
"""
sql = """
SELECT key_id, key_json FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
"""
txn.execute(sql, (user_id, device_id, algorithm))
otk_row = txn.fetchone()
if otk_row is None:
return None
key_id, key_json = otk_row
self.db_pool.simple_delete_one_txn(
txn,
table="e2e_one_time_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
},
)
for user_id, device_id, algorithm, key_id in delete:
log_kv(
{
"message": "Executing claim e2e_one_time_keys transaction on database."
}
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
return f"{algorithm}:{key_id}", key_json
@trace
def _claim_e2e_one_time_key_returning(
txn, user_id: str, device_id: str, algorithm: str
) -> Optional[Tuple[str, str]]:
"""Claim OTK for device for DBs that support RETURNING.
Returns:
A tuple of key name (algorithm + key ID) and key JSON, if an
OTK was found.
"""
# We can use RETURNING to do the fetch and DELETE in once step.
sql = """
DELETE FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
AND key_id IN (
SELECT key_id FROM e2e_one_time_keys_json
WHERE user_id = ? AND device_id = ? AND algorithm = ?
LIMIT 1
)
RETURNING key_id, key_json
"""
txn.execute(
sql, (user_id, device_id, algorithm, user_id, device_id, algorithm)
)
otk_row = txn.fetchone()
if otk_row is None:
return None
key_id, key_json = otk_row
return f"{algorithm}:{key_id}", key_json
results = {}
for user_id, device_id, algorithm in query_list:
if self.database_engine.supports_returning:
# If we support RETURNING clause we can use a single query that
# allows us to use autocommit mode.
_claim_e2e_one_time_key = _claim_e2e_one_time_key_returning
db_autocommit = True
else:
_claim_e2e_one_time_key = _claim_e2e_one_time_key_simple
db_autocommit = False
row = await self.db_pool.runInteraction(
"claim_e2e_one_time_keys",
_claim_e2e_one_time_key,
user_id,
device_id,
algorithm,
db_autocommit=db_autocommit,
)
if row:
device_results = results.setdefault(user_id, {}).setdefault(
device_id, {}
)
txn.execute(sql, (user_id, device_id, algorithm, key_id))
log_kv({"message": "finished executing and invalidating cache"})
self._invalidate_cache_and_stream(
txn, self.count_e2e_one_time_keys, (user_id, device_id)
)
# mark fallback keys as used
for user_id, device_id, algorithm, key_id in used_fallbacks:
self.db_pool.simple_update_txn(
txn,
"e2e_fallback_keys_json",
{
device_results[row[0]] = row[1]
continue
# No one-time key available, so see if there's a fallback
# key
row = await self.db_pool.simple_select_one(
table="e2e_fallback_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
},
retcols=("key_id", "key_json", "used"),
desc="_get_fallback_key",
allow_none=True,
)
if row is None:
continue
key_id = row["key_id"]
key_json = row["key_json"]
used = row["used"]
# Mark fallback key as used if not already.
if not used:
await self.db_pool.simple_update_one(
table="e2e_fallback_keys_json",
keyvalues={
"user_id": user_id,
"device_id": device_id,
"algorithm": algorithm,
"key_id": key_id,
},
{"used": True},
updatevalues={"used": True},
desc="_get_fallback_key_set_used",
)
self._invalidate_cache_and_stream(
txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
await self.invalidate_cache_and_stream(
"get_e2e_unused_fallback_key_types", (user_id, device_id)
)
return result
device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
device_results[f"{algorithm}:{key_id}"] = key_json
return await self.db_pool.runInteraction(
"claim_e2e_one_time_keys", _claim_e2e_one_time_keys
)
return results
class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):

View file

@ -671,28 +671,98 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}
async def get_oldest_events_with_depth_in_room(self, room_id):
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
"""Gets the oldest events(backwards extremities) in the room along with the
aproximate depth.
We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
event extremeties. If the current depth is close to the depth of given
oldest event, we can trigger a backfill.
Args:
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
"""
def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
# Assemble a dictionary with event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
# some other event referenced them by prev_event and aren't peristed
# in our database yet (meaning we don't know their depth
# specifically). So we need to look for the aproximate depth from
# the events connected to the current backwards extremeties.
sql = """
SELECT b.event_id, MAX(e.depth) FROM events as e
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
* to a backward extremity in the next join.
*/
INNER JOIN event_edges as g
ON g.event_id = e.event_id
/**
* We find the "oldest" events in the room by looking for
* events connected to backwards extremeties (oldest events
* in the room that we know of so far).
*/
INNER JOIN event_backward_extremities as b
ON g.prev_event_id = b.event_id
WHERE b.room_id = ? AND g.is_state is ?
GROUP BY b.event_id
"""
txn.execute(sql, (room_id, False))
return dict(txn)
return await self.db_pool.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
"get_oldest_event_ids_with_depth_in_room",
get_oldest_event_ids_with_depth_in_room_txn,
room_id,
)
def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
async def get_insertion_event_backwards_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.
We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
insertion event. If the current depth is close to the depth of given
insertion event, we can trigger a backfill.
Args:
room_id: Room where we want to find the oldest events
Returns:
Map from event_id to depth
"""
def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN insertion_event_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
GROUP BY b.event_id
"""
txn.execute(sql, (room_id,))
return dict(txn)
return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
room_id,
)
txn.execute(sql, (room_id, False))
return dict(txn)
async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
if row[1] not in event_results:
queue.put((-row[0], row[1]))
# Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
keyvalues={"event_id": event_id},
values={
"event_id": event_id,
"room_id": room_id,
},
insertion_values={},
desc="insert_insertion_extremity",
lock=False,
)
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:

View file

@ -1845,6 +1845,18 @@ class PersistEventsStore:
},
)
# When we receive an event with a `chunk_id` referencing the
# `next_chunk_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_chunk_id = ?
)
"""
txn.execute(sql, (chunk_id,))
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
@ -2101,15 +2113,17 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)
@ -2123,6 +2137,8 @@ class PersistEventsStore:
],
)
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"

View file

@ -14,7 +14,6 @@
import logging
import threading
from collections import namedtuple
from typing import (
Collection,
Container,
@ -27,6 +26,7 @@ from typing import (
overload,
)
import attr
from constantly import NamedConstant, Names
from typing_extensions import Literal
@ -42,7 +42,11 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
from synapse.logging.context import (
PreserveLoggingContext,
current_context,
make_deferred_yieldable,
)
from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
@ -56,6 +60,8 @@ from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@ -74,7 +80,10 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events
EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
@attr.s(slots=True, auto_attribs=True)
class _EventCacheEntry:
event: EventBase
redacted_event: Optional[EventBase]
class EventRedactBehaviour(Names):
@ -161,6 +170,13 @@ class EventsWorkerStore(SQLBaseStore):
max_size=hs.config.caches.event_cache_size,
)
# Map from event ID to a deferred that will result in a map from event
# ID to cache entry. Note that the returned dict may not have the
# requested event in it if the event isn't in the DB.
self._current_event_fetches: Dict[
str, ObservableDeferred[Dict[str, _EventCacheEntry]]
] = {}
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
@ -476,7 +492,9 @@ class EventsWorkerStore(SQLBaseStore):
return events
async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
async def _get_events_from_cache_or_db(
self, event_ids: Iterable[str], allow_rejected: bool = False
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the cache or the database.
If events are pulled from the database, they will be cached for future lookups.
@ -485,53 +503,107 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
event_ids: The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events. If False,
allow_rejected: Whether to include rejected events. If False,
rejected events are omitted from the response.
Returns:
Dict[str, _EventCacheEntry]:
map from event id to result
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
event_ids,
)
missing_events_ids = [e for e in event_ids if e not in event_entry_map]
missing_events_ids = {e for e in event_ids if e not in event_entry_map}
# We now look up if we're already fetching some of the events in the DB,
# if so we wait for those lookups to finish instead of pulling the same
# events out of the DB multiple times.
already_fetching: Dict[str, defer.Deferred] = {}
for event_id in missing_events_ids:
deferred = self._current_event_fetches.get(event_id)
if deferred is not None:
# We're already pulling the event out of the DB. Add the deferred
# to the collection of deferreds to wait on.
already_fetching[event_id] = deferred.observe()
missing_events_ids.difference_update(already_fetching)
if missing_events_ids:
log_ctx = current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Add entries to `self._current_event_fetches` for each event we're
# going to pull from the DB. We use a single deferred that resolves
# to all the events we pulled from the DB (this will result in this
# function returning more events than requested, but that can happen
# already due to `_get_events_from_db`).
fetching_deferred: ObservableDeferred[
Dict[str, _EventCacheEntry]
] = ObservableDeferred(defer.Deferred())
for event_id in missing_events_ids:
self._current_event_fetches[event_id] = fetching_deferred
# Note that _get_events_from_db is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
missing_events = await self._get_events_from_db(
missing_events_ids, allow_rejected=allow_rejected
)
try:
missing_events = await self._get_events_from_db(
missing_events_ids,
)
event_entry_map.update(missing_events)
event_entry_map.update(missing_events)
except Exception as e:
with PreserveLoggingContext():
fetching_deferred.errback(e)
raise e
finally:
# Ensure that we mark these events as no longer being fetched.
for event_id in missing_events_ids:
self._current_event_fetches.pop(event_id, None)
with PreserveLoggingContext():
fetching_deferred.callback(missing_events)
if already_fetching:
# Wait for the other event requests to finish and add their results
# to ours.
results = await make_deferred_yieldable(
defer.gatherResults(
already_fetching.values(),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
for result in results:
event_entry_map.update(result)
if not allow_rejected:
event_entry_map = {
event_id: entry
for event_id, entry in event_entry_map.items()
if not entry.event.rejected_reason
}
return event_entry_map
def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
"""Fetch events from the caches
def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, _EventCacheEntry]:
"""Fetch events from the caches.
May return rejected events.
Args:
events (Iterable[str]): list of event_ids to fetch
allow_rejected (bool): Whether to return events that were rejected
update_metrics (bool): Whether to update the cache hit ratio metrics
Returns:
dict of event_id -> _EventCacheEntry for each event_id in cache. If
allow_rejected is `False` then there will still be an entry but it
will be `None`
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = {}
@ -542,10 +614,7 @@ class EventsWorkerStore(SQLBaseStore):
if not ret:
continue
if allow_rejected or not ret.event.rejected_reason:
event_map[event_id] = ret
else:
event_map[event_id] = None
event_map[event_id] = ret
return event_map
@ -672,23 +741,23 @@ class EventsWorkerStore(SQLBaseStore):
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list, e)
async def _get_events_from_db(self, event_ids, allow_rejected=False):
async def _get_events_from_db(
self, event_ids: Iterable[str]
) -> Dict[str, _EventCacheEntry]:
"""Fetch a bunch of events from the database.
May return rejected events.
Returned events will be added to the cache for future lookups.
Unknown events are omitted from the response.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.
event_ids: The event_ids of the events to fetch
Returns:
Dict[str, _EventCacheEntry]:
map from event id to result. May return extra events which
weren't asked for.
map from event id to result. May return extra events which
weren't asked for.
"""
fetched_events = {}
events_to_fetch = event_ids
@ -717,9 +786,6 @@ class EventsWorkerStore(SQLBaseStore):
rejected_reason = row["rejected_reason"]
if not allow_rejected and rejected_reason:
continue
# If the event or metadata cannot be parsed, log the error and act
# as if the event is unknown.
try:

View file

@ -29,7 +29,7 @@ from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.types import Connection, Cursor
from synapse.storage.util.id_generators import IdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import UserID
from synapse.types import UserID, UserInfo
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
@ -146,6 +146,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Deprecated: use get_userinfo_by_id instead"""
return await self.db_pool.simple_select_one(
table="users",
keyvalues={"name": user_id},
@ -166,6 +167,33 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
desc="get_user_by_id",
)
async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Get a UserInfo object for a user by user ID.
Note! Currently uses the cache of `get_user_by_id`. Once that deprecated method is removed,
this method should be cached.
Args:
user_id: The user to fetch user info for.
Returns:
`UserInfo` object if user found, otherwise `None`.
"""
user_data = await self.get_user_by_id(user_id)
if not user_data:
return None
return UserInfo(
appservice_id=user_data["appservice_id"],
consent_server_notice_sent=user_data["consent_server_notice_sent"],
consent_version=user_data["consent_version"],
creation_ts=user_data["creation_ts"],
is_admin=bool(user_data["admin"]),
is_deactivated=bool(user_data["deactivated"]),
is_guest=bool(user_data["is_guest"]),
is_shadow_banned=bool(user_data["shadow_banned"]),
user_id=UserID.from_string(user_data["name"]),
user_type=user_data["user_type"],
)
async def is_trial_user(self, user_id: str) -> bool:
"""Checks if user is in the "trial" period, i.e. within the first
N days of registration defined by `mau_trial_days` config
@ -571,6 +599,28 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
desc="record_user_external_id",
)
async def remove_user_external_id(
self, auth_provider: str, external_id: str, user_id: str
) -> None:
"""Remove a mapping from an external user id to a mxid
If the mapping is not found, this method does nothing.
Args:
auth_provider: identifier for the remote auth provider
external_id: id on that system
user_id: complete mxid that it is mapped to
"""
await self.db_pool.simple_delete(
table="user_external_ids",
keyvalues={
"auth_provider": auth_provider,
"external_id": external_id,
"user_id": user_id,
},
desc="remove_user_external_id",
)
async def get_user_by_external_id(
self, auth_provider: str, external_id: str
) -> Optional[str]:

View file

@ -73,6 +73,40 @@ class RoomWorkerStore(SQLBaseStore):
self.config = hs.config
async def store_room(
self,
room_id: str,
room_creator_user_id: str,
is_public: bool,
room_version: RoomVersion,
):
"""Stores a room.
Args:
room_id: The desired room ID, can be None.
room_creator_user_id: The user ID of the room creator.
is_public: True to indicate that this room should appear in
public room lists.
room_version: The version of the room
Raises:
StoreError if the room could not be stored.
"""
try:
await self.db_pool.simple_insert(
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
"has_auth_chain_index": True,
},
desc="store_room",
)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
async def get_room(self, room_id: str) -> dict:
"""Retrieve a room.
@ -890,55 +924,6 @@ class RoomWorkerStore(SQLBaseStore):
return total_media_quarantined
async def get_all_new_public_rooms(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
"""Get updates for public rooms replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
return [], current_id, False
def get_all_new_public_rooms(txn):
sql = """
SELECT stream_id, room_id, visibility, appservice_id, network_id
FROM public_room_list_stream
WHERE stream_id > ? AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
updates = [(row[0], row[1:]) for row in txn]
limited = False
upto_token = current_id
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True
return updates, upto_token, limited
return await self.db_pool.runInteraction(
"get_all_new_public_rooms", get_all_new_public_rooms
)
async def get_rooms_for_retention_period_in_range(
self, min_ms: Optional[int], max_ms: Optional[int], include_null: bool = False
) -> Dict[str, dict]:
@ -1391,57 +1376,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
async def store_room(
self,
room_id: str,
room_creator_user_id: str,
is_public: bool,
room_version: RoomVersion,
):
"""Stores a room.
Args:
room_id: The desired room ID, can be None.
room_creator_user_id: The user ID of the room creator.
is_public: True to indicate that this room should appear in
public room lists.
room_version: The version of the room
Raises:
StoreError if the room could not be stored.
"""
try:
def store_room_txn(txn, next_id):
self.db_pool.simple_insert_txn(
txn,
"rooms",
{
"room_id": room_id,
"creator": room_creator_user_id,
"is_public": is_public,
"room_version": room_version.identifier,
"has_auth_chain_index": True,
},
)
if is_public:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
},
)
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"store_room_txn", store_room_txn, next_id
)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
async def maybe_store_room_on_outlier_membership(
self, room_id: str, room_version: RoomVersion
):
@ -1470,49 +1404,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
lock=False,
)
async def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):
self.db_pool.simple_update_one_txn(
txn,
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
)
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
await self.db_pool.simple_update_one(
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
desc="set_room_is_public",
)
entries = self.db_pool.simple_select_list_txn(
txn,
table="public_room_list_stream",
keyvalues={
"room_id": room_id,
"appservice_id": None,
"network_id": None,
},
retcols=("stream_id", "visibility"),
)
entries.sort(key=lambda r: r["stream_id"])
add_to_stream = True
if entries:
add_to_stream = bool(entries[-1]["visibility"]) != is_public
if add_to_stream:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
"appservice_id": None,
"network_id": None,
},
)
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
self.hs.get_notifier().on_new_replication_data()
async def set_room_is_public_appservice(
@ -1533,68 +1432,33 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
list.
"""
def set_room_is_public_appservice_txn(txn, next_id):
if is_public:
try:
self.db_pool.simple_insert_txn(
txn,
table="appservice_room_list",
values={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
)
except self.database_engine.module.IntegrityError:
# We've already inserted, nothing to do.
return
else:
self.db_pool.simple_delete_txn(
txn,
table="appservice_room_list",
keyvalues={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
)
entries = self.db_pool.simple_select_list_txn(
txn,
table="public_room_list_stream",
if is_public:
await self.db_pool.simple_upsert(
table="appservice_room_list",
keyvalues={
"room_id": room_id,
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
retcols=("stream_id", "visibility"),
values={},
insertion_values={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
desc="set_room_is_public_appservice_true",
)
else:
await self.db_pool.simple_delete(
table="appservice_room_list",
keyvalues={
"appservice_id": appservice_id,
"network_id": network_id,
"room_id": room_id,
},
desc="set_room_is_public_appservice_false",
)
entries.sort(key=lambda r: r["stream_id"])
add_to_stream = True
if entries:
add_to_stream = bool(entries[-1]["visibility"]) != is_public
if add_to_stream:
self.db_pool.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
"appservice_id": appservice_id,
"network_id": network_id,
},
)
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
next_id,
)
self.hs.get_notifier().on_new_replication_data()
async def add_event_report(
@ -1787,9 +1651,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
"get_event_reports_paginate", _get_event_reports_paginate_txn
)
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
async def block_room(self, room_id: str, user_id: str) -> None:
"""Marks the room as blocked. Can be called multiple times.

View file

@ -629,14 +629,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
member_event_ids, allow_rejected=False, update_metrics=False
)
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
missing_member_event_ids = []
for event_id in member_event_ids:
ev_entry = event_map.get(event_id)
if ev_entry:
if ev_entry and not ev_entry.event.rejected_reason:
if ev_entry.event.membership == Membership.JOIN:
users_in_room[ev_entry.event.state_key] = ProfileInfo(
display_name=ev_entry.event.content.get("displayname", None),

View file

@ -1,4 +1,4 @@
# Synapse Database Schemas
This directory contains the schema files used to build Synapse databases. For more
information, see /docs/development/database_schema.md.
information, see https://matrix-org.github.io/synapse/develop/development/database_schema.html.

View file

@ -12,19 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
SCHEMA_VERSION = 61
SCHEMA_VERSION = 63
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
shape of the database schema (even if those requirements are backwards-compatible with
older versions of Synapse).
See `README.md <synapse/storage/schema/README.md>`_ for more information on how this
works.
See https://matrix-org.github.io/synapse/develop/development/database_schema.html
for more information on how this works.
Changes in SCHEMA_VERSION = 61:
- The `user_stats_historical` and `room_stats_historical` tables are not written and
are not read (previously, they were written but not read).
Changes in SCHEMA_VERSION = 63:
- The `public_room_list_stream` table is not written nor read to
(previously, it was written and read to, but not for any significant purpose).
https://github.com/matrix-org/synapse/pull/10565
"""

View file

@ -0,0 +1,24 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Add a table that keeps track of which "insertion" events need to be backfilled
CREATE TABLE IF NOT EXISTS insertion_event_extremities(
event_id TEXT NOT NULL,
room_id TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);