mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-12-15 22:23:53 -05:00
Merge branch 'develop' into babolivier/context_filters
This commit is contained in:
commit
9dc84b7989
189 changed files with 4485 additions and 1443 deletions
|
|
@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
|
|||
from synapse.storage.data_stores import DataStores
|
||||
from synapse.storage.data_stores.main import DataStore
|
||||
from synapse.storage.persist_events import EventsPersistenceStorage
|
||||
from synapse.storage.purge_events import PurgeEventsStorage
|
||||
from synapse.storage.state import StateGroupStorage
|
||||
|
||||
__all__ = ["DataStores", "DataStore"]
|
||||
|
|
@ -46,6 +47,7 @@ class Storage(object):
|
|||
self.main = stores.main
|
||||
|
||||
self.persistence = EventsPersistenceStorage(hs, stores)
|
||||
self.purge_events = PurgeEventsStorage(hs, stores)
|
||||
self.state = StateGroupStorage(hs, stores)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -361,14 +361,11 @@ class SQLBaseStore(object):
|
|||
expiration_ts,
|
||||
)
|
||||
|
||||
self._simple_insert_txn(
|
||||
self._simple_upsert_txn(
|
||||
txn,
|
||||
"account_validity",
|
||||
values={
|
||||
"user_id": user_id,
|
||||
"expiration_ts_ms": expiration_ts,
|
||||
"email_sent": False,
|
||||
},
|
||||
keyvalues={"user_id": user_id},
|
||||
values={"expiration_ts_ms": expiration_ts, "email_sent": False},
|
||||
)
|
||||
|
||||
def start_profiling(self):
|
||||
|
|
@ -412,16 +409,15 @@ class SQLBaseStore(object):
|
|||
i = 0
|
||||
N = 5
|
||||
while True:
|
||||
cursor = LoggingTransaction(
|
||||
conn.cursor(),
|
||||
name,
|
||||
self.database_engine,
|
||||
after_callbacks,
|
||||
exception_callbacks,
|
||||
)
|
||||
try:
|
||||
txn = conn.cursor()
|
||||
txn = LoggingTransaction(
|
||||
txn,
|
||||
name,
|
||||
self.database_engine,
|
||||
after_callbacks,
|
||||
exception_callbacks,
|
||||
)
|
||||
r = func(txn, *args, **kwargs)
|
||||
r = func(cursor, *args, **kwargs)
|
||||
conn.commit()
|
||||
return r
|
||||
except self.database_engine.module.OperationalError as e:
|
||||
|
|
@ -459,6 +455,40 @@ class SQLBaseStore(object):
|
|||
)
|
||||
continue
|
||||
raise
|
||||
finally:
|
||||
# we're either about to retry with a new cursor, or we're about to
|
||||
# release the connection. Once we release the connection, it could
|
||||
# get used for another query, which might do a conn.rollback().
|
||||
#
|
||||
# In the latter case, even though that probably wouldn't affect the
|
||||
# results of this transaction, python's sqlite will reset all
|
||||
# statements on the connection [1], which will make our cursor
|
||||
# invalid [2].
|
||||
#
|
||||
# In any case, continuing to read rows after commit()ing seems
|
||||
# dubious from the PoV of ACID transactional semantics
|
||||
# (sqlite explicitly says that once you commit, you may see rows
|
||||
# from subsequent updates.)
|
||||
#
|
||||
# In psycopg2, cursors are essentially a client-side fabrication -
|
||||
# all the data is transferred to the client side when the statement
|
||||
# finishes executing - so in theory we could go on streaming results
|
||||
# from the cursor, but attempting to do so would make us
|
||||
# incompatible with sqlite, so let's make sure we're not doing that
|
||||
# by closing the cursor.
|
||||
#
|
||||
# (*named* cursors in psycopg2 are different and are proper server-
|
||||
# side things, but (a) we don't use them and (b) they are implicitly
|
||||
# closed by ending the transaction anyway.)
|
||||
#
|
||||
# In short, if we haven't finished with the cursor yet, that's a
|
||||
# problem waiting to bite us.
|
||||
#
|
||||
# TL;DR: we're done with the cursor, so we can close it.
|
||||
#
|
||||
# [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
|
||||
# [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
|
||||
cursor.close()
|
||||
except Exception as e:
|
||||
logger.debug("[TXN FAIL] {%s} %s", name, e)
|
||||
raise
|
||||
|
|
@ -854,7 +884,7 @@ class SQLBaseStore(object):
|
|||
allvalues.update(values)
|
||||
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
|
||||
|
||||
sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
|
||||
sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
|
||||
table,
|
||||
", ".join(k for k in allvalues),
|
||||
", ".join("?" for _ in allvalues),
|
||||
|
|
|
|||
|
|
@ -184,14 +184,14 @@ class AccountDataWorkerStore(SQLBaseStore):
|
|||
current_id(int): The position to fetch up to.
|
||||
Returns:
|
||||
A deferred pair of lists of tuples of stream_id int, user_id string,
|
||||
room_id string, type string, and content string.
|
||||
room_id string, and type string.
|
||||
"""
|
||||
if last_room_id == current_id and last_global_id == current_id:
|
||||
return defer.succeed(([], []))
|
||||
|
||||
def get_updated_account_data_txn(txn):
|
||||
sql = (
|
||||
"SELECT stream_id, user_id, account_data_type, content"
|
||||
"SELECT stream_id, user_id, account_data_type"
|
||||
" FROM account_data WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC LIMIT ?"
|
||||
)
|
||||
|
|
@ -199,7 +199,7 @@ class AccountDataWorkerStore(SQLBaseStore):
|
|||
global_results = txn.fetchall()
|
||||
|
||||
sql = (
|
||||
"SELECT stream_id, user_id, room_id, account_data_type, content"
|
||||
"SELECT stream_id, user_id, room_id, account_data_type"
|
||||
" FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
|
||||
" ORDER BY stream_id ASC LIMIT ?"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
|
|||
def _add_messages_to_local_device_inbox_txn(
|
||||
self, txn, stream_id, messages_by_user_then_device
|
||||
):
|
||||
sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
|
||||
txn.execute(sql, (stream_id, stream_id))
|
||||
# Compatible method of performing an upsert
|
||||
sql = "SELECT stream_id FROM device_max_stream_id"
|
||||
|
||||
txn.execute(sql)
|
||||
rows = txn.fetchone()
|
||||
if rows:
|
||||
db_stream_id = rows[0]
|
||||
if db_stream_id < stream_id:
|
||||
# Insert the new stream_id
|
||||
sql = "UPDATE device_max_stream_id SET stream_id = ?"
|
||||
else:
|
||||
# No rows, perform an insert
|
||||
sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
|
||||
|
||||
txn.execute(sql, (stream_id,))
|
||||
|
||||
local_by_user_then_device = {}
|
||||
for user_id, messages_by_device in messages_by_user_then_device.items():
|
||||
|
|
@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
|
|||
devices = list(messages_by_device.keys())
|
||||
if len(devices) == 1 and devices[0] == "*":
|
||||
# Handle wildcard device_ids.
|
||||
sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
|
||||
sql = "SELECT device_id FROM devices WHERE user_id = ?"
|
||||
txn.execute(sql, (user_id,))
|
||||
message_json = json.dumps(messages_by_device["*"])
|
||||
for row in txn:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2017 New Vector Ltd
|
||||
# Copyright 2019 Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
|
@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
|
|||
|
||||
class EndToEndRoomKeyStore(SQLBaseStore):
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_room_key(self, user_id, version, room_id, session_id):
|
||||
"""Get the encrypted E2E room key for a given session from a given
|
||||
backup version of room_keys. We only store the 'best' room key for a given
|
||||
session at a given time, as determined by the handler.
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup we're querying
|
||||
version(str): the version ID of the backup for the set of keys we're querying
|
||||
room_id(str): the ID of the room whose keys we're querying.
|
||||
This is a bit redundant as it's implied by the session_id, but
|
||||
we include for consistency with the rest of the API.
|
||||
session_id(str): the session whose room_key we're querying.
|
||||
|
||||
Returns:
|
||||
A deferred dict giving the session_data and message metadata for
|
||||
this room key.
|
||||
"""
|
||||
|
||||
row = yield self._simple_select_one(
|
||||
table="e2e_room_keys",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
"version": version,
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
},
|
||||
retcols=(
|
||||
"first_message_index",
|
||||
"forwarded_count",
|
||||
"is_verified",
|
||||
"session_data",
|
||||
),
|
||||
desc="get_e2e_room_key",
|
||||
)
|
||||
|
||||
row["session_data"] = json.loads(row["session_data"])
|
||||
|
||||
return row
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
|
||||
"""Replaces or inserts the encrypted E2E room key for a given session in
|
||||
a given backup
|
||||
def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
|
||||
"""Replaces the encrypted E2E room key for a given session in a given backup
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup we're setting
|
||||
|
|
@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
StoreError
|
||||
"""
|
||||
|
||||
yield self._simple_upsert(
|
||||
yield self._simple_update_one(
|
||||
table="e2e_room_keys",
|
||||
keyvalues={
|
||||
"user_id": user_id,
|
||||
|
|
@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
},
|
||||
values={
|
||||
updatevalues={
|
||||
"first_message_index": room_key["first_message_index"],
|
||||
"forwarded_count": room_key["forwarded_count"],
|
||||
"is_verified": room_key["is_verified"],
|
||||
"session_data": json.dumps(room_key["session_data"]),
|
||||
},
|
||||
lock=False,
|
||||
desc="update_e2e_room_key",
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Set room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"room_key": room_key,
|
||||
}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_e2e_room_keys(self, user_id, version, room_keys):
|
||||
"""Bulk add room keys to a given backup.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose backup we're adding to
|
||||
version (str): the version ID of the backup for the set of keys we're adding to
|
||||
room_keys (iterable[(str, str, dict)]): the keys to add, in the form
|
||||
(roomID, sessionID, keyData)
|
||||
"""
|
||||
|
||||
values = []
|
||||
for (room_id, session_id, room_key) in room_keys:
|
||||
values.append(
|
||||
{
|
||||
"user_id": user_id,
|
||||
"version": version,
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"first_message_index": room_key["first_message_index"],
|
||||
"forwarded_count": room_key["forwarded_count"],
|
||||
"is_verified": room_key["is_verified"],
|
||||
"session_data": json.dumps(room_key["session_data"]),
|
||||
}
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Set room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"room_key": room_key,
|
||||
}
|
||||
)
|
||||
|
||||
yield self._simple_insert_many(
|
||||
table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
|
||||
)
|
||||
|
||||
@trace
|
||||
|
|
@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
room, or a given session.
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup we're querying
|
||||
version(str): the version ID of the backup for the set of keys we're querying
|
||||
room_id(str): Optional. the ID of the room whose keys we're querying, if any.
|
||||
user_id (str): the user whose backup we're querying
|
||||
version (str): the version ID of the backup for the set of keys we're querying
|
||||
room_id (str): Optional. the ID of the room whose keys we're querying, if any.
|
||||
If not specified, we return the keys for all the rooms in the backup.
|
||||
session_id(str): Optional. the session whose room_key we're querying, if any.
|
||||
session_id (str): Optional. the session whose room_key we're querying, if any.
|
||||
If specified, we also require the room_id to be specified.
|
||||
If not specified, we return all the keys in this version of
|
||||
the backup (or for the specified room)
|
||||
|
|
@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
|
||||
return sessions
|
||||
|
||||
def get_e2e_room_keys_multi(self, user_id, version, room_keys):
|
||||
"""Get multiple room keys at a time. The difference between this function and
|
||||
get_e2e_room_keys is that this function can be used to retrieve
|
||||
multiple specific keys at a time, whereas get_e2e_room_keys is used for
|
||||
getting all the keys in a backup version, all the keys for a room, or a
|
||||
specific key.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose backup we're querying
|
||||
version (str): the version ID of the backup we're querying about
|
||||
room_keys (dict[str, dict[str, iterable[str]]]): a map from
|
||||
room ID -> {"session": [session ids]} indicating the session IDs
|
||||
that we want to query
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"get_e2e_room_keys_multi",
|
||||
self._get_e2e_room_keys_multi_txn,
|
||||
user_id,
|
||||
version,
|
||||
room_keys,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
|
||||
if not room_keys:
|
||||
return {}
|
||||
|
||||
where_clauses = []
|
||||
params = [user_id, version]
|
||||
for room_id, room in room_keys.items():
|
||||
sessions = list(room["sessions"])
|
||||
if not sessions:
|
||||
continue
|
||||
params.append(room_id)
|
||||
params.extend(sessions)
|
||||
where_clauses.append(
|
||||
"(room_id = ? AND session_id IN (%s))"
|
||||
% (",".join(["?" for _ in sessions]),)
|
||||
)
|
||||
|
||||
# check if we're actually querying something
|
||||
if not where_clauses:
|
||||
return {}
|
||||
|
||||
sql = """
|
||||
SELECT room_id, session_id, first_message_index, forwarded_count,
|
||||
is_verified, session_data
|
||||
FROM e2e_room_keys
|
||||
WHERE user_id = ? AND version = ? AND (%s)
|
||||
""" % (
|
||||
" OR ".join(where_clauses)
|
||||
)
|
||||
|
||||
txn.execute(sql, params)
|
||||
|
||||
ret = {}
|
||||
|
||||
for row in txn:
|
||||
room_id = row[0]
|
||||
session_id = row[1]
|
||||
ret.setdefault(room_id, {})
|
||||
ret[room_id][session_id] = {
|
||||
"first_message_index": row[2],
|
||||
"forwarded_count": row[3],
|
||||
"is_verified": row[4],
|
||||
"session_data": json.loads(row[5]),
|
||||
}
|
||||
|
||||
return ret
|
||||
|
||||
def count_e2e_room_keys(self, user_id, version):
|
||||
"""Get the number of keys in a backup version.
|
||||
|
||||
Args:
|
||||
user_id (str): the user whose backup we're querying
|
||||
version (str): the version ID of the backup we're querying about
|
||||
"""
|
||||
|
||||
return self._simple_select_one_onecol(
|
||||
table="e2e_room_keys",
|
||||
keyvalues={"user_id": user_id, "version": version},
|
||||
retcol="COUNT(*)",
|
||||
desc="count_e2e_room_keys",
|
||||
)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
|
|
@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
version(str)
|
||||
algorithm(str)
|
||||
auth_data(object): opaque dict supplied by the client
|
||||
etag(int): tag of the keys in the backup
|
||||
"""
|
||||
|
||||
def _get_e2e_room_keys_version_info_txn(txn):
|
||||
|
|
@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
txn,
|
||||
table="e2e_room_keys_versions",
|
||||
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
|
||||
retcols=("version", "algorithm", "auth_data"),
|
||||
retcols=("version", "algorithm", "auth_data", "etag"),
|
||||
)
|
||||
result["auth_data"] = json.loads(result["auth_data"])
|
||||
result["version"] = str(result["version"])
|
||||
if result["etag"] is None:
|
||||
result["etag"] = 0
|
||||
return result
|
||||
|
||||
return self.runInteraction(
|
||||
|
|
@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
@trace
|
||||
def update_e2e_room_keys_version(self, user_id, version, info):
|
||||
def update_e2e_room_keys_version(
|
||||
self, user_id, version, info=None, version_etag=None
|
||||
):
|
||||
"""Update a given backup version
|
||||
|
||||
Args:
|
||||
user_id(str): the user whose backup version we're updating
|
||||
version(str): the version ID of the backup version we're updating
|
||||
info(dict): the new backup version info to store
|
||||
info (dict): the new backup version info to store. If None, then
|
||||
the backup version info is not updated
|
||||
version_etag (Optional[int]): etag of the keys in the backup. If
|
||||
None, then the etag is not updated
|
||||
"""
|
||||
updatevalues = {}
|
||||
|
||||
return self._simple_update(
|
||||
table="e2e_room_keys_versions",
|
||||
keyvalues={"user_id": user_id, "version": version},
|
||||
updatevalues={"auth_data": json.dumps(info["auth_data"])},
|
||||
desc="update_e2e_room_keys_version",
|
||||
)
|
||||
if info is not None and "auth_data" in info:
|
||||
updatevalues["auth_data"] = json.dumps(info["auth_data"])
|
||||
if version_etag is not None:
|
||||
updatevalues["etag"] = version_etag
|
||||
|
||||
if updatevalues:
|
||||
return self._simple_update(
|
||||
table="e2e_room_keys_versions",
|
||||
keyvalues={"user_id": user_id, "version": version},
|
||||
updatevalues=updatevalues,
|
||||
desc="update_e2e_room_keys_version",
|
||||
)
|
||||
|
||||
@trace
|
||||
def delete_e2e_room_keys_version(self, user_id, version=None):
|
||||
|
|
|
|||
|
|
@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
result.setdefault(user_id, {})[device_id] = None
|
||||
|
||||
# get signatures on the device
|
||||
signature_sql = (
|
||||
"SELECT * " " FROM e2e_cross_signing_signatures " " WHERE %s"
|
||||
) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
|
||||
signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
|
||||
" OR ".join("(" + q + ")" for q in signature_query_clauses)
|
||||
)
|
||||
|
||||
txn.execute(signature_sql, signature_query_params)
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
|
|
|||
|
|
@ -713,9 +713,7 @@ class EventsStore(
|
|||
|
||||
metadata_json = encode_json(event.internal_metadata.get_dict())
|
||||
|
||||
sql = (
|
||||
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
|
||||
)
|
||||
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
|
||||
txn.execute(sql, (metadata_json, event.event_id))
|
||||
|
||||
# Add an entry to the ex_outlier_stream table to replicate the
|
||||
|
|
@ -732,7 +730,7 @@ class EventsStore(
|
|||
},
|
||||
)
|
||||
|
||||
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
|
||||
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
|
||||
txn.execute(sql, (False, event.event_id))
|
||||
|
||||
# Update the event_backward_extremities table now that this
|
||||
|
|
@ -929,6 +927,9 @@ class EventsStore(
|
|||
elif event.type == EventTypes.Redaction:
|
||||
# Insert into the redactions table.
|
||||
self._store_redaction(txn, event)
|
||||
elif event.type == EventTypes.Retention:
|
||||
# Update the room_retention table.
|
||||
self._store_retention_policy_for_room_txn(txn, event)
|
||||
|
||||
self._handle_event_relations(txn, event)
|
||||
|
||||
|
|
@ -1375,6 +1376,10 @@ class EventsStore(
|
|||
if True, we will delete local events as well as remote ones
|
||||
(instead of just marking them as outliers and deleting their
|
||||
state groups).
|
||||
|
||||
Returns:
|
||||
Deferred[set[int]]: The set of state groups that are referenced by
|
||||
deleted events.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
|
|
@ -1475,7 +1480,7 @@ class EventsStore(
|
|||
|
||||
# We do joins against events_to_purge for e.g. calculating state
|
||||
# groups to purge, etc., so lets make an index.
|
||||
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
|
||||
txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
|
||||
|
||||
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
|
||||
event_rows = txn.fetchall()
|
||||
|
|
@ -1511,11 +1516,10 @@ class EventsStore(
|
|||
[(room_id, event_id) for event_id, in new_backwards_extrems],
|
||||
)
|
||||
|
||||
logger.info("[purge] finding redundant state groups")
|
||||
logger.info("[purge] finding state groups referenced by deleted events")
|
||||
|
||||
# Get all state groups that are referenced by events that are to be
|
||||
# deleted. We then go and check if they are referenced by other events
|
||||
# or state groups, and if not we delete them.
|
||||
# deleted.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT DISTINCT state_group FROM events_to_purge
|
||||
|
|
@ -1528,60 +1532,6 @@ class EventsStore(
|
|||
"[purge] found %i referenced state groups", len(referenced_state_groups)
|
||||
)
|
||||
|
||||
logger.info("[purge] finding state groups that can be deleted")
|
||||
|
||||
_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
|
||||
state_groups_to_delete, remaining_state_groups = _
|
||||
|
||||
logger.info(
|
||||
"[purge] found %i state groups to delete", len(state_groups_to_delete)
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[purge] de-delta-ing %i remaining state groups",
|
||||
len(remaining_state_groups),
|
||||
)
|
||||
|
||||
# Now we turn the state groups that reference to-be-deleted state
|
||||
# groups to non delta versions.
|
||||
for sg in remaining_state_groups:
|
||||
logger.info("[purge] de-delta-ing remaining state group %s", sg)
|
||||
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
|
||||
curr_state = curr_state[sg]
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn, table="state_groups_state", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn, table="state_group_edges", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": sg,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in iteritems(curr_state)
|
||||
],
|
||||
)
|
||||
|
||||
logger.info("[purge] removing redundant state groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups WHERE id = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
|
||||
logger.info("[purge] removing events from event_to_state_groups")
|
||||
txn.execute(
|
||||
"DELETE FROM event_to_state_groups "
|
||||
|
|
@ -1668,138 +1618,35 @@ class EventsStore(
|
|||
|
||||
logger.info("[purge] done")
|
||||
|
||||
def _find_unreferenced_groups_during_purge(self, txn, state_groups):
|
||||
"""Used when purging history to figure out which state groups can be
|
||||
deleted and which need to be de-delta'ed (due to one of its prev groups
|
||||
being scheduled for deletion).
|
||||
|
||||
Args:
|
||||
txn
|
||||
state_groups (set[int]): Set of state groups referenced by events
|
||||
that are going to be deleted.
|
||||
|
||||
Returns:
|
||||
tuple[set[int], set[int]]: The set of state groups that can be
|
||||
deleted and the set of state groups that need to be de-delta'ed
|
||||
"""
|
||||
# Graph of state group -> previous group
|
||||
graph = {}
|
||||
|
||||
# Set of events that we have found to be referenced by events
|
||||
referenced_groups = set()
|
||||
|
||||
# Set of state groups we've already seen
|
||||
state_groups_seen = set(state_groups)
|
||||
|
||||
# Set of state groups to handle next.
|
||||
next_to_search = set(state_groups)
|
||||
while next_to_search:
|
||||
# We bound size of groups we're looking up at once, to stop the
|
||||
# SQL query getting too big
|
||||
if len(next_to_search) < 100:
|
||||
current_search = next_to_search
|
||||
next_to_search = set()
|
||||
else:
|
||||
current_search = set(itertools.islice(next_to_search, 100))
|
||||
next_to_search -= current_search
|
||||
|
||||
# Check if state groups are referenced
|
||||
sql = """
|
||||
SELECT DISTINCT state_group FROM event_to_state_groups
|
||||
LEFT JOIN events_to_purge AS ep USING (event_id)
|
||||
WHERE ep.event_id IS NULL AND
|
||||
"""
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "state_group", current_search
|
||||
)
|
||||
txn.execute(sql + clause, list(args))
|
||||
|
||||
referenced = set(sg for sg, in txn)
|
||||
referenced_groups |= referenced
|
||||
|
||||
# We don't continue iterating up the state group graphs for state
|
||||
# groups that are referenced.
|
||||
current_search -= referenced
|
||||
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=current_search,
|
||||
keyvalues={},
|
||||
retcols=("prev_state_group", "state_group"),
|
||||
)
|
||||
|
||||
prevs = set(row["state_group"] for row in rows)
|
||||
# We don't bother re-handling groups we've already seen
|
||||
prevs -= state_groups_seen
|
||||
next_to_search |= prevs
|
||||
state_groups_seen |= prevs
|
||||
|
||||
for row in rows:
|
||||
# Note: Each state group can have at most one prev group
|
||||
graph[row["state_group"]] = row["prev_state_group"]
|
||||
|
||||
to_delete = state_groups_seen - referenced_groups
|
||||
|
||||
to_dedelta = set()
|
||||
for sg in referenced_groups:
|
||||
prev_sg = graph.get(sg)
|
||||
if prev_sg and prev_sg in to_delete:
|
||||
to_dedelta.add(sg)
|
||||
|
||||
return to_delete, to_dedelta
|
||||
return referenced_state_groups
|
||||
|
||||
def purge_room(self, room_id):
|
||||
"""Deletes all record of a room
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
room_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[List[int]]: The list of state groups to delete.
|
||||
"""
|
||||
|
||||
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
|
||||
|
||||
def _purge_room_txn(self, txn, room_id):
|
||||
# first we have to delete the state groups states
|
||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||
|
||||
# First we fetch all the state groups that should be deleted, before
|
||||
# we delete that information.
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_groups_state WHERE state_group IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
SELECT DISTINCT state_group FROM events
|
||||
INNER JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id = ?
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# ... and the state group edges
|
||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||
state_groups = [row[0] for row in txn]
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_group_edges WHERE state_group IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# ... and the state groups
|
||||
logger.info("[purge] removing %s from state_groups", room_id)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_groups WHERE id IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# and then tables which lack an index on room_id but have one on event_id
|
||||
# Now we delete tables which lack an index on room_id but have one on event_id
|
||||
for table in (
|
||||
"event_auth",
|
||||
"event_edges",
|
||||
|
|
@ -1887,6 +1734,165 @@ class EventsStore(
|
|||
|
||||
logger.info("[purge] done")
|
||||
|
||||
return state_groups
|
||||
|
||||
def purge_unreferenced_state_groups(
|
||||
self, room_id: str, state_groups_to_delete
|
||||
) -> defer.Deferred:
|
||||
"""Deletes no longer referenced state groups and de-deltas any state
|
||||
groups that reference them.
|
||||
|
||||
Args:
|
||||
room_id: The room the state groups belong to (must all be in the
|
||||
same room).
|
||||
state_groups_to_delete (Collection[int]): Set of all state groups
|
||||
to delete.
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"purge_unreferenced_state_groups",
|
||||
self._purge_unreferenced_state_groups,
|
||||
room_id,
|
||||
state_groups_to_delete,
|
||||
)
|
||||
|
||||
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
|
||||
logger.info(
|
||||
"[purge] found %i state groups to delete", len(state_groups_to_delete)
|
||||
)
|
||||
|
||||
rows = self._simple_select_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
retcols=("state_group",),
|
||||
)
|
||||
|
||||
remaining_state_groups = set(
|
||||
row["state_group"]
|
||||
for row in rows
|
||||
if row["state_group"] not in state_groups_to_delete
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[purge] de-delta-ing %i remaining state groups",
|
||||
len(remaining_state_groups),
|
||||
)
|
||||
|
||||
# Now we turn the state groups that reference to-be-deleted state
|
||||
# groups to non delta versions.
|
||||
for sg in remaining_state_groups:
|
||||
logger.info("[purge] de-delta-ing remaining state group %s", sg)
|
||||
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
|
||||
curr_state = curr_state[sg]
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn, table="state_groups_state", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self._simple_delete_txn(
|
||||
txn, table="state_group_edges", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": sg,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in iteritems(curr_state)
|
||||
],
|
||||
)
|
||||
|
||||
logger.info("[purge] removing redundant state groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups WHERE id = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_previous_state_groups(self, state_groups):
|
||||
"""Fetch the previous groups of the given state groups.
|
||||
|
||||
Args:
|
||||
state_groups (Iterable[int])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[int, int]]: mapping from state group to previous
|
||||
state group.
|
||||
"""
|
||||
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=state_groups,
|
||||
keyvalues={},
|
||||
retcols=("prev_state_group", "state_group"),
|
||||
desc="get_previous_state_groups",
|
||||
)
|
||||
|
||||
return {row["state_group"]: row["prev_state_group"] for row in rows}
|
||||
|
||||
def purge_room_state(self, room_id, state_groups_to_delete):
|
||||
"""Deletes all record of a room from state tables
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
state_groups_to_delete (list[int]): State groups to delete
|
||||
"""
|
||||
|
||||
return self.runInteraction(
|
||||
"purge_room_state",
|
||||
self._purge_room_state_txn,
|
||||
room_id,
|
||||
state_groups_to_delete,
|
||||
)
|
||||
|
||||
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
|
||||
# first we have to delete the state groups states
|
||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||
|
||||
self._simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
column="state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
# ... and the state group edges
|
||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||
|
||||
self._simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
# ... and the state groups
|
||||
logger.info("[purge] removing %s from state_groups", room_id)
|
||||
|
||||
self._simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
column="id",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
async def is_event_after(self, event_id1, event_id2):
|
||||
"""Returns True if event_id1 is after event_id2 in the stream
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from canonicaljson import json
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventContentFields
|
||||
from synapse.storage._base import make_in_list_sql_clause
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
||||
|
|
@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
"event_fix_redactions_bytes", self._event_fix_redactions_bytes
|
||||
)
|
||||
|
||||
self.register_background_update_handler(
|
||||
"event_store_labels", self._event_store_labels
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_reindex_fields_sender(self, progress, batch_size):
|
||||
target_min_stream_id = progress["target_min_stream_id_inclusive"]
|
||||
|
|
@ -503,3 +508,68 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
|
|||
yield self._end_background_update("event_fix_redactions_bytes")
|
||||
|
||||
return 1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _event_store_labels(self, progress, batch_size):
|
||||
"""Background update handler which will store labels for existing events."""
|
||||
last_event_id = progress.get("last_event_id", "")
|
||||
|
||||
def _event_store_labels_txn(txn):
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, json FROM event_json
|
||||
LEFT JOIN event_labels USING (event_id)
|
||||
WHERE event_id > ? AND label IS NULL
|
||||
ORDER BY event_id LIMIT ?
|
||||
""",
|
||||
(last_event_id, batch_size),
|
||||
)
|
||||
|
||||
results = list(txn)
|
||||
|
||||
nbrows = 0
|
||||
last_row_event_id = ""
|
||||
for (event_id, event_json_raw) in results:
|
||||
try:
|
||||
event_json = json.loads(event_json_raw)
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn=txn,
|
||||
table="event_labels",
|
||||
values=[
|
||||
{
|
||||
"event_id": event_id,
|
||||
"label": label,
|
||||
"room_id": event_json["room_id"],
|
||||
"topological_ordering": event_json["depth"],
|
||||
}
|
||||
for label in event_json["content"].get(
|
||||
EventContentFields.LABELS, []
|
||||
)
|
||||
if isinstance(label, str)
|
||||
],
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Unable to load event %s (no labels will be imported): %s",
|
||||
event_id,
|
||||
e,
|
||||
)
|
||||
|
||||
nbrows += 1
|
||||
last_row_event_id = event_id
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, "event_store_labels", {"last_event_id": last_row_event_id}
|
||||
)
|
||||
|
||||
return nbrows
|
||||
|
||||
num_rows = yield self.runInteraction(
|
||||
desc="event_store_labels", func=_event_store_labels_txn
|
||||
)
|
||||
|
||||
if not num_rows:
|
||||
yield self._end_background_update("event_store_labels")
|
||||
|
||||
return num_rows
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
|
|||
if filter_id_response is not None:
|
||||
return filter_id_response[0]
|
||||
|
||||
sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
|
||||
sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
|
||||
txn.execute(sql, (user_localpart,))
|
||||
max_id = txn.fetchone()[0]
|
||||
if max_id is None:
|
||||
|
|
|
|||
|
|
@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore):
|
|||
desc="remove_user_from_summary",
|
||||
)
|
||||
|
||||
def get_local_groups_for_room(self, room_id):
|
||||
"""Get all of the local group that contain a given room
|
||||
Args:
|
||||
room_id (str): The ID of a room
|
||||
Returns:
|
||||
Deferred[list[str]]: A twisted.Deferred containing a list of group ids
|
||||
containing this room
|
||||
"""
|
||||
return self._simple_select_onecol(
|
||||
table="group_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="group_id",
|
||||
desc="get_local_groups_for_room",
|
||||
)
|
||||
|
||||
def get_users_for_summary_by_role(self, group_id, include_private=False):
|
||||
"""Get the users and roles that should be included in a summary request
|
||||
|
||||
|
|
|
|||
|
|
@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
|||
if len(media_ids) == 0:
|
||||
return
|
||||
|
||||
sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
|
||||
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
|
||||
|
||||
def _delete_url_cache_txn(txn):
|
||||
txn.executemany(sql, [(media_id,) for media_id in media_ids])
|
||||
|
|
@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
|||
return
|
||||
|
||||
def _delete_url_cache_media_txn(txn):
|
||||
sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
|
||||
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
|
||||
|
||||
txn.executemany(sql, [(media_id,) for media_id in media_ids])
|
||||
|
||||
sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
|
||||
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
|
||||
|
||||
txn.executemany(sql, [(media_id,) for media_id in media_ids])
|
||||
|
||||
|
|
|
|||
|
|
@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
|
|||
args.append(limit)
|
||||
txn.execute(sql, args)
|
||||
|
||||
return (r[0:5] + (json.loads(r[5]),) for r in txn)
|
||||
return list(r[0:5] + (json.loads(r[5]),) for r in txn)
|
||||
|
||||
return self.runInteraction(
|
||||
"get_all_updated_receipts", get_all_updated_receipts_txn
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import logging
|
|||
import re
|
||||
|
||||
from six import iterkeys
|
||||
from six.moves import range
|
||||
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.defer import Deferred
|
||||
|
|
@ -377,9 +376,7 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
|
||||
)
|
||||
sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
|
||||
txn.execute(sql, (user_id,))
|
||||
return dict(txn)
|
||||
|
||||
|
|
@ -484,30 +481,25 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
Gets the localpart of the next generated user ID.
|
||||
|
||||
Generated user IDs are integers, and we aim for them to be as small as
|
||||
we can. Unfortunately, it's possible some of them are already taken by
|
||||
existing users, and there may be gaps in the already taken range. This
|
||||
function returns the start of the first allocatable gap. This is to
|
||||
avoid the case of ID 10000000 being pre-allocated, so us wasting the
|
||||
first (and shortest) many generated user IDs.
|
||||
Generated user IDs are integers, so we find the largest integer user ID
|
||||
already taken and return that plus one.
|
||||
"""
|
||||
|
||||
def _find_next_generated_user_id(txn):
|
||||
# We bound between '@1' and '@a' to avoid pulling the entire table
|
||||
# We bound between '@0' and '@a' to avoid pulling the entire table
|
||||
# out.
|
||||
txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
|
||||
txn.execute("SELECT name FROM users WHERE '@0' <= name AND name < '@a'")
|
||||
|
||||
regex = re.compile(r"^@(\d+):")
|
||||
|
||||
found = set()
|
||||
max_found = 0
|
||||
|
||||
for (user_id,) in txn:
|
||||
match = regex.search(user_id)
|
||||
if match:
|
||||
found.add(int(match.group(1)))
|
||||
for i in range(len(found) + 1):
|
||||
if i not in found:
|
||||
return i
|
||||
max_found = max(int(match.group(1)), max_found)
|
||||
|
||||
return max_found + 1
|
||||
|
||||
return (
|
||||
(
|
||||
|
|
@ -577,6 +569,19 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
return self._simple_delete(
|
||||
"user_threepids",
|
||||
keyvalues={"user_id": user_id, "medium": medium, "address": address},
|
||||
desc="user_delete_threepid",
|
||||
)
|
||||
|
||||
def user_delete_threepids(self, user_id: str):
|
||||
"""Delete all threepid this user has bound
|
||||
|
||||
Args:
|
||||
user_id: The user id to delete all threepids of
|
||||
|
||||
"""
|
||||
return self._simple_delete(
|
||||
"user_threepids",
|
||||
keyvalues={"user_id": user_id},
|
||||
desc="user_delete_threepids",
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -19,10 +19,13 @@ import logging
|
|||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from six import integer_types
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.data_stores.main.search import SearchStore
|
||||
|
|
@ -300,8 +303,141 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
else:
|
||||
return None
|
||||
|
||||
@cachedInlineCallbacks()
|
||||
def get_retention_policy_for_room(self, room_id):
|
||||
"""Get the retention policy for a given room.
|
||||
|
||||
If no retention policy has been found for this room, returns a policy defined
|
||||
by the configured default policy (which has None as both the 'min_lifetime' and
|
||||
the 'max_lifetime' if no default policy has been defined in the server's
|
||||
configuration).
|
||||
|
||||
Args:
|
||||
room_id (str): The ID of the room to get the retention policy of.
|
||||
|
||||
Returns:
|
||||
dict[int, int]: "min_lifetime" and "max_lifetime" for this room.
|
||||
"""
|
||||
|
||||
def get_retention_policy_for_room_txn(txn):
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT min_lifetime, max_lifetime FROM room_retention
|
||||
INNER JOIN current_state_events USING (event_id, room_id)
|
||||
WHERE room_id = ?;
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
return self.cursor_to_dict(txn)
|
||||
|
||||
ret = yield self.runInteraction(
|
||||
"get_retention_policy_for_room", get_retention_policy_for_room_txn,
|
||||
)
|
||||
|
||||
# If we don't know this room ID, ret will be None, in this case return the default
|
||||
# policy.
|
||||
if not ret:
|
||||
defer.returnValue(
|
||||
{
|
||||
"min_lifetime": self.config.retention_default_min_lifetime,
|
||||
"max_lifetime": self.config.retention_default_max_lifetime,
|
||||
}
|
||||
)
|
||||
|
||||
row = ret[0]
|
||||
|
||||
# If one of the room's policy's attributes isn't defined, use the matching
|
||||
# attribute from the default policy.
|
||||
# The default values will be None if no default policy has been defined, or if one
|
||||
# of the attributes is missing from the default policy.
|
||||
if row["min_lifetime"] is None:
|
||||
row["min_lifetime"] = self.config.retention_default_min_lifetime
|
||||
|
||||
if row["max_lifetime"] is None:
|
||||
row["max_lifetime"] = self.config.retention_default_max_lifetime
|
||||
|
||||
defer.returnValue(row)
|
||||
|
||||
|
||||
class RoomStore(RoomWorkerStore, SearchStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(RoomStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.config = hs.config
|
||||
|
||||
self.register_background_update_handler(
|
||||
"insert_room_retention", self._background_insert_retention,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_insert_retention(self, progress, batch_size):
|
||||
"""Retrieves a list of all rooms within a range and inserts an entry for each of
|
||||
them into the room_retention table.
|
||||
NULLs the property's columns if missing from the retention event in the room's
|
||||
state (or NULLs all of them if there's no retention event in the room's state),
|
||||
so that we fall back to the server's retention policy.
|
||||
"""
|
||||
|
||||
last_room = progress.get("room_id", "")
|
||||
|
||||
def _background_insert_retention_txn(txn):
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT state.room_id, state.event_id, events.json
|
||||
FROM current_state_events as state
|
||||
LEFT JOIN event_json AS events ON (state.event_id = events.event_id)
|
||||
WHERE state.room_id > ? AND state.type = '%s'
|
||||
ORDER BY state.room_id ASC
|
||||
LIMIT ?;
|
||||
"""
|
||||
% EventTypes.Retention,
|
||||
(last_room, batch_size),
|
||||
)
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
if not rows:
|
||||
return True
|
||||
|
||||
for row in rows:
|
||||
if not row["json"]:
|
||||
retention_policy = {}
|
||||
else:
|
||||
ev = json.loads(row["json"])
|
||||
retention_policy = json.dumps(ev["content"])
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn=txn,
|
||||
table="room_retention",
|
||||
values={
|
||||
"room_id": row["room_id"],
|
||||
"event_id": row["event_id"],
|
||||
"min_lifetime": retention_policy.get("min_lifetime"),
|
||||
"max_lifetime": retention_policy.get("max_lifetime"),
|
||||
},
|
||||
)
|
||||
|
||||
logger.info("Inserted %d rows into room_retention", len(rows))
|
||||
|
||||
self._background_update_progress_txn(
|
||||
txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
|
||||
)
|
||||
|
||||
if batch_size > len(rows):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
end = yield self.runInteraction(
|
||||
"insert_room_retention", _background_insert_retention_txn,
|
||||
)
|
||||
|
||||
if end:
|
||||
yield self._end_background_update("insert_room_retention")
|
||||
|
||||
defer.returnValue(batch_size)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def store_room(self, room_id, room_creator_user_id, is_public):
|
||||
"""Stores a room.
|
||||
|
|
@ -502,6 +638,35 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||
txn, event, "content.body", event.content["body"]
|
||||
)
|
||||
|
||||
def _store_retention_policy_for_room_txn(self, txn, event):
|
||||
if hasattr(event, "content") and (
|
||||
"min_lifetime" in event.content or "max_lifetime" in event.content
|
||||
):
|
||||
if (
|
||||
"min_lifetime" in event.content
|
||||
and not isinstance(event.content.get("min_lifetime"), integer_types)
|
||||
) or (
|
||||
"max_lifetime" in event.content
|
||||
and not isinstance(event.content.get("max_lifetime"), integer_types)
|
||||
):
|
||||
# Ignore the event if one of the value isn't an integer.
|
||||
return
|
||||
|
||||
self._simple_insert_txn(
|
||||
txn=txn,
|
||||
table="room_retention",
|
||||
values={
|
||||
"room_id": event.room_id,
|
||||
"event_id": event.event_id,
|
||||
"min_lifetime": event.content.get("min_lifetime"),
|
||||
"max_lifetime": event.content.get("max_lifetime"),
|
||||
},
|
||||
)
|
||||
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_retention_policy_for_room, (event.room_id,)
|
||||
)
|
||||
|
||||
def add_event_report(
|
||||
self, room_id, event_id, user_id, reason, content, received_ts
|
||||
):
|
||||
|
|
@ -683,3 +848,89 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||
remote_media_mxcs.append((hostname, media_id))
|
||||
|
||||
return local_media_mxcs, remote_media_mxcs
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_rooms_for_retention_period_in_range(
|
||||
self, min_ms, max_ms, include_null=False
|
||||
):
|
||||
"""Retrieves all of the rooms within the given retention range.
|
||||
|
||||
Optionally includes the rooms which don't have a retention policy.
|
||||
|
||||
Args:
|
||||
min_ms (int|None): Duration in milliseconds that define the lower limit of
|
||||
the range to handle (exclusive). If None, doesn't set a lower limit.
|
||||
max_ms (int|None): Duration in milliseconds that define the upper limit of
|
||||
the range to handle (inclusive). If None, doesn't set an upper limit.
|
||||
include_null (bool): Whether to include rooms which retention policy is NULL
|
||||
in the returned set.
|
||||
|
||||
Returns:
|
||||
dict[str, dict]: The rooms within this range, along with their retention
|
||||
policy. The key is "room_id", and maps to a dict describing the retention
|
||||
policy associated with this room ID. The keys for this nested dict are
|
||||
"min_lifetime" (int|None), and "max_lifetime" (int|None).
|
||||
"""
|
||||
|
||||
def get_rooms_for_retention_period_in_range_txn(txn):
|
||||
range_conditions = []
|
||||
args = []
|
||||
|
||||
if min_ms is not None:
|
||||
range_conditions.append("max_lifetime > ?")
|
||||
args.append(min_ms)
|
||||
|
||||
if max_ms is not None:
|
||||
range_conditions.append("max_lifetime <= ?")
|
||||
args.append(max_ms)
|
||||
|
||||
# Do a first query which will retrieve the rooms that have a retention policy
|
||||
# in their current state.
|
||||
sql = """
|
||||
SELECT room_id, min_lifetime, max_lifetime FROM room_retention
|
||||
INNER JOIN current_state_events USING (event_id, room_id)
|
||||
"""
|
||||
|
||||
if len(range_conditions):
|
||||
sql += " WHERE (" + " AND ".join(range_conditions) + ")"
|
||||
|
||||
if include_null:
|
||||
sql += " OR max_lifetime IS NULL"
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
rooms_dict = {}
|
||||
|
||||
for row in rows:
|
||||
rooms_dict[row["room_id"]] = {
|
||||
"min_lifetime": row["min_lifetime"],
|
||||
"max_lifetime": row["max_lifetime"],
|
||||
}
|
||||
|
||||
if include_null:
|
||||
# If required, do a second query that retrieves all of the rooms we know
|
||||
# of so we can handle rooms with no retention policy.
|
||||
sql = "SELECT DISTINCT room_id FROM current_state_events"
|
||||
|
||||
txn.execute(sql)
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
# If a room isn't already in the dict (i.e. it doesn't have a retention
|
||||
# policy in its state), add it with a null policy.
|
||||
for row in rows:
|
||||
if row["room_id"] not in rooms_dict:
|
||||
rooms_dict[row["room_id"]] = {
|
||||
"min_lifetime": None,
|
||||
"max_lifetime": None,
|
||||
}
|
||||
|
||||
return rooms_dict
|
||||
|
||||
rooms = yield self.runInteraction(
|
||||
"get_rooms_for_retention_period_in_range",
|
||||
get_rooms_for_retention_period_in_range_txn,
|
||||
)
|
||||
|
||||
defer.returnValue(rooms)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,17 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('event_store_labels', '{}');
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
/* Copyright 2019 Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- store the current etag of backup version
|
||||
ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
/* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- Tracks the retention policy of a room.
|
||||
-- A NULL max_lifetime or min_lifetime means that the matching property is not defined in
|
||||
-- the room's retention policy state event.
|
||||
-- If a room doesn't have a retention policy state event in its state, both max_lifetime
|
||||
-- and min_lifetime are NULL.
|
||||
CREATE TABLE IF NOT EXISTS room_retention(
|
||||
room_id TEXT,
|
||||
event_id TEXT,
|
||||
min_lifetime BIGINT,
|
||||
max_lifetime BIGINT,
|
||||
|
||||
PRIMARY KEY(room_id, event_id)
|
||||
);
|
||||
|
||||
CREATE INDEX room_retention_max_lifetime_idx on room_retention(max_lifetime);
|
||||
|
||||
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||
('insert_room_retention', '{}');
|
||||
|
|
@ -285,7 +285,11 @@ class StateGroupWorkerStore(
|
|||
room_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[unicode|None]: predecessor room id
|
||||
Deferred[dict|None]: A dictionary containing the structure of the predecessor
|
||||
field from the room's create event. The structure is subject to other servers,
|
||||
but it is expected to be:
|
||||
* room_id (str): The room ID of the predecessor room
|
||||
* event_id (str): The ID of the tombstone event in the predecessor room
|
||||
|
||||
Raises:
|
||||
NotFoundError if the room is unknown
|
||||
|
|
@ -991,6 +995,29 @@ class StateGroupWorkerStore(
|
|||
|
||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_referenced_state_groups(self, state_groups):
|
||||
"""Check if the state groups are referenced by events.
|
||||
|
||||
Args:
|
||||
state_groups (Iterable[int])
|
||||
|
||||
Returns:
|
||||
Deferred[set[int]]: The subset of state groups that are
|
||||
referenced.
|
||||
"""
|
||||
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="event_to_state_groups",
|
||||
column="state_group",
|
||||
iterable=state_groups,
|
||||
keyvalues={},
|
||||
retcols=("DISTINCT state_group",),
|
||||
desc="get_referenced_state_groups",
|
||||
)
|
||||
|
||||
return set(row["state_group"] for row in rows)
|
||||
|
||||
|
||||
class StateBackgroundUpdateStore(
|
||||
StateGroupBackgroundUpdateStore, BackgroundUpdateStore
|
||||
|
|
@ -1231,7 +1258,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
|
|||
# if the event was rejected, just give it the same state as its
|
||||
# predecessor.
|
||||
if context.rejected:
|
||||
state_groups[event.event_id] = context.prev_group
|
||||
state_groups[event.event_id] = context.state_group_before_event
|
||||
continue
|
||||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
|
|
|||
|
|
@ -619,7 +619,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
def _get_max_topological_txn(self, txn, room_id):
|
||||
txn.execute(
|
||||
"SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
|
||||
"SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
|
|
@ -874,14 +874,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
args.append(int(limit))
|
||||
|
||||
sql = (
|
||||
"SELECT DISTINCT event_id, topological_ordering, stream_ordering"
|
||||
" FROM events"
|
||||
" LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
|
||||
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
|
||||
" ORDER BY topological_ordering %(order)s,"
|
||||
" stream_ordering %(order)s LIMIT ?"
|
||||
) % {"bounds": bounds, "order": order}
|
||||
select_keywords = "SELECT"
|
||||
join_clause = ""
|
||||
if event_filter and event_filter.labels:
|
||||
# If we're not filtering on a label, then joining on event_labels will
|
||||
# return as many row for a single event as the number of labels it has. To
|
||||
# avoid this, only join if we're filtering on at least one label.
|
||||
join_clause = """
|
||||
LEFT JOIN event_labels
|
||||
USING (event_id, room_id, topological_ordering)
|
||||
"""
|
||||
if len(event_filter.labels) > 1:
|
||||
# Using DISTINCT in this SELECT query is quite expensive, because it
|
||||
# requires the engine to sort on the entire (not limited) result set,
|
||||
# i.e. the entire events table. We only need to use it when we're
|
||||
# filtering on more than two labels, because that's the only scenario
|
||||
# in which we can possibly to get multiple times the same event ID in
|
||||
# the results.
|
||||
select_keywords += "DISTINCT"
|
||||
|
||||
sql = """
|
||||
%(select_keywords)s event_id, topological_ordering, stream_ordering
|
||||
FROM events
|
||||
%(join_clause)s
|
||||
WHERE outlier = ? AND room_id = ? AND %(bounds)s
|
||||
ORDER BY topological_ordering %(order)s,
|
||||
stream_ordering %(order)s LIMIT ?
|
||||
""" % {
|
||||
"select_keywords": select_keywords,
|
||||
"join_clause": join_clause,
|
||||
"bounds": bounds,
|
||||
"order": order,
|
||||
}
|
||||
|
||||
txn.execute(sql, args)
|
||||
|
||||
|
|
|
|||
|
|
@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
|
|||
)
|
||||
|
||||
def get_tag_content(txn, tag_ids):
|
||||
sql = (
|
||||
"SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
|
||||
)
|
||||
sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
|
||||
results = []
|
||||
for stream_id, user_id, room_id in tag_ids:
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
|
|
|
|||
|
|
@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
|
|||
# Mark as done.
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
|
||||
"INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
|
||||
),
|
||||
(modname, name),
|
||||
)
|
||||
|
|
|
|||
117
synapse/storage/purge_events.py
Normal file
117
synapse/storage/purge_events.py
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PurgeEventsStorage(object):
|
||||
"""High level interface for purging rooms and event history.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, stores):
|
||||
self.stores = stores
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def purge_room(self, room_id: str):
|
||||
"""Deletes all record of a room
|
||||
"""
|
||||
|
||||
state_groups_to_delete = yield self.stores.main.purge_room(room_id)
|
||||
yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def purge_history(self, room_id, token, delete_local_events):
|
||||
"""Deletes room history before a certain point
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
|
||||
token (str): A topological token to delete events before
|
||||
|
||||
delete_local_events (bool):
|
||||
if True, we will delete local events as well as remote ones
|
||||
(instead of just marking them as outliers and deleting their
|
||||
state groups).
|
||||
"""
|
||||
state_groups = yield self.stores.main.purge_history(
|
||||
room_id, token, delete_local_events
|
||||
)
|
||||
|
||||
logger.info("[purge] finding state groups that can be deleted")
|
||||
|
||||
sg_to_delete = yield self._find_unreferenced_groups(state_groups)
|
||||
|
||||
yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _find_unreferenced_groups(self, state_groups):
|
||||
"""Used when purging history to figure out which state groups can be
|
||||
deleted.
|
||||
|
||||
Args:
|
||||
state_groups (set[int]): Set of state groups referenced by events
|
||||
that are going to be deleted.
|
||||
|
||||
Returns:
|
||||
Deferred[set[int]] The set of state groups that can be deleted.
|
||||
"""
|
||||
# Graph of state group -> previous group
|
||||
graph = {}
|
||||
|
||||
# Set of events that we have found to be referenced by events
|
||||
referenced_groups = set()
|
||||
|
||||
# Set of state groups we've already seen
|
||||
state_groups_seen = set(state_groups)
|
||||
|
||||
# Set of state groups to handle next.
|
||||
next_to_search = set(state_groups)
|
||||
while next_to_search:
|
||||
# We bound size of groups we're looking up at once, to stop the
|
||||
# SQL query getting too big
|
||||
if len(next_to_search) < 100:
|
||||
current_search = next_to_search
|
||||
next_to_search = set()
|
||||
else:
|
||||
current_search = set(itertools.islice(next_to_search, 100))
|
||||
next_to_search -= current_search
|
||||
|
||||
referenced = yield self.stores.main.get_referenced_state_groups(
|
||||
current_search
|
||||
)
|
||||
referenced_groups |= referenced
|
||||
|
||||
# We don't continue iterating up the state group graphs for state
|
||||
# groups that are referenced.
|
||||
current_search -= referenced
|
||||
|
||||
edges = yield self.stores.main.get_previous_state_groups(current_search)
|
||||
|
||||
prevs = set(edges.values())
|
||||
# We don't bother re-handling groups we've already seen
|
||||
prevs -= state_groups_seen
|
||||
next_to_search |= prevs
|
||||
state_groups_seen |= prevs
|
||||
|
||||
graph.update(edges)
|
||||
|
||||
to_delete = state_groups_seen - referenced_groups
|
||||
|
||||
return to_delete
|
||||
Loading…
Add table
Add a link
Reference in a new issue